http://git-wip-us.apache.org/repos/asf/phoenix/blob/d018cc1c/phoenix-core/src/main/java/org/apache/phoenix/iterate/ParallelIterators.java ---------------------------------------------------------------------- diff --git a/phoenix-core/src/main/java/org/apache/phoenix/iterate/ParallelIterators.java b/phoenix-core/src/main/java/org/apache/phoenix/iterate/ParallelIterators.java index 4da593f..59fb082 100644 --- a/phoenix-core/src/main/java/org/apache/phoenix/iterate/ParallelIterators.java +++ b/phoenix-core/src/main/java/org/apache/phoenix/iterate/ParallelIterators.java @@ -17,12 +17,9 @@ */ package org.apache.phoenix.iterate; -import static org.apache.phoenix.coprocessor.BaseScannerRegionObserver.EXPECTED_UPPER_REGION_KEY; - import java.sql.SQLException; import java.util.ArrayList; import java.util.Collections; -import java.util.Comparator; import java.util.List; import java.util.Map; import java.util.Map.Entry; @@ -36,37 +33,39 @@ import java.util.concurrent.ExecutorService; import java.util.concurrent.Future; import java.util.concurrent.TimeUnit; +import org.apache.hadoop.hbase.HRegionInfo; import org.apache.hadoop.hbase.HRegionLocation; import org.apache.hadoop.hbase.client.Scan; import org.apache.hadoop.hbase.filter.FirstKeyOnlyFilter; import org.apache.hadoop.hbase.filter.PageFilter; import org.apache.hadoop.hbase.util.Bytes; import org.apache.hadoop.hbase.util.Pair; -import org.apache.phoenix.compile.GroupByCompiler.GroupBy; +import org.apache.phoenix.compile.QueryPlan; import org.apache.phoenix.compile.RowProjector; +import org.apache.phoenix.compile.ScanRanges; import org.apache.phoenix.compile.StatementContext; import org.apache.phoenix.coprocessor.MetaDataProtocol.MetaDataMutationResult; import org.apache.phoenix.filter.ColumnProjectionFilter; import org.apache.phoenix.hbase.index.util.ImmutableBytesPtr; import org.apache.phoenix.job.JobManager.JobCallable; import org.apache.phoenix.parse.FilterableStatement; -import org.apache.phoenix.parse.HintNode; import org.apache.phoenix.parse.HintNode.Hint; import org.apache.phoenix.query.ConnectionQueryServices; import org.apache.phoenix.query.KeyRange; import org.apache.phoenix.query.QueryConstants; import org.apache.phoenix.query.QueryServices; +import org.apache.phoenix.schema.ColumnFamilyNotFoundException; import org.apache.phoenix.schema.MetaDataClient; import org.apache.phoenix.schema.PColumnFamily; import org.apache.phoenix.schema.PTable; import org.apache.phoenix.schema.PTable.IndexType; import org.apache.phoenix.schema.PTable.ViewType; import org.apache.phoenix.schema.PTableKey; -import org.apache.phoenix.schema.PTableType; import org.apache.phoenix.schema.SaltingUtil; import org.apache.phoenix.schema.StaleRegionBoundaryCacheException; import org.apache.phoenix.schema.TableRef; import org.apache.phoenix.trace.util.Tracing; +import org.apache.phoenix.util.ByteUtil; import org.apache.phoenix.util.LogUtil; import org.apache.phoenix.util.ReadOnlyProps; import org.apache.phoenix.util.SQLCloseables; @@ -77,6 +76,7 @@ import org.slf4j.Logger; import org.slf4j.LoggerFactory; import com.google.common.base.Function; +import com.google.common.collect.ImmutableList; import com.google.common.collect.Lists; @@ -90,7 +90,10 @@ import com.google.common.collect.Lists; */ public class ParallelIterators extends ExplainTable implements ResultIterators { private static final Logger logger = LoggerFactory.getLogger(ParallelIterators.class); + private final List<List<Scan>> scans; private final List<KeyRange> splits; + private final PTable physicalTable; + private final QueryPlan plan; private final ParallelIteratorFactory iteratorFactory; public static interface ParallelIteratorFactory { @@ -98,6 +101,7 @@ public class ParallelIterators extends ExplainTable implements ResultIterators { } private static final int DEFAULT_THREAD_TIMEOUT_MS = 60000; // 1min + private static final int ESTIMATED_GUIDEPOSTS_PER_REGION = 20; static final Function<HRegionLocation, KeyRange> TO_KEY_RANGE = new Function<HRegionLocation, KeyRange>() { @Override @@ -106,10 +110,14 @@ public class ParallelIterators extends ExplainTable implements ResultIterators { } }; - public ParallelIterators(StatementContext context, TableRef tableRef, FilterableStatement statement, - RowProjector projector, GroupBy groupBy, Integer limit, ParallelIteratorFactory iteratorFactory) + public ParallelIterators(QueryPlan plan, Integer perScanLimit, ParallelIteratorFactory iteratorFactory) throws SQLException { - super(context, tableRef, groupBy); + super(plan.getContext(), plan.getTableRef(), plan.getGroupBy()); + this.plan = plan; + StatementContext context = plan.getContext(); + TableRef tableRef = plan.getTableRef(); + FilterableStatement statement = plan.getStatement(); + RowProjector projector = plan.getProjector(); MetaDataClient client = new MetaDataClient(context.getConnection()); PTable physicalTable = tableRef.getTable(); String physicalName = tableRef.getTable().getPhysicalName().getString(); @@ -128,8 +136,7 @@ public class ParallelIterators extends ExplainTable implements ResultIterators { .getTable(new PTableKey(null, physicalTableName)); } } - this.splits = getSplits(context, physicalTable, statement.getHint()); - this.iteratorFactory = iteratorFactory; + this.physicalTable = physicalTable; Scan scan = context.getScan(); PTable table = tableRef.getTable(); if (projector.isProjectEmptyKeyValue()) { @@ -154,17 +161,30 @@ public class ParallelIterators extends ExplainTable implements ResultIterators { } } } else if (table.getViewType() == ViewType.MAPPED) { - // Since we don't have the empty key value in MAPPED tables, we must select all CFs in HRS. But only the - // selected column values are returned back to client - for (PColumnFamily family : table.getColumnFamilies()) { - scan.addFamily(family.getName().getBytes()); - } - } // TODO adding all CFs here is not correct. It should be done only after ColumnProjectionOptimization. - if (limit != null) { - ScanUtil.andFilterAtEnd(scan, new PageFilter(limit)); + // Since we don't have the empty key value in MAPPED tables, we must select all CFs in HRS. But only the + // selected column values are returned back to client + for (PColumnFamily family : table.getColumnFamilies()) { + scan.addFamily(family.getName().getBytes()); + } + } + + // TODO adding all CFs here is not correct. It should be done only after ColumnProjectionOptimization. + if (perScanLimit != null) { + ScanUtil.andFilterAtEnd(scan, new PageFilter(perScanLimit)); } doColumnProjectionOptimization(context, scan, table, statement); + + this.iteratorFactory = iteratorFactory; + this.scans = getParallelScans(context.getScan()); + List<List<Scan>> scans = getParallelScans(context.getScan()); + List<KeyRange> splitRanges = Lists.newArrayListWithExpectedSize(scans.size() * ESTIMATED_GUIDEPOSTS_PER_REGION); + for (List<Scan> scanList : scans) { + for (Scan aScan : scanList) { + splitRanges.add(KeyRange.getKeyRange(aScan.getStartRow(), aScan.getStopRow())); + } + } + this.splits = ImmutableList.copyOf(splitRanges); } private void doColumnProjectionOptimization(StatementContext context, Scan scan, PTable table, FilterableStatement statement) { @@ -247,29 +267,218 @@ public class ParallelIterators extends ExplainTable implements ResultIterators { } } + public List<KeyRange> getSplits() { + return splits; + } + + private static List<byte[]> toBoundaries(List<HRegionLocation> regionLocations) { + int nBoundaries = regionLocations.size() - 1; + List<byte[]> ranges = Lists.newArrayListWithExpectedSize(nBoundaries); + for (int i = 0; i < nBoundaries; i++) { + HRegionInfo regionInfo = regionLocations.get(i).getRegionInfo(); + ranges.add(regionInfo.getEndKey()); + } + return ranges; + } + + private static int getIndexContainingInclusive(List<byte[]> boundaries, byte[] inclusiveKey) { + int guideIndex = Collections.binarySearch(boundaries, inclusiveKey, Bytes.BYTES_COMPARATOR); + // If we found an exact match, return the index+1, as the inclusiveKey will be contained + // in the next region (since we're matching on the end boundary). + guideIndex = (guideIndex < 0 ? -(guideIndex + 1) : (guideIndex + 1)); + return guideIndex; + } + + private static int getIndexContainingExclusive(List<byte[]> boundaries, byte[] exclusiveKey) { + int guideIndex = Collections.binarySearch(boundaries, exclusiveKey, Bytes.BYTES_COMPARATOR); + // If we found an exact match, return the index we found as the exclusiveKey won't be + // contained in the next region as with getIndexContainingInclusive. + guideIndex = (guideIndex < 0 ? -(guideIndex + 1) : guideIndex); + return guideIndex; + } + + private List<byte[]> getGuidePosts(PTable table) { + Scan scan = context.getScan(); + boolean isPointLookup = context.getScanRanges().isPointLookup(); + byte[] defaultCF = SchemaUtil.getEmptyColumnFamily(table); + List<byte[]> gps = Collections.emptyList(); + /* + * Don't use guide posts if: + * 1) We're doing a point lookup, as HBase is fast enough at those + * to not need them to be further parallelized. TODO: pref test to verify + * 2) We're collecting stats, as in this case we need to scan entire + * regions worth of data to track where to put the guide posts. + */ + if (!isPointLookup && !ScanUtil.isAnalyzeTable(scan)) { + if (table.getColumnFamilies().isEmpty()) { + // For sure we can get the defaultCF from the table + return table.getGuidePosts(); + } + try { + if (scan.getFamilyMap().size() > 0 && !scan.getFamilyMap().containsKey(defaultCF)) { + // If default CF is not used in scan, use first CF referenced in scan + return table.getColumnFamily(scan.getFamilyMap().keySet().iterator().next()).getGuidePosts(); + } + // Otherwise, favor use of default CF. + return table.getColumnFamily(defaultCF).getGuidePosts(); + } catch (ColumnFamilyNotFoundException cfne) { + // Alter table does this + } + } + return gps; + + } + + private static String toString(List<byte[]> gps) { + StringBuilder buf = new StringBuilder(gps.size() * 100); + buf.append("["); + for (int i = 0; i < gps.size(); i++) { + buf.append(Bytes.toStringBinary(gps.get(i))); + buf.append(","); + if (i < gps.size()-1 && (i % 10) == 0) { + buf.append("\n"); + } + } + buf.setCharAt(buf.length()-1, ']'); + return buf.toString(); + } + + private List<Scan> addNewScan(List<List<Scan>> parallelScans, List<Scan> scans, Scan scan, boolean crossedRegionBoundary) { + if (scan == null) { + return scans; + } + if (!scans.isEmpty()) { + boolean startNewScanList = false; + if (!plan.isRowKeyOrdered()) { + startNewScanList = true; + } else if (crossedRegionBoundary) { + if (physicalTable.getIndexType() == IndexType.LOCAL) { + startNewScanList = true; + } else if (physicalTable.getBucketNum() != null) { + byte[] previousStartKey = scans.get(scans.size()-1).getStartRow(); + byte[] currentStartKey = scan.getStartRow(); + byte[] prefix = ScanUtil.getPrefix(previousStartKey, SaltingUtil.NUM_SALTING_BYTES); + startNewScanList = ScanUtil.crossesPrefixBoundary(currentStartKey, prefix, SaltingUtil.NUM_SALTING_BYTES); + } + } + if (startNewScanList) { + parallelScans.add(scans); + scans = Lists.newArrayListWithExpectedSize(1); + } + } + scans.add(scan); + return scans; + } /** - * Splits the given scan's key range so that each split can be queried in parallel - * @param hintNode TODO - * - * @return the key ranges that should be scanned in parallel + * Compute the list of parallel scans to run for a given query. The inner scans + * may be concatenated together directly, while the other ones may need to be + * merge sorted, depending on the query. + * @return list of parallel scans to run for a given query. + * @throws SQLException */ - // exposed for tests - public static List<KeyRange> getSplits(StatementContext context, PTable table, HintNode hintNode) throws SQLException { - return ParallelIteratorRegionSplitterFactory.getSplitter(context, table, hintNode).getSplits(); + private List<List<Scan>> getParallelScans(final Scan scan) throws SQLException { + List<HRegionLocation> regionLocations = context.getConnection().getQueryServices() + .getAllTableRegions(physicalTable.getPhysicalName().getBytes()); + List<byte[]> regionBoundaries = toBoundaries(regionLocations); + ScanRanges scanRanges = context.getScanRanges(); + boolean isSalted = physicalTable.getBucketNum() != null; + boolean isLocalIndex = physicalTable.getIndexType() == IndexType.LOCAL; + List<byte[]> gps = getGuidePosts(physicalTable); + if (logger.isDebugEnabled()) { + logger.debug("Guideposts: " + toString(gps)); + } + boolean traverseAllRegions = isSalted || isLocalIndex; + + byte[] startKey = ByteUtil.EMPTY_BYTE_ARRAY; + byte[] currentKey = ByteUtil.EMPTY_BYTE_ARRAY; + byte[] stopKey = ByteUtil.EMPTY_BYTE_ARRAY; + int regionIndex = 0; + int stopIndex = regionBoundaries.size(); + if (!traverseAllRegions) { + startKey = scan.getStartRow(); + if (startKey.length > 0) { + currentKey = startKey; + regionIndex = getIndexContainingInclusive(regionBoundaries, startKey); + } + stopKey = scan.getStopRow(); + if (stopKey.length > 0) { + stopIndex = Math.min(stopIndex, regionIndex + getIndexContainingExclusive(regionBoundaries.subList(regionIndex, stopIndex), stopKey)); + } + } + List<List<Scan>> parallelScans = Lists.newArrayListWithExpectedSize(stopIndex - regionIndex + 1); + + int guideIndex = currentKey.length == 0 ? 0 : getIndexContainingInclusive(gps, currentKey); + int gpsSize = gps.size(); + int estGuidepostsPerRegion = gpsSize == 0 ? 1 : gpsSize / regionLocations.size() + 1; + int keyOffset = 0; + List<Scan> scans = Lists.newArrayListWithExpectedSize(estGuidepostsPerRegion); + // Merge bisect with guideposts for all but the last region + while (regionIndex <= stopIndex) { + byte[] currentGuidePost; + byte[] endKey = regionIndex == stopIndex ? stopKey : regionBoundaries.get(regionIndex); + if (isLocalIndex) { + HRegionInfo regionInfo = regionLocations.get(regionIndex).getRegionInfo(); + keyOffset = ScanUtil.getRowKeyOffset(regionInfo.getStartKey(), regionInfo.getEndKey()); + } + while (guideIndex < gpsSize + && (Bytes.compareTo(currentGuidePost = gps.get(guideIndex), endKey) <= 0 || endKey.length == 0)) { + Scan newScan = scanRanges.intersectScan(scan, currentKey, currentGuidePost, keyOffset); + scans = addNewScan(parallelScans, scans, newScan, false); + currentKey = currentGuidePost; + guideIndex++; + } + Scan newScan = scanRanges.intersectScan(scan, currentKey, endKey, keyOffset); + scans = addNewScan(parallelScans, scans, newScan, true); + currentKey = endKey; + regionIndex++; + } + if (!scans.isEmpty()) { // Add any remaining scans + parallelScans.add(scans); + } + if (logger.isDebugEnabled()) { + logger.debug(LogUtil.addCustomAnnotations("The parallelScans: " + parallelScans, + ScanUtil.getCustomAnnotations(scan))); + } + return parallelScans; } - private static List<KeyRange> toKeyRanges(List<HRegionLocation> regions) { - List<KeyRange> keyRanges = Lists.newArrayListWithExpectedSize(regions.size()); - for (HRegionLocation region : regions) { - keyRanges.add(TO_KEY_RANGE.apply(region)); + private static void addConcatResultIterator(List<PeekingResultIterator> iterators, final List<PeekingResultIterator> concatIterators) { + if (!concatIterators.isEmpty()) { + if (concatIterators.size() == 1) { + iterators.add(concatIterators.get(0)); + } else { + // TODO: should ConcatResultIterator have a constructor that takes + // a List<PeekingResultIterator>? + iterators.add(new ConcatResultIterator(new ResultIterators() { + + @Override + public List<PeekingResultIterator> getIterators() throws SQLException { + return concatIterators; + } + + @Override + public int size() { + return concatIterators.size(); + } + + @Override + public void explain(List<String> planSteps) { + // TODO: review what we should for explain plan here + concatIterators.get(0).explain(planSteps); + } + + })); + } } - return keyRanges; } - public List<KeyRange> getSplits() { - return splits; + public static <T> List<T> reverseIfNecessary(List<T> list, boolean reverse) { + if (!reverse) { + return list; + } + return Lists.reverse(list); } - + /** * Executes the scan in parallel across all regions, blocking until all scans are complete. * @return the result iterators for the scan of each region @@ -277,53 +486,54 @@ public class ParallelIterators extends ExplainTable implements ResultIterators { @Override public List<PeekingResultIterator> getIterators() throws SQLException { boolean success = false; + boolean isReverse = ScanUtil.isReversed(context.getScan()); final ConnectionQueryServices services = context.getConnection().getQueryServices(); ReadOnlyProps props = services.getProps(); - int numSplits = splits.size(); + int numSplits = size(); List<PeekingResultIterator> iterators = new ArrayList<PeekingResultIterator>(numSplits); - List<Pair<KeyRange,Future<PeekingResultIterator>>> futures = new ArrayList<Pair<KeyRange,Future<PeekingResultIterator>>>(numSplits); + List<List<Pair<Scan,Future<PeekingResultIterator>>>> futures = Lists.newArrayListWithExpectedSize(numSplits); + // TODO: what purpose does this scanID serve? final UUID scanId = UUID.randomUUID(); try { - submitWork(scanId, splits, futures); + submitWork(scanId, scans, futures, splits.size()); int timeoutMs = props.getInt(QueryServices.THREAD_TIMEOUT_MS_ATTRIB, DEFAULT_THREAD_TIMEOUT_MS); - final int factor = ScanUtil.isReversed(this.context.getScan()) ? -1 : 1; - // Sort futures by row key so that we have a predictable order we're getting rows back for scans. - // We're going to wait here until they're finished anyway and this makes testing much easier. - Collections.sort(futures, new Comparator<Pair<KeyRange,Future<PeekingResultIterator>>>() { - @Override - public int compare(Pair<KeyRange, Future<PeekingResultIterator>> o1, Pair<KeyRange, Future<PeekingResultIterator>> o2) { - return factor * Bytes.compareTo(o1.getFirst().getLowerRange(), o2.getFirst().getLowerRange()); - } - }); boolean clearedCache = false; - byte[] tableName = tableRef.getTable().getPhysicalName().getBytes(); - for (Pair<KeyRange,Future<PeekingResultIterator>> future : futures) { - try { - PeekingResultIterator iterator = future.getSecond().get(timeoutMs, TimeUnit.MILLISECONDS); - iterators.add(iterator); - } catch (ExecutionException e) { - try { // Rethrow as SQLException - throw ServerUtil.parseServerException(e); - } catch (StaleRegionBoundaryCacheException e2) { // Catch only to try to recover from region boundary cache being out of date - List<Pair<KeyRange,Future<PeekingResultIterator>>> newFutures = new ArrayList<Pair<KeyRange,Future<PeekingResultIterator>>>(2); - if (!clearedCache) { // Clear cache once so that we rejigger job based on new boundaries - services.clearTableRegionCache(tableName); - clearedCache = true; - } - List<KeyRange> allSplits = toKeyRanges(services.getAllTableRegions(tableName)); - // Intersect what was the expected boundary with all new region boundaries and - // resubmit just this portion of work again - List<KeyRange> newSubSplits = KeyRange.intersect(Collections.singletonList(future.getFirst()), allSplits); - submitWork(scanId, newSubSplits, newFutures); - for (Pair<KeyRange,Future<PeekingResultIterator>> newFuture : newFutures) { - // Immediate do a get (not catching exception again) and then add the iterators we - // get back immediately. They'll be sorted as expected, since they're replacing the - // original one. - PeekingResultIterator iterator = newFuture.getSecond().get(timeoutMs, TimeUnit.MILLISECONDS); - iterators.add(iterator); + for (List<Pair<Scan,Future<PeekingResultIterator>>> future : reverseIfNecessary(futures,isReverse)) { + List<PeekingResultIterator> concatIterators = Lists.newArrayListWithExpectedSize(future.size()); + for (Pair<Scan,Future<PeekingResultIterator>> scanPair : reverseIfNecessary(future,isReverse)) { + try { + PeekingResultIterator iterator = scanPair.getSecond().get(timeoutMs, TimeUnit.MILLISECONDS); + concatIterators.add(iterator); + } catch (ExecutionException e) { + try { // Rethrow as SQLException + throw ServerUtil.parseServerException(e); + } catch (StaleRegionBoundaryCacheException e2) { // Catch only to try to recover from region boundary cache being out of date + List<List<Pair<Scan,Future<PeekingResultIterator>>>> newFutures = Lists.newArrayListWithExpectedSize(2); + if (!clearedCache) { // Clear cache once so that we rejigger job based on new boundaries + services.clearTableRegionCache(physicalTable.getName().getBytes()); + clearedCache = true; + } + // Resubmit just this portion of work again + Scan oldScan = scanPair.getFirst(); + List<List<Scan>> newNestedScans = this.getParallelScans(oldScan); + // Add any concatIterators that were successful so far + // as we need these to be in order + addConcatResultIterator(iterators, concatIterators); + concatIterators = Collections.emptyList(); + submitWork(scanId, newNestedScans, newFutures, newNestedScans.size()); + for (List<Pair<Scan,Future<PeekingResultIterator>>> newFuture : reverseIfNecessary(newFutures, isReverse)) { + for (Pair<Scan,Future<PeekingResultIterator>> newScanPair : reverseIfNecessary(newFuture, isReverse)) { + // Immediate do a get (not catching exception again) and then add the iterators we + // get back immediately. They'll be sorted as expected, since they're replacing the + // original one. + PeekingResultIterator iterator = newScanPair.getSecond().get(timeoutMs, TimeUnit.MILLISECONDS); + iterators.add(iterator); + } + } } } } + addConcatResultIterator(iterators, concatIterators); } success = true; @@ -343,70 +553,80 @@ public class ParallelIterators extends ExplainTable implements ResultIterators { } } - private void submitWork(final UUID scanId, List<KeyRange> splits, - List<Pair<KeyRange,Future<PeekingResultIterator>>> futures) { + private static final class ScanLocation { + private final int outerListIndex; + private final int innerListIndex; + private final Scan scan; + public ScanLocation(Scan scan, int outerListIndex, int innerListIndex) { + this.outerListIndex = outerListIndex; + this.innerListIndex = innerListIndex; + this.scan = scan; + } + public int getOuterListIndex() { + return outerListIndex; + } + public int getInnerListIndex() { + return innerListIndex; + } + public Scan getScan() { + return scan; + } + } + private void submitWork(final UUID scanId, List<List<Scan>> nestedScans, + List<List<Pair<Scan,Future<PeekingResultIterator>>>> nestedFutures, int estFlattenedSize) { final ConnectionQueryServices services = context.getConnection().getQueryServices(); ExecutorService executor = services.getExecutor(); - final boolean localIndex = this.tableRef.getTable().getType() == PTableType.INDEX && this.tableRef.getTable().getIndexType() == IndexType.LOCAL; - for (final KeyRange split : splits) { - final Scan splitScan = ScanUtil.newScan(context.getScan()); - // Intersect with existing start/stop key if the table is salted - // If not salted, we've already intersected it. If salted, we need - // to wait until now to intersect, as we're running parallel scans - // on all the possible regions here. - if (tableRef.getTable().getBucketNum() != null) { - KeyRange minMaxRange = context.getMinMaxRange(); - if (minMaxRange != null) { - // Add salt byte based on current split, as minMaxRange won't have it - minMaxRange = SaltingUtil.addSaltByte(split.getLowerRange(), minMaxRange); - // FIXME: seems like this should be possible when we set the scan start/stop - // in StatementContext.setScanRanges(). If it doesn't intersect the range for - // one salt byte, I don't see how it could intersect it with any of them. - if (!ScanUtil.intersectScanRange(splitScan, minMaxRange.getLowerRange(), minMaxRange.getUpperRange())) { - continue; // Skip this chunk if no intersection based on minMaxRange - } - } - } else if (localIndex) { - // Used to detect stale region boundary information on server side - splitScan.setAttribute(EXPECTED_UPPER_REGION_KEY, split.getUpperRange()); - if (splitScan.getStartRow().length != 0 || splitScan.getStopRow().length != 0) { - SaltingUtil.addRegionStartKeyToScanStartAndStopRows(split.getLowerRange(),split.getUpperRange(), - splitScan); - } - } - if (ScanUtil.intersectScanRange(splitScan, split.getLowerRange(), split.getUpperRange(), this.context.getScanRanges().useSkipScanFilter())) { - Future<PeekingResultIterator> future = - executor.submit(Tracing.wrap(new JobCallable<PeekingResultIterator>() { + // Pre-populate nestedFutures lists so that we can shuffle the scans + // and add the future to the right nested list. By shuffling the scans + // we get better utilization of the cluster since our thread executor + // will spray the scans across machines as opposed to targeting a + // single one since the scans are in row key order. + List<ScanLocation> scanLocations = Lists.newArrayListWithExpectedSize(estFlattenedSize); + for (int i = 0; i < nestedScans.size(); i++) { + List<Scan> scans = nestedScans.get(i); + List<Pair<Scan,Future<PeekingResultIterator>>> futures = Lists.newArrayListWithExpectedSize(scans.size()); + nestedFutures.add(futures); + for (int j = 0; j < scans.size(); j++) { + Scan scan = nestedScans.get(i).get(j); + scanLocations.add(new ScanLocation(scan, i, j)); + futures.add(null); // placeholder + } + } + Collections.shuffle(scanLocations); + for (ScanLocation scanLocation : scanLocations) { + final Scan scan = scanLocation.getScan(); + Future<PeekingResultIterator> future = + executor.submit(Tracing.wrap(new JobCallable<PeekingResultIterator>() { - @Override - public PeekingResultIterator call() throws Exception { - long startTime = System.currentTimeMillis(); - ResultIterator scanner = new TableResultIterator(context, tableRef, splitScan); - if (logger.isDebugEnabled()) { - logger.debug(LogUtil.addCustomAnnotations("Id: " + scanId + ", Time: " + (System.currentTimeMillis() - startTime) + "ms, Scan: " + splitScan, ScanUtil.getCustomAnnotations(splitScan))); - } - return iteratorFactory.newIterator(context, scanner, splitScan); + @Override + public PeekingResultIterator call() throws Exception { + long startTime = System.currentTimeMillis(); + ResultIterator scanner = new TableResultIterator(context, tableRef, scan); + if (logger.isDebugEnabled()) { + logger.debug(LogUtil.addCustomAnnotations("Id: " + scanId + ", Time: " + (System.currentTimeMillis() - startTime) + "ms, Scan: " + scan, ScanUtil.getCustomAnnotations(scan))); } + return iteratorFactory.newIterator(context, scanner, scan); + } - /** - * Defines the grouping for round robin behavior. All threads spawned to process - * this scan will be grouped together and time sliced with other simultaneously - * executing parallel scans. - */ - @Override - public Object getJobId() { - return ParallelIterators.this; - } - }, "Parallel scanner for table: " + tableRef.getTable().getName().getString())); - futures.add(new Pair<KeyRange,Future<PeekingResultIterator>>(split,future)); - } + /** + * Defines the grouping for round robin behavior. All threads spawned to process + * this scan will be grouped together and time sliced with other simultaneously + * executing parallel scans. + */ + @Override + public Object getJobId() { + return ParallelIterators.this; + } + }, "Parallel scanner for table: " + tableRef.getTable().getName().getString())); + // Add our future in the right place so that we can concatenate the + // results of the inner futures versus merge sorting across all of them. + nestedFutures.get(scanLocation.getOuterListIndex()).set(scanLocation.getInnerListIndex(), new Pair<Scan,Future<PeekingResultIterator>>(scan,future)); } - } @Override public int size() { - return this.splits.size(); + return this.scans.size(); } @Override @@ -418,6 +638,6 @@ public class ParallelIterators extends ExplainTable implements ResultIterators { @Override public String toString() { - return "ParallelIterators [splits=" + splits + "]"; + return "ParallelIterators [scans=" + scans + "]"; } } \ No newline at end of file
http://git-wip-us.apache.org/repos/asf/phoenix/blob/d018cc1c/phoenix-core/src/main/java/org/apache/phoenix/iterate/SkipRangeParallelIteratorRegionSplitter.java ---------------------------------------------------------------------- diff --git a/phoenix-core/src/main/java/org/apache/phoenix/iterate/SkipRangeParallelIteratorRegionSplitter.java b/phoenix-core/src/main/java/org/apache/phoenix/iterate/SkipRangeParallelIteratorRegionSplitter.java deleted file mode 100644 index 81f5af6..0000000 --- a/phoenix-core/src/main/java/org/apache/phoenix/iterate/SkipRangeParallelIteratorRegionSplitter.java +++ /dev/null @@ -1,83 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ -package org.apache.phoenix.iterate; - -import java.sql.SQLException; -import java.util.List; - -import org.apache.hadoop.hbase.HRegionLocation; -import org.apache.phoenix.compile.ScanRanges; -import org.apache.phoenix.compile.StatementContext; -import org.apache.phoenix.parse.HintNode; -import org.apache.phoenix.query.KeyRange; -import org.apache.phoenix.schema.PTable; -import org.apache.phoenix.schema.SaltingUtil; - -import com.google.common.base.Predicate; -import com.google.common.collect.Iterables; -import com.google.common.collect.Lists; - - -/** - * Split the region according to the information contained in the scan's SkipScanFilter. - */ -public class SkipRangeParallelIteratorRegionSplitter extends DefaultParallelIteratorRegionSplitter { - - public static SkipRangeParallelIteratorRegionSplitter getInstance(StatementContext context, PTable table, HintNode hintNode) { - return new SkipRangeParallelIteratorRegionSplitter(context, table, hintNode); - } - - protected SkipRangeParallelIteratorRegionSplitter(StatementContext context, PTable table, HintNode hintNode) { - super(context, table, hintNode); - } - - @Override - protected List<HRegionLocation> getAllRegions() throws SQLException { - List<HRegionLocation> allTableRegions = context.getConnection().getQueryServices().getAllTableRegions(table.getPhysicalName().getBytes()); - return filterRegions(allTableRegions, context.getScanRanges()); - } - - public List<HRegionLocation> filterRegions(List<HRegionLocation> allTableRegions, final ScanRanges ranges) { - Iterable<HRegionLocation> regions; - if (ranges == ScanRanges.EVERYTHING) { - return allTableRegions; - } else if (ranges == ScanRanges.NOTHING) { // TODO: why not emptyList? - return Lists.<HRegionLocation>newArrayList(); - } else { - regions = Iterables.filter(allTableRegions, - new Predicate<HRegionLocation>() { - @Override - public boolean apply(HRegionLocation region) { - KeyRange minMaxRange = context.getMinMaxRange(); - if (minMaxRange != null) { - KeyRange range = KeyRange.getKeyRange(region.getRegionInfo().getStartKey(), region.getRegionInfo().getEndKey()); - if (table.getBucketNum() != null) { - // Add salt byte, as minMaxRange won't have it - minMaxRange = SaltingUtil.addSaltByte(region.getRegionInfo().getStartKey(), minMaxRange); - } - range = range.intersect(minMaxRange); - return ranges.intersect(range.getLowerRange(), range.getUpperRange()); - } - return ranges.intersect(region.getRegionInfo().getStartKey(), region.getRegionInfo().getEndKey()); - } - }); - } - return Lists.newArrayList(regions); - } - -} http://git-wip-us.apache.org/repos/asf/phoenix/blob/d018cc1c/phoenix-core/src/main/java/org/apache/phoenix/jdbc/PhoenixStatement.java ---------------------------------------------------------------------- diff --git a/phoenix-core/src/main/java/org/apache/phoenix/jdbc/PhoenixStatement.java b/phoenix-core/src/main/java/org/apache/phoenix/jdbc/PhoenixStatement.java index f7d6e14..4f67d4f 100644 --- a/phoenix-core/src/main/java/org/apache/phoenix/jdbc/PhoenixStatement.java +++ b/phoenix-core/src/main/java/org/apache/phoenix/jdbc/PhoenixStatement.java @@ -432,6 +432,11 @@ public class PhoenixStatement implements Statement, SQLCloseable, org.apache.pho public boolean isDegenerate() { return false; } + + @Override + public boolean isRowKeyOrdered() { + return true; + } }; } http://git-wip-us.apache.org/repos/asf/phoenix/blob/d018cc1c/phoenix-core/src/main/java/org/apache/phoenix/query/KeyRange.java ---------------------------------------------------------------------- diff --git a/phoenix-core/src/main/java/org/apache/phoenix/query/KeyRange.java b/phoenix-core/src/main/java/org/apache/phoenix/query/KeyRange.java index 68f786a..afcc741 100644 --- a/phoenix-core/src/main/java/org/apache/phoenix/query/KeyRange.java +++ b/phoenix-core/src/main/java/org/apache/phoenix/query/KeyRange.java @@ -210,6 +210,10 @@ public class KeyRange implements Writable { return compareLowerToUpperBound(b,o,l,true); } + public int compareLowerToUpperBound( byte[] b) { + return compareLowerToUpperBound(b,0,b.length); + } + /** * Compares a lower bound against an upper bound * @param b upper bound byte array @@ -237,6 +241,10 @@ public class KeyRange implements Writable { return 1; } + public int compareUpperToLowerBound(byte[] b) { + return compareUpperToLowerBound(b,0,b.length); + } + public int compareUpperToLowerBound(byte[] b, int o, int l) { return compareUpperToLowerBound(b,o,l, true); } http://git-wip-us.apache.org/repos/asf/phoenix/blob/d018cc1c/phoenix-core/src/main/java/org/apache/phoenix/query/StatsManager.java ---------------------------------------------------------------------- diff --git a/phoenix-core/src/main/java/org/apache/phoenix/query/StatsManager.java b/phoenix-core/src/main/java/org/apache/phoenix/query/StatsManager.java deleted file mode 100644 index df55fb5..0000000 --- a/phoenix-core/src/main/java/org/apache/phoenix/query/StatsManager.java +++ /dev/null @@ -1,59 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ -package org.apache.phoenix.query; - -import java.sql.SQLException; - -import org.apache.phoenix.schema.TableRef; - - -/** - * - * Interface for managing and caching table statistics. - * The frequency of updating the table statistics are controlled - * by {@link org.apache.phoenix.query.QueryServices#STATS_UPDATE_FREQ_MS_ATTRIB}. - * Table stats may also be manually updated through {@link #updateStats(TableRef)}. - * - * - * - * @since 0.1 - */ -public interface StatsManager { - /** - * Get the minimum key for the given table - * @param table the table - * @return the minimum key or null if unknown - */ - byte[] getMinKey(TableRef table); - - /** - * Get the maximum key for the given table - * @param table the table - * @return the maximum key or null if unknown - */ - byte[] getMaxKey(TableRef table); - - /** - * Manually update the cached table statistics - * @param table the table - * @throws SQLException - */ - void updateStats(TableRef table) throws SQLException; - - void clearStats() throws SQLException; -} http://git-wip-us.apache.org/repos/asf/phoenix/blob/d018cc1c/phoenix-core/src/main/java/org/apache/phoenix/query/StatsManagerImpl.java ---------------------------------------------------------------------- diff --git a/phoenix-core/src/main/java/org/apache/phoenix/query/StatsManagerImpl.java b/phoenix-core/src/main/java/org/apache/phoenix/query/StatsManagerImpl.java deleted file mode 100644 index 905802b..0000000 --- a/phoenix-core/src/main/java/org/apache/phoenix/query/StatsManagerImpl.java +++ /dev/null @@ -1,218 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ -package org.apache.phoenix.query; - -import java.io.IOException; -import java.sql.SQLException; -import java.util.Arrays; -import java.util.concurrent.Callable; -import java.util.concurrent.ConcurrentHashMap; -import java.util.concurrent.ConcurrentMap; - -import org.apache.hadoop.hbase.HConstants; -import org.apache.hadoop.hbase.client.HTableInterface; -import org.apache.hadoop.hbase.client.Result; -import org.apache.hadoop.hbase.client.ResultScanner; -import org.apache.hadoop.hbase.client.Scan; -import org.apache.hadoop.hbase.filter.KeyOnlyFilter; -import org.apache.phoenix.schema.TableRef; -import org.apache.phoenix.util.SchemaUtil; -import org.apache.phoenix.util.ServerUtil; -import org.apache.phoenix.util.TimeKeeper; - - -/** - * - * Implementation of StatsManager. Table stats are updated asynchronously when they're - * accessed and past time-to-live. In this case, future calls (after the asynchronous - * call has completed), will have the updated stats. - * - * All tables share the same HBase connection for a given connection and each connection - * will have it's own cache for these stats. This isn't ideal and will get reworked when - * the schema is kept on the server side. It's ok for now because: - * 1) we only ask the server for these stats when the start/end region is queried against - * 2) the query to get the stats pulls a single row so it's very cheap - * 3) it's async and if it takes too long it won't lead to anything except less optimal - * parallelization. - * - * - * @since 0.1 - */ -public class StatsManagerImpl implements StatsManager { - private final ConnectionQueryServices services; - private final int statsUpdateFrequencyMs; - private final int maxStatsAgeMs; - private final TimeKeeper timeKeeper; - private final ConcurrentMap<String,PTableStats> tableStatsMap = new ConcurrentHashMap<String,PTableStats>(); - - public StatsManagerImpl(ConnectionQueryServices services, int statsUpdateFrequencyMs, int maxStatsAgeMs) { - this(services, statsUpdateFrequencyMs, maxStatsAgeMs, TimeKeeper.SYSTEM); - } - - public StatsManagerImpl(ConnectionQueryServices services, int statsUpdateFrequencyMs, int maxStatsAgeMs, TimeKeeper timeKeeper) { - this.services = services; - this.statsUpdateFrequencyMs = statsUpdateFrequencyMs; - this.maxStatsAgeMs = maxStatsAgeMs; - this.timeKeeper = timeKeeper; - } - - public long getStatsUpdateFrequency() { - return statsUpdateFrequencyMs; - } - - @Override - public void updateStats(TableRef tableRef) throws SQLException { - SQLException sqlE = null; - HTableInterface hTable = services.getTable(tableRef.getTable().getPhysicalName().getBytes()); - try { - byte[] minKey = null, maxKey = null; - // Do a key-only scan to get the first row of a table. This is the min - // key for the table. - Scan scan = new Scan(HConstants.EMPTY_START_ROW, new KeyOnlyFilter()); - ResultScanner scanner = hTable.getScanner(scan); - try { - Result r = scanner.next(); - if (r != null) { - minKey = r.getRow(); - } - } finally { - scanner.close(); - } - - // Get max possible key value - scan = new Scan(); - scan.setFilter(new KeyOnlyFilter()); - scan.setReversed(true); - scanner = hTable.getScanner(scan); - try { - Result r = scanner.next(); - if (r != null) { - maxKey = r.getRow(); - } - } finally { - scanner.close(); - } - tableStatsMap.put(tableRef.getTable().getName().getString(), new PTableStats(timeKeeper.getCurrentTime(),minKey,maxKey)); - } catch (IOException e) { - sqlE = ServerUtil.parseServerException(e); - } finally { - try { - hTable.close(); - } catch (IOException e) { - if (sqlE == null) { - sqlE = ServerUtil.parseServerException(e); - } else { - sqlE.setNextException(ServerUtil.parseServerException(e)); - } - } finally { - if (sqlE != null) { - throw sqlE; - } - } - } - } - - private PTableStats getStats(final TableRef table) { - PTableStats stats = tableStatsMap.get(table); - if (stats == null) { - PTableStats newStats = new PTableStats(); - stats = tableStatsMap.putIfAbsent(table.getTable().getName().getString(), newStats); - stats = stats == null ? newStats : stats; - } - // Synchronize on the current stats for a table to prevent - // multiple attempts to update the stats. - synchronized (stats) { - long initiatedTime = stats.getInitiatedTime(); - long currentTime = timeKeeper.getCurrentTime(); - // Update stats asynchronously if they haven't been updated within the specified frequency. - // We update asynchronously because we don't ever want to block the caller - instead we'll continue - // to use the old one. - if ( currentTime - initiatedTime >= getStatsUpdateFrequency()) { - stats.setInitiatedTime(currentTime); - services.getExecutor().submit(new Callable<Void>() { - - @Override - public Void call() throws Exception { // TODO: will exceptions be logged? - updateStats(table); - return null; - } - - }); - } - // If the stats are older than the max age, use an empty stats - if (currentTime - stats.getCompletedTime() >= maxStatsAgeMs) { - return PTableStats.NO_STATS; - } - } - return stats; - } - - @Override - public byte[] getMinKey(TableRef table) { - PTableStats stats = getStats(table); - return stats.getMinKey(); - } - - @Override - public byte[] getMaxKey(TableRef table) { - PTableStats stats = getStats(table); - return stats.getMaxKey(); - } - - private static class PTableStats { - private static final PTableStats NO_STATS = new PTableStats(); - private long initiatedTime; - private final long completedTime; - private final byte[] minKey; - private final byte[] maxKey; - - public PTableStats() { - this(-1,null,null); - } - public PTableStats(long completedTime, byte[] minKey, byte[] maxKey) { - this.minKey = minKey; - this.maxKey = maxKey; - this.completedTime = this.initiatedTime = completedTime; - } - - private byte[] getMinKey() { - return minKey; - } - - private byte[] getMaxKey() { - return maxKey; - } - - private long getCompletedTime() { - return completedTime; - } - - private void setInitiatedTime(long initiatedTime) { - this.initiatedTime = initiatedTime; - } - - private long getInitiatedTime() { - return initiatedTime; - } - } - - @Override - public void clearStats() throws SQLException { - tableStatsMap.clear(); - } -} http://git-wip-us.apache.org/repos/asf/phoenix/blob/d018cc1c/phoenix-core/src/main/java/org/apache/phoenix/schema/stat/StatisticsCollector.java ---------------------------------------------------------------------- diff --git a/phoenix-core/src/main/java/org/apache/phoenix/schema/stat/StatisticsCollector.java b/phoenix-core/src/main/java/org/apache/phoenix/schema/stat/StatisticsCollector.java index 6b45c5e..82ae309 100644 --- a/phoenix-core/src/main/java/org/apache/phoenix/schema/stat/StatisticsCollector.java +++ b/phoenix-core/src/main/java/org/apache/phoenix/schema/stat/StatisticsCollector.java @@ -38,6 +38,7 @@ import org.apache.hadoop.hbase.regionserver.ScanType; import org.apache.hadoop.hbase.regionserver.Store; import org.apache.hadoop.hbase.regionserver.StoreScanner; import org.apache.hadoop.hbase.util.Bytes; +import org.apache.hadoop.hbase.util.Pair; import org.apache.phoenix.hbase.index.util.ImmutableBytesPtr; import org.apache.phoenix.query.QueryServices; import org.apache.phoenix.query.QueryServicesOptions; @@ -57,8 +58,7 @@ public class StatisticsCollector { private Map<String, byte[]> minMap = Maps.newHashMap(); private Map<String, byte[]> maxMap = Maps.newHashMap(); private long guidepostDepth; - private long byteCount = 0; - private Map<String, List<byte[]>> guidePostsMap = Maps.newHashMap(); + private Map<String, Pair<Integer,List<byte[]>>> guidePostsMap = Maps.newHashMap(); private Map<ImmutableBytesPtr, Boolean> familyMap = Maps.newHashMap(); protected StatisticsTable statsTable; // Ensures that either analyze or compaction happens at any point of time. @@ -135,7 +135,6 @@ public class StatisticsCollector { List<Cell> results = new ArrayList<Cell>(); boolean hasMore = true; while (hasMore) { - // Am getting duplicates here. Need to avoid that hasMore = scanner.next(results); collectStatistics(results); count += results.size(); @@ -289,19 +288,21 @@ public class StatisticsCollector { maxMap.put(fam, row); } } - byteCount += kv.getLength(); // TODO : This can be moved to an interface so that we could collect guide posts in different ways + Pair<Integer,List<byte[]>> gps = guidePostsMap.get(fam); + if (gps == null) { + gps = new Pair<Integer,List<byte[]>>(0, Lists.<byte[]>newArrayList()); + guidePostsMap.put(fam, gps); + } + int byteCount = gps.getFirst() + kv.getLength(); + gps.setFirst(byteCount); if (byteCount >= guidepostDepth) { - if (guidePostsMap.get(fam) != null) { - guidePostsMap.get(fam).add( - row); - } else { - List<byte[]> guidePosts = new ArrayList<byte[]>(); - guidePosts.add(row); - guidePostsMap.put(fam, guidePosts); + // Prevent dups + List<byte[]> gpsKeys = gps.getSecond(); + if (gpsKeys.isEmpty() || Bytes.compareTo(row, gpsKeys.get(gpsKeys.size()-1)) > 0) { + gpsKeys.add(row); + gps.setFirst(0); // Only reset count when adding guidepost } - // reset the count for the next key - byteCount = 0; } } @@ -317,16 +318,19 @@ public class StatisticsCollector { public byte[] getGuidePosts(String fam) { if (!guidePostsMap.isEmpty()) { - List<byte[]> guidePosts = guidePostsMap.get(fam); - if (guidePosts != null) { - byte[][] array = new byte[guidePosts.size()][]; - int i = 0; - for (byte[] element : guidePosts) { - array[i] = element; - i++; + Pair<Integer,List<byte[]>> gps = guidePostsMap.get(fam); + if (gps != null) { + List<byte[]> guidePosts = gps.getSecond(); + if (!guidePosts.isEmpty()) { + byte[][] array = new byte[guidePosts.size()][]; + int i = 0; + for (byte[] element : guidePosts) { + array[i] = element; + i++; + } + PhoenixArray phoenixArray = new PhoenixArray(PDataType.VARBINARY, array); + return PDataType.VARBINARY_ARRAY.toBytes(phoenixArray); } - PhoenixArray phoenixArray = new PhoenixArray(PDataType.VARBINARY, array); - return PDataType.VARBINARY_ARRAY.toBytes(phoenixArray); } } return null; http://git-wip-us.apache.org/repos/asf/phoenix/blob/d018cc1c/phoenix-core/src/main/java/org/apache/phoenix/schema/stat/StatisticsTable.java ---------------------------------------------------------------------- diff --git a/phoenix-core/src/main/java/org/apache/phoenix/schema/stat/StatisticsTable.java b/phoenix-core/src/main/java/org/apache/phoenix/schema/stat/StatisticsTable.java index e92d61e..bc769e3 100644 --- a/phoenix-core/src/main/java/org/apache/phoenix/schema/stat/StatisticsTable.java +++ b/phoenix-core/src/main/java/org/apache/phoenix/schema/stat/StatisticsTable.java @@ -33,6 +33,7 @@ import org.apache.hadoop.hbase.client.Put; import org.apache.phoenix.jdbc.PhoenixDatabaseMetaData; import org.apache.phoenix.query.QueryConstants; import org.apache.phoenix.schema.PDataType; +import org.apache.phoenix.util.ByteUtil; /** * Wrapper to access the statistics table SYSTEM.STATS using the HTable. @@ -56,7 +57,7 @@ public class StatisticsTable implements Closeable { if (table == null) { // Map the statics table and the table with which the statistics is // associated. This is a workaround - HTablePool pool = new HTablePool(conf,1); + HTablePool pool = new HTablePool(conf,100); //HTable hTable = new HTable(conf, PhoenixDatabaseMetaData.SYSTEM_STATS_NAME); HTableInterface hTable = pool.getTable(PhoenixDatabaseMetaData.SYSTEM_STATS_NAME); //h.setAutoFlushTo(true); @@ -130,6 +131,9 @@ public class StatisticsTable implements Closeable { currentTime, PDataType.VARBINARY.toBytes(tracker.getMinKey(fam))); put.add(QueryConstants.DEFAULT_COLUMN_FAMILY_BYTES, PhoenixDatabaseMetaData.MAX_KEY_BYTES, currentTime, PDataType.VARBINARY.toBytes(tracker.getMaxKey(fam))); + // Add our empty column value so queries behave correctly + put.add(QueryConstants.DEFAULT_COLUMN_FAMILY_BYTES, QueryConstants.EMPTY_COLUMN_BYTES, + currentTime, ByteUtil.EMPTY_BYTE_ARRAY); mutations.add(put); } http://git-wip-us.apache.org/repos/asf/phoenix/blob/d018cc1c/phoenix-core/src/main/java/org/apache/phoenix/util/ScanUtil.java ---------------------------------------------------------------------- diff --git a/phoenix-core/src/main/java/org/apache/phoenix/util/ScanUtil.java b/phoenix-core/src/main/java/org/apache/phoenix/util/ScanUtil.java index fc79173..e321c9c 100644 --- a/phoenix-core/src/main/java/org/apache/phoenix/util/ScanUtil.java +++ b/phoenix-core/src/main/java/org/apache/phoenix/util/ScanUtil.java @@ -46,7 +46,6 @@ import org.apache.phoenix.query.KeyRange; import org.apache.phoenix.query.KeyRange.Bound; import org.apache.phoenix.query.QueryConstants; import org.apache.phoenix.schema.PDataType; -import org.apache.phoenix.schema.PTable; import org.apache.phoenix.schema.RowKeySchema; import com.google.common.collect.Lists; @@ -60,6 +59,7 @@ import com.google.common.collect.Lists; * @since 0.1 */ public class ScanUtil { + public static final int[] SINGLE_COLUMN_SLOT_SPAN = new int[1]; /* * Max length that we fill our key when we turn an inclusive key * into a exclusive key. @@ -68,7 +68,7 @@ public class ScanUtil { static { Arrays.fill(MAX_FILL_LENGTH_FOR_PREVIOUS_KEY, (byte)-1); } - public static final int[] SINGLE_COLUMN_SLOT_SPAN = new int[1]; + private static final byte[] ZERO_BYTE_ARRAY = new byte[1024]; private ScanUtil() { } @@ -264,7 +264,7 @@ public class ScanUtil { private static byte[] getKey(RowKeySchema schema, List<List<KeyRange>> slots, int[] slotSpan, Bound bound) { if (slots.isEmpty()) { - return null; + return KeyRange.UNBOUND; } int[] position = new int[slots.size()]; int maxLength = 0; @@ -276,7 +276,7 @@ public class ScanUtil { byte[] key = new byte[maxLength]; int length = setKey(schema, slots, slotSpan, position, bound, key, 0, 0, position.length); if (length == 0) { - return null; + return KeyRange.UNBOUND; } if (length == maxLength) { return key; @@ -439,9 +439,35 @@ public class ScanUtil { return keyRanges; } - public static byte[] nextKey(byte[] key, PTable table, ImmutableBytesWritable ptr) { + /** + * Converts a partially qualified KeyRange into a KeyRange with a + * inclusive lower bound and an exclusive upper bound, widening + * as necessary. + */ + public static KeyRange convertToInclusiveExclusiveRange (KeyRange partialRange, RowKeySchema schema, ImmutableBytesWritable ptr) { + // Ensure minMaxRange is lower inclusive and upper exclusive, as that's + // what we need to intersect against for the HBase scan. + byte[] lowerRange = partialRange.getLowerRange(); + if (!partialRange.lowerUnbound()) { + if (!partialRange.isLowerInclusive()) { + lowerRange = ScanUtil.nextKey(lowerRange, schema, ptr); + } + } + + byte[] upperRange = partialRange.getUpperRange(); + if (!partialRange.upperUnbound()) { + if (partialRange.isUpperInclusive()) { + upperRange = ScanUtil.nextKey(upperRange, schema, ptr); + } + } + if (partialRange.getLowerRange() != lowerRange || partialRange.getUpperRange() != upperRange) { + partialRange = KeyRange.getKeyRange(lowerRange, upperRange); + } + return partialRange; + } + + private static byte[] nextKey(byte[] key, RowKeySchema schema, ImmutableBytesWritable ptr) { int pos = 0; - RowKeySchema schema = table.getRowKeySchema(); int maxOffset = schema.iterator(key, ptr); while (schema.next(ptr, pos, maxOffset) != null) { pos++; @@ -500,6 +526,10 @@ public class ScanUtil { } } + public static int getRowKeyOffset(byte[] regionStartKey, byte[] regionEndKey) { + return regionStartKey.length > 0 ? regionStartKey.length : regionEndKey.length; + } + private static void setRowKeyOffset(Filter filter, int offset) { if (filter instanceof BooleanExpressionFilter) { BooleanExpressionFilter boolFilter = (BooleanExpressionFilter)filter; @@ -566,4 +596,31 @@ public class ScanUtil { public static boolean isAnalyzeTable(Scan scan) { return scan.getAttribute((BaseScannerRegionObserver.ANALYZE_TABLE)) != null; } + + public static boolean crossesPrefixBoundary(byte[] key, byte[] prefixBytes, int prefixLength) { + if (key.length < prefixLength) { + return true; + } + if (prefixBytes.length >= prefixLength) { + return Bytes.compareTo(prefixBytes, 0, prefixLength, key, 0, prefixLength) != 0; + } + return hasNonZeroLeadingBytes(key, prefixLength); + } + + public static byte[] getPrefix(byte[] startKey, int prefixLength) { + // If startKey is at beginning, then our prefix will be a null padded byte array + return startKey.length >= prefixLength ? startKey : ByteUtil.EMPTY_BYTE_ARRAY; + } + + private static boolean hasNonZeroLeadingBytes(byte[] key, int nBytesToCheck) { + if (nBytesToCheck > ZERO_BYTE_ARRAY.length) { + do { + if (Bytes.compareTo(key, nBytesToCheck - ZERO_BYTE_ARRAY.length, ZERO_BYTE_ARRAY.length, ScanUtil.ZERO_BYTE_ARRAY, 0, ScanUtil.ZERO_BYTE_ARRAY.length) != 0) { + return true; + } + nBytesToCheck -= ZERO_BYTE_ARRAY.length; + } while (nBytesToCheck > ZERO_BYTE_ARRAY.length); + } + return Bytes.compareTo(key, 0, nBytesToCheck, ZERO_BYTE_ARRAY, 0, nBytesToCheck) != 0; + } } \ No newline at end of file http://git-wip-us.apache.org/repos/asf/phoenix/blob/d018cc1c/phoenix-core/src/test/java/org/apache/phoenix/compile/ScanRangesIntersectTest.java ---------------------------------------------------------------------- diff --git a/phoenix-core/src/test/java/org/apache/phoenix/compile/ScanRangesIntersectTest.java b/phoenix-core/src/test/java/org/apache/phoenix/compile/ScanRangesIntersectTest.java new file mode 100644 index 0000000..be90399 --- /dev/null +++ b/phoenix-core/src/test/java/org/apache/phoenix/compile/ScanRangesIntersectTest.java @@ -0,0 +1,105 @@ +/* + * Copyright 2010 The Apache Software Foundation + * + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + *distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you maynot use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicablelaw or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.phoenix.compile; + +import static org.junit.Assert.assertEquals; +import static org.junit.Assert.assertNotNull; +import static org.junit.Assert.assertNull; + +import java.util.Collections; +import java.util.List; + +import org.apache.hadoop.hbase.client.Scan; +import org.apache.phoenix.filter.SkipScanFilter; +import org.apache.phoenix.query.KeyRange; +import org.apache.phoenix.schema.PDataType; +import org.apache.phoenix.schema.PDatum; +import org.apache.phoenix.schema.RowKeySchema; +import org.apache.phoenix.schema.RowKeySchema.RowKeySchemaBuilder; +import org.apache.phoenix.schema.SortOrder; +import org.apache.phoenix.util.ScanUtil; +import org.junit.Test; + +import com.google.common.collect.Lists; + +public class ScanRangesIntersectTest { + + @Test + public void testPointLookupIntersect() throws Exception { + RowKeySchema schema = schema(); + int[] slotSpan = ScanUtil.SINGLE_COLUMN_SLOT_SPAN; + List<KeyRange> keys = points("a","j","m","z"); + ScanRanges ranges = ScanRanges.create(schema, Collections.singletonList(keys), slotSpan); + assertIntersect(ranges, "b", "l", "j"); + + } + + private static void assertIntersect(ScanRanges ranges, String lowerRange, String upperRange, String... expectedPoints) { + List<KeyRange> expectedKeys = points(expectedPoints); + Collections.sort(expectedKeys,KeyRange.COMPARATOR); + Scan scan = new Scan(); + scan.setFilter(ranges.getSkipScanFilter()); + byte[] startKey = lowerRange == null ? KeyRange.UNBOUND : PDataType.VARCHAR.toBytes(lowerRange); + byte[] stopKey = upperRange == null ? KeyRange.UNBOUND : PDataType.VARCHAR.toBytes(upperRange); + Scan newScan = ranges.intersectScan(scan, startKey, stopKey, 0); + if (expectedPoints.length == 0) { + assertNull(newScan); + } else { + assertNotNull(newScan); + SkipScanFilter filter = (SkipScanFilter)newScan.getFilter(); + assertEquals(expectedKeys, filter.getSlots().get(0)); + } + } + + private static List<KeyRange> points(String... points) { + List<KeyRange> keys = Lists.newArrayListWithExpectedSize(points.length); + for (String point : points) { + keys.add(KeyRange.getKeyRange(PDataType.VARCHAR.toBytes(point))); + } + return keys; + } + + private static RowKeySchema schema() { + RowKeySchemaBuilder builder = new RowKeySchemaBuilder(1); + builder.addField(new PDatum() { + @Override + public boolean isNullable() { + return false; + } + @Override + public PDataType getDataType() { + return PDataType.VARCHAR; + } + @Override + public Integer getMaxLength() { + return null; + } + @Override + public Integer getScale() { + return null; + } + @Override + public SortOrder getSortOrder() { + return SortOrder.getDefault(); + } + }, false, SortOrder.getDefault()); + return builder.build(); + } +} http://git-wip-us.apache.org/repos/asf/phoenix/blob/d018cc1c/phoenix-core/src/test/java/org/apache/phoenix/compile/ScanRangesTest.java ---------------------------------------------------------------------- diff --git a/phoenix-core/src/test/java/org/apache/phoenix/compile/ScanRangesTest.java b/phoenix-core/src/test/java/org/apache/phoenix/compile/ScanRangesTest.java index cd88ce7..695c4c9 100644 --- a/phoenix-core/src/test/java/org/apache/phoenix/compile/ScanRangesTest.java +++ b/phoenix-core/src/test/java/org/apache/phoenix/compile/ScanRangesTest.java @@ -72,7 +72,7 @@ public class ScanRangesTest { // incrementing the key too much. upperExclusiveKey = ByteUtil.nextKey(upperExclusiveKey); } - assertEquals(expectedResult, scanRanges.intersect(lowerInclusiveKey,upperExclusiveKey)); + assertEquals(expectedResult, scanRanges.intersects(lowerInclusiveKey,upperExclusiveKey,0)); } @Parameters(name="{0} {2}") http://git-wip-us.apache.org/repos/asf/phoenix/blob/d018cc1c/phoenix-core/src/test/java/org/apache/phoenix/compile/WhereCompilerTest.java ---------------------------------------------------------------------- diff --git a/phoenix-core/src/test/java/org/apache/phoenix/compile/WhereCompilerTest.java b/phoenix-core/src/test/java/org/apache/phoenix/compile/WhereCompilerTest.java index 3c0a952..063728c 100644 --- a/phoenix-core/src/test/java/org/apache/phoenix/compile/WhereCompilerTest.java +++ b/phoenix-core/src/test/java/org/apache/phoenix/compile/WhereCompilerTest.java @@ -165,17 +165,22 @@ public class WhereCompilerTest extends BaseConnectionlessQueryTest { KeyRange.getKeyRange(startKey2))); if (Bytes.compareTo(startKey1, startKey2) > 0) { expectedStartKey = startKey2; - expectedEndKey = ByteUtil.concat(startKey1, QueryConstants.SEPARATOR_BYTE_ARRAY); + expectedEndKey = startKey1; Collections.reverse(expectedRanges.get(0)); } else { expectedStartKey = startKey1; - expectedEndKey = ByteUtil.concat(startKey2, QueryConstants.SEPARATOR_BYTE_ARRAY);; + expectedEndKey = startKey2; } - assertTrue(Bytes.compareTo(expectedStartKey, startKey) == 0); - assertTrue(Bytes.compareTo(expectedEndKey, stopKey) == 0); + assertEquals(0,startKey.length); + assertEquals(0,stopKey.length); assertNotNull(filter); assertTrue(filter instanceof SkipScanFilter); + SkipScanFilter skipScanFilter = (SkipScanFilter)filter; + assertEquals(1,skipScanFilter.getSlots().size()); + assertEquals(2,skipScanFilter.getSlots().get(0).size()); + assertArrayEquals(expectedStartKey, skipScanFilter.getSlots().get(0).get(0).getLowerRange()); + assertArrayEquals(expectedEndKey, skipScanFilter.getSlots().get(0).get(1).getLowerRange()); StatementContext context = plan.getContext(); ScanRanges scanRanges = context.getScanRanges(); List<List<KeyRange>> ranges = scanRanges.getRanges(); http://git-wip-us.apache.org/repos/asf/phoenix/blob/d018cc1c/phoenix-core/src/test/java/org/apache/phoenix/compile/WhereOptimizerTest.java ---------------------------------------------------------------------- diff --git a/phoenix-core/src/test/java/org/apache/phoenix/compile/WhereOptimizerTest.java b/phoenix-core/src/test/java/org/apache/phoenix/compile/WhereOptimizerTest.java index bd19663..032768b 100644 --- a/phoenix-core/src/test/java/org/apache/phoenix/compile/WhereOptimizerTest.java +++ b/phoenix-core/src/test/java/org/apache/phoenix/compile/WhereOptimizerTest.java @@ -1185,9 +1185,8 @@ public class WhereOptimizerTest extends BaseConnectionlessQueryTest { StatementContext context = compileStatement(query, binds); Scan scan = context.getScan(); Filter filter = scan.getFilter(); - assertNotNull(filter); - assertTrue(filter instanceof SkipScanFilter); - byte[] expectedStartRow = ByteUtil.concat(PDataType.VARCHAR.toBytes(tenantId), PDataType.VARCHAR.toBytes(entityId)); + assertNull(filter); + byte[] expectedStartRow = ByteUtil.concat(PDataType.VARCHAR.toBytes(tenantId), PDataType.VARCHAR.toBytes(entityId2)); byte[] expectedStopRow = ByteUtil.concat(ByteUtil.concat(PDataType.VARCHAR.toBytes(tenantId), PDataType.VARCHAR.toBytes(entityId2)), QueryConstants.SEPARATOR_BYTE_ARRAY); assertArrayEquals(expectedStartRow, scan.getStartRow()); assertArrayEquals(expectedStopRow, scan.getStopRow()); http://git-wip-us.apache.org/repos/asf/phoenix/blob/d018cc1c/phoenix-core/src/test/java/org/apache/phoenix/query/BaseConnectionlessQueryTest.java ---------------------------------------------------------------------- diff --git a/phoenix-core/src/test/java/org/apache/phoenix/query/BaseConnectionlessQueryTest.java b/phoenix-core/src/test/java/org/apache/phoenix/query/BaseConnectionlessQueryTest.java index ff31f7c..8ac322f 100644 --- a/phoenix-core/src/test/java/org/apache/phoenix/query/BaseConnectionlessQueryTest.java +++ b/phoenix-core/src/test/java/org/apache/phoenix/query/BaseConnectionlessQueryTest.java @@ -27,6 +27,7 @@ import static org.apache.phoenix.util.TestUtil.JOIN_ORDER_TABLE_FULL_NAME; import static org.apache.phoenix.util.TestUtil.JOIN_SUPPLIER_TABLE_FULL_NAME; import static org.apache.phoenix.util.TestUtil.MULTI_CF_NAME; import static org.apache.phoenix.util.TestUtil.PHOENIX_CONNECTIONLESS_JDBC_URL; +import static org.apache.phoenix.util.TestUtil.PTSDB3_NAME; import static org.apache.phoenix.util.TestUtil.PTSDB_NAME; import static org.apache.phoenix.util.TestUtil.TABLE_WITH_ARRAY; import static org.apache.phoenix.util.TestUtil.TEST_PROPERTIES; @@ -103,6 +104,7 @@ public class BaseConnectionlessQueryTest extends BaseTest { ensureTableCreated(getUrl(), ENTITY_HISTORY_TABLE_NAME); ensureTableCreated(getUrl(), FUNKY_NAME); ensureTableCreated(getUrl(), PTSDB_NAME); + ensureTableCreated(getUrl(), PTSDB3_NAME); ensureTableCreated(getUrl(), MULTI_CF_NAME); ensureTableCreated(getUrl(), JOIN_ORDER_TABLE_FULL_NAME); ensureTableCreated(getUrl(), JOIN_CUSTOMER_TABLE_FULL_NAME); @@ -110,7 +112,6 @@ public class BaseConnectionlessQueryTest extends BaseTest { ensureTableCreated(getUrl(), JOIN_SUPPLIER_TABLE_FULL_NAME); ensureTableCreated(getUrl(), TABLE_WITH_ARRAY); Properties props = new Properties(); - //props.setProperty(PhoenixRuntime.CURRENT_SCN_ATTRIB, Long.toString(MetaDataProtocol.MIN_TABLE_TIMESTAMP)); props.setProperty(PhoenixRuntime.CURRENT_SCN_ATTRIB, Long.toString(HConstants.LATEST_TIMESTAMP)); PhoenixConnection conn = DriverManager.getConnection(PHOENIX_CONNECTIONLESS_JDBC_URL, props).unwrap(PhoenixConnection.class); try { http://git-wip-us.apache.org/repos/asf/phoenix/blob/d018cc1c/phoenix-core/src/test/java/org/apache/phoenix/query/QueryPlanTest.java ---------------------------------------------------------------------- diff --git a/phoenix-core/src/test/java/org/apache/phoenix/query/QueryPlanTest.java b/phoenix-core/src/test/java/org/apache/phoenix/query/QueryPlanTest.java new file mode 100644 index 0000000..fd22e47 --- /dev/null +++ b/phoenix-core/src/test/java/org/apache/phoenix/query/QueryPlanTest.java @@ -0,0 +1,179 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.phoenix.query; + +import static org.junit.Assert.assertEquals; + +import java.sql.Connection; +import java.sql.DriverManager; +import java.sql.ResultSet; +import java.sql.Statement; +import java.util.Properties; + +import org.apache.phoenix.util.QueryUtil; +import org.junit.Test; + +public class QueryPlanTest extends BaseConnectionlessQueryTest { + + @Test + public void testExplainPlan() throws Exception { + String[] queryPlans = new String[] { + + "SELECT a_string,b_string FROM atable WHERE organization_id = '000000000000001' AND entity_id > '000000000000002' AND entity_id < '000000000000008' AND (organization_id,entity_id) >= ('000000000000001','000000000000005') ", + "CLIENT PARALLEL 1-WAY RANGE SCAN OVER ATABLE ['000000000000001','000000000000005'] - ['000000000000001','000000000000008']", + + "SELECT host FROM PTSDB3 WHERE host IN ('na1', 'na2','na3')", + "CLIENT PARALLEL 1-WAY SKIP SCAN ON 3 KEYS OVER PTSDB3 [~'na3'] - [~'na1']\n" + + " SERVER FILTER BY FIRST KEY ONLY", + + "SELECT host FROM PTSDB WHERE inst IS NULL AND host IS NOT NULL AND date >= to_date('2013-01-01')", + "CLIENT PARALLEL 1-WAY RANGE SCAN OVER PTSDB [null,not null]\n" + + " SERVER FILTER BY FIRST KEY ONLY AND DATE >= '2013-01-01 00:00:00.000'", + + // Since inst IS NOT NULL is unbounded, we won't continue optimizing + "SELECT host FROM PTSDB WHERE inst IS NOT NULL AND host IS NULL AND date >= to_date('2013-01-01')", + "CLIENT PARALLEL 1-WAY RANGE SCAN OVER PTSDB [not null]\n" + + " SERVER FILTER BY FIRST KEY ONLY AND (HOST IS NULL AND DATE >= '2013-01-01 00:00:00.000')", + + "SELECT a_string,b_string FROM atable WHERE organization_id = '000000000000001' AND entity_id = '000000000000002' AND x_integer = 2 AND a_integer < 5 ", + "CLIENT PARALLEL 1-WAY POINT LOOKUP ON 1 KEY OVER ATABLE\n" + + " SERVER FILTER BY (X_INTEGER = 2 AND A_INTEGER < 5)", + + "SELECT a_string,b_string FROM atable WHERE organization_id = '000000000000001' AND entity_id > '000000000000002' AND entity_id < '000000000000008' AND (organization_id,entity_id) <= ('000000000000001','000000000000005') ", + "CLIENT PARALLEL 1-WAY RANGE SCAN OVER ATABLE ['000000000000001','000000000000003'] - ['000000000000001','000000000000006']", + + "SELECT a_string,b_string FROM atable WHERE organization_id > '000000000000001' AND entity_id > '000000000000002' AND entity_id < '000000000000008' AND (organization_id,entity_id) >= ('000000000000003','000000000000005') ", + "CLIENT PARALLEL 1-WAY RANGE SCAN OVER ATABLE ['000000000000003','000000000000005'] - [*]\n" + + " SERVER FILTER BY (ENTITY_ID > '000000000000002' AND ENTITY_ID < '000000000000008')", + + "SELECT a_string,b_string FROM atable WHERE organization_id = '000000000000001' AND entity_id >= '000000000000002' AND entity_id < '000000000000008' AND (organization_id,entity_id) >= ('000000000000000','000000000000005') ", + "CLIENT PARALLEL 1-WAY RANGE SCAN OVER ATABLE ['000000000000001','000000000000002'] - ['000000000000001','000000000000008']", + + "SELECT * FROM atable", + "CLIENT PARALLEL 1-WAY FULL SCAN OVER ATABLE", + + "SELECT inst,host FROM PTSDB WHERE inst IN ('na1', 'na2','na3') AND host IN ('a','b') AND date >= to_date('2013-01-01') AND date < to_date('2013-01-02')", + "CLIENT PARALLEL 1-WAY SKIP SCAN ON 6 RANGES OVER PTSDB ['na1','a','2013-01-01'] - ['na3','b','2013-01-02']\n" + + " SERVER FILTER BY FIRST KEY ONLY", + + "SELECT inst,host FROM PTSDB WHERE inst LIKE 'na%' AND host IN ('a','b') AND date >= to_date('2013-01-01') AND date < to_date('2013-01-02')", + "CLIENT PARALLEL 1-WAY SKIP SCAN ON 2 RANGES OVER PTSDB ['na','a','2013-01-01'] - ['nb','b','2013-01-02']\n" + + " SERVER FILTER BY FIRST KEY ONLY", + + "SELECT count(*) FROM atable", + "CLIENT PARALLEL 1-WAY FULL SCAN OVER ATABLE\n" + + " SERVER FILTER BY FIRST KEY ONLY\n" + + " SERVER AGGREGATE INTO SINGLE ROW", + + "SELECT count(*) FROM atable WHERE organization_id='000000000000001' AND SUBSTR(entity_id,1,3) > '002' AND SUBSTR(entity_id,1,3) <= '003'", + "CLIENT PARALLEL 1-WAY RANGE SCAN OVER ATABLE ['000000000000001','003 '] - ['000000000000001','004 ']\n" + + " SERVER FILTER BY FIRST KEY ONLY\n" + + " SERVER AGGREGATE INTO SINGLE ROW", + + "SELECT a_string FROM atable WHERE organization_id='000000000000001' AND SUBSTR(entity_id,1,3) > '002' AND SUBSTR(entity_id,1,3) <= '003'", + "CLIENT PARALLEL 1-WAY RANGE SCAN OVER ATABLE ['000000000000001','003 '] - ['000000000000001','004 ']", + + "SELECT count(1) FROM atable GROUP BY a_string", + "CLIENT PARALLEL 1-WAY FULL SCAN OVER ATABLE\n" + + " SERVER AGGREGATE INTO DISTINCT ROWS BY [A_STRING]\n" + + "CLIENT MERGE SORT", + + "SELECT count(1) FROM atable GROUP BY a_string LIMIT 5", + "CLIENT PARALLEL 1-WAY FULL SCAN OVER ATABLE\n" + + " SERVER AGGREGATE INTO DISTINCT ROWS BY [A_STRING]\n" + + "CLIENT MERGE SORT\n" + + "CLIENT 5 ROW LIMIT", + + "SELECT a_string FROM atable ORDER BY a_string DESC LIMIT 3", + "CLIENT PARALLEL 1-WAY FULL SCAN OVER ATABLE\n" + + " SERVER TOP 3 ROWS SORTED BY [A_STRING DESC]\n" + + "CLIENT MERGE SORT", + + "SELECT count(1) FROM atable GROUP BY a_string,b_string HAVING max(a_string) = 'a'", + "CLIENT PARALLEL 1-WAY FULL SCAN OVER ATABLE\n" + + " SERVER AGGREGATE INTO DISTINCT ROWS BY [A_STRING, B_STRING]\n" + + "CLIENT MERGE SORT\n" + + "CLIENT FILTER BY MAX(A_STRING) = 'a'", + + "SELECT count(1) FROM atable WHERE a_integer = 1 GROUP BY ROUND(a_time,'HOUR',2),entity_id HAVING max(a_string) = 'a'", + "CLIENT PARALLEL 1-WAY FULL SCAN OVER ATABLE\n" + + " SERVER FILTER BY A_INTEGER = 1\n" + + " SERVER AGGREGATE INTO DISTINCT ROWS BY [ENTITY_ID, ROUND(A_TIME)]\n" + + "CLIENT MERGE SORT\n" + + "CLIENT FILTER BY MAX(A_STRING) = 'a'", + + "SELECT count(1) FROM atable WHERE a_integer = 1 GROUP BY a_string,b_string HAVING max(a_string) = 'a' ORDER BY b_string", + "CLIENT PARALLEL 1-WAY FULL SCAN OVER ATABLE\n" + + " SERVER FILTER BY A_INTEGER = 1\n" + + " SERVER AGGREGATE INTO DISTINCT ROWS BY [A_STRING, B_STRING]\n" + + "CLIENT MERGE SORT\n" + + "CLIENT FILTER BY MAX(A_STRING) = 'a'\n" + + "CLIENT SORTED BY [B_STRING]", + + "SELECT a_string,b_string FROM atable WHERE organization_id = '000000000000001' AND entity_id != '000000000000002' AND x_integer = 2 AND a_integer < 5 LIMIT 10", + "CLIENT PARALLEL 1-WAY RANGE SCAN OVER ATABLE ['000000000000001']\n" + + " SERVER FILTER BY (ENTITY_ID != '000000000000002' AND X_INTEGER = 2 AND A_INTEGER < 5)\n" + + " SERVER 10 ROW LIMIT\n" + + "CLIENT 10 ROW LIMIT", + + "SELECT a_string,b_string FROM atable WHERE organization_id = '000000000000001' ORDER BY a_string ASC NULLS FIRST LIMIT 10", + "CLIENT PARALLEL 1-WAY RANGE SCAN OVER ATABLE ['000000000000001']\n" + + " SERVER TOP 10 ROWS SORTED BY [A_STRING]\n" + + "CLIENT MERGE SORT", + + "SELECT max(a_integer) FROM atable WHERE organization_id = '000000000000001' GROUP BY organization_id,entity_id,ROUND(a_date,'HOUR') ORDER BY entity_id NULLS LAST LIMIT 10", + "CLIENT PARALLEL 1-WAY RANGE SCAN OVER ATABLE ['000000000000001']\n" + + " SERVER AGGREGATE INTO DISTINCT ROWS BY [ORGANIZATION_ID, ENTITY_ID, ROUND(A_DATE)]\n" + + "CLIENT MERGE SORT\n" + + "CLIENT TOP 10 ROWS SORTED BY [ENTITY_ID NULLS LAST]", + + "SELECT a_string,b_string FROM atable WHERE organization_id = '000000000000001' ORDER BY a_string DESC NULLS LAST LIMIT 10", + "CLIENT PARALLEL 1-WAY RANGE SCAN OVER ATABLE ['000000000000001']\n" + + " SERVER TOP 10 ROWS SORTED BY [A_STRING DESC NULLS LAST]\n" + + "CLIENT MERGE SORT", + + "SELECT a_string,b_string FROM atable WHERE organization_id IN ('000000000000001', '000000000000005')", + "CLIENT PARALLEL 1-WAY SKIP SCAN ON 2 KEYS OVER ATABLE ['000000000000001'] - ['000000000000005']", + + "SELECT a_string,b_string FROM atable WHERE organization_id IN ('00D000000000001', '00D000000000005') AND entity_id IN('00E00000000000X','00E00000000000Z')", + "CLIENT PARALLEL 1-WAY POINT LOOKUP ON 4 KEYS OVER ATABLE", + + "SELECT inst,host FROM PTSDB WHERE REGEXP_SUBSTR(INST, '[^-]+', 1) IN ('na1', 'na2','na3')", + "CLIENT PARALLEL 1-WAY SKIP SCAN ON 3 RANGES OVER PTSDB ['na1'] - ['na4']\n" + + " SERVER FILTER BY FIRST KEY ONLY AND REGEXP_SUBSTR(INST, '[^-]+', 1) IN ('na1','na2','na3')", + + }; + for (int i = 0; i < queryPlans.length; i+=2) { + String query = queryPlans[i]; + String plan = queryPlans[i+1]; + Properties props = new Properties(); + // Override date format so we don't have a bunch of zeros + props.setProperty(QueryServices.DATE_FORMAT_ATTRIB, "yyyy-MM-dd"); + Connection conn = DriverManager.getConnection(getUrl(), props); + try { + Statement statement = conn.createStatement(); + ResultSet rs = statement.executeQuery("EXPLAIN " + query); + // TODO: figure out a way of verifying that query isn't run during explain execution + assertEquals((i/2+1) + ") " + query, plan, QueryUtil.getExplainPlan(rs)); + } finally { + conn.close(); + } + } + } + +}
