This is an automated email from the ASF dual-hosted git repository.

vjasani pushed a commit to branch 5.1
in repository https://gitbox.apache.org/repos/asf/phoenix.git


The following commit(s) were added to refs/heads/5.1 by this push:
     new b3a8a2f208 PHOENIX-7580: Data in last salt bucket is not being scanned 
for range scan (#2114) (#2109)
b3a8a2f208 is described below

commit b3a8a2f208c7fb6bdfc95c8bf16932df9b023005
Author: sanjeet006py <36011005+sanjeet00...@users.noreply.github.com>
AuthorDate: Wed Apr 16 04:53:36 2025 +0530

    PHOENIX-7580: Data in last salt bucket is not being scanned for range scan 
(#2114) (#2109)
---
 .../SaltedTableWithParallelStatsEnabledIT.java     | 355 +++++++++++++++++++++
 .../org/apache/phoenix/compile/ScanRanges.java     |   6 +-
 .../java/org/apache/phoenix/query/BaseTest.java    |  14 +
 3 files changed, 374 insertions(+), 1 deletion(-)

diff --git 
a/phoenix-core/src/it/java/org/apache/phoenix/end2end/salted/SaltedTableWithParallelStatsEnabledIT.java
 
b/phoenix-core/src/it/java/org/apache/phoenix/end2end/salted/SaltedTableWithParallelStatsEnabledIT.java
new file mode 100644
index 0000000000..d3ec01b210
--- /dev/null
+++ 
b/phoenix-core/src/it/java/org/apache/phoenix/end2end/salted/SaltedTableWithParallelStatsEnabledIT.java
@@ -0,0 +1,355 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.phoenix.end2end.salted;
+
+import org.apache.hadoop.hbase.TableName;
+import org.apache.hadoop.hbase.client.Admin;
+import org.apache.hadoop.hbase.client.Get;
+import org.apache.hadoop.hbase.client.RegionInfo;
+import org.apache.hadoop.hbase.client.Result;
+import org.apache.hadoop.hbase.client.ResultScanner;
+import org.apache.hadoop.hbase.client.Scan;
+import org.apache.hadoop.hbase.client.Table;
+import org.apache.hadoop.hbase.util.Bytes;
+import org.apache.phoenix.end2end.ParallelStatsEnabledIT;
+import org.apache.phoenix.jdbc.PhoenixConnection;
+import org.apache.phoenix.query.QueryServices;
+import org.apache.phoenix.schema.SaltingUtil;
+import org.junit.Test;
+import org.junit.Assert;
+import org.junit.experimental.categories.Category;
+import org.junit.runner.RunWith;
+import org.junit.runners.Parameterized;
+
+import java.sql.Connection;
+import java.sql.DriverManager;
+import java.sql.ResultSet;
+import java.sql.PreparedStatement;
+import java.sql.Statement;
+import java.util.Arrays;
+import java.util.Collection;
+import java.util.List;
+import java.util.Properties;
+import java.util.stream.Collectors;
+import java.util.stream.IntStream;
+
+
+@Category(ParallelStatsEnabledIT.class)
+@RunWith(Parameterized.class)
+public class SaltedTableWithParallelStatsEnabledIT extends 
ParallelStatsEnabledIT {
+
+    private final boolean withStatsForParallelization;
+    private final boolean withFullTableScan;
+    private final boolean withPointLookups;
+
+    public SaltedTableWithParallelStatsEnabledIT(boolean 
withStatsForParallelization,
+                                          boolean withFullTableScan, boolean 
withPointLookups) {
+        this.withStatsForParallelization = withStatsForParallelization;
+        this.withFullTableScan = withFullTableScan;
+        this.withPointLookups = withPointLookups;
+    }
+
+    @Parameterized.Parameters(name =
+            
"SaltedTableWithParallelStatsEnabledIT_withStatsForParallelization={0}, "
+                    + "withFullTableScan={1}, withPointLookups={2}")
+    public static synchronized Collection<Object[]> data() {
+        return Arrays.asList(new Object[][] {
+                { true, false, false },
+                { false, false, false },
+                { true, true, false },
+                { false, true, false },
+                { true, false, true },
+                { false, false, true }});
+    }
+
+    @Test
+    public void testPhoenix7580() throws Exception {
+        String tableName = generateUniqueName();
+        int saltBucketCount = 5;
+        int rowsToInsert = saltBucketCount * 10;
+        String primaryKeyPrefix = "pk1_1";
+        // The values of this array is such that we have 3 values from each of 
the 5 salt buckets.
+        int[] pk2ValuesForPointLookups = IntStream.range(0, 15).toArray();
+        int pointLookupsPerSaltBkt = pk2ValuesForPointLookups.length / 
saltBucketCount;
+
+        String connProfile = "testRangeScanForPhoenix7580" + 
withStatsForParallelization;
+        Properties props = new Properties();
+        props.setProperty(QueryServices.USE_STATS_FOR_PARALLELIZATION,
+                withStatsForParallelization ? Boolean.TRUE.toString() : 
Boolean.FALSE.toString());
+        try (Connection conn = 
DriverManager.getConnection(getUrl(connProfile), props)) {
+            createTable(conn, tableName, saltBucketCount);
+            addRows(conn, tableName, primaryKeyPrefix, IntStream.range(0, 
rowsToInsert).toArray(),
+                    false);
+
+            // Run COUNT(*) range query on Phoenix with row key prefix as 
{@code primaryKeyPrefix}.
+            // Assert that count of rows reported by Phoenix is same as count 
of rows in HBase.
+            if (withFullTableScan) {
+                assertFullScanRowCntFromHBaseAndPhoenix(conn, rowsToInsert, 
tableName);
+            }
+            else if (withPointLookups) {
+                assertPointLookupsRowCntFromHBaseAndPhoenix(conn, 
pk2ValuesForPointLookups.length,
+                        tableName, saltBucketCount, primaryKeyPrefix, 
pk2ValuesForPointLookups,
+                        pointLookupsPerSaltBkt);
+            }
+            else {
+                assertRangeScanRowCntFromHBaseAndPhoenix(conn, rowsToInsert, 
tableName,
+                        saltBucketCount, primaryKeyPrefix);
+            }
+
+            // ********** Create conditions to trigger PHOENIX-7580 ***********
+
+            // Insert 3 rows with row key prefix greater than the row key 
prefix of rows
+            // earlier inserted.
+            String primaryKeyPrefixForNewRows = "pk1_2";
+            // These values have been carefully selected such that newly 
inserted rows go to
+            // second last salt bucket when salt bucket count = 5.
+            int[] pk2ValuesForNewRows = new int[] { 1, 6, 10 };
+            triggerPhoenix7580(conn, tableName, saltBucketCount, 
primaryKeyPrefixForNewRows,
+                    pk2ValuesForNewRows);
+
+            // **** Conditions to trigger PHOENIX-7580 have been satisfied. 
Test the fix now. ****
+
+            if (withFullTableScan) {
+                assertFullScanRowCntFromHBaseAndPhoenix(conn,
+                        rowsToInsert + pk2ValuesForNewRows.length, tableName);
+            }
+            else if (withPointLookups) {
+                assertPointLookupsRowCntFromHBaseAndPhoenix(conn, 
pk2ValuesForPointLookups.length,
+                        tableName, saltBucketCount, primaryKeyPrefix, 
pk2ValuesForPointLookups,
+                        pointLookupsPerSaltBkt);
+            }
+            else {
+                assertRangeScanRowCntFromHBaseAndPhoenix(conn, rowsToInsert, 
tableName,
+                        saltBucketCount, primaryKeyPrefix);
+            }
+        }
+    }
+
+    private void assertRangeScanRowCntFromHBaseAndPhoenix(Connection conn, int 
expectedRowCount,
+                                                          String tableName, 
int saltBucketCount,
+                                                          String 
primaryKeyPrefix)
+            throws Exception {
+        Table hTable = conn.unwrap(PhoenixConnection.class)
+                .getQueryServices().getTable(tableName.getBytes());
+        int rowCountFromHBase = 0;
+        byte[] rowKeyPrefix = new byte[primaryKeyPrefix.length() + 1];
+        System.arraycopy(Bytes.toBytes(primaryKeyPrefix), 0, rowKeyPrefix, 1,
+                rowKeyPrefix.length - 1);
+        for (int i = 0; i< saltBucketCount; i++) {
+            rowKeyPrefix[0] = (byte) i;
+            Scan scan = new Scan();
+            scan.setStartStopRowForPrefixScan(rowKeyPrefix);
+            try (ResultScanner scanner = hTable.getScanner(scan)) {
+                while(scanner.next() != null) {
+                    rowCountFromHBase++;
+                }
+            }
+        }
+        // Assert all the rows are visible on running prefix scan from HBase
+        Assert.assertEquals(expectedRowCount, rowCountFromHBase);
+        String rangeScanDql = "SELECT COUNT(*) FROM " + tableName + " WHERE 
PK1=?";
+        try (PreparedStatement stmt = conn.prepareStatement(rangeScanDql)) {
+            stmt.setString(1, primaryKeyPrefix);
+            ResultSet rs = stmt.executeQuery();
+            rs.next();
+            int rowsVisible = rs.getInt(1);
+            rs.close();
+            // Assert all the rows are visible on running range query from 
Phoenix
+            Assert.assertEquals(expectedRowCount, rowsVisible);
+        }
+    }
+
+    private void assertFullScanRowCntFromHBaseAndPhoenix(Connection conn, int 
expectedRowCount,
+                                                         String tableName) 
throws Exception {
+        Table hTable = conn.unwrap(PhoenixConnection.class)
+                .getQueryServices().getTable(tableName.getBytes());
+        int rowCountFromHBase = 0;
+        Scan scan = new Scan();
+        try (ResultScanner scanner = hTable.getScanner(scan)) {
+            while(scanner.next() != null) {
+                rowCountFromHBase++;
+            }
+        }
+        // Assert all the rows are visible on full table scan from HBase
+        Assert.assertEquals(expectedRowCount, rowCountFromHBase);
+        String fullScanDql = "SELECT COUNT(*) FROM " + tableName;
+        try (PreparedStatement stmt = conn.prepareStatement(fullScanDql)) {
+            ResultSet rs = stmt.executeQuery();
+            rs.next();
+            int rowsVisible = rs.getInt(1);
+            rs.close();
+            // Assert all the rows are visible on full table scan from Phoenix
+            Assert.assertEquals(expectedRowCount, rowsVisible);
+        }
+    }
+
+    private void assertPointLookupsRowCntFromHBaseAndPhoenix(Connection conn, 
int expectedRowCount,
+                                                             String tableName, 
int saltBucketCount,
+                                                             String 
firstPrimaryKey,
+                                                             int[] pk2Values, 
int rowsPerSaltBkt)
+            throws Exception {
+        String secondPrimaryKeyPrefix = "pk2_";
+        String primaryKeyPrefix = firstPrimaryKey + secondPrimaryKeyPrefix;
+        Table hTable = conn.unwrap(PhoenixConnection.class)
+                .getQueryServices().getTable(tableName.getBytes());
+        int rowCountFromHBase = 0;
+        byte[] rowKey = new byte[primaryKeyPrefix.length() + 3];
+        System.arraycopy(Bytes.toBytes(primaryKeyPrefix), 0, rowKey, 1,
+                rowKey.length - 3);
+        for (int pk2Value : pk2Values) {
+            byte[] rowKeySuffix = Bytes.toBytes(String.format("%02d", 
pk2Value));
+            rowKey[rowKey.length - 2] = rowKeySuffix[0];
+            rowKey[rowKey.length - 1] = rowKeySuffix[1];
+            rowKey[0] = SaltingUtil.getSaltingByte(rowKey, 1, rowKey.length - 
1,
+                    saltBucketCount);
+            Get get = new Get(rowKey);
+            if (!hTable.get(get).isEmpty()) {
+                rowCountFromHBase++;
+            }
+        }
+        // Assert all point lookups are visible from HBase
+        Assert.assertEquals(expectedRowCount, rowCountFromHBase);
+        StringBuilder pointLookupDql = new StringBuilder("SELECT COUNT(*) FROM 
");
+        pointLookupDql.append(tableName);
+        pointLookupDql.append(" WHERE PK1=? AND PK2 IN (?");
+        for (int i = 1; i < pk2Values.length; i++) {
+            pointLookupDql.append(",?");
+        }
+        pointLookupDql.append(")");
+        try (PreparedStatement stmt = 
conn.prepareStatement(pointLookupDql.toString())) {
+            stmt.setString(1, firstPrimaryKey);
+            for (int i = 0; i < pk2Values.length; i++) {
+                stmt.setString(i + 2,
+                        String.format(secondPrimaryKeyPrefix + "%02d", i));
+            }
+            ResultSet rs = stmt.executeQuery();
+            rs.next();
+            int rowsVisible = rs.getInt(1);
+            rs.close();
+            // Assert all point lookups are visible from Phoenix
+            Assert.assertEquals(expectedRowCount, rowsVisible);
+        }
+    }
+
+    private void triggerPhoenix7580(Connection conn, String tableName, int 
saltBucketCount,
+                                    String primaryKeyPrefixForNewRows, int[] 
pk2ValuesForNewRows)
+            throws Exception {
+        addRows(conn, tableName, primaryKeyPrefixForNewRows, 
pk2ValuesForNewRows, true);
+
+        byte[] expectedEndKeyPrefixAfterSplit;
+        // Compute split key for splitting region corresponding to the second 
last salt bucket.
+        byte[] splitKey = null;
+        byte[] rowKeyPrefix = new byte[primaryKeyPrefixForNewRows.length() + 
1];
+        System.arraycopy(Bytes.toBytes(primaryKeyPrefixForNewRows), 0,
+                rowKeyPrefix, 1, rowKeyPrefix.length - 1);
+        // Doing minus 2 from salt bucket count to get second last bucket.
+        // Salt buckets are 0 indexed.
+        rowKeyPrefix[0] = (byte) (saltBucketCount - 2);
+        // Save this and will be used to verify that conditions to trigger 
PHOENIX-7580 are
+        // being met at the end of this method call.
+        expectedEndKeyPrefixAfterSplit = Bytes.copy(rowKeyPrefix);
+        Table hTable = conn.unwrap(PhoenixConnection.class)
+                .getQueryServices().getTable(tableName.getBytes());
+        Scan scan = new Scan();
+        scan.setStartStopRowForPrefixScan(rowKeyPrefix);
+        boolean pastFirstRow = false;
+        try (ResultScanner scanner = hTable.getScanner(scan)) {
+            Result r;
+            while((r = scanner.next()) != null) {
+                if (pastFirstRow) {
+                    // Use row key of 2nd row out of 3 newly inserted rows as 
split key
+                    // later for splitting the region corresponding to the 
second last
+                    // salt bucket.
+                    splitKey = r.getRow();
+                    break;
+                }
+                pastFirstRow = true;
+            }
+        }
+
+        // Identify region corresponding to the second last salt bucket for 
splitting
+        Admin admin = 
conn.unwrap(PhoenixConnection.class).getQueryServices().getAdmin();
+        List<RegionInfo> regions = 
admin.getRegions(TableName.valueOf(tableName));
+        RegionInfo secondLastSaltBucketRegion = null;
+        for (RegionInfo regionInfo : regions) {
+            byte[] startKey = regionInfo.getStartKey();
+            byte[] endKey = regionInfo.getEndKey();
+            if (startKey.length > 0 && startKey[0] == saltBucketCount - 2
+                    && endKey.length > 0 && endKey[0] == saltBucketCount - 1) {
+                secondLastSaltBucketRegion = regionInfo;
+                break;
+            }
+        }
+        Assert.assertNotNull("Not able to determine region of second last salt 
bucket",
+                secondLastSaltBucketRegion);
+
+        // Split region corresponding to the second last salt bucket
+        
admin.splitRegionAsync(secondLastSaltBucketRegion.getEncodedNameAsBytes(),
+                splitKey).get();
+        regions = admin.getRegions(TableName.valueOf(tableName));
+        // Verify that after split the conditions to reproduce PHOENIX-7580 
are being met
+        for (RegionInfo regionInfo : regions) {
+            byte[] startKey = regionInfo.getStartKey();
+            byte[] endKey = regionInfo.getEndKey();
+            if (startKey.length > 0 && startKey[0] == saltBucketCount - 2) {
+                Assert.assertTrue(
+                        Bytes.compareTo(expectedEndKeyPrefixAfterSplit, 
endKey) < 0);
+                break;
+            }
+        }
+    }
+
+    private void createTable(Connection conn, String tableName, int 
saltBucketCount)
+            throws Exception {
+        String createTableDdl = "CREATE TABLE IF NOT EXISTS " + tableName + " 
(\n" +
+                "    PK1 CHAR(5) NOT NULL,\n" +
+                "    PK2 CHAR(6) NOT NULL,\n" +
+                "    COL1 VARCHAR,\n" +
+                "    CONSTRAINT PK PRIMARY KEY (\n" +
+                "        PK1,\n" +
+                "        PK2 \n" +
+                "    )\n" +
+                ") SALT_BUCKETS=" + saltBucketCount;
+        // Create table
+        try (Statement stmt = conn.createStatement()) {
+            stmt.execute(createTableDdl);
+        }
+    }
+
+    private void addRows(Connection conn, String tableName, String 
primaryKeyPrefix,
+                         int[] pk2Values, boolean skipUpdateStats) throws 
Exception {
+        String upsertDml = "UPSERT INTO " + tableName + " VALUES (?,?,?)";
+        // Insert rows in the table with row key prefix at HBase level being
+        // {@code primaryKeyPrefix}.
+        try (PreparedStatement upsertStmt = conn.prepareStatement(upsertDml)) {
+            for (int i = 0; i < pk2Values.length; i++) {
+                upsertStmt.setString(1, primaryKeyPrefix);
+                upsertStmt.setString(2, String.format("pk2_%02d", 
pk2Values[i]));
+                upsertStmt.setString(3, "col1_" + i);
+                upsertStmt.executeUpdate();
+            }
+            conn.commit();
+        }
+
+        if (!skipUpdateStats) {
+            try (Statement stmt = conn.createStatement()) {
+                stmt.execute("UPDATE STATISTICS " + tableName);
+            }
+        }
+    }
+}
diff --git 
a/phoenix-core/src/main/java/org/apache/phoenix/compile/ScanRanges.java 
b/phoenix-core/src/main/java/org/apache/phoenix/compile/ScanRanges.java
index e2e4afac19..57f9f7add6 100644
--- a/phoenix-core/src/main/java/org/apache/phoenix/compile/ScanRanges.java
+++ b/phoenix-core/src/main/java/org/apache/phoenix/compile/ScanRanges.java
@@ -268,7 +268,11 @@ public class ScanRanges {
                     // These is the start of the next bucket in byte[], 
without the PK suffix
                     nextBucketByte = new byte[] { nextBucketStart[0] };
                 }
-                if (lastBucket || Bytes.compareTo(originalStopKey, 
nextBucketStart) <= 0) {
+                // PHOENIX-7580: Empty stop key is the biggest possible stop 
key.
+                // Special handling of empty stop else Byte comparison will 
treat empty stop key
+                // as smallest possible stop key
+                if (lastBucket || originalStopKey.length > 0
+                        && Bytes.compareTo(originalStopKey, nextBucketStart) 
<= 0) {
                     // either we don't need to add synthetic guideposts, or we 
already have, and
                     // are at the last bucket of the original scan
                     addIfNotNull(newScans, intersectScan(scan, wrkStartKey, 
originalStopKey,
diff --git a/phoenix-core/src/test/java/org/apache/phoenix/query/BaseTest.java 
b/phoenix-core/src/test/java/org/apache/phoenix/query/BaseTest.java
index 9b135d9e1d..8d21212931 100644
--- a/phoenix-core/src/test/java/org/apache/phoenix/query/BaseTest.java
+++ b/phoenix-core/src/test/java/org/apache/phoenix/query/BaseTest.java
@@ -430,6 +430,13 @@ public abstract class BaseTest {
         }
         return url;
     }
+
+    protected static String getUrl(String principal) throws Exception {
+        if (!clusterInitialized) {
+            throw new IllegalStateException("Cluster must be initialized 
before attempting to get the URL");
+        }
+        return getLocalClusterUrl(utility, principal);
+    }
     
     protected static String checkClusterInitialized(ReadOnlyProps serverProps) 
throws Exception {
         if (!clusterInitialized) {
@@ -556,6 +563,13 @@ public abstract class BaseTest {
         String url = QueryUtil.getConnectionUrl(new Properties(), 
util.getConfiguration());
         return url + PHOENIX_TEST_DRIVER_URL_PARAM;
     }
+
+    protected static String getLocalClusterUrl(HBaseTestingUtility util, 
String principal)
+            throws Exception {
+        String url = QueryUtil.getConnectionUrl(new Properties(), 
util.getConfiguration(),
+                principal);
+        return url + PHOENIX_TEST_DRIVER_URL_PARAM;
+    }
     
     /**
      * Initialize the cluster in distributed mode

Reply via email to