dbwong commented on a change in pull request #482: PHOENIX-4925 Use Segment
tree to organize Guide Post Info
URL: https://github.com/apache/phoenix/pull/482#discussion_r277906809
##########
File path:
phoenix-core/src/main/java/org/apache/phoenix/iterate/BaseResultIterators.java
##########
@@ -885,292 +867,215 @@ private static boolean clipKeyRangeBytes(RowKeySchema
schema, int fieldIndex, in
return maxOffset != offset;
}
+ private List<KeyRange> getRowKeyRanges(List<HRegionLocation>
regionLocations, boolean isLocalIndex) {
+ List<KeyRange> queryKeyRanges = null;
+
+ // Use the dataplan to build the queryRowKeyRanges
+ if (isLocalIndex) {
+ // TODO: when implementing PHOENIX-4585, we should change this to
an assert
+ // as we should always have a data plan when a local index is
being used.
+ if (dataPlan != null &&
dataPlan.getTableRef().getTable().getType() != PTableType.INDEX) { // Sanity
check
+ int columnsInCommon = computeColumnsInCommon();
+ ScanRanges prefixScanRanges =
computePrefixScanRanges(dataPlan.getContext().getScanRanges(), columnsInCommon);
+ List<KeyRange> queryKeyRangesTemp =
prefixScanRanges.getRowKeyRanges();
+
+ if (queryKeyRangesTemp.size() == 1 &&
queryKeyRangesTemp.get(0) == KeyRange.EVERYTHING_RANGE) {
+ queryKeyRanges = queryKeyRangesTemp;
+ } else {
+ List<KeyRange> newQueryRowKeyRanges =
Lists.newArrayListWithExpectedSize(
+ queryKeyRangesTemp.size() *
regionLocations.size());
+
+ for (HRegionLocation regionLocation : regionLocations) {
+ HRegionInfo regionInfo =
regionLocation.getRegionInfo();
+
+ // Only attempt further pruning if the prefix range is
using
+ // a skip scan since we've already pruned the range of
regions
+ // based on the start/stop key.
+ if (columnsInCommon > 0 &&
prefixScanRanges.useSkipScanFilter()) {
+ byte[] regionStartKey = regionInfo.getStartKey();
+ ImmutableBytesWritable ptr = context.getTempPtr();
+ clipKeyRangeBytes(prefixScanRanges.getSchema(), 0,
+ columnsInCommon, regionStartKey, ptr,
false);
+ regionStartKey =
ByteUtil.copyKeyBytesIfNecessary(ptr);
+ // Prune this region if there's no intersection
+ if
(!prefixScanRanges.intersectRegion(regionStartKey, regionInfo.getEndKey(),
false)) {
+ continue;
+ }
+ }
+
+ for (KeyRange queryKeyRange : queryKeyRangesTemp) {
+ KeyRange newQueryRowKeyRange =
queryKeyRange.prependRange(
+
regionInfo.getStartKey(),0,regionInfo.getStartKey().length);
+ newQueryRowKeyRanges.add(newQueryRowKeyRange);
+ }
+ }
+
+ queryKeyRanges = newQueryRowKeyRanges;
+ }
+ }
+ }
+
+ if (queryKeyRanges == null) {
+ ScanRanges scanRanges = context.getScanRanges();
+ queryKeyRanges = scanRanges.getRowKeyRanges();
+ }
+
+ return queryKeyRanges;
+ }
+
/**
* Compute the list of parallel scans to run for a given query. The inner
scans
* may be concatenated together directly, while the other ones may need to
be
- * merge sorted, depending on the query.
- * Also computes an estimated bytes scanned, rows scanned, and last update
time
- * of statistics. To compute correctly, we need to handle a couple of edge
cases:
- * 1) if a guidepost is equal to the start key of the scan.
- * 2) If a guidepost is equal to the end region key.
- * In both cases, we set a flag (delayAddingEst) which indicates that the
previous
- * gp should be use in our stats calculation. The normal case is that a gp
is
- * encountered which is in the scan range in which case it is simply added
to
- * our calculation.
- * For the last update time, we use the min timestamp of the gp that are in
- * range of the scans that will be issued. If we find no gp in the range,
we use
- * the gp in the first or last region of the scan. If we encounter a
region with
- * no gp, then we return a null value as an indication that we don't know
with
- * certainty when the stats were updated last. This handles the case of a
split
- * occurring for a large ingest with stats never having been calculated
for the
- * new region.
+ * merge sorted, depending on the query. Also computes an estimated bytes
scanned,
+ * rows scanned, and last update time of statistics.
+ *
* @return list of parallel scans to run for a given query.
* @throws SQLException
*/
private List<List<Scan>> getParallelScans(byte[] startKey, byte[] stopKey)
throws SQLException {
+ ScanRanges scanRanges = context.getScanRanges();
List<HRegionLocation> regionLocations =
getRegionBoundaries(scanGrouper);
List<byte[]> regionBoundaries = toBoundaries(regionLocations);
- ScanRanges scanRanges = context.getScanRanges();
+
PTable table = getTable();
boolean isSalted = table.getBucketNum() != null;
boolean isLocalIndex = table.getIndexType() == IndexType.LOCAL;
- GuidePostsInfo gps = getGuidePosts();
- // case when stats wasn't collected
- hasGuidePosts = gps != GuidePostsInfo.NO_GUIDEPOST;
- // Case when stats collection did run but there possibly wasn't enough
data. In such a
- // case we generate an empty guide post with the byte estimate being
set as guide post
- // width.
- boolean emptyGuidePost = gps.isEmptyGuidePost();
- byte[] startRegionBoundaryKey = startKey;
- byte[] stopRegionBoundaryKey = stopKey;
- int columnsInCommon = 0;
- ScanRanges prefixScanRanges = ScanRanges.EVERYTHING;
- boolean traverseAllRegions = isSalted || isLocalIndex;
- if (isLocalIndex) {
- // TODO: when implementing PHOENIX-4585, we should change this to
an assert
- // as we should always have a data plan when a local index is
being used.
- if (dataPlan != null &&
dataPlan.getTableRef().getTable().getType() != PTableType.INDEX) { // Sanity
check
- prefixScanRanges =
computePrefixScanRanges(dataPlan.getContext().getScanRanges(),
columnsInCommon=computeColumnsInCommon());
- KeyRange prefixRange = prefixScanRanges.getScanRange();
- if (!prefixRange.lowerUnbound()) {
- startRegionBoundaryKey = prefixRange.getLowerRange();
- }
- if (!prefixRange.upperUnbound()) {
- stopRegionBoundaryKey = prefixRange.getUpperRange();
- }
+ // We'll never have a case where a table is both salted and local.
+ assert !(isSalted && isLocalIndex);
+
+ Long pageLimit = getUnfilteredPageLimit(scan);
+ boolean estimateUsingStats = ! (scanRanges.isPointLookup() ||
pageLimit != null);
+
+ GuidePostEstimation estimationFromStats = null;
+ GuidePostsInfo guidePostsInfo = getGuidePosts();
+ boolean hasGuidePosts = guidePostsInfo != GuidePostsInfo.NO_GUIDEPOST;
+ List<Pair<Integer, List<KeyRange>>> parallelScanRangesGroupedByRegion;
+
+ // Get the Query KeyRanges from skip filters
+ List<KeyRange> queryKeyRanges = getRowKeyRanges(regionLocations,
isLocalIndex);
+
+ if (this.useStatsForParallelization && hasGuidePosts) {
+ Pair<List<KeyRange>, GuidePostEstimation> result =
guidePostsInfo.generateParallelScanRanges(queryKeyRanges);
+
+ parallelScanRangesGroupedByRegion =
ScanUtil.splitKeyRangesByBoundaries(regionBoundaries, result.getFirst());
+
+ if (estimateUsingStats) {
+ estimationFromStats = result.getSecond();
}
- } else if (!traverseAllRegions) {
- byte[] scanStartRow = scan.getStartRow();
- if (scanStartRow.length != 0 && Bytes.compareTo(scanStartRow,
startKey) > 0) {
- startRegionBoundaryKey = startKey = scanStartRow;
+ }
+ else {
+ // Get the Query KeyRanges which are grouped by region
+ List<Pair<Integer, List<KeyRange>>> queryKeyRangesGroupedByRegion =
+ ScanUtil.splitKeyRangesByBoundaries(regionBoundaries,
queryKeyRanges);
+ parallelScanRangesGroupedByRegion =
Lists.newArrayListWithCapacity(queryKeyRangesGroupedByRegion.size());
+
+ for (Pair<Integer, List<KeyRange>> pair :
queryKeyRangesGroupedByRegion) {
+ Integer regionIndex = pair.getFirst();
+ List<KeyRange> queryKeyRangesPerRegion = pair.getSecond();
+ assert (queryKeyRangesPerRegion.size() > 0);
+
+ assert (!
queryKeyRangesPerRegion.get(queryKeyRangesPerRegion.size() -
1).isUpperInclusive());
+ byte[] endKey =
queryKeyRangesPerRegion.get(queryKeyRangesPerRegion.size() - 1).getUpperRange();
+ KeyRange scanKeyRange =
KeyRange.getKeyRange(queryKeyRangesPerRegion.get(0).getLowerRange(),
+ queryKeyRangesPerRegion.get(0).isLowerInclusive(),
endKey, false);
+ parallelScanRangesGroupedByRegion.add(
+ new Pair<Integer, List<KeyRange>>(regionIndex,
Lists.newArrayList(scanKeyRange)));
}
- byte[] scanStopRow = scan.getStopRow();
- if (stopKey.length == 0
- || (scanStopRow.length != 0 &&
Bytes.compareTo(scanStopRow, stopKey) < 0)) {
- stopRegionBoundaryKey = stopKey = scanStopRow;
+
+ if (estimateUsingStats && hasGuidePosts) {
+ estimationFromStats =
guidePostsInfo.getEstimationOnly(queryKeyRanges);
}
}
- int regionIndex = 0;
- int startRegionIndex = 0;
- int stopIndex = regionBoundaries.size();
- if (startRegionBoundaryKey.length > 0) {
- startRegionIndex = regionIndex =
getIndexContainingInclusive(regionBoundaries, startRegionBoundaryKey);
- }
- if (stopRegionBoundaryKey.length > 0) {
- stopIndex = Math.min(stopIndex, regionIndex +
getIndexContainingExclusive(regionBoundaries.subList(regionIndex, stopIndex),
stopRegionBoundaryKey));
+ // Generate parallel scan for each region
+ int countOfRegionsToScan = parallelScanRangesGroupedByRegion.size();
+ List<List<Scan>> parallelScans =
Lists.newArrayListWithExpectedSize(countOfRegionsToScan);
+ List<Scan> regionScans = Lists.newArrayListWithExpectedSize(1);
+
+ for (int i = 0; i < countOfRegionsToScan; i++) {
+ Integer regionIndex =
parallelScanRangesGroupedByRegion.get(i).getFirst();
+ List<KeyRange> regionScanKeyRanges =
parallelScanRangesGroupedByRegion.get(i).getSecond();
+ int keyRangeCount = regionScanKeyRanges.size();
+ assert (keyRangeCount > 0);
+
+ HRegionLocation regionLocation = regionLocations.get(regionIndex);
+ HRegionInfo regionInfo = regionLocation.getRegionInfo();
+
+ int keyOffset = 0;
if (isLocalIndex) {
- stopKey =
regionLocations.get(stopIndex).getRegionInfo().getEndKey();
+ keyOffset = ScanUtil.getRowKeyOffset(regionInfo.getStartKey(),
regionInfo.getEndKey());
}
- }
- List<List<Scan>> parallelScans =
Lists.newArrayListWithExpectedSize(stopIndex - regionIndex + 1);
- ImmutableBytesWritable currentKey = new
ImmutableBytesWritable(startKey);
-
- int gpsSize = gps.getGuidePostsCount();
- int estGuidepostsPerRegion = gpsSize == 0 ? 1 : gpsSize /
regionLocations.size() + 1;
- int keyOffset = 0;
- ImmutableBytesWritable currentGuidePost =
ByteUtil.EMPTY_IMMUTABLE_BYTE_ARRAY;
- List<Scan> scans =
Lists.newArrayListWithExpectedSize(estGuidepostsPerRegion);
- ImmutableBytesWritable guidePosts = gps.getGuidePosts();
- ByteArrayInputStream stream = null;
- DataInput input = null;
- PrefixByteDecoder decoder = null;
- int guideIndex = 0;
- GuidePostEstimate estimates = new GuidePostEstimate();
- boolean gpsForFirstRegion = false;
- boolean intersectWithGuidePosts = true;
- // Maintain min ts for gps in first or last region outside of
- // gps that are in the scan range. We'll use this if we find
- // no gps in range.
- long fallbackTs = Long.MAX_VALUE;
- // Determination of whether of not we found a guidepost in
- // every region between the start and stop key. If not, then
- // we cannot definitively say at what time the guideposts
- // were collected.
- boolean gpsAvailableForAllRegions = true;
- try {
- boolean delayAddingEst = false;
- ImmutableBytesWritable firstRegionStartKey = null;
- if (gpsSize > 0) {
- stream = new ByteArrayInputStream(guidePosts.get(),
guidePosts.getOffset(), guidePosts.getLength());
- input = new DataInputStream(stream);
- decoder = new PrefixByteDecoder(gps.getMaxLength());
- firstRegionStartKey = new
ImmutableBytesWritable(regionLocations.get(regionIndex).getRegionInfo().getStartKey());
- try {
- int c;
- // Continue walking guideposts until we get past the
currentKey
- while ((c=currentKey.compareTo(currentGuidePost =
PrefixByteCodec.decode(decoder, input))) >= 0) {
- // Detect if we found a guidepost that might be in the
first region. This
- // is for the case where the start key may be past the
only guidepost in
- // the first region.
- if (!gpsForFirstRegion &&
firstRegionStartKey.compareTo(currentGuidePost) <= 0) {
- gpsForFirstRegion = true;
- }
- // While we have gps in the region (but outside of
start/stop key), track
- // the min ts as a fallback for the time at which stas
were calculated.
- if (gpsForFirstRegion) {
- fallbackTs =
- Math.min(fallbackTs,
-
gps.getGuidePostTimestamps()[guideIndex]);
- }
- // Special case for gp == startKey in which case we
want to
- // count this gp (if it's in range) though we go past
it.
- delayAddingEst = (c == 0);
- guideIndex++;
- }
- } catch (EOFException e) {
- // expected. Thrown when we have decoded all guide posts.
- intersectWithGuidePosts = false;
- }
- }
- byte[] endRegionKey =
regionLocations.get(stopIndex).getRegionInfo().getEndKey();
- byte[] currentKeyBytes = currentKey.copyBytes();
- intersectWithGuidePosts &= guideIndex < gpsSize;
- // Merge bisect with guideposts for all but the last region
- while (regionIndex <= stopIndex) {
- HRegionLocation regionLocation =
regionLocations.get(regionIndex);
- HRegionInfo regionInfo = regionLocation.getRegionInfo();
- byte[] currentGuidePostBytes = currentGuidePost.copyBytes();
- byte[] endKey;
- if (regionIndex == stopIndex) {
- endKey = stopKey;
- } else {
- endKey = regionBoundaries.get(regionIndex);
- }
- if (isLocalIndex) {
- // Only attempt further pruning if the prefix range is
using
- // a skip scan since we've already pruned the range of
regions
- // based on the start/stop key.
- if (columnsInCommon > 0 &&
prefixScanRanges.useSkipScanFilter()) {
- byte[] regionStartKey = regionInfo.getStartKey();
- ImmutableBytesWritable ptr = context.getTempPtr();
- clipKeyRangeBytes(prefixScanRanges.getSchema(), 0,
columnsInCommon, regionStartKey, ptr, false);
- regionStartKey = ByteUtil.copyKeyBytesIfNecessary(ptr);
- // Prune this region if there's no intersection
- if (!prefixScanRanges.intersectRegion(regionStartKey,
regionInfo.getEndKey(), false)) {
- currentKeyBytes = endKey;
- regionIndex++;
- continue;
- }
- }
- keyOffset =
ScanUtil.getRowKeyOffset(regionInfo.getStartKey(), regionInfo.getEndKey());
- }
- byte[] initialKeyBytes = currentKeyBytes;
- int gpsComparedToEndKey = -1;
- boolean everNotDelayed = false;
- while (intersectWithGuidePosts && (endKey.length == 0 ||
(gpsComparedToEndKey=currentGuidePost.compareTo(endKey)) <= 0)) {
- Scan newScan = scanRanges.intersectScan(scan,
currentKeyBytes, currentGuidePostBytes, keyOffset,
- false);
- if (newScan != null) {
- ScanUtil.setLocalIndexAttributes(newScan, keyOffset,
- regionInfo.getStartKey(), regionInfo.getEndKey(),
- newScan.getStartRow(), newScan.getStopRow());
- // If we've delaying adding estimates, add the previous
- // gp estimates now that we know they are in range.
- if (delayAddingEst) {
- updateEstimates(gps, guideIndex-1, estimates);
- }
- // If we're not delaying adding estimates, add the
- // current gp estimates.
- if (! (delayAddingEst = gpsComparedToEndKey == 0) ) {
- updateEstimates(gps, guideIndex, estimates);
- }
- } else {
- delayAddingEst = false;
- }
- everNotDelayed |= !delayAddingEst;
- scans = addNewScan(parallelScans, scans, newScan,
currentGuidePostBytes, false, regionLocation);
- currentKeyBytes = currentGuidePostBytes;
- try {
- currentGuidePost = PrefixByteCodec.decode(decoder,
input);
- currentGuidePostBytes = currentGuidePost.copyBytes();
- guideIndex++;
- } catch (EOFException e) {
- // We have read all guide posts
- intersectWithGuidePosts = false;
- }
- }
- boolean gpsInThisRegion = initialKeyBytes != currentKeyBytes;
- if (!useStatsForParallelization) {
- /*
- * If we are not using stats for generating parallel
scans, we need to reset the
- * currentKey back to what it was at the beginning of the
loop.
- */
- currentKeyBytes = initialKeyBytes;
+ // Make a scan for every range in regionScanKeyRanges
+ for (int j = 0; j < keyRangeCount; j++) {
+ KeyRange keyRange = regionScanKeyRanges.get(j);
+ boolean crossesRegionBoundary = false;
+
+ if (j == keyRangeCount - 1) {
+ crossesRegionBoundary = true;
}
- Scan newScan = scanRanges.intersectScan(scan, currentKeyBytes,
endKey, keyOffset, true);
- if(newScan != null) {
+
+ Scan newScan = scanRanges.intersectScan(
+ scan, keyRange.getLowerRange(),
keyRange.getUpperRange(), keyOffset, crossesRegionBoundary);
+ if (newScan != null) {
ScanUtil.setLocalIndexAttributes(newScan, keyOffset,
regionInfo.getStartKey(),
- regionInfo.getEndKey(), newScan.getStartRow(),
newScan.getStopRow());
- // Boundary case of no GP in region after delaying adding
of estimates
- if (!gpsInThisRegion && delayAddingEst) {
- updateEstimates(gps, guideIndex-1, estimates);
- gpsInThisRegion = true;
- delayAddingEst = false;
- }
- } else if (!gpsInThisRegion) {
- delayAddingEst = false;
+ regionInfo.getEndKey(), newScan.getStartRow(),
newScan.getStopRow());
+ } else {
+ logger.warn("Didn't generate scan for " +
keyRange.toString());
}
- scans = addNewScan(parallelScans, scans, newScan, endKey,
true, regionLocation);
- currentKeyBytes = endKey;
- // We have a guide post in the region if the above loop was
entered
- // or if the current key is less than the region end key
(since the loop
- // may not have been entered if our scan end key is smaller
than the
- // first guide post in that region).
- boolean gpsAfterStopKey = false;
- gpsAvailableForAllRegions &=
- ( gpsInThisRegion && everNotDelayed) || // GP in this
region
- ( regionIndex == startRegionIndex && gpsForFirstRegion )
|| // GP in first region (before start key)
- ( gpsAfterStopKey = ( regionIndex == stopIndex &&
intersectWithGuidePosts && // GP in last region (after stop key)
- ( endRegionKey.length == 0 || // then check if gp
is in the region
- currentGuidePost.compareTo(endRegionKey) < 0) ) );
- if (gpsAfterStopKey) {
- // If gp after stop key, but still in last region, track
min ts as fallback
- fallbackTs =
- Math.min(fallbackTs,
- gps.getGuidePostTimestamps()[guideIndex]);
+
+ byte[] endKey;
+ if ((j < keyRangeCount - 1) || (i == countOfRegionsToScan -
1)) {
+ endKey = keyRange.getUpperRange();
+ } else {
+ endKey = regionInfo.getEndKey();
}
- regionIndex++;
+
+ regionScans = addNewScan(parallelScans, regionScans, newScan,
endKey, crossesRegionBoundary, regionLocation);
Review comment:
Lets discuss the naming and usage a bit of this as in ParallelScanGrouper
this is "startKey". This is used for the crossesPrefixBoundary check and
indicated in the interface as a startKey and compared to the last scans
startRow Prefix looking for for example the salt byte changing or region
changing. By using end key is there any cases we might miss?
----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
For queries about this service, please contact Infrastructure at:
[email protected]
With regards,
Apache Git Services