This is an automated email from the ASF dual-hosted git repository.
rajeshbabu pushed a commit to branch 5.1
in repository https://gitbox.apache.org/repos/asf/phoenix.git
The following commit(s) were added to refs/heads/5.1 by this push:
new b0e2fc49c5 PHOENIX-6767 Traversing through all the guideposts to
prepare parallel scans is not required for salted tables when the query is
point lookup (#1493)
b0e2fc49c5 is described below
commit b0e2fc49c5cad639170a717240d5090f9f9f1391
Author: Rajeshbabu Chintaguntla <[email protected]>
AuthorDate: Thu Sep 15 15:18:48 2022 +0530
PHOENIX-6767 Traversing through all the guideposts to prepare parallel
scans is not required for salted tables when the query is point lookup (#1493)
Co-authored-by: Rajeshbabu Chintaguntla <[email protected]>
---
.../phoenix/end2end/salted/SaltedTableIT.java | 29 +++++
.../phoenix/iterate/BaseResultIterators.java | 130 ++++++++++++---------
2 files changed, 107 insertions(+), 52 deletions(-)
diff --git
a/phoenix-core/src/it/java/org/apache/phoenix/end2end/salted/SaltedTableIT.java
b/phoenix-core/src/it/java/org/apache/phoenix/end2end/salted/SaltedTableIT.java
index 9dd1db7686..8b1bdf99aa 100644
---
a/phoenix-core/src/it/java/org/apache/phoenix/end2end/salted/SaltedTableIT.java
+++
b/phoenix-core/src/it/java/org/apache/phoenix/end2end/salted/SaltedTableIT.java
@@ -62,6 +62,35 @@ public class SaltedTableIT extends BaseSaltedTableIT {
}
}
+ @Test public void testPointLookupOnSaltedTable() throws Exception {
+ Properties props = PropertiesUtil.deepCopy(TEST_PROPERTIES);
+ try (Connection conn = DriverManager.getConnection(getUrl(), props)) {
+ String tableName = generateUniqueName();
+ String
+ query =
+ "create table " + tableName + " (a_integer integer not
null "
+ + "CONSTRAINT pk PRIMARY KEY (a_integer))
SALT_BUCKETS = 10";
+ conn.createStatement().execute(query);
+ PreparedStatement
+ stmt =
+ conn.prepareStatement("upsert into " + tableName + "
values(?)");
+ stmt.setInt(1, 1);
+ stmt.execute();
+ stmt.setInt(1, 2);
+ stmt.execute();
+ stmt.setInt(1, 3);
+ stmt.execute();
+ conn.commit();
+ query = "select * from " + tableName + " where a_integer = 1";
+ ResultSet rs = conn.createStatement().executeQuery(query);
+ assertTrue(rs.next());
+ assertEquals(1, rs.getInt("a_integer"));
+ query = "explain " + query;
+ rs = conn.createStatement().executeQuery(query);
+ assertTrue(QueryUtil.getExplainPlan(rs).contains("POINT LOOKUP ON
1 KEY"));
+ }
+ }
+
@Test
public void testTableWithSplit() throws Exception {
try {
diff --git
a/phoenix-core/src/main/java/org/apache/phoenix/iterate/BaseResultIterators.java
b/phoenix-core/src/main/java/org/apache/phoenix/iterate/BaseResultIterators.java
index 3807e444ee..ec6e37a69e 100644
---
a/phoenix-core/src/main/java/org/apache/phoenix/iterate/BaseResultIterators.java
+++
b/phoenix-core/src/main/java/org/apache/phoenix/iterate/BaseResultIterators.java
@@ -266,7 +266,7 @@ public abstract class BaseResultIterators extends
ExplainTable implements Result
}
}
}
- if(containsNullableGroubBy){
+ if (containsNullableGroubBy) {
byte[] ecf = SchemaUtil.getEmptyColumnFamily(table);
if (!familyMap.containsKey(ecf) || familyMap.get(ecf) !=
null) {
scan.addColumn(ecf,
EncodedColumnsUtil.getEmptyKeyValueInfo(table)
@@ -290,8 +290,8 @@ public abstract class BaseResultIterators extends
ExplainTable implements Result
Bytes.toBytes((long) perScanLimit));
}
}
-
- if(offset!=null){
+
+ if (offset != null) {
ScanUtil.addOffsetAttribute(scan, offset);
}
GroupBy groupBy = plan.getGroupBy();
@@ -612,7 +612,7 @@ public abstract class BaseResultIterators extends
ExplainTable implements Result
}
TreeSet<byte[]> whereConditions = new
TreeSet<byte[]>(Bytes.BYTES_COMPARATOR);
- for(Pair<byte[], byte[]> where : context.getWhereConditionColumns()) {
+ for (Pair<byte[], byte[]> where : context.getWhereConditionColumns()) {
byte[] cf = where.getFirst();
if (cf != null) {
whereConditions.add(cf);
@@ -622,7 +622,7 @@ public abstract class BaseResultIterators extends
ExplainTable implements Result
byte[] defaultCF = SchemaUtil.getEmptyColumnFamily(getTable());
byte[] cf = null;
if ( !table.getColumnFamilies().isEmpty() &&
!whereConditions.isEmpty() ) {
- for(Pair<byte[], byte[]> where :
context.getWhereConditionColumns()) {
+ for (Pair<byte[], byte[]> where :
context.getWhereConditionColumns()) {
byte[] whereCF = where.getFirst();
if (Bytes.compareTo(defaultCF, whereCF) == 0) {
cf = defaultCF;
@@ -711,16 +711,17 @@ public abstract class BaseResultIterators extends
ExplainTable implements Result
} else {
endKey = regionBoundaries.get(regionIndex);
}
- if(ScanUtil.isLocalIndex(scan)) {
+ if (ScanUtil.isLocalIndex(scan)) {
ScanUtil.setLocalIndexAttributes(newScan, 0,
regionInfo.getStartKey(),
regionInfo.getEndKey(),
newScan.getAttribute(SCAN_START_ROW_SUFFIX),
newScan.getAttribute(SCAN_STOP_ROW_SUFFIX));
} else {
- if(Bytes.compareTo(scan.getStartRow(),
regionInfo.getStartKey())<=0) {
+ if (Bytes.compareTo(scan.getStartRow(),
regionInfo.getStartKey()) <= 0) {
newScan.setAttribute(SCAN_ACTUAL_START_ROW,
regionInfo.getStartKey());
newScan.setStartRow(regionInfo.getStartKey());
}
- if(scan.getStopRow().length == 0 ||
(regionInfo.getEndKey().length != 0 && Bytes.compareTo(scan.getStopRow(),
regionInfo.getEndKey())>0)) {
+ if (scan.getStopRow().length == 0 ||
(regionInfo.getEndKey().length != 0
+ && Bytes.compareTo(scan.getStopRow(),
regionInfo.getEndKey()) > 0)) {
newScan.setStopRow(regionInfo.getEndKey());
}
}
@@ -928,12 +929,23 @@ public abstract class BaseResultIterators extends
ExplainTable implements Result
* @throws SQLException
*/
private List<List<Scan>> getParallelScans(byte[] startKey, byte[] stopKey)
throws SQLException {
- List<HRegionLocation> regionLocations =
getRegionBoundaries(scanGrouper);
- List<byte[]> regionBoundaries = toBoundaries(regionLocations);
ScanRanges scanRanges = context.getScanRanges();
PTable table = getTable();
- boolean isSalted = table.getBucketNum() != null;
boolean isLocalIndex = table.getIndexType() == IndexType.LOCAL;
+ GuidePostEstimate estimates = new GuidePostEstimate();
+ if (!isLocalIndex && scanRanges.isPointLookup() &&
!scanRanges.useSkipScanFilter()) {
+ List<List<Scan>> parallelScans =
Lists.newArrayListWithExpectedSize(1);
+ List<Scan> scans = Lists.newArrayListWithExpectedSize(1);
+ scans.add(context.getScan());
+ parallelScans.add(scans);
+ generateEstimates(scanRanges, table, GuidePostsInfo.NO_GUIDEPOST,
+ GuidePostsInfo.NO_GUIDEPOST.isEmptyGuidePost(),
parallelScans, estimates,
+ Long.MAX_VALUE, false);
+ return parallelScans;
+ }
+ List<HRegionLocation> regionLocations =
getRegionBoundaries(scanGrouper);
+ List<byte[]> regionBoundaries = toBoundaries(regionLocations);
+ boolean isSalted = table.getBucketNum() != null;
GuidePostsInfo gps = getGuidePosts();
// case when stats wasn't collected
hasGuidePosts = gps != GuidePostsInfo.NO_GUIDEPOST;
@@ -997,7 +1009,6 @@ public abstract class BaseResultIterators extends
ExplainTable implements Result
DataInput input = null;
PrefixByteDecoder decoder = null;
int guideIndex = 0;
- GuidePostEstimate estimates = new GuidePostEstimate();
boolean gpsForFirstRegion = false;
boolean intersectWithGuidePosts = true;
// Maintain min ts for gps in first or last region outside of
@@ -1129,7 +1140,7 @@ public abstract class BaseResultIterators extends
ExplainTable implements Result
currentKeyBytes = initialKeyBytes;
}
Scan newScan = scanRanges.intersectScan(scan, currentKeyBytes,
endKey, keyOffset, true);
- if(newScan != null) {
+ if (newScan != null) {
ScanUtil.setLocalIndexAttributes(newScan, keyOffset,
regionInfo.getStartKey(),
regionInfo.getEndKey(), newScan.getStartRow(),
newScan.getStopRow());
// Boundary case of no GP in region after delaying adding
of estimates
@@ -1165,37 +1176,8 @@ public abstract class BaseResultIterators extends
ExplainTable implements Result
if (!scans.isEmpty()) { // Add any remaining scans
parallelScans.add(scans);
}
- Long pageLimit = getUnfilteredPageLimit(scan);
- if (scanRanges.isPointLookup() || pageLimit != null) {
- // If run in parallel, the limit is pushed to each parallel
scan so must be accounted for in all of them
- int parallelFactor = this.isSerial() ? 1 :
parallelScans.size();
- if (scanRanges.isPointLookup() && pageLimit != null) {
- this.estimatedRows =
Long.valueOf(Math.min(scanRanges.getPointLookupCount(), pageLimit *
parallelFactor));
- } else if (scanRanges.isPointLookup()) {
- this.estimatedRows =
Long.valueOf(scanRanges.getPointLookupCount());
- } else {
- this.estimatedRows = Long.valueOf(pageLimit) *
parallelFactor;
- }
- this.estimatedSize = this.estimatedRows *
SchemaUtil.estimateRowSize(table);
- // Indication to client that the statistics estimates were not
- // calculated based on statistics but instead are based on row
- // limits from the query.
- this.estimateInfoTimestamp = StatisticsUtil.NOT_STATS_BASED_TS;
- } else if (emptyGuidePost) {
- // In case of an empty guide post, we estimate the number of
rows scanned by
- // using the estimated row size
- this.estimatedRows = (gps.getByteCounts()[0] /
SchemaUtil.estimateRowSize(table));
- this.estimatedSize = gps.getByteCounts()[0];
- this.estimateInfoTimestamp = gps.getGuidePostTimestamps()[0];
- } else if (hasGuidePosts) {
- this.estimatedRows = estimates.rowsEstimate;
- this.estimatedSize = estimates.bytesEstimate;
- this.estimateInfoTimestamp =
computeMinTimestamp(gpsAvailableForAllRegions, estimates, fallbackTs);
- } else {
- this.estimatedRows = null;
- this.estimatedSize = null;
- this.estimateInfoTimestamp = null;
- }
+ generateEstimates(scanRanges, table, gps, emptyGuidePost,
parallelScans, estimates,
+ fallbackTs, gpsAvailableForAllRegions);
} finally {
if (stream != null) Closeables.closeQuietly(stream);
}
@@ -1203,6 +1185,46 @@ public abstract class BaseResultIterators extends
ExplainTable implements Result
return parallelScans;
}
+ private void generateEstimates(ScanRanges scanRanges, PTable table,
GuidePostsInfo gps,
+ boolean emptyGuidePost, List<List<Scan>> parallelScans,
GuidePostEstimate estimates,
+ long fallbackTs, boolean gpsAvailableForAllRegions) {
+ Long pageLimit = getUnfilteredPageLimit(scan);
+ if (scanRanges.isPointLookup() || pageLimit != null) {
+ // If run in parallel, the limit is pushed to each parallel scan
so must be accounted
+ // for in all of them
+ int parallelFactor = this.isSerial() ? 1 : parallelScans.size();
+ if (scanRanges.isPointLookup() && pageLimit != null) {
+ this.estimatedRows =
+ Long.valueOf(Math.min(scanRanges.getPointLookupCount(),
+ pageLimit * parallelFactor));
+ } else if (scanRanges.isPointLookup()) {
+ this.estimatedRows =
Long.valueOf(scanRanges.getPointLookupCount());
+ } else {
+ this.estimatedRows = pageLimit * parallelFactor;
+ }
+ this.estimatedSize = this.estimatedRows *
SchemaUtil.estimateRowSize(table);
+ // Indication to client that the statistics estimates were not
+ // calculated based on statistics but instead are based on row
+ // limits from the query.
+ this.estimateInfoTimestamp = StatisticsUtil.NOT_STATS_BASED_TS;
+ } else if (emptyGuidePost) {
+ // In case of an empty guide post, we estimate the number of rows
scanned by
+ // using the estimated row size
+ this.estimatedRows = gps.getByteCounts()[0] /
SchemaUtil.estimateRowSize(table);
+ this.estimatedSize = gps.getByteCounts()[0];
+ this.estimateInfoTimestamp = gps.getGuidePostTimestamps()[0];
+ } else if (hasGuidePosts) {
+ this.estimatedRows = estimates.rowsEstimate;
+ this.estimatedSize = estimates.bytesEstimate;
+ this.estimateInfoTimestamp =
computeMinTimestamp(gpsAvailableForAllRegions, estimates,
+ fallbackTs);
+ } else {
+ this.estimatedRows = null;
+ this.estimatedSize = null;
+ this.estimateInfoTimestamp = null;
+ }
+ }
+
/**
* Return row count limit of PageFilter if exists and there is no where
* clause filter.
@@ -1248,13 +1270,17 @@ public abstract class BaseResultIterators extends
ExplainTable implements Result
* @param parallelScans
*/
private void sampleScans(final List<List<Scan>> parallelScans, final
Double tableSamplingRate){
- if(tableSamplingRate==null||tableSamplingRate==100d) return;
- final Predicate<byte[]>
tableSamplerPredicate=TableSamplerPredicate.of(tableSamplingRate);
-
- for(Iterator<List<Scan>> is = parallelScans.iterator();is.hasNext();){
- for(Iterator<Scan> i=is.next().iterator();i.hasNext();){
+ if (tableSamplingRate == null || tableSamplingRate == 100d) {
+ return;
+ }
+ final Predicate<byte[]> tableSamplerPredicate =
TableSamplerPredicate.of(tableSamplingRate);
+
+ for (Iterator<List<Scan>> is = parallelScans.iterator();
is.hasNext();) {
+ for (Iterator<Scan> i = is.next().iterator(); i.hasNext();) {
final Scan scan=i.next();
-
if(!tableSamplerPredicate.apply(scan.getStartRow())){i.remove();}
+ if (!tableSamplerPredicate.apply(scan.getStartRow())) {
+ i.remove();
+ }
}
}
}
@@ -1360,10 +1386,10 @@ public abstract class BaseResultIterators extends
ExplainTable implements Result
// Resubmit just this portion of work again
Scan oldScan = scanPair.getFirst();
byte[] startKey =
oldScan.getAttribute(SCAN_ACTUAL_START_ROW);
- if(e2 instanceof HashJoinCacheNotFoundException){
+ if (e2 instanceof HashJoinCacheNotFoundException) {
LOGGER.debug(
"Retrying when Hash Join cache is not
found on the server ,by sending the cache again");
- if(retryCount<=0){
+ if (retryCount <= 0) {
throw e2;
}
Long cacheId =
((HashJoinCacheNotFoundException)e2).getCacheId();