This is an automated email from the ASF dual-hosted git repository.

rajeshbabu pushed a commit to branch 5.1
in repository https://gitbox.apache.org/repos/asf/phoenix.git


The following commit(s) were added to refs/heads/5.1 by this push:
     new b0e2fc49c5 PHOENIX-6767 Traversing through all the guideposts to 
prepare parallel scans is not required for salted tables when the query is 
point lookup (#1493)
b0e2fc49c5 is described below

commit b0e2fc49c5cad639170a717240d5090f9f9f1391
Author: Rajeshbabu Chintaguntla <[email protected]>
AuthorDate: Thu Sep 15 15:18:48 2022 +0530

    PHOENIX-6767 Traversing through all the guideposts to prepare parallel 
scans is not required for salted tables when the query is point lookup (#1493)
    
    Co-authored-by: Rajeshbabu Chintaguntla <[email protected]>
---
 .../phoenix/end2end/salted/SaltedTableIT.java      |  29 +++++
 .../phoenix/iterate/BaseResultIterators.java       | 130 ++++++++++++---------
 2 files changed, 107 insertions(+), 52 deletions(-)

diff --git 
a/phoenix-core/src/it/java/org/apache/phoenix/end2end/salted/SaltedTableIT.java 
b/phoenix-core/src/it/java/org/apache/phoenix/end2end/salted/SaltedTableIT.java
index 9dd1db7686..8b1bdf99aa 100644
--- 
a/phoenix-core/src/it/java/org/apache/phoenix/end2end/salted/SaltedTableIT.java
+++ 
b/phoenix-core/src/it/java/org/apache/phoenix/end2end/salted/SaltedTableIT.java
@@ -62,6 +62,35 @@ public class SaltedTableIT extends BaseSaltedTableIT {
         }
     }
 
+    @Test public void testPointLookupOnSaltedTable() throws Exception {
+        Properties props = PropertiesUtil.deepCopy(TEST_PROPERTIES);
+        try (Connection conn = DriverManager.getConnection(getUrl(), props)) {
+            String tableName = generateUniqueName();
+            String
+                    query =
+                    "create table " + tableName + " (a_integer integer not 
null "
+                            + "CONSTRAINT pk PRIMARY KEY (a_integer)) 
SALT_BUCKETS = 10";
+            conn.createStatement().execute(query);
+            PreparedStatement
+                    stmt =
+                    conn.prepareStatement("upsert into " + tableName + " 
values(?)");
+            stmt.setInt(1, 1);
+            stmt.execute();
+            stmt.setInt(1, 2);
+            stmt.execute();
+            stmt.setInt(1, 3);
+            stmt.execute();
+            conn.commit();
+            query = "select * from " + tableName + " where a_integer = 1";
+            ResultSet rs = conn.createStatement().executeQuery(query);
+            assertTrue(rs.next());
+            assertEquals(1, rs.getInt("a_integer"));
+            query = "explain " + query;
+            rs = conn.createStatement().executeQuery(query);
+            assertTrue(QueryUtil.getExplainPlan(rs).contains("POINT LOOKUP ON 
1 KEY"));
+        }
+    }
+
     @Test
     public void testTableWithSplit() throws Exception {
         try {
diff --git 
a/phoenix-core/src/main/java/org/apache/phoenix/iterate/BaseResultIterators.java
 
b/phoenix-core/src/main/java/org/apache/phoenix/iterate/BaseResultIterators.java
index 3807e444ee..ec6e37a69e 100644
--- 
a/phoenix-core/src/main/java/org/apache/phoenix/iterate/BaseResultIterators.java
+++ 
b/phoenix-core/src/main/java/org/apache/phoenix/iterate/BaseResultIterators.java
@@ -266,7 +266,7 @@ public abstract class BaseResultIterators extends 
ExplainTable implements Result
                         }
                     }
                 }
-                if(containsNullableGroubBy){
+                if (containsNullableGroubBy) {
                     byte[] ecf = SchemaUtil.getEmptyColumnFamily(table);
                     if (!familyMap.containsKey(ecf) || familyMap.get(ecf) != 
null) {
                         scan.addColumn(ecf, 
EncodedColumnsUtil.getEmptyKeyValueInfo(table)
@@ -290,8 +290,8 @@ public abstract class BaseResultIterators extends 
ExplainTable implements Result
                             Bytes.toBytes((long) perScanLimit));
                 }
             }
-            
-            if(offset!=null){
+
+            if (offset != null) {
                 ScanUtil.addOffsetAttribute(scan, offset);
             }
             GroupBy groupBy = plan.getGroupBy();
@@ -612,7 +612,7 @@ public abstract class BaseResultIterators extends 
ExplainTable implements Result
         }
 
         TreeSet<byte[]> whereConditions = new 
TreeSet<byte[]>(Bytes.BYTES_COMPARATOR);
-        for(Pair<byte[], byte[]> where : context.getWhereConditionColumns()) {
+        for (Pair<byte[], byte[]> where : context.getWhereConditionColumns()) {
             byte[] cf = where.getFirst();
             if (cf != null) {
                 whereConditions.add(cf);
@@ -622,7 +622,7 @@ public abstract class BaseResultIterators extends 
ExplainTable implements Result
         byte[] defaultCF = SchemaUtil.getEmptyColumnFamily(getTable());
         byte[] cf = null;
         if ( !table.getColumnFamilies().isEmpty() && 
!whereConditions.isEmpty() ) {
-            for(Pair<byte[], byte[]> where : 
context.getWhereConditionColumns()) {
+            for (Pair<byte[], byte[]> where : 
context.getWhereConditionColumns()) {
                 byte[] whereCF = where.getFirst();
                 if (Bytes.compareTo(defaultCF, whereCF) == 0) {
                     cf = defaultCF;
@@ -711,16 +711,17 @@ public abstract class BaseResultIterators extends 
ExplainTable implements Result
             } else {
                 endKey = regionBoundaries.get(regionIndex);
             }
-            if(ScanUtil.isLocalIndex(scan)) {
+            if (ScanUtil.isLocalIndex(scan)) {
                 ScanUtil.setLocalIndexAttributes(newScan, 0, 
regionInfo.getStartKey(),
                     regionInfo.getEndKey(), 
newScan.getAttribute(SCAN_START_ROW_SUFFIX),
                     newScan.getAttribute(SCAN_STOP_ROW_SUFFIX));
             } else {
-                if(Bytes.compareTo(scan.getStartRow(), 
regionInfo.getStartKey())<=0) {
+                if (Bytes.compareTo(scan.getStartRow(), 
regionInfo.getStartKey()) <= 0) {
                     newScan.setAttribute(SCAN_ACTUAL_START_ROW, 
regionInfo.getStartKey());
                     newScan.setStartRow(regionInfo.getStartKey());
                 }
-                if(scan.getStopRow().length == 0 || 
(regionInfo.getEndKey().length != 0 && Bytes.compareTo(scan.getStopRow(), 
regionInfo.getEndKey())>0)) {
+                if (scan.getStopRow().length == 0 || 
(regionInfo.getEndKey().length != 0
+                        && Bytes.compareTo(scan.getStopRow(), 
regionInfo.getEndKey()) > 0)) {
                     newScan.setStopRow(regionInfo.getEndKey());
                 }
             }
@@ -928,12 +929,23 @@ public abstract class BaseResultIterators extends 
ExplainTable implements Result
      * @throws SQLException
      */
     private List<List<Scan>> getParallelScans(byte[] startKey, byte[] stopKey) 
throws SQLException {
-        List<HRegionLocation> regionLocations = 
getRegionBoundaries(scanGrouper);
-        List<byte[]> regionBoundaries = toBoundaries(regionLocations);
         ScanRanges scanRanges = context.getScanRanges();
         PTable table = getTable();
-        boolean isSalted = table.getBucketNum() != null;
         boolean isLocalIndex = table.getIndexType() == IndexType.LOCAL;
+        GuidePostEstimate estimates = new GuidePostEstimate();
+        if (!isLocalIndex && scanRanges.isPointLookup() && 
!scanRanges.useSkipScanFilter()) {
+            List<List<Scan>> parallelScans = 
Lists.newArrayListWithExpectedSize(1);
+            List<Scan> scans = Lists.newArrayListWithExpectedSize(1);
+            scans.add(context.getScan());
+            parallelScans.add(scans);
+            generateEstimates(scanRanges, table, GuidePostsInfo.NO_GUIDEPOST,
+                    GuidePostsInfo.NO_GUIDEPOST.isEmptyGuidePost(), 
parallelScans, estimates,
+                    Long.MAX_VALUE, false);
+            return parallelScans;
+        }
+        List<HRegionLocation> regionLocations = 
getRegionBoundaries(scanGrouper);
+        List<byte[]> regionBoundaries = toBoundaries(regionLocations);
+        boolean isSalted = table.getBucketNum() != null;
         GuidePostsInfo gps = getGuidePosts();
         // case when stats wasn't collected
         hasGuidePosts = gps != GuidePostsInfo.NO_GUIDEPOST;
@@ -997,7 +1009,6 @@ public abstract class BaseResultIterators extends 
ExplainTable implements Result
         DataInput input = null;
         PrefixByteDecoder decoder = null;
         int guideIndex = 0;
-        GuidePostEstimate estimates = new GuidePostEstimate();
         boolean gpsForFirstRegion = false;
         boolean intersectWithGuidePosts = true;
         // Maintain min ts for gps in first or last region outside of
@@ -1129,7 +1140,7 @@ public abstract class BaseResultIterators extends 
ExplainTable implements Result
                     currentKeyBytes = initialKeyBytes;
                 }
                 Scan newScan = scanRanges.intersectScan(scan, currentKeyBytes, 
endKey, keyOffset, true);
-                if(newScan != null) {
+                if (newScan != null) {
                     ScanUtil.setLocalIndexAttributes(newScan, keyOffset, 
regionInfo.getStartKey(),
                         regionInfo.getEndKey(), newScan.getStartRow(), 
newScan.getStopRow());
                     // Boundary case of no GP in region after delaying adding 
of estimates
@@ -1165,37 +1176,8 @@ public abstract class BaseResultIterators extends 
ExplainTable implements Result
             if (!scans.isEmpty()) { // Add any remaining scans
                 parallelScans.add(scans);
             }
-            Long pageLimit = getUnfilteredPageLimit(scan);
-            if (scanRanges.isPointLookup() || pageLimit != null) {
-                // If run in parallel, the limit is pushed to each parallel 
scan so must be accounted for in all of them
-                int parallelFactor = this.isSerial() ? 1 : 
parallelScans.size();
-                if (scanRanges.isPointLookup() && pageLimit != null) {
-                    this.estimatedRows = 
Long.valueOf(Math.min(scanRanges.getPointLookupCount(), pageLimit * 
parallelFactor));
-                } else if (scanRanges.isPointLookup()) {
-                    this.estimatedRows = 
Long.valueOf(scanRanges.getPointLookupCount());
-                } else {
-                    this.estimatedRows = Long.valueOf(pageLimit) * 
parallelFactor;
-                }
-                this.estimatedSize = this.estimatedRows * 
SchemaUtil.estimateRowSize(table);
-                 // Indication to client that the statistics estimates were not
-                 // calculated based on statistics but instead are based on row
-                 // limits from the query.
-               this.estimateInfoTimestamp = StatisticsUtil.NOT_STATS_BASED_TS;
-            } else if (emptyGuidePost) {
-                // In case of an empty guide post, we estimate the number of 
rows scanned by
-                // using the estimated row size
-                this.estimatedRows = (gps.getByteCounts()[0] / 
SchemaUtil.estimateRowSize(table));
-                this.estimatedSize = gps.getByteCounts()[0];
-                this.estimateInfoTimestamp = gps.getGuidePostTimestamps()[0];
-            } else if (hasGuidePosts) {
-                this.estimatedRows = estimates.rowsEstimate;
-                this.estimatedSize = estimates.bytesEstimate;
-                this.estimateInfoTimestamp = 
computeMinTimestamp(gpsAvailableForAllRegions, estimates, fallbackTs);
-            } else {
-                this.estimatedRows = null;
-                this.estimatedSize = null;
-                this.estimateInfoTimestamp = null;
-            }
+            generateEstimates(scanRanges, table, gps, emptyGuidePost, 
parallelScans, estimates,
+                    fallbackTs, gpsAvailableForAllRegions);
         } finally {
             if (stream != null) Closeables.closeQuietly(stream);
         }
@@ -1203,6 +1185,46 @@ public abstract class BaseResultIterators extends 
ExplainTable implements Result
         return parallelScans;
     }
 
+    private void generateEstimates(ScanRanges scanRanges, PTable table, 
GuidePostsInfo gps,
+            boolean emptyGuidePost, List<List<Scan>> parallelScans, 
GuidePostEstimate estimates,
+            long fallbackTs, boolean gpsAvailableForAllRegions) {
+        Long pageLimit = getUnfilteredPageLimit(scan);
+        if (scanRanges.isPointLookup() || pageLimit != null) {
+            // If run in parallel, the limit is pushed to each parallel scan 
so must be accounted
+            // for in all of them
+            int parallelFactor = this.isSerial() ? 1 : parallelScans.size();
+            if (scanRanges.isPointLookup() && pageLimit != null) {
+                this.estimatedRows =
+                        Long.valueOf(Math.min(scanRanges.getPointLookupCount(),
+                                pageLimit * parallelFactor));
+            } else if (scanRanges.isPointLookup()) {
+                this.estimatedRows = 
Long.valueOf(scanRanges.getPointLookupCount());
+            } else {
+                this.estimatedRows = pageLimit * parallelFactor;
+            }
+            this.estimatedSize = this.estimatedRows * 
SchemaUtil.estimateRowSize(table);
+             // Indication to client that the statistics estimates were not
+             // calculated based on statistics but instead are based on row
+             // limits from the query.
+            this.estimateInfoTimestamp = StatisticsUtil.NOT_STATS_BASED_TS;
+        } else if (emptyGuidePost) {
+            // In case of an empty guide post, we estimate the number of rows 
scanned by
+            // using the estimated row size
+            this.estimatedRows = gps.getByteCounts()[0] / 
SchemaUtil.estimateRowSize(table);
+            this.estimatedSize = gps.getByteCounts()[0];
+            this.estimateInfoTimestamp = gps.getGuidePostTimestamps()[0];
+        } else if (hasGuidePosts) {
+            this.estimatedRows = estimates.rowsEstimate;
+            this.estimatedSize = estimates.bytesEstimate;
+            this.estimateInfoTimestamp = 
computeMinTimestamp(gpsAvailableForAllRegions, estimates,
+                    fallbackTs);
+        } else {
+            this.estimatedRows = null;
+            this.estimatedSize = null;
+            this.estimateInfoTimestamp = null;
+        }
+    }
+
     /**
      * Return row count limit of PageFilter if exists and there is no where
      * clause filter.
@@ -1248,13 +1270,17 @@ public abstract class BaseResultIterators extends 
ExplainTable implements Result
      * @param parallelScans
      */
     private void sampleScans(final List<List<Scan>> parallelScans, final 
Double tableSamplingRate){
-       if(tableSamplingRate==null||tableSamplingRate==100d) return;
-       final Predicate<byte[]> 
tableSamplerPredicate=TableSamplerPredicate.of(tableSamplingRate);
-       
-       for(Iterator<List<Scan>> is = parallelScans.iterator();is.hasNext();){
-               for(Iterator<Scan> i=is.next().iterator();i.hasNext();){
+        if (tableSamplingRate == null || tableSamplingRate == 100d) {
+            return;
+        }
+        final Predicate<byte[]> tableSamplerPredicate = 
TableSamplerPredicate.of(tableSamplingRate);
+
+        for (Iterator<List<Scan>> is = parallelScans.iterator(); 
is.hasNext();) {
+            for (Iterator<Scan> i = is.next().iterator(); i.hasNext();) {
                        final Scan scan=i.next();
-                       
if(!tableSamplerPredicate.apply(scan.getStartRow())){i.remove();}
+                if (!tableSamplerPredicate.apply(scan.getStartRow())) {
+                    i.remove();
+                }
                }
        }
     }
@@ -1360,10 +1386,10 @@ public abstract class BaseResultIterators extends 
ExplainTable implements Result
                             // Resubmit just this portion of work again
                             Scan oldScan = scanPair.getFirst();
                             byte[] startKey = 
oldScan.getAttribute(SCAN_ACTUAL_START_ROW);
-                            if(e2 instanceof HashJoinCacheNotFoundException){
+                            if (e2 instanceof HashJoinCacheNotFoundException) {
                                 LOGGER.debug(
                                         "Retrying when Hash Join cache is not 
found on the server ,by sending the cache again");
-                                if(retryCount<=0){
+                                if (retryCount <= 0) {
                                     throw e2;
                                 }
                                 Long cacheId = 
((HashJoinCacheNotFoundException)e2).getCacheId();

Reply via email to