http://git-wip-us.apache.org/repos/asf/phoenix/blob/88c6abb0/phoenix-core/src/it/java/org/apache/phoenix/end2end/TenantSpecificTablesDMLIT.java ---------------------------------------------------------------------- diff --git a/phoenix-core/src/it/java/org/apache/phoenix/end2end/TenantSpecificTablesDMLIT.java b/phoenix-core/src/it/java/org/apache/phoenix/end2end/TenantSpecificTablesDMLIT.java index 8f9e553..4b20979 100644 --- a/phoenix-core/src/it/java/org/apache/phoenix/end2end/TenantSpecificTablesDMLIT.java +++ b/phoenix-core/src/it/java/org/apache/phoenix/end2end/TenantSpecificTablesDMLIT.java @@ -18,34 +18,22 @@ package org.apache.phoenix.end2end; import static org.apache.phoenix.util.TestUtil.TEST_PROPERTIES; +import static org.apache.phoenix.util.TestUtil.analyzeTable; +import static org.apache.phoenix.util.TestUtil.getAllSplits; import static org.junit.Assert.assertEquals; import static org.junit.Assert.assertFalse; import static org.junit.Assert.assertTrue; import static org.junit.Assert.fail; -import java.io.IOException; import java.sql.Connection; import java.sql.DriverManager; import java.sql.ResultSet; import java.sql.SQLException; -import java.util.Collections; -import java.util.Comparator; import java.util.List; import java.util.Properties; -import org.apache.hadoop.hbase.HRegionLocation; -import org.apache.hadoop.hbase.client.Scan; -import org.apache.hadoop.hbase.util.Bytes; -import org.apache.phoenix.compile.SequenceManager; -import org.apache.phoenix.compile.StatementContext; -import org.apache.phoenix.iterate.DefaultParallelIteratorRegionSplitter; -import org.apache.phoenix.jdbc.PhoenixConnection; -import org.apache.phoenix.jdbc.PhoenixStatement; -import org.apache.phoenix.parse.HintNode; import org.apache.phoenix.query.KeyRange; -import org.apache.phoenix.schema.PTableKey; import org.apache.phoenix.schema.TableNotFoundException; -import org.apache.phoenix.schema.TableRef; import org.apache.phoenix.util.PhoenixRuntime; import org.apache.phoenix.util.PropertiesUtil; import org.junit.Test; @@ -153,9 +141,8 @@ public class TenantSpecificTablesDMLIT extends BaseTenantSpecificTablesIT { assertTrue("Expected 1 row in result set", rs.next()); assertEquals(2, rs.getInt(3)); assertEquals("Viva Las Vegas", rs.getString(4)); - conn1 = nextConnection(getUrl()); - List<KeyRange> splits = getSplits(conn1, new Scan()); - assertEquals(splits.size(), 5); + List<KeyRange> splits = getAllSplits(conn1, TENANT_TABLE_NAME); + assertEquals(3, splits.size()); } finally { conn1.close(); @@ -490,10 +477,6 @@ public class TenantSpecificTablesDMLIT extends BaseTenantSpecificTablesIT { } } - private void analyzeTable(Connection conn, String tableName) throws IOException, SQLException { - String query = "ANALYZE " + tableName; - conn.createStatement().execute(query); - } @Test public void testUpsertValuesUsingViewWithNoWhereClause() throws Exception { Connection conn = nextConnection(PHOENIX_JDBC_TENANT_SPECIFIC_URL); @@ -508,34 +491,4 @@ public class TenantSpecificTablesDMLIT extends BaseTenantSpecificTablesIT { assertFalse(rs.next()); conn.close(); } - private static List<KeyRange> getSplits(Connection conn, final Scan scan) throws SQLException { - TableRef tableRef = getTableRef(conn); - PhoenixConnection pconn = conn.unwrap(PhoenixConnection.class); - final List<HRegionLocation> regions = pconn.getQueryServices().getAllTableRegions( - tableRef.getTable().getPhysicalName().getBytes()); - PhoenixStatement statement = new PhoenixStatement(pconn); - StatementContext context = new StatementContext(statement, null, scan, new SequenceManager(statement)); - DefaultParallelIteratorRegionSplitter splitter = new DefaultParallelIteratorRegionSplitter(context, tableRef.getTable(), - HintNode.EMPTY_HINT_NODE) { - @Override - protected List<HRegionLocation> getAllRegions() throws SQLException { - return DefaultParallelIteratorRegionSplitter.filterRegions(regions, scan.getStartRow(), - scan.getStopRow()); - } - }; - List<KeyRange> keyRanges = splitter.getSplits(); - Collections.sort(keyRanges, new Comparator<KeyRange>() { - @Override - public int compare(KeyRange o1, KeyRange o2) { - return Bytes.compareTo(o1.getLowerRange(), o2.getLowerRange()); - } - }); - return keyRanges; - } - protected static TableRef getTableRef(Connection conn) throws SQLException { - PhoenixConnection pconn = conn.unwrap(PhoenixConnection.class); - TableRef table = new TableRef(null, pconn.getMetaDataCache().getTable( - new PTableKey(pconn.getTenantId(), PARENT_TABLE_NAME)), System.currentTimeMillis(), false); - return table; - } }
http://git-wip-us.apache.org/repos/asf/phoenix/blob/88c6abb0/phoenix-core/src/it/java/org/apache/phoenix/end2end/index/SaltedIndexIT.java ---------------------------------------------------------------------- diff --git a/phoenix-core/src/it/java/org/apache/phoenix/end2end/index/SaltedIndexIT.java b/phoenix-core/src/it/java/org/apache/phoenix/end2end/index/SaltedIndexIT.java index d5e9d42..8f7912a 100644 --- a/phoenix-core/src/it/java/org/apache/phoenix/end2end/index/SaltedIndexIT.java +++ b/phoenix-core/src/it/java/org/apache/phoenix/end2end/index/SaltedIndexIT.java @@ -145,7 +145,7 @@ public class SaltedIndexIT extends BaseIndexIT { rs = conn.createStatement().executeQuery("EXPLAIN " + query); expectedPlan = indexSaltBuckets == null ? "CLIENT PARALLEL 1-WAY RANGE SCAN OVER " + INDEX_TABLE_FULL_NAME + " [~'y']" : - ("CLIENT PARALLEL 4-WAY SKIP SCAN ON 4 KEYS OVER " + INDEX_TABLE_FULL_NAME + " [0,~'y'] - [3,~'y']\n" + + ("CLIENT PARALLEL 4-WAY RANGE SCAN OVER " + INDEX_TABLE_FULL_NAME + " [0,~'y']\n" + "CLIENT MERGE SORT"); assertEquals(expectedPlan,QueryUtil.getExplainPlan(rs)); @@ -164,7 +164,7 @@ public class SaltedIndexIT extends BaseIndexIT { rs = conn.createStatement().executeQuery("EXPLAIN " + query); expectedPlan = indexSaltBuckets == null ? "CLIENT PARALLEL 1-WAY RANGE SCAN OVER " + INDEX_TABLE_FULL_NAME + " [*] - [~'x']" : - ("CLIENT PARALLEL 4-WAY SKIP SCAN ON 4 RANGES OVER " + INDEX_TABLE_FULL_NAME + " [0,*] - [3,~'x']\n" + + ("CLIENT PARALLEL 4-WAY RANGE SCAN OVER " + INDEX_TABLE_FULL_NAME + " [0,*] - [0,~'x']\n" + "CLIENT MERGE SORT"); assertEquals(expectedPlan,QueryUtil.getExplainPlan(rs)); http://git-wip-us.apache.org/repos/asf/phoenix/blob/88c6abb0/phoenix-core/src/main/java/org/apache/phoenix/cache/ServerCacheClient.java ---------------------------------------------------------------------- diff --git a/phoenix-core/src/main/java/org/apache/phoenix/cache/ServerCacheClient.java b/phoenix-core/src/main/java/org/apache/phoenix/cache/ServerCacheClient.java index a6ee92e..267aec9 100644 --- a/phoenix-core/src/main/java/org/apache/phoenix/cache/ServerCacheClient.java +++ b/phoenix-core/src/main/java/org/apache/phoenix/cache/ServerCacheClient.java @@ -40,8 +40,6 @@ import org.apache.hadoop.hbase.client.HTableInterface; import org.apache.hadoop.hbase.client.Scan; import org.apache.hadoop.hbase.io.ImmutableBytesWritable; import org.apache.hadoop.hbase.util.Bytes; - -import com.google.common.collect.ImmutableSet; import org.apache.phoenix.compile.ScanRanges; import org.apache.phoenix.coprocessor.ServerCachingProtocol; import org.apache.phoenix.coprocessor.ServerCachingProtocol.ServerCacheFactory; @@ -51,11 +49,14 @@ import org.apache.phoenix.memory.MemoryManager.MemoryChunk; import org.apache.phoenix.query.ConnectionQueryServices; import org.apache.phoenix.query.QueryServices; import org.apache.phoenix.query.QueryServicesOptions; +import org.apache.phoenix.schema.PTable; import org.apache.phoenix.schema.TableRef; import org.apache.phoenix.util.Closeables; import org.apache.phoenix.util.SQLCloseable; import org.apache.phoenix.util.SQLCloseables; +import com.google.common.collect.ImmutableSet; + /** * * Client for sending cache to each region server @@ -145,15 +146,18 @@ public class ServerCacheClient { ExecutorService executor = services.getExecutor(); List<Future<Boolean>> futures = Collections.emptyList(); try { - List<HRegionLocation> locations = services.getAllTableRegions(cacheUsingTableRef.getTable().getPhysicalName().getBytes()); + PTable cacheUsingTable = cacheUsingTableRef.getTable(); + List<HRegionLocation> locations = services.getAllTableRegions(cacheUsingTable.getPhysicalName().getBytes()); int nRegions = locations.size(); // Size these based on worst case futures = new ArrayList<Future<Boolean>>(nRegions); Set<HRegionLocation> servers = new HashSet<HRegionLocation>(nRegions); for (HRegionLocation entry : locations) { // Keep track of servers we've sent to and only send once + byte[] regionStartKey = entry.getRegionInfo().getStartKey(); + byte[] regionEndKey = entry.getRegionInfo().getEndKey(); if ( ! servers.contains(entry) && - keyRanges.intersect(entry.getRegionInfo().getStartKey(), entry.getRegionInfo().getEndKey())) { // Call RPC once per server + keyRanges.intersects(regionStartKey, regionEndKey, 0) ) { servers.add(entry); if (LOG.isDebugEnabled()) {LOG.debug("Adding cache entry to be sent for " + entry);} final byte[] key = entry.getRegionInfo().getStartKey(); http://git-wip-us.apache.org/repos/asf/phoenix/blob/88c6abb0/phoenix-core/src/main/java/org/apache/phoenix/compile/QueryPlan.java ---------------------------------------------------------------------- diff --git a/phoenix-core/src/main/java/org/apache/phoenix/compile/QueryPlan.java b/phoenix-core/src/main/java/org/apache/phoenix/compile/QueryPlan.java index b5c14f0..271ba30 100644 --- a/phoenix-core/src/main/java/org/apache/phoenix/compile/QueryPlan.java +++ b/phoenix-core/src/main/java/org/apache/phoenix/compile/QueryPlan.java @@ -64,4 +64,6 @@ public interface QueryPlan extends StatementPlan { FilterableStatement getStatement(); public boolean isDegenerate(); + + public boolean isRowKeyOrdered(); } http://git-wip-us.apache.org/repos/asf/phoenix/blob/88c6abb0/phoenix-core/src/main/java/org/apache/phoenix/compile/ScanRanges.java ---------------------------------------------------------------------- diff --git a/phoenix-core/src/main/java/org/apache/phoenix/compile/ScanRanges.java b/phoenix-core/src/main/java/org/apache/phoenix/compile/ScanRanges.java index dc8e0b3..1bd8cef 100644 --- a/phoenix-core/src/main/java/org/apache/phoenix/compile/ScanRanges.java +++ b/phoenix-core/src/main/java/org/apache/phoenix/compile/ScanRanges.java @@ -23,12 +23,16 @@ import java.util.Iterator; import java.util.List; import org.apache.hadoop.hbase.client.Scan; +import org.apache.hadoop.hbase.filter.Filter; +import org.apache.hadoop.hbase.filter.FilterList; import org.apache.hadoop.hbase.io.ImmutableBytesWritable; +import org.apache.hadoop.hbase.util.Bytes; import org.apache.phoenix.filter.SkipScanFilter; import org.apache.phoenix.query.KeyRange; import org.apache.phoenix.query.KeyRange.Bound; import org.apache.phoenix.schema.RowKeySchema; import org.apache.phoenix.schema.SaltingUtil; +import org.apache.phoenix.util.ByteUtil; import org.apache.phoenix.util.ScanUtil; import org.apache.phoenix.util.SchemaUtil; @@ -40,28 +44,40 @@ import com.google.common.collect.Lists; public class ScanRanges { private static final List<List<KeyRange>> EVERYTHING_RANGES = Collections.<List<KeyRange>>emptyList(); private static final List<List<KeyRange>> NOTHING_RANGES = Collections.<List<KeyRange>>singletonList(Collections.<KeyRange>singletonList(KeyRange.EMPTY_RANGE)); - public static final ScanRanges EVERYTHING = new ScanRanges(null,ScanUtil.SINGLE_COLUMN_SLOT_SPAN,EVERYTHING_RANGES, false, false); - public static final ScanRanges NOTHING = new ScanRanges(null,ScanUtil.SINGLE_COLUMN_SLOT_SPAN,NOTHING_RANGES, false, false); + public static final ScanRanges EVERYTHING = new ScanRanges(null,ScanUtil.SINGLE_COLUMN_SLOT_SPAN,EVERYTHING_RANGES, KeyRange.EVERYTHING_RANGE, KeyRange.EVERYTHING_RANGE, false, false, null); + public static final ScanRanges NOTHING = new ScanRanges(null,ScanUtil.SINGLE_COLUMN_SLOT_SPAN,NOTHING_RANGES, KeyRange.EMPTY_RANGE, KeyRange.EMPTY_RANGE, false, false, null); + private static final Scan HAS_INTERSECTION = new Scan(); public static ScanRanges create(RowKeySchema schema, List<List<KeyRange>> ranges, int[] slotSpan) { - return create(schema, ranges, slotSpan, false, null); + return create(schema, ranges, slotSpan, KeyRange.EVERYTHING_RANGE, false, null); } - public static ScanRanges create(RowKeySchema schema, List<List<KeyRange>> ranges, int[] slotSpan, boolean forceRangeScan, Integer nBuckets) { - int offset = nBuckets == null ? 0 : 1; - if (ranges.size() == offset) { + public static ScanRanges create(RowKeySchema schema, List<List<KeyRange>> ranges, int[] slotSpan, KeyRange minMaxRange, boolean forceRangeScan, Integer nBuckets) { + int offset = nBuckets == null ? 0 : SaltingUtil.NUM_SALTING_BYTES; + if (ranges.size() == offset && minMaxRange == KeyRange.EVERYTHING_RANGE) { return EVERYTHING; - } else if (ranges.size() == 1 + offset && ranges.get(offset).size() == 1 && ranges.get(offset).get(0) == KeyRange.EMPTY_RANGE) { + } else if (minMaxRange == KeyRange.EMPTY_RANGE || (ranges.size() == 1 + offset && ranges.get(offset).size() == 1 && ranges.get(offset).get(0) == KeyRange.EMPTY_RANGE)) { return NOTHING; } boolean isPointLookup = !forceRangeScan && ScanRanges.isPointLookup(schema, ranges, slotSpan); if (isPointLookup) { - // TODO: consider keeping original to use for serialization as it would - // be smaller? + // TODO: consider keeping original to use for serialization as it would be smaller? List<byte[]> keys = ScanRanges.getPointKeys(ranges, slotSpan, schema, nBuckets); List<KeyRange> keyRanges = Lists.newArrayListWithExpectedSize(keys.size()); + KeyRange unsaltedMinMaxRange = minMaxRange; + if (nBuckets != null && minMaxRange != KeyRange.EVERYTHING_RANGE) { + unsaltedMinMaxRange = KeyRange.getKeyRange( + stripPrefix(minMaxRange.getLowerRange(),offset), + minMaxRange.lowerUnbound(), + stripPrefix(minMaxRange.getUpperRange(),offset), + minMaxRange.upperUnbound()); + } for (byte[] key : keys) { - keyRanges.add(KeyRange.getKeyRange(key)); + // Filter now based on unsalted minMaxRange and ignore the point key salt byte + if ( unsaltedMinMaxRange.compareLowerToUpperBound(key, offset, key.length-offset, true) <= 0 && + unsaltedMinMaxRange.compareUpperToLowerBound(key, offset, key.length-offset, true) >= 0) { + keyRanges.add(KeyRange.getKeyRange(key)); + } } ranges = Collections.singletonList(keyRanges); if (keys.size() > 1) { @@ -72,39 +88,286 @@ public class ScanRanges { // when there's a single key. slotSpan = new int[] {schema.getMaxFields()-1}; } - } else if (nBuckets != null) { - List<List<KeyRange>> saltedRanges = Lists.newArrayListWithExpectedSize(ranges.size()); - saltedRanges.add(SaltingUtil.generateAllSaltingRanges(nBuckets)); - saltedRanges.addAll(ranges.subList(1, ranges.size())); - ranges = saltedRanges; } - return new ScanRanges(schema, slotSpan, ranges, forceRangeScan, isPointLookup); + List<List<KeyRange>> sortedRanges = Lists.newArrayListWithExpectedSize(ranges.size()); + for (int i = 0; i < ranges.size(); i++) { + List<KeyRange> sorted = Lists.newArrayList(ranges.get(i)); + Collections.sort(sorted, KeyRange.COMPARATOR); + sortedRanges.add(ImmutableList.copyOf(sorted)); + } + boolean useSkipScanFilter = useSkipScanFilter(forceRangeScan, isPointLookup, sortedRanges); + + // Don't set minMaxRange for point lookup because it causes issues during intersect + // by going across region boundaries + KeyRange scanRange = KeyRange.EVERYTHING_RANGE; + // if (!isPointLookup && (nBuckets == null || !useSkipScanFilter)) { + // if (! ( isPointLookup || (nBuckets != null && useSkipScanFilter) ) ) { + // if (nBuckets == null || (nBuckets != null && (!isPointLookup || !useSkipScanFilter))) { + if (nBuckets == null || !isPointLookup || !useSkipScanFilter) { + byte[] minKey = ScanUtil.getMinKey(schema, sortedRanges, slotSpan); + byte[] maxKey = ScanUtil.getMaxKey(schema, sortedRanges, slotSpan); + // If the maxKey has crossed the salt byte boundary, then we do not + // have anything to filter at the upper end of the range + if (ScanUtil.crossesPrefixBoundary(maxKey, ScanUtil.getPrefix(minKey, offset), offset)) { + maxKey = KeyRange.UNBOUND; + } + // We won't filter anything at the low end of the range if we just have the salt byte + if (minKey.length <= offset) { + minKey = KeyRange.UNBOUND; + } + scanRange = KeyRange.getKeyRange(minKey, maxKey); + } + if (minMaxRange != KeyRange.EVERYTHING_RANGE) { + minMaxRange = ScanUtil.convertToInclusiveExclusiveRange(minMaxRange, schema, new ImmutableBytesWritable()); + scanRange = scanRange.intersect(minMaxRange); + } + + if (scanRange == KeyRange.EMPTY_RANGE) { + return NOTHING; + } + return new ScanRanges(schema, slotSpan, sortedRanges, scanRange, minMaxRange, useSkipScanFilter, isPointLookup, nBuckets); } private SkipScanFilter filter; private final List<List<KeyRange>> ranges; private final int[] slotSpan; private final RowKeySchema schema; - private final boolean forceRangeScan; private final boolean isPointLookup; + private final boolean isSalted; + private final boolean useSkipScanFilter; + private final KeyRange scanRange; + private final KeyRange minMaxRange; - private ScanRanges (RowKeySchema schema, int[] slotSpan, List<List<KeyRange>> ranges, boolean forceRangeScan, boolean isPointLookup) { + private ScanRanges (RowKeySchema schema, int[] slotSpan, List<List<KeyRange>> ranges, KeyRange scanRange, KeyRange minMaxRange, boolean useSkipScanFilter, boolean isPointLookup, Integer bucketNum) { this.isPointLookup = isPointLookup; - List<List<KeyRange>> sortedRanges = Lists.newArrayListWithExpectedSize(ranges.size()); - for (int i = 0; i < ranges.size(); i++) { - List<KeyRange> sorted = Lists.newArrayList(ranges.get(i)); - Collections.sort(sorted, KeyRange.COMPARATOR); - sortedRanges.add(ImmutableList.copyOf(sorted)); + this.isSalted = bucketNum != null; + this.useSkipScanFilter = useSkipScanFilter; + this.scanRange = scanRange; + this.minMaxRange = minMaxRange; + + // Only blow out the bucket values if we're using the skip scan. We need all the + // bucket values in this case because we use intersect against a key that may have + // any of the possible bucket values. Otherwise, we can pretty easily ignore the + // bucket values. + if (useSkipScanFilter && isSalted && !isPointLookup) { + ranges.set(0, SaltingUtil.generateAllSaltingRanges(bucketNum)); } - this.ranges = ImmutableList.copyOf(sortedRanges); + this.ranges = ImmutableList.copyOf(ranges); this.slotSpan = slotSpan; this.schema = schema; - if (schema != null && !ranges.isEmpty()) { + if (schema != null && !ranges.isEmpty()) { // TODO: only create if useSkipScanFilter is true? this.filter = new SkipScanFilter(this.ranges, slotSpan, schema); } - this.forceRangeScan = forceRangeScan; + } + + /** + * Get the minMaxRange that is applied in addition to the scan range. + * Only used by the ExplainTable to generate the explain plan. + */ + public KeyRange getMinMaxRange() { + return minMaxRange; + } + + public void initializeScan(Scan scan) { + scan.setStartRow(scanRange.getLowerRange()); + scan.setStopRow(scanRange.getUpperRange()); + } + + private static byte[] prefixKey(byte[] key, int keyOffset, byte[] prefixKey, int prefixKeyOffset) { + if (key.length > 0) { + byte[] newKey = new byte[key.length + prefixKeyOffset]; + int totalKeyOffset = keyOffset + prefixKeyOffset; + if (prefixKey.length >= totalKeyOffset) { // otherwise it's null padded + System.arraycopy(prefixKey, 0, newKey, 0, totalKeyOffset); + } + System.arraycopy(key, keyOffset, newKey, totalKeyOffset, key.length - keyOffset); + return newKey; + } + return key; + } + + private static byte[] replaceSaltByte(byte[] key, byte[] saltKey) { + if (key.length == 0) { + return key; + } + byte[] temp = new byte[key.length]; + if (saltKey.length >= SaltingUtil.NUM_SALTING_BYTES) { // Otherwise it's null padded + System.arraycopy(saltKey, 0, temp, 0, SaltingUtil.NUM_SALTING_BYTES); + } + System.arraycopy(key, SaltingUtil.NUM_SALTING_BYTES, temp, SaltingUtil.NUM_SALTING_BYTES, key.length - SaltingUtil.NUM_SALTING_BYTES); + return temp; + } + + private static byte[] stripPrefix(byte[] key, int keyOffset) { + if (key.length == 0) { + return key; + } + byte[] temp = new byte[key.length - keyOffset]; + System.arraycopy(key, keyOffset, temp, 0, key.length - keyOffset); + return temp; + } + + public Scan intersectScan(Scan scan, final byte[] originalStartKey, final byte[] originalStopKey, final int keyOffset) { + byte[] startKey = originalStartKey; + byte[] stopKey = originalStopKey; + boolean mayHaveRows = false; + // Keep the keys as they are if we have a point lookup, as we've already resolved the + // salt bytes in that case. + final int scanKeyOffset = this.isSalted && !this.isPointLookup ? SaltingUtil.NUM_SALTING_BYTES : 0; + assert (scanKeyOffset == 0 || keyOffset == 0); + // Offset for startKey/stopKey. Either 1 for salted tables or the prefix length + // of the current region for local indexes. + final int totalKeyOffset = scanKeyOffset + keyOffset; + // In this case, we've crossed the "prefix" boundary and should consider everything after the startKey + // This prevents us from having to prefix the key prior to knowing whether or not there may be an + // intersection. + byte[] prefixBytes = ByteUtil.EMPTY_BYTE_ARRAY; + if (totalKeyOffset > 0) { + prefixBytes = ScanUtil.getPrefix(startKey, totalKeyOffset); + if (ScanUtil.crossesPrefixBoundary(stopKey, prefixBytes, totalKeyOffset)) { + stopKey = ByteUtil.EMPTY_BYTE_ARRAY; + } + } + int scanStartKeyOffset = scanKeyOffset; + byte[] scanStartKey = scan == null ? ByteUtil.EMPTY_BYTE_ARRAY : scan.getStartRow(); + // Compare ignoring key prefix and salt byte + if (scanStartKey.length > 0) { + if (startKey.length > 0 && Bytes.compareTo(scanStartKey, scanKeyOffset, scanStartKey.length - scanKeyOffset, startKey, totalKeyOffset, startKey.length - totalKeyOffset) < 0) { + scanStartKey = startKey; + scanStartKeyOffset = totalKeyOffset; + } + } else { + scanStartKey = startKey; + scanStartKeyOffset = totalKeyOffset; + mayHaveRows = true; + } + int scanStopKeyOffset = scanKeyOffset; + byte[] scanStopKey = scan == null ? ByteUtil.EMPTY_BYTE_ARRAY : scan.getStopRow(); + if (scanStopKey.length > 0) { + if (stopKey.length > 0 && Bytes.compareTo(scanStopKey, scanKeyOffset, scanStopKey.length - scanKeyOffset, stopKey, totalKeyOffset, stopKey.length - totalKeyOffset) > 0) { + scanStopKey = stopKey; + scanStopKeyOffset = totalKeyOffset; + } + } else { + scanStopKey = stopKey; + scanStopKeyOffset = totalKeyOffset; + mayHaveRows = true; + } + mayHaveRows = mayHaveRows || Bytes.compareTo(scanStartKey, scanStartKeyOffset, scanStartKey.length - scanStartKeyOffset, scanStopKey, scanStopKeyOffset, scanStopKey.length - scanStopKeyOffset) < 0; + + if (!mayHaveRows) { + return null; + } + if (originalStopKey.length != 0 && scanStopKey.length == 0) { + scanStopKey = originalStopKey; + } + Filter newFilter = null; + // If the scan is using skip scan filter, intersect and replace the filter. + if (scan == null || this.useSkipScanFilter()) { + byte[] skipScanStartKey = scanStartKey; + byte[] skipScanStopKey = scanStopKey; + // If we have a keyOffset and we've used the startKey/stopKey that + // were passed in (which have the prefix) for the above range check, + // we need to remove the prefix before running our intersect method. + // TODO: we could use skipScanFilter.setOffset(keyOffset) if both + // the startKey and stopKey were used above *and* our intersect + // method honored the skipScanFilter.offset variable. + if (scanKeyOffset > 0) { + if (skipScanStartKey != originalStartKey) { // original already has correct salt byte + skipScanStartKey = replaceSaltByte(skipScanStartKey, prefixBytes); + } + if (skipScanStopKey != originalStopKey) { + skipScanStopKey = replaceSaltByte(skipScanStopKey, prefixBytes); + } + } else if (keyOffset > 0) { + if (skipScanStartKey == originalStartKey) { + skipScanStartKey = stripPrefix(skipScanStartKey, keyOffset); + } + if (skipScanStopKey == originalStopKey) { + skipScanStopKey = stripPrefix(skipScanStopKey, keyOffset); + } + } + if (scan == null) { + return filter.hasIntersect(skipScanStartKey, skipScanStopKey) ? HAS_INTERSECTION : null; + } + Filter filter = scan.getFilter(); + SkipScanFilter newSkipScanFilter = null; + if (filter instanceof SkipScanFilter) { + SkipScanFilter oldSkipScanFilter = (SkipScanFilter)filter; + newFilter = newSkipScanFilter = oldSkipScanFilter.intersect(skipScanStartKey, skipScanStopKey); + if (newFilter == null) { + return null; + } + } else if (filter instanceof FilterList) { + FilterList oldList = (FilterList)filter; + FilterList newList = new FilterList(FilterList.Operator.MUST_PASS_ALL); + newFilter = newList; + for (Filter f : oldList.getFilters()) { + if (f instanceof SkipScanFilter) { + newSkipScanFilter = ((SkipScanFilter)f).intersect(skipScanStartKey, skipScanStopKey); + if (newSkipScanFilter == null) { + return null; + } + newList.addFilter(newSkipScanFilter); + } else { + newList.addFilter(f); + } + } + } + // TODO: it seems that our SkipScanFilter or HBase runs into problems if we don't + // have an enclosing range when we do a point lookup. + if (isPointLookup) { + scanStartKey = ScanUtil.getMinKey(schema, newSkipScanFilter.getSlots(), slotSpan); + scanStopKey = ScanUtil.getMaxKey(schema, newSkipScanFilter.getSlots(), slotSpan); + } + } + if (newFilter == null) { + newFilter = scan.getFilter(); + } + Scan newScan = ScanUtil.newScan(scan); + newScan.setFilter(newFilter); + // If we have an offset (salted table or local index), we need to make sure to + // prefix our scan start/stop row by the prefix of the startKey or stopKey that + // were passed in. Our scan either doesn't have the prefix or has a placeholder + // for it. + if (totalKeyOffset > 0) { + if (scanStartKey != originalStartKey) { + scanStartKey = prefixKey(scanStartKey, scanKeyOffset, prefixBytes, keyOffset); + } + if (scanStopKey != originalStopKey) { + scanStopKey = prefixKey(scanStopKey, scanKeyOffset, prefixBytes, keyOffset); + } + } + newScan.setStartRow(scanStartKey); + newScan.setStopRow(scanStopKey); + + return newScan; } + /** + * Return true if the range formed by the lowerInclusiveKey and upperExclusiveKey + * intersects with any of the scan ranges and false otherwise. We cannot pass in + * a KeyRange here, because the underlying compare functions expect lower inclusive + * and upper exclusive keys. We cannot get their next key because the key must + * conform to the row key schema and if a null byte is added to a lower inclusive + * key, it's no longer a valid, real key. + * @param lowerInclusiveKey lower inclusive key + * @param upperExclusiveKey upper exclusive key + * @return true if the scan range intersects with the specified lower/upper key + * range + */ + public boolean intersects(byte[] lowerInclusiveKey, byte[] upperExclusiveKey, int keyOffset) { + if (isEverything()) { + return true; + } + if (isDegenerate()) { + return false; + } + + //return filter.hasIntersect(lowerInclusiveKey, upperExclusiveKey); + return intersectScan(null, lowerInclusiveKey, upperExclusiveKey, keyOffset) == HAS_INTERSECTION; + } + public SkipScanFilter getSkipScanFilter() { return filter; } @@ -132,11 +395,15 @@ public class ScanRanges { * not the last key slot */ public boolean useSkipScanFilter() { + return useSkipScanFilter; + } + + private static boolean useSkipScanFilter(boolean forceRangeScan, boolean isPointLookup, List<List<KeyRange>> ranges) { if (forceRangeScan) { return false; } if (isPointLookup) { - return getPointLookupCount() > 1; + return getPointLookupCount(isPointLookup, ranges) > 1; } boolean hasRangeKey = false, useSkipScan = false; for (List<KeyRange> orRanges : ranges) { @@ -208,6 +475,10 @@ public class ScanRanges { } public int getPointLookupCount() { + return getPointLookupCount(isPointLookup, ranges); + } + + private static int getPointLookupCount(boolean isPointLookup, List<List<KeyRange>> ranges) { return isPointLookup ? ranges.get(0).size() : 0; } @@ -215,51 +486,6 @@ public class ScanRanges { return isPointLookup ? ranges.get(0).iterator() : Iterators.<KeyRange>emptyIterator(); } - public void setScanStartStopRow(Scan scan) { - if (isEverything()) { - return; - } - if (isDegenerate()) { - scan.setStartRow(KeyRange.EMPTY_RANGE.getLowerRange()); - scan.setStopRow(KeyRange.EMPTY_RANGE.getUpperRange()); - return; - } - - byte[] expectedKey; - expectedKey = ScanUtil.getMinKey(schema, ranges, slotSpan); - if (expectedKey != null) { - scan.setStartRow(expectedKey); - } - expectedKey = ScanUtil.getMaxKey(schema, ranges, slotSpan); - if (expectedKey != null) { - scan.setStopRow(expectedKey); - } - } - - public static final ImmutableBytesWritable UNBOUND = new ImmutableBytesWritable(KeyRange.UNBOUND); - - /** - * Return true if the range formed by the lowerInclusiveKey and upperExclusiveKey - * intersects with any of the scan ranges and false otherwise. We cannot pass in - * a KeyRange here, because the underlying compare functions expect lower inclusive - * and upper exclusive keys. We cannot get their next key because the key must - * conform to the row key schema and if a null byte is added to a lower inclusive - * key, it's no longer a valid, real key. - * @param lowerInclusiveKey lower inclusive key - * @param upperExclusiveKey upper exclusive key - * @return true if the scan range intersects with the specified lower/upper key - * range - */ - public boolean intersect(byte[] lowerInclusiveKey, byte[] upperExclusiveKey) { - if (isEverything()) { - return true; - } - if (isDegenerate()) { - return false; - } - return filter.hasIntersect(lowerInclusiveKey, upperExclusiveKey); - } - public int getPkColumnSpan() { return this == ScanRanges.NOTHING ? 0 : ScanUtil.getTotalSpan(ranges, slotSpan); } http://git-wip-us.apache.org/repos/asf/phoenix/blob/88c6abb0/phoenix-core/src/main/java/org/apache/phoenix/compile/StatementContext.java ---------------------------------------------------------------------- diff --git a/phoenix-core/src/main/java/org/apache/phoenix/compile/StatementContext.java b/phoenix-core/src/main/java/org/apache/phoenix/compile/StatementContext.java index 3cb6ce9..887ca3e 100644 --- a/phoenix-core/src/main/java/org/apache/phoenix/compile/StatementContext.java +++ b/phoenix-core/src/main/java/org/apache/phoenix/compile/StatementContext.java @@ -31,7 +31,6 @@ import org.apache.phoenix.jdbc.PhoenixConnection; import org.apache.phoenix.jdbc.PhoenixStatement; import org.apache.phoenix.join.TupleProjector; import org.apache.phoenix.parse.SelectStatement; -import org.apache.phoenix.query.KeyRange; import org.apache.phoenix.query.QueryConstants; import org.apache.phoenix.query.QueryServices; import org.apache.phoenix.schema.MetaDataClient; @@ -39,7 +38,6 @@ import org.apache.phoenix.schema.PTable; import org.apache.phoenix.schema.TableRef; import org.apache.phoenix.util.DateUtil; import org.apache.phoenix.util.NumberUtil; -import org.apache.phoenix.util.ScanUtil; import com.google.common.collect.Maps; @@ -67,7 +65,6 @@ public class StatementContext { private long currentTime = QueryConstants.UNSET_TIMESTAMP; private ScanRanges scanRanges = ScanRanges.EVERYTHING; - private KeyRange minMaxRange = null; private final SequenceManager sequences; private TableRef currentTable; @@ -158,41 +155,8 @@ public class StatementContext { } public void setScanRanges(ScanRanges scanRanges) { - setScanRanges(scanRanges, null); - } - - public void setScanRanges(ScanRanges scanRanges, KeyRange minMaxRange) { this.scanRanges = scanRanges; - this.scanRanges.setScanStartStopRow(scan); - PTable table = this.getCurrentTable().getTable(); - if (minMaxRange != null) { - // Ensure minMaxRange is lower inclusive and upper exclusive, as that's - // what we need to intersect against for the HBase scan. - byte[] lowerRange = minMaxRange.getLowerRange(); - if (!minMaxRange.lowerUnbound()) { - if (!minMaxRange.isLowerInclusive()) { - lowerRange = ScanUtil.nextKey(lowerRange, table, tempPtr); - } - } - - byte[] upperRange = minMaxRange.getUpperRange(); - if (!minMaxRange.upperUnbound()) { - if (minMaxRange.isUpperInclusive()) { - upperRange = ScanUtil.nextKey(upperRange, table, tempPtr); - } - } - if (minMaxRange.getLowerRange() != lowerRange || minMaxRange.getUpperRange() != upperRange) { - minMaxRange = KeyRange.getKeyRange(lowerRange, true, upperRange, false); - } - // If we're not salting, we can intersect this now with the scan range. - // Otherwise, we have to wait to do this when we chunk up the scan. - if (table.getBucketNum() == null) { - minMaxRange = minMaxRange.intersect(KeyRange.getKeyRange(scan.getStartRow(), scan.getStopRow())); - scan.setStartRow(minMaxRange.getLowerRange()); - scan.setStopRow(minMaxRange.getUpperRange()); - } - this.minMaxRange = minMaxRange; - } + scanRanges.initializeScan(scan); } public PhoenixConnection getConnection() { @@ -224,14 +188,6 @@ public class StatementContext { return currentTime; } - /** - * Get the key range derived from row value constructor usage in where clause. These are orthogonal to the ScanRanges - * and form a range for which each scan is intersected against. - */ - public KeyRange getMinMaxRange () { - return minMaxRange; - } - public SequenceManager getSequenceManager(){ return sequences; } http://git-wip-us.apache.org/repos/asf/phoenix/blob/88c6abb0/phoenix-core/src/main/java/org/apache/phoenix/compile/WhereOptimizer.java ---------------------------------------------------------------------- diff --git a/phoenix-core/src/main/java/org/apache/phoenix/compile/WhereOptimizer.java b/phoenix-core/src/main/java/org/apache/phoenix/compile/WhereOptimizer.java index 9cd7e01..634bd15 100644 --- a/phoenix-core/src/main/java/org/apache/phoenix/compile/WhereOptimizer.java +++ b/phoenix-core/src/main/java/org/apache/phoenix/compile/WhereOptimizer.java @@ -58,6 +58,7 @@ import org.apache.phoenix.schema.PDataType; import org.apache.phoenix.schema.PName; import org.apache.phoenix.schema.PTable; import org.apache.phoenix.schema.RowKeySchema; +import org.apache.phoenix.schema.SaltingUtil; import org.apache.phoenix.schema.SortOrder; import org.apache.phoenix.schema.tuple.Tuple; import org.apache.phoenix.util.ByteUtil; @@ -146,14 +147,30 @@ public class WhereOptimizer { RowKeySchema schema = table.getRowKeySchema(); List<List<KeyRange>> cnf = Lists.newArrayListWithExpectedSize(schema.getMaxFields()); KeyRange minMaxRange = keySlots.getMinMaxRange(); - boolean hasMinMaxRange = (minMaxRange != null); + if (minMaxRange == null) { + minMaxRange = KeyRange.EVERYTHING_RANGE; + } + boolean hasMinMaxRange = (minMaxRange != KeyRange.EVERYTHING_RANGE); int minMaxRangeOffset = 0; byte[] minMaxRangePrefix = null; + boolean isSalted = nBuckets != null; + boolean isMultiTenant = tenantId != null && table.isMultiTenant(); + boolean hasViewIndex = table.getViewIndexId() != null; + if (hasMinMaxRange) { + int minMaxRangeSize = (isSalted ? SaltingUtil.NUM_SALTING_BYTES : 0) + + (isMultiTenant ? tenantId.getBytes().length + 1 : 0) + + (hasViewIndex ? MetaDataUtil.getViewIndexIdDataType().getByteSize() : 0); + minMaxRangePrefix = new byte[minMaxRangeSize]; + } Iterator<KeyExpressionVisitor.KeySlot> iterator = keySlots.iterator(); // Add placeholder for salt byte ranges - if (nBuckets != null) { + if (isSalted) { cnf.add(SALT_PLACEHOLDER); + if (hasMinMaxRange) { + System.arraycopy(SALT_PLACEHOLDER.get(0).getLowerRange(), 0, minMaxRangePrefix, minMaxRangeOffset, SaltingUtil.NUM_SALTING_BYTES); + minMaxRangeOffset += SaltingUtil.NUM_SALTING_BYTES; + } // Increment the pkPos, as the salt column is in the row schema // Do not increment the iterator, though, as there will never be // an expression in the keySlots for the salt column @@ -161,13 +178,12 @@ public class WhereOptimizer { } // Add tenant data isolation for tenant-specific tables - if (tenantId != null && table.isMultiTenant()) { + if (isMultiTenant) { byte[] tenantIdBytes = tenantId.getBytes(); KeyRange tenantIdKeyRange = KeyRange.getKeyRange(tenantIdBytes); cnf.add(singletonList(tenantIdKeyRange)); if (hasMinMaxRange) { - minMaxRangePrefix = new byte[tenantIdBytes.length + MetaDataUtil.getViewIndexIdDataType().getByteSize() + 1]; - System.arraycopy(tenantIdBytes, 0, minMaxRangePrefix, 0, tenantIdBytes.length); + System.arraycopy(tenantIdBytes, 0, minMaxRangePrefix, minMaxRangeOffset, tenantIdBytes.length); minMaxRangeOffset += tenantIdBytes.length; if (!schema.getField(pkPos).getDataType().isFixedWidth()) { minMaxRangePrefix[minMaxRangeOffset] = QueryConstants.SEPARATOR_BYTE; @@ -178,14 +194,11 @@ public class WhereOptimizer { } // Add unique index ID for shared indexes on views. This ensures // that different indexes don't interleave. - if (table.getViewIndexId() != null) { + if (hasViewIndex) { byte[] viewIndexBytes = MetaDataUtil.getViewIndexIdDataType().toBytes(table.getViewIndexId()); KeyRange indexIdKeyRange = KeyRange.getKeyRange(viewIndexBytes); cnf.add(singletonList(indexIdKeyRange)); if (hasMinMaxRange) { - if (minMaxRangePrefix == null) { - minMaxRangePrefix = new byte[viewIndexBytes.length]; - } System.arraycopy(viewIndexBytes, 0, minMaxRangePrefix, minMaxRangeOffset, viewIndexBytes.length); minMaxRangeOffset += viewIndexBytes.length; } @@ -194,7 +207,7 @@ public class WhereOptimizer { // Prepend minMaxRange with fixed column values so we can properly intersect the // range with the other range. - if (minMaxRange != null) { + if (hasMinMaxRange) { minMaxRange = minMaxRange.prependRange(minMaxRangePrefix, 0, minMaxRangeOffset); } boolean forcedSkipScan = statement.getHint().hasHint(Hint.SKIP_SCAN); @@ -285,9 +298,9 @@ public class WhereOptimizer { // If we have fully qualified point keys with multi-column spans (i.e. RVC), // we can still use our skip scan. The ScanRanges.create() call will explode // out the keys. - context.setScanRanges( - ScanRanges.create(schema, cnf, Arrays.copyOf(slotSpan, cnf.size()), forcedRangeScan, nBuckets), - minMaxRange); + slotSpan = Arrays.copyOf(slotSpan, cnf.size()); + ScanRanges scanRanges = ScanRanges.create(schema, cnf, slotSpan, minMaxRange, forcedRangeScan, nBuckets); + context.setScanRanges(scanRanges); if (whereClause == null) { return null; } else { http://git-wip-us.apache.org/repos/asf/phoenix/blob/88c6abb0/phoenix-core/src/main/java/org/apache/phoenix/coprocessor/MetaDataEndpointImpl.java ---------------------------------------------------------------------- diff --git a/phoenix-core/src/main/java/org/apache/phoenix/coprocessor/MetaDataEndpointImpl.java b/phoenix-core/src/main/java/org/apache/phoenix/coprocessor/MetaDataEndpointImpl.java index a1d943c..f1f05be 100644 --- a/phoenix-core/src/main/java/org/apache/phoenix/coprocessor/MetaDataEndpointImpl.java +++ b/phoenix-core/src/main/java/org/apache/phoenix/coprocessor/MetaDataEndpointImpl.java @@ -110,7 +110,6 @@ import org.apache.phoenix.schema.SortOrder; import org.apache.phoenix.schema.TableNotFoundException; import org.apache.phoenix.schema.stat.PTableStats; import org.apache.phoenix.schema.stat.PTableStatsImpl; -import org.apache.phoenix.schema.stat.StatisticsUtils; import org.apache.phoenix.schema.tuple.ResultTuple; import org.apache.phoenix.util.ByteUtil; import org.apache.phoenix.util.IndexUtil; @@ -239,8 +238,14 @@ public class MetaDataEndpointImpl extends BaseEndpointCoprocessor implements Met int length = getVarCharLength(keyBuffer, keyOffset, keyLength); return PNameFactory.newName(keyBuffer, keyOffset, length); } - - private static Scan newTableRowsScan(byte[] key, long startTimeStamp, long stopTimeStamp) throws IOException { + + private static Scan newTableRowsScan(byte[] key) + throws IOException { + return newTableRowsScan(key, MetaDataProtocol.MIN_TABLE_TIMESTAMP, HConstants.LATEST_TIMESTAMP); + } + + private static Scan newTableRowsScan(byte[] key, long startTimeStamp, long stopTimeStamp) + throws IOException { Scan scan = new Scan(); scan.setTimeRange(startTimeStamp, stopTimeStamp); scan.setStartRow(key); @@ -454,7 +459,7 @@ public class MetaDataEndpointImpl extends BaseEndpointCoprocessor implements Met } PName physicalTableName = physicalTables.isEmpty() ? PNameFactory.newName(SchemaUtil.getTableName( schemaName.getString(), tableName.getString())) : physicalTables.get(0); - PTableStats stats = updateStatsInternal(physicalTableName.getBytes(), columns); + PTableStats stats = tenantId == null ? updateStatsInternal(physicalTableName.getBytes()) : null; return PTableImpl .makePTable(tenantId, schemaName, tableName, tableType, indexState, timeStamp, tableSeqNum, pkName, saltBucketNum, columns, tableType == INDEX ? dataTableName : null, indexes, isImmutableRows, @@ -462,62 +467,48 @@ public class MetaDataEndpointImpl extends BaseEndpointCoprocessor implements Met viewIndexId, stats); } - private PTableStats updateStatsInternal(byte[] tableNameBytes, List<PColumn> columns) throws IOException { - List<PName> family = Lists.newArrayListWithExpectedSize(columns.size()); - for (PColumn column : columns) { - PName familyName = column.getFamilyName(); - if (familyName != null) { - family.add(familyName); - } - } + private PTableStats updateStatsInternal(byte[] tableNameBytes) throws IOException { HTable statsHTable = null; + ImmutableBytesWritable ptr = new ImmutableBytesWritable(); try { // Can we do a new HTable instance here? Or get it from a pool or cache of these instances? statsHTable = new HTable(getEnvironment().getConfiguration(), PhoenixDatabaseMetaData.SYSTEM_STATS_NAME_BYTES); - Scan s = new Scan(); - if (tableNameBytes != null) { - // Check for an efficient way here - s.setStartRow(tableNameBytes); - s.setStopRow(ByteUtil.nextKey(tableNameBytes)); - } + Scan s = newTableRowsScan(tableNameBytes); + s.addColumn(QueryConstants.DEFAULT_COLUMN_FAMILY_BYTES, PhoenixDatabaseMetaData.GUIDE_POSTS_BYTES); ResultScanner scanner = statsHTable.getScanner(s); Result result = null; - byte[] fam = null; - List<byte[]> guidePosts = Lists.newArrayListWithExpectedSize(columns.size()); TreeMap<byte[], List<byte[]>> guidePostsPerCf = new TreeMap<byte[], List<byte[]>>(Bytes.BYTES_COMPARATOR); while ((result = scanner.next()) != null) { - KeyValue[] kvs = result.raw(); - for(KeyValue kv : kvs) { - // For now collect only guide posts - if (Bytes.equals(kv.getQualifier(), PhoenixDatabaseMetaData.GUIDE_POSTS_BYTES)) { - byte[] cfInCell = StatisticsUtils.getCFFromRowKey(tableNameBytes, kv.getRow()); - if (fam == null) { - fam = cfInCell; - } else if (!Bytes.equals(fam, cfInCell)) { - // Sort all the guide posts - guidePostsPerCf.put(cfInCell, guidePosts); - guidePosts = new ArrayList<byte[]>(); - fam = cfInCell; - } - byte[] guidePostVal = new ImmutableBytesPtr(kv.getBuffer(), kv.getValueOffset(), - kv.getValueLength()).copyBytesIfNecessary(); - PhoenixArray array = (PhoenixArray)PDataType.VARBINARY_ARRAY.toObject(guidePostVal); - if (array != null && array.getDimensions() != 0) { - for (int j = 0; j < array.getDimensions(); j++) { - byte[] gp = array.toBytes(j); - if (gp.length != 0) { - guidePosts.add(gp); - } - } + KeyValue current = result.raw()[0]; + int tableNameLength = tableNameBytes.length + 1; + int cfOffset = current.getRowOffset() + tableNameLength; + int cfLength = getVarCharLength(current.getRow(), cfOffset, current.getRowLength() - tableNameLength); + ptr.set(current.getRow(), cfOffset, cfLength); + byte[] cfName = ByteUtil.copyKeyBytesIfNecessary(ptr); + PhoenixArray array = (PhoenixArray)PDataType.VARBINARY_ARRAY.toObject(current.getValue(), current.getValueOffset(), current + .getValueLength()); + if (array != null && array.getDimensions() != 0) { + List<byte[]> guidePosts = Lists.newArrayListWithExpectedSize(array.getDimensions()); + for (int j = 0; j < array.getDimensions(); j++) { + byte[] gp = array.toBytes(j); + if (gp.length != 0) { + guidePosts.add(gp); } } + List<byte[]> gps = guidePostsPerCf.put(cfName, guidePosts); + if (gps != null) { // Add guidepost already there from other regions + guidePosts.addAll(gps); + } } } - if (fam != null) { - // Sort all the guideposts - guidePostsPerCf.put(fam, guidePosts); + if (!guidePostsPerCf.isEmpty()) { + // Sort guideposts, as the order above will depend on the order we traverse + // each region's worth of guideposts above. + for (List<byte[]> gps : guidePostsPerCf.values()) { + Collections.sort(gps, Bytes.BYTES_COMPARATOR); + } + return new PTableStatsImpl(guidePostsPerCf); } - return new PTableStatsImpl(guidePostsPerCf); } catch (Exception e) { if (e instanceof org.apache.hadoop.hbase.TableNotFoundException) { logger.warn("Stats table not yet online", e); http://git-wip-us.apache.org/repos/asf/phoenix/blob/88c6abb0/phoenix-core/src/main/java/org/apache/phoenix/execute/AggregatePlan.java ---------------------------------------------------------------------- diff --git a/phoenix-core/src/main/java/org/apache/phoenix/execute/AggregatePlan.java b/phoenix-core/src/main/java/org/apache/phoenix/execute/AggregatePlan.java index d45e036..9f294a1 100644 --- a/phoenix-core/src/main/java/org/apache/phoenix/execute/AggregatePlan.java +++ b/phoenix-core/src/main/java/org/apache/phoenix/execute/AggregatePlan.java @@ -65,7 +65,7 @@ import org.apache.phoenix.schema.TableRef; * * @since 0.1 */ -public class AggregatePlan extends BasicQueryPlan { +public class AggregatePlan extends BaseQueryPlan { private final Aggregators aggregators; private final Expression having; private List<KeyRange> splits; @@ -164,7 +164,7 @@ public class AggregatePlan extends BasicQueryPlan { */ context.getScan().setAttribute(BaseScannerRegionObserver.GROUP_BY_LIMIT, PDataType.INTEGER.toBytes(limit)); } - ParallelIterators parallelIterators = new ParallelIterators(context, tableRef, statement, projection, groupBy, null, wrapParallelIteratorFactory()); + ParallelIterators parallelIterators = new ParallelIterators(this, null, wrapParallelIteratorFactory()); splits = parallelIterators.getSplits(); AggregatingResultIterator aggResultIterator; http://git-wip-us.apache.org/repos/asf/phoenix/blob/88c6abb0/phoenix-core/src/main/java/org/apache/phoenix/execute/BaseQueryPlan.java ---------------------------------------------------------------------- diff --git a/phoenix-core/src/main/java/org/apache/phoenix/execute/BaseQueryPlan.java b/phoenix-core/src/main/java/org/apache/phoenix/execute/BaseQueryPlan.java new file mode 100644 index 0000000..eb43aa4 --- /dev/null +++ b/phoenix-core/src/main/java/org/apache/phoenix/execute/BaseQueryPlan.java @@ -0,0 +1,216 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.phoenix.execute; + +import java.io.IOException; +import java.sql.ParameterMetaData; +import java.sql.SQLException; +import java.util.Collections; +import java.util.List; + +import org.apache.hadoop.hbase.HConstants; +import org.apache.hadoop.hbase.client.Scan; +import org.apache.phoenix.compile.ExplainPlan; +import org.apache.phoenix.compile.GroupByCompiler.GroupBy; +import org.apache.phoenix.compile.OrderByCompiler.OrderBy; +import org.apache.phoenix.compile.QueryPlan; +import org.apache.phoenix.compile.RowProjector; +import org.apache.phoenix.compile.ScanRanges; +import org.apache.phoenix.compile.StatementContext; +import org.apache.phoenix.iterate.DelegateResultIterator; +import org.apache.phoenix.iterate.ParallelIterators.ParallelIteratorFactory; +import org.apache.phoenix.iterate.ResultIterator; +import org.apache.phoenix.jdbc.PhoenixConnection; +import org.apache.phoenix.parse.FilterableStatement; +import org.apache.phoenix.schema.TableRef; +import org.apache.phoenix.util.SQLCloseable; +import org.apache.phoenix.util.SQLCloseables; +import org.apache.phoenix.util.ScanUtil; + +import com.google.common.collect.Lists; + + + +/** + * + * Query plan that has no child plans + * + * + * @since 0.1 + */ +public abstract class BaseQueryPlan implements QueryPlan { + protected static final long DEFAULT_ESTIMATED_SIZE = 10 * 1024; // 10 K + + protected final TableRef tableRef; + protected final StatementContext context; + protected final FilterableStatement statement; + protected final RowProjector projection; + protected final ParameterMetaData paramMetaData; + protected final Integer limit; + protected final OrderBy orderBy; + protected final GroupBy groupBy; + protected final ParallelIteratorFactory parallelIteratorFactory; + + protected BaseQueryPlan( + StatementContext context, FilterableStatement statement, TableRef table, + RowProjector projection, ParameterMetaData paramMetaData, Integer limit, OrderBy orderBy, + GroupBy groupBy, ParallelIteratorFactory parallelIteratorFactory) { + this.context = context; + this.statement = statement; + this.tableRef = table; + this.projection = projection; + this.paramMetaData = paramMetaData; + this.limit = limit; + this.orderBy = orderBy; + this.groupBy = groupBy; + this.parallelIteratorFactory = parallelIteratorFactory; + } + + @Override + public boolean isDegenerate() { + return context.getScanRanges() == ScanRanges.NOTHING; + + } + + @Override + public GroupBy getGroupBy() { + return groupBy; + } + + + @Override + public OrderBy getOrderBy() { + return orderBy; + } + + @Override + public TableRef getTableRef() { + return tableRef; + } + + @Override + public Integer getLimit() { + return limit; + } + + @Override + public RowProjector getProjector() { + return projection; + } + +// /** +// * Sets up an id used to do round robin queue processing on the server +// * @param scan +// */ +// private void setProducer(Scan scan) { +// byte[] producer = Bytes.toBytes(UUID.randomUUID().toString()); +// scan.setAttribute(HBaseServer.CALL_QUEUE_PRODUCER_ATTRIB_NAME, producer); +// } + + @Override + public final ResultIterator iterator() throws SQLException { + return iterator(Collections.<SQLCloseable>emptyList()); + } + + public final ResultIterator iterator(final List<? extends SQLCloseable> dependencies) throws SQLException { + if (context.getScanRanges() == ScanRanges.NOTHING) { + return ResultIterator.EMPTY_ITERATOR; + } + + Scan scan = context.getScan(); + // Set producer on scan so HBase server does round robin processing + //setProducer(scan); + // Set the time range on the scan so we don't get back rows newer than when the statement was compiled + // The time stamp comes from the server at compile time when the meta data + // is resolved. + // TODO: include time range in explain plan? + PhoenixConnection connection = context.getConnection(); + if (context.getScanTimeRange() == null) { + Long scn = connection.getSCN(); + if (scn == null) { + scn = context.getCurrentTime(); + // Add one to server time since max of time range is exclusive + // and we need to account of OSs with lower resolution clocks. + if (scn < HConstants.LATEST_TIMESTAMP) { + scn++; + } + } + ScanUtil.setTimeRange(scan, scn); + } else { + try { + scan.setTimeRange(context.getScanTimeRange().getMin(), context.getScanTimeRange().getMax()); + } catch (IOException e) { + throw new SQLException(e); + } + } + ScanUtil.setTenantId(scan, connection.getTenantId() == null ? null : connection.getTenantId().getBytes()); + ResultIterator iterator = newIterator(); + return dependencies.isEmpty() ? + iterator : new DelegateResultIterator(iterator) { + @Override + public void close() throws SQLException { + try { + super.close(); + } finally { + SQLCloseables.closeAll(dependencies); + } + } + }; + } + + abstract protected ResultIterator newIterator() throws SQLException; + + @Override + public long getEstimatedSize() { + return DEFAULT_ESTIMATED_SIZE; + } + + @Override + public ParameterMetaData getParameterMetaData() { + return paramMetaData; + } + + @Override + public FilterableStatement getStatement() { + return statement; + } + + @Override + public StatementContext getContext() { + return context; + } + + @Override + public ExplainPlan getExplainPlan() throws SQLException { + if (context.getScanRanges() == ScanRanges.NOTHING) { + return new ExplainPlan(Collections.singletonList("DEGENERATE SCAN OVER " + tableRef.getTable().getName().getString())); + } + + // Optimize here when getting explain plan, as queries don't get optimized until after compilation + QueryPlan plan = context.getConnection().getQueryServices().getOptimizer().optimize(context.getStatement(), this); + ResultIterator iterator = plan.iterator(); + List<String> planSteps = Lists.newArrayListWithExpectedSize(5); + iterator.explain(planSteps); + return new ExplainPlan(planSteps); + } + + @Override + public boolean isRowKeyOrdered() { + return groupBy.isEmpty() ? orderBy.getOrderByExpressions().isEmpty() : groupBy.isOrderPreserving(); + } +} http://git-wip-us.apache.org/repos/asf/phoenix/blob/88c6abb0/phoenix-core/src/main/java/org/apache/phoenix/execute/BasicQueryPlan.java ---------------------------------------------------------------------- diff --git a/phoenix-core/src/main/java/org/apache/phoenix/execute/BasicQueryPlan.java b/phoenix-core/src/main/java/org/apache/phoenix/execute/BasicQueryPlan.java deleted file mode 100644 index 1aa3892..0000000 --- a/phoenix-core/src/main/java/org/apache/phoenix/execute/BasicQueryPlan.java +++ /dev/null @@ -1,211 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ -package org.apache.phoenix.execute; - -import java.io.IOException; -import java.sql.ParameterMetaData; -import java.sql.SQLException; -import java.util.Collections; -import java.util.List; - -import org.apache.hadoop.hbase.HConstants; -import org.apache.hadoop.hbase.client.Scan; -import org.apache.phoenix.compile.ExplainPlan; -import org.apache.phoenix.compile.GroupByCompiler.GroupBy; -import org.apache.phoenix.compile.OrderByCompiler.OrderBy; -import org.apache.phoenix.compile.QueryPlan; -import org.apache.phoenix.compile.RowProjector; -import org.apache.phoenix.compile.ScanRanges; -import org.apache.phoenix.compile.StatementContext; -import org.apache.phoenix.iterate.DelegateResultIterator; -import org.apache.phoenix.iterate.ParallelIterators.ParallelIteratorFactory; -import org.apache.phoenix.iterate.ResultIterator; -import org.apache.phoenix.jdbc.PhoenixConnection; -import org.apache.phoenix.parse.FilterableStatement; -import org.apache.phoenix.schema.TableRef; -import org.apache.phoenix.util.SQLCloseable; -import org.apache.phoenix.util.SQLCloseables; -import org.apache.phoenix.util.ScanUtil; - -import com.google.common.collect.Lists; - - - -/** - * - * Query plan that has no child plans - * - * - * @since 0.1 - */ -public abstract class BasicQueryPlan implements QueryPlan { - protected static final long DEFAULT_ESTIMATED_SIZE = 10 * 1024; // 10 K - - protected final TableRef tableRef; - protected final StatementContext context; - protected final FilterableStatement statement; - protected final RowProjector projection; - protected final ParameterMetaData paramMetaData; - protected final Integer limit; - protected final OrderBy orderBy; - protected final GroupBy groupBy; - protected final ParallelIteratorFactory parallelIteratorFactory; - - protected BasicQueryPlan( - StatementContext context, FilterableStatement statement, TableRef table, - RowProjector projection, ParameterMetaData paramMetaData, Integer limit, OrderBy orderBy, - GroupBy groupBy, ParallelIteratorFactory parallelIteratorFactory) { - this.context = context; - this.statement = statement; - this.tableRef = table; - this.projection = projection; - this.paramMetaData = paramMetaData; - this.limit = limit; - this.orderBy = orderBy; - this.groupBy = groupBy; - this.parallelIteratorFactory = parallelIteratorFactory; - } - - @Override - public boolean isDegenerate() { - return context.getScanRanges() == ScanRanges.NOTHING; - - } - - @Override - public GroupBy getGroupBy() { - return groupBy; - } - - - @Override - public OrderBy getOrderBy() { - return orderBy; - } - - @Override - public TableRef getTableRef() { - return tableRef; - } - - @Override - public Integer getLimit() { - return limit; - } - - @Override - public RowProjector getProjector() { - return projection; - } - -// /** -// * Sets up an id used to do round robin queue processing on the server -// * @param scan -// */ -// private void setProducer(Scan scan) { -// byte[] producer = Bytes.toBytes(UUID.randomUUID().toString()); -// scan.setAttribute(HBaseServer.CALL_QUEUE_PRODUCER_ATTRIB_NAME, producer); -// } - - @Override - public final ResultIterator iterator() throws SQLException { - return iterator(Collections.<SQLCloseable>emptyList()); - } - - public final ResultIterator iterator(final List<? extends SQLCloseable> dependencies) throws SQLException { - if (context.getScanRanges() == ScanRanges.NOTHING) { - return ResultIterator.EMPTY_ITERATOR; - } - - Scan scan = context.getScan(); - // Set producer on scan so HBase server does round robin processing - //setProducer(scan); - // Set the time range on the scan so we don't get back rows newer than when the statement was compiled - // The time stamp comes from the server at compile time when the meta data - // is resolved. - // TODO: include time range in explain plan? - PhoenixConnection connection = context.getConnection(); - if (context.getScanTimeRange() == null) { - Long scn = connection.getSCN(); - if (scn == null) { - scn = context.getCurrentTime(); - // Add one to server time since max of time range is exclusive - // and we need to account of OSs with lower resolution clocks. - if (scn < HConstants.LATEST_TIMESTAMP) { - scn++; - } - } - ScanUtil.setTimeRange(scan, scn); - } else { - try { - scan.setTimeRange(context.getScanTimeRange().getMin(), context.getScanTimeRange().getMax()); - } catch (IOException e) { - throw new SQLException(e); - } - } - ScanUtil.setTenantId(scan, connection.getTenantId() == null ? null : connection.getTenantId().getBytes()); - ResultIterator iterator = newIterator(); - return dependencies.isEmpty() ? - iterator : new DelegateResultIterator(iterator) { - @Override - public void close() throws SQLException { - try { - super.close(); - } finally { - SQLCloseables.closeAll(dependencies); - } - } - }; - } - - abstract protected ResultIterator newIterator() throws SQLException; - - @Override - public long getEstimatedSize() { - return DEFAULT_ESTIMATED_SIZE; - } - - @Override - public ParameterMetaData getParameterMetaData() { - return paramMetaData; - } - - @Override - public FilterableStatement getStatement() { - return statement; - } - - @Override - public StatementContext getContext() { - return context; - } - - @Override - public ExplainPlan getExplainPlan() throws SQLException { - if (context.getScanRanges() == ScanRanges.NOTHING) { - return new ExplainPlan(Collections.singletonList("DEGENERATE SCAN OVER " + tableRef.getTable().getName().getString())); - } - - // Optimize here when getting explain plan, as queries don't get optimized until after compilation - QueryPlan plan = context.getConnection().getQueryServices().getOptimizer().optimize(context.getStatement(), this); - ResultIterator iterator = plan.iterator(); - List<String> planSteps = Lists.newArrayListWithExpectedSize(5); - iterator.explain(planSteps); - return new ExplainPlan(planSteps); - } -} http://git-wip-us.apache.org/repos/asf/phoenix/blob/88c6abb0/phoenix-core/src/main/java/org/apache/phoenix/execute/DegenerateQueryPlan.java ---------------------------------------------------------------------- diff --git a/phoenix-core/src/main/java/org/apache/phoenix/execute/DegenerateQueryPlan.java b/phoenix-core/src/main/java/org/apache/phoenix/execute/DegenerateQueryPlan.java index 0e8eb50..80c4727 100644 --- a/phoenix-core/src/main/java/org/apache/phoenix/execute/DegenerateQueryPlan.java +++ b/phoenix-core/src/main/java/org/apache/phoenix/execute/DegenerateQueryPlan.java @@ -30,7 +30,7 @@ import org.apache.phoenix.parse.FilterableStatement; import org.apache.phoenix.query.*; import org.apache.phoenix.schema.TableRef; -public class DegenerateQueryPlan extends BasicQueryPlan { +public class DegenerateQueryPlan extends BaseQueryPlan { public DegenerateQueryPlan(StatementContext context, FilterableStatement statement, TableRef table) { super(context, statement, table, RowProjector.EMPTY_PROJECTOR, PhoenixParameterMetaData.EMPTY_PARAMETER_META_DATA, null, OrderBy.EMPTY_ORDER_BY, GroupBy.EMPTY_GROUP_BY, null); http://git-wip-us.apache.org/repos/asf/phoenix/blob/88c6abb0/phoenix-core/src/main/java/org/apache/phoenix/execute/HashJoinPlan.java ---------------------------------------------------------------------- diff --git a/phoenix-core/src/main/java/org/apache/phoenix/execute/HashJoinPlan.java b/phoenix-core/src/main/java/org/apache/phoenix/execute/HashJoinPlan.java index 66ad235..5ffcaeb 100644 --- a/phoenix-core/src/main/java/org/apache/phoenix/execute/HashJoinPlan.java +++ b/phoenix-core/src/main/java/org/apache/phoenix/execute/HashJoinPlan.java @@ -79,7 +79,7 @@ public class HashJoinPlan implements QueryPlan { private static final Log LOG = LogFactory.getLog(HashJoinPlan.class); private final FilterableStatement statement; - private final BasicQueryPlan plan; + private final BaseQueryPlan plan; private final HashJoinInfo joinInfo; private final SubPlan[] subPlans; private final boolean recompileWhereClause; @@ -93,8 +93,8 @@ public class HashJoinPlan implements QueryPlan { public static HashJoinPlan create(FilterableStatement statement, QueryPlan plan, HashJoinInfo joinInfo, SubPlan[] subPlans) { - if (plan instanceof BasicQueryPlan) - return new HashJoinPlan(statement, (BasicQueryPlan) plan, joinInfo, subPlans, joinInfo == null); + if (plan instanceof BaseQueryPlan) + return new HashJoinPlan(statement, (BaseQueryPlan) plan, joinInfo, subPlans, joinInfo == null); assert (plan instanceof HashJoinPlan); HashJoinPlan hashJoinPlan = (HashJoinPlan) plan; @@ -111,7 +111,7 @@ public class HashJoinPlan implements QueryPlan { } private HashJoinPlan(FilterableStatement statement, - BasicQueryPlan plan, HashJoinInfo joinInfo, SubPlan[] subPlans, boolean recompileWhereClause) { + BaseQueryPlan plan, HashJoinInfo joinInfo, SubPlan[] subPlans, boolean recompileWhereClause) { this.statement = statement; this.plan = plan; this.joinInfo = joinInfo; @@ -462,6 +462,11 @@ public class HashJoinPlan implements QueryPlan { } } + + @Override + public boolean isRowKeyOrdered() { + return plan.isRowKeyOrdered(); + } } http://git-wip-us.apache.org/repos/asf/phoenix/blob/88c6abb0/phoenix-core/src/main/java/org/apache/phoenix/execute/ScanPlan.java ---------------------------------------------------------------------- diff --git a/phoenix-core/src/main/java/org/apache/phoenix/execute/ScanPlan.java b/phoenix-core/src/main/java/org/apache/phoenix/execute/ScanPlan.java index 03deca7..dfb8fec 100644 --- a/phoenix-core/src/main/java/org/apache/phoenix/execute/ScanPlan.java +++ b/phoenix-core/src/main/java/org/apache/phoenix/execute/ScanPlan.java @@ -56,12 +56,12 @@ import org.apache.phoenix.util.ScanUtil; * * @since 0.1 */ -public class ScanPlan extends BasicQueryPlan { +public class ScanPlan extends BaseQueryPlan { private List<KeyRange> splits; private boolean allowPageFilter; public ScanPlan(StatementContext context, FilterableStatement statement, TableRef table, RowProjector projector, Integer limit, OrderBy orderBy, ParallelIteratorFactory parallelIteratorFactory, boolean allowPageFilter) { - super(context, statement, table, projector, context.getBindManager().getParameterMetaData(), limit, orderBy, null, + super(context, statement, table, projector, context.getBindManager().getParameterMetaData(), limit, orderBy, GroupBy.EMPTY_GROUP_BY, parallelIteratorFactory != null ? parallelIteratorFactory : buildResultIteratorFactory(context, table, orderBy)); this.allowPageFilter = allowPageFilter; @@ -110,7 +110,7 @@ public class ScanPlan extends BasicQueryPlan { * limit is provided, run query serially. */ boolean isOrdered = !orderBy.getOrderByExpressions().isEmpty(); - ParallelIterators iterators = new ParallelIterators(context, tableRef, statement, projection, GroupBy.EMPTY_GROUP_BY, !allowPageFilter || isOrdered ? null : limit, parallelIteratorFactory); + ParallelIterators iterators = new ParallelIterators(this, !allowPageFilter || isOrdered ? null : limit, parallelIteratorFactory); splits = iterators.getSplits(); if (isOrdered) { scanner = new MergeSortTopNResultIterator(iterators, limit, orderBy.getOrderByExpressions()); http://git-wip-us.apache.org/repos/asf/phoenix/blob/88c6abb0/phoenix-core/src/main/java/org/apache/phoenix/filter/SkipScanFilter.java ---------------------------------------------------------------------- diff --git a/phoenix-core/src/main/java/org/apache/phoenix/filter/SkipScanFilter.java b/phoenix-core/src/main/java/org/apache/phoenix/filter/SkipScanFilter.java index 1bcfcd0..adeb2ac 100644 --- a/phoenix-core/src/main/java/org/apache/phoenix/filter/SkipScanFilter.java +++ b/phoenix-core/src/main/java/org/apache/phoenix/filter/SkipScanFilter.java @@ -105,7 +105,7 @@ public class SkipScanFilter extends FilterBase { } // Exposed for testing. - List<List<KeyRange>> getSlots() { + public List<List<KeyRange>> getSlots() { return slots; } @@ -178,14 +178,35 @@ public class SkipScanFilter extends FilterBase { schema.next(ptr, 0, schema.iterator(upperExclusiveKey,ptr), slotSpan[0]); endPos = ScanUtil.searchClosestKeyRangeWithUpperHigherThanPtr(slots.get(0), ptr, startPos); // Upper range lower than first lower range of first slot, so cannot possibly be in range - if (endPos == 0 && Bytes.compareTo(upperExclusiveKey, slots.get(0).get(0).getLowerRange()) <= 0) { - return false; - } +// if (endPos == 0 && Bytes.compareTo(upperExclusiveKey, slots.get(0).get(0).getLowerRange()) <= 0) { +// return false; +// } // Past last position, so we can include everything from the start position if (endPos >= slots.get(0).size()) { upperUnbound = true; endPos = slots.get(0).size()-1; + } else if (slots.get(0).get(endPos).compareLowerToUpperBound(upperExclusiveKey) >= 0) { + // We know that the endPos range is higher than the previous range, but we need + // to test if it ends before the next range starts. + endPos--; + } + if (endPos < startPos) { + return false; + } + + } + // Short circuit out if we only have a single set of keys + if (slots.size() == 1) { +// int offset = slots.get(0).get(endPos).compareLowerToUpperBound(upperExclusiveKey) < 0 ? 1 : 0; +// if (endPos + offset <= startPos) { +// return false; +// } +// List<KeyRange> newRanges = slots.get(0).subList(startPos, endPos + offset); + if (newSlots != null) { + List<KeyRange> newRanges = slots.get(0).subList(startPos, endPos+1); + newSlots.add(newRanges); } + return true; } if (!lowerUnbound) { position[0] = startPos; http://git-wip-us.apache.org/repos/asf/phoenix/blob/88c6abb0/phoenix-core/src/main/java/org/apache/phoenix/index/PhoenixIndexBuilder.java ---------------------------------------------------------------------- diff --git a/phoenix-core/src/main/java/org/apache/phoenix/index/PhoenixIndexBuilder.java b/phoenix-core/src/main/java/org/apache/phoenix/index/PhoenixIndexBuilder.java index f486a47..8822e49 100644 --- a/phoenix-core/src/main/java/org/apache/phoenix/index/PhoenixIndexBuilder.java +++ b/phoenix-core/src/main/java/org/apache/phoenix/index/PhoenixIndexBuilder.java @@ -59,7 +59,6 @@ public class PhoenixIndexBuilder extends CoveredColumnsIndexBuilder { } Scan scan = IndexManagementUtil.newLocalStateScan(maintainers); ScanRanges scanRanges = ScanRanges.create(SchemaUtil.VAR_BINARY_SCHEMA, Collections.singletonList(keys), ScanUtil.SINGLE_COLUMN_SLOT_SPAN); - scanRanges.setScanStartStopRow(scan); scan.setFilter(scanRanges.getSkipScanFilter()); HRegion region = this.env.getRegion(); RegionScanner scanner = region.getScanner(scan); http://git-wip-us.apache.org/repos/asf/phoenix/blob/88c6abb0/phoenix-core/src/main/java/org/apache/phoenix/iterate/DefaultParallelIteratorRegionSplitter.java ---------------------------------------------------------------------- diff --git a/phoenix-core/src/main/java/org/apache/phoenix/iterate/DefaultParallelIteratorRegionSplitter.java b/phoenix-core/src/main/java/org/apache/phoenix/iterate/DefaultParallelIteratorRegionSplitter.java deleted file mode 100644 index b8135d1..0000000 --- a/phoenix-core/src/main/java/org/apache/phoenix/iterate/DefaultParallelIteratorRegionSplitter.java +++ /dev/null @@ -1,174 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ -package org.apache.phoenix.iterate; - -import java.sql.SQLException; -import java.util.Collections; -import java.util.List; - -import org.apache.hadoop.hbase.HRegionLocation; -import org.apache.hadoop.hbase.client.Scan; -import org.apache.hadoop.hbase.util.Bytes; -import org.apache.phoenix.compile.StatementContext; -import org.apache.phoenix.parse.HintNode; -import org.apache.phoenix.query.KeyRange; -import org.apache.phoenix.query.QueryServices; -import org.apache.phoenix.query.QueryServicesOptions; -import org.apache.phoenix.schema.ColumnFamilyNotFoundException; -import org.apache.phoenix.schema.PTable; -import org.apache.phoenix.util.ReadOnlyProps; -import org.apache.phoenix.util.ScanUtil; -import org.apache.phoenix.util.SchemaUtil; -import org.slf4j.Logger; -import org.slf4j.LoggerFactory; - -import com.google.common.base.Predicate; -import com.google.common.collect.Iterables; -import com.google.common.collect.Lists; - - -/** - * Default strategy for splitting regions in ParallelIterator. Refactored from the - * original version. - * - * - * - */ -public class DefaultParallelIteratorRegionSplitter implements ParallelIteratorRegionSplitter { - - protected final long guidePostsDepth; - protected final StatementContext context; - protected final PTable table; - - private static final Logger logger = LoggerFactory.getLogger(DefaultParallelIteratorRegionSplitter.class); - public static DefaultParallelIteratorRegionSplitter getInstance(StatementContext context, PTable table, HintNode hintNode) { - return new DefaultParallelIteratorRegionSplitter(context, table, hintNode); - } - - protected DefaultParallelIteratorRegionSplitter(StatementContext context, PTable table, HintNode hintNode) { - this.context = context; - this.table = table; - ReadOnlyProps props = context.getConnection().getQueryServices().getProps(); - this.guidePostsDepth = props.getLong(QueryServices.HISTOGRAM_BYTE_DEPTH_ATTRIB, - QueryServicesOptions.DEFAULT_HISTOGRAM_BYTE_DEPTH); - } - - // Get the mapping between key range and the regions that contains them. - protected List<HRegionLocation> getAllRegions() throws SQLException { - Scan scan = context.getScan(); - List<HRegionLocation> allTableRegions = context.getConnection().getQueryServices() - .getAllTableRegions(table.getPhysicalName().getBytes()); - // If we're not salting, then we've already intersected the minMaxRange with the scan range - // so there's nothing to do here. - return filterRegions(allTableRegions, scan.getStartRow(), scan.getStopRow()); - } - - /** - * Filters out regions that intersect with key range specified by the startKey and stopKey - * @param allTableRegions all region infos for a given table - * @param startKey the lower bound of key range, inclusive - * @param stopKey the upper bound of key range, inclusive - * @return regions that intersect with the key range given by the startKey and stopKey - */ - // exposed for tests - public static List<HRegionLocation> filterRegions(List<HRegionLocation> allTableRegions, byte[] startKey, byte[] stopKey) { - Iterable<HRegionLocation> regions; - final KeyRange keyRange = KeyRange.getKeyRange(startKey, true, stopKey, false); - if (keyRange == KeyRange.EVERYTHING_RANGE) { - return allTableRegions; - } - - regions = Iterables.filter(allTableRegions, new Predicate<HRegionLocation>() { - @Override - public boolean apply(HRegionLocation location) { - KeyRange regionKeyRange = KeyRange.getKeyRange(location.getRegionInfo().getStartKey(), location - .getRegionInfo().getEndKey()); - return keyRange.intersect(regionKeyRange) != KeyRange.EMPTY_RANGE; - } - }); - return Lists.newArrayList(regions); - } - - protected List<KeyRange> genKeyRanges(List<HRegionLocation> regions) { - if (regions.isEmpty()) { return Collections.emptyList(); } - Scan scan = context.getScan(); - byte[] defaultCF = SchemaUtil.getEmptyColumnFamily(table); - List<byte[]> gps = Lists.newArrayList(); - if (!ScanUtil.isAnalyzeTable(scan)) { - if (table.getColumnFamilies().isEmpty()) { - // For sure we can get the defaultCF from the table - gps = table.getGuidePosts(); - } else { - try { - if (scan.getFamilyMap().size() > 0) { - if (scan.getFamilyMap().containsKey(defaultCF)) { // Favor using default CF if it's used in scan - gps = table.getColumnFamily(defaultCF).getGuidePosts(); - } else { // Otherwise, just use first CF in use by scan - gps = table.getColumnFamily(scan.getFamilyMap().keySet().iterator().next()).getGuidePosts(); - } - } else { - gps = table.getColumnFamily(defaultCF).getGuidePosts(); - } - } catch (ColumnFamilyNotFoundException cfne) { - // Alter table does this - } - } - - } - - List<KeyRange> guidePosts = Lists.newArrayListWithCapacity(regions.size()); - byte[] currentKey = regions.get(0).getRegionInfo().getStartKey(); - byte[] endKey = null; - int regionIndex = 0; - int guideIndex = 0; - int gpsSize = gps.size(); - int regionSize = regions.size(); - if (currentKey.length > 0) { - guideIndex = Collections.binarySearch(gps, currentKey, Bytes.BYTES_COMPARATOR); - guideIndex = (guideIndex < 0 ? -(guideIndex + 1) : (guideIndex + 1)); - } - // Merge bisect with guideposts for all but the last region - while (regionIndex < regionSize) { - byte[] currentGuidePost; - currentKey = regions.get(regionIndex).getRegionInfo().getStartKey(); - endKey = regions.get(regionIndex++).getRegionInfo().getEndKey(); - while (guideIndex < gpsSize - && (Bytes.compareTo(currentGuidePost = gps.get(guideIndex), endKey) <= 0 || endKey.length == 0)) { - KeyRange keyRange = KeyRange.getKeyRange(currentKey, currentGuidePost); - if (keyRange != KeyRange.EMPTY_RANGE) { - guidePosts.add(keyRange); - } - currentKey = currentGuidePost; - guideIndex++; - } - KeyRange keyRange = KeyRange.getKeyRange(currentKey, endKey); - if (keyRange != KeyRange.EMPTY_RANGE) { - guidePosts.add(keyRange); - } - } - if (logger.isDebugEnabled()) { - logger.debug("The captured guideposts are: " + guidePosts); - } - return guidePosts; - } - - @Override - public List<KeyRange> getSplits() throws SQLException { - return genKeyRanges(getAllRegions()); - } -}
