This is an automated email from the ASF dual-hosted git repository. vjasani pushed a commit to branch 5.1 in repository https://gitbox.apache.org/repos/asf/phoenix.git
The following commit(s) were added to refs/heads/5.1 by this push: new b3a8a2f208 PHOENIX-7580: Data in last salt bucket is not being scanned for range scan (#2114) (#2109) b3a8a2f208 is described below commit b3a8a2f208c7fb6bdfc95c8bf16932df9b023005 Author: sanjeet006py <36011005+sanjeet00...@users.noreply.github.com> AuthorDate: Wed Apr 16 04:53:36 2025 +0530 PHOENIX-7580: Data in last salt bucket is not being scanned for range scan (#2114) (#2109) --- .../SaltedTableWithParallelStatsEnabledIT.java | 355 +++++++++++++++++++++ .../org/apache/phoenix/compile/ScanRanges.java | 6 +- .../java/org/apache/phoenix/query/BaseTest.java | 14 + 3 files changed, 374 insertions(+), 1 deletion(-) diff --git a/phoenix-core/src/it/java/org/apache/phoenix/end2end/salted/SaltedTableWithParallelStatsEnabledIT.java b/phoenix-core/src/it/java/org/apache/phoenix/end2end/salted/SaltedTableWithParallelStatsEnabledIT.java new file mode 100644 index 0000000000..d3ec01b210 --- /dev/null +++ b/phoenix-core/src/it/java/org/apache/phoenix/end2end/salted/SaltedTableWithParallelStatsEnabledIT.java @@ -0,0 +1,355 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.phoenix.end2end.salted; + +import org.apache.hadoop.hbase.TableName; +import org.apache.hadoop.hbase.client.Admin; +import org.apache.hadoop.hbase.client.Get; +import org.apache.hadoop.hbase.client.RegionInfo; +import org.apache.hadoop.hbase.client.Result; +import org.apache.hadoop.hbase.client.ResultScanner; +import org.apache.hadoop.hbase.client.Scan; +import org.apache.hadoop.hbase.client.Table; +import org.apache.hadoop.hbase.util.Bytes; +import org.apache.phoenix.end2end.ParallelStatsEnabledIT; +import org.apache.phoenix.jdbc.PhoenixConnection; +import org.apache.phoenix.query.QueryServices; +import org.apache.phoenix.schema.SaltingUtil; +import org.junit.Test; +import org.junit.Assert; +import org.junit.experimental.categories.Category; +import org.junit.runner.RunWith; +import org.junit.runners.Parameterized; + +import java.sql.Connection; +import java.sql.DriverManager; +import java.sql.ResultSet; +import java.sql.PreparedStatement; +import java.sql.Statement; +import java.util.Arrays; +import java.util.Collection; +import java.util.List; +import java.util.Properties; +import java.util.stream.Collectors; +import java.util.stream.IntStream; + + +@Category(ParallelStatsEnabledIT.class) +@RunWith(Parameterized.class) +public class SaltedTableWithParallelStatsEnabledIT extends ParallelStatsEnabledIT { + + private final boolean withStatsForParallelization; + private final boolean withFullTableScan; + private final boolean withPointLookups; + + public SaltedTableWithParallelStatsEnabledIT(boolean withStatsForParallelization, + boolean withFullTableScan, boolean withPointLookups) { + this.withStatsForParallelization = withStatsForParallelization; + this.withFullTableScan = withFullTableScan; + this.withPointLookups = withPointLookups; + } + + @Parameterized.Parameters(name = + "SaltedTableWithParallelStatsEnabledIT_withStatsForParallelization={0}, " + + "withFullTableScan={1}, withPointLookups={2}") + public static synchronized Collection<Object[]> data() { + return Arrays.asList(new Object[][] { + { true, false, false }, + { false, false, false }, + { true, true, false }, + { false, true, false }, + { true, false, true }, + { false, false, true }}); + } + + @Test + public void testPhoenix7580() throws Exception { + String tableName = generateUniqueName(); + int saltBucketCount = 5; + int rowsToInsert = saltBucketCount * 10; + String primaryKeyPrefix = "pk1_1"; + // The values of this array is such that we have 3 values from each of the 5 salt buckets. + int[] pk2ValuesForPointLookups = IntStream.range(0, 15).toArray(); + int pointLookupsPerSaltBkt = pk2ValuesForPointLookups.length / saltBucketCount; + + String connProfile = "testRangeScanForPhoenix7580" + withStatsForParallelization; + Properties props = new Properties(); + props.setProperty(QueryServices.USE_STATS_FOR_PARALLELIZATION, + withStatsForParallelization ? Boolean.TRUE.toString() : Boolean.FALSE.toString()); + try (Connection conn = DriverManager.getConnection(getUrl(connProfile), props)) { + createTable(conn, tableName, saltBucketCount); + addRows(conn, tableName, primaryKeyPrefix, IntStream.range(0, rowsToInsert).toArray(), + false); + + // Run COUNT(*) range query on Phoenix with row key prefix as {@code primaryKeyPrefix}. + // Assert that count of rows reported by Phoenix is same as count of rows in HBase. + if (withFullTableScan) { + assertFullScanRowCntFromHBaseAndPhoenix(conn, rowsToInsert, tableName); + } + else if (withPointLookups) { + assertPointLookupsRowCntFromHBaseAndPhoenix(conn, pk2ValuesForPointLookups.length, + tableName, saltBucketCount, primaryKeyPrefix, pk2ValuesForPointLookups, + pointLookupsPerSaltBkt); + } + else { + assertRangeScanRowCntFromHBaseAndPhoenix(conn, rowsToInsert, tableName, + saltBucketCount, primaryKeyPrefix); + } + + // ********** Create conditions to trigger PHOENIX-7580 *********** + + // Insert 3 rows with row key prefix greater than the row key prefix of rows + // earlier inserted. + String primaryKeyPrefixForNewRows = "pk1_2"; + // These values have been carefully selected such that newly inserted rows go to + // second last salt bucket when salt bucket count = 5. + int[] pk2ValuesForNewRows = new int[] { 1, 6, 10 }; + triggerPhoenix7580(conn, tableName, saltBucketCount, primaryKeyPrefixForNewRows, + pk2ValuesForNewRows); + + // **** Conditions to trigger PHOENIX-7580 have been satisfied. Test the fix now. **** + + if (withFullTableScan) { + assertFullScanRowCntFromHBaseAndPhoenix(conn, + rowsToInsert + pk2ValuesForNewRows.length, tableName); + } + else if (withPointLookups) { + assertPointLookupsRowCntFromHBaseAndPhoenix(conn, pk2ValuesForPointLookups.length, + tableName, saltBucketCount, primaryKeyPrefix, pk2ValuesForPointLookups, + pointLookupsPerSaltBkt); + } + else { + assertRangeScanRowCntFromHBaseAndPhoenix(conn, rowsToInsert, tableName, + saltBucketCount, primaryKeyPrefix); + } + } + } + + private void assertRangeScanRowCntFromHBaseAndPhoenix(Connection conn, int expectedRowCount, + String tableName, int saltBucketCount, + String primaryKeyPrefix) + throws Exception { + Table hTable = conn.unwrap(PhoenixConnection.class) + .getQueryServices().getTable(tableName.getBytes()); + int rowCountFromHBase = 0; + byte[] rowKeyPrefix = new byte[primaryKeyPrefix.length() + 1]; + System.arraycopy(Bytes.toBytes(primaryKeyPrefix), 0, rowKeyPrefix, 1, + rowKeyPrefix.length - 1); + for (int i = 0; i< saltBucketCount; i++) { + rowKeyPrefix[0] = (byte) i; + Scan scan = new Scan(); + scan.setStartStopRowForPrefixScan(rowKeyPrefix); + try (ResultScanner scanner = hTable.getScanner(scan)) { + while(scanner.next() != null) { + rowCountFromHBase++; + } + } + } + // Assert all the rows are visible on running prefix scan from HBase + Assert.assertEquals(expectedRowCount, rowCountFromHBase); + String rangeScanDql = "SELECT COUNT(*) FROM " + tableName + " WHERE PK1=?"; + try (PreparedStatement stmt = conn.prepareStatement(rangeScanDql)) { + stmt.setString(1, primaryKeyPrefix); + ResultSet rs = stmt.executeQuery(); + rs.next(); + int rowsVisible = rs.getInt(1); + rs.close(); + // Assert all the rows are visible on running range query from Phoenix + Assert.assertEquals(expectedRowCount, rowsVisible); + } + } + + private void assertFullScanRowCntFromHBaseAndPhoenix(Connection conn, int expectedRowCount, + String tableName) throws Exception { + Table hTable = conn.unwrap(PhoenixConnection.class) + .getQueryServices().getTable(tableName.getBytes()); + int rowCountFromHBase = 0; + Scan scan = new Scan(); + try (ResultScanner scanner = hTable.getScanner(scan)) { + while(scanner.next() != null) { + rowCountFromHBase++; + } + } + // Assert all the rows are visible on full table scan from HBase + Assert.assertEquals(expectedRowCount, rowCountFromHBase); + String fullScanDql = "SELECT COUNT(*) FROM " + tableName; + try (PreparedStatement stmt = conn.prepareStatement(fullScanDql)) { + ResultSet rs = stmt.executeQuery(); + rs.next(); + int rowsVisible = rs.getInt(1); + rs.close(); + // Assert all the rows are visible on full table scan from Phoenix + Assert.assertEquals(expectedRowCount, rowsVisible); + } + } + + private void assertPointLookupsRowCntFromHBaseAndPhoenix(Connection conn, int expectedRowCount, + String tableName, int saltBucketCount, + String firstPrimaryKey, + int[] pk2Values, int rowsPerSaltBkt) + throws Exception { + String secondPrimaryKeyPrefix = "pk2_"; + String primaryKeyPrefix = firstPrimaryKey + secondPrimaryKeyPrefix; + Table hTable = conn.unwrap(PhoenixConnection.class) + .getQueryServices().getTable(tableName.getBytes()); + int rowCountFromHBase = 0; + byte[] rowKey = new byte[primaryKeyPrefix.length() + 3]; + System.arraycopy(Bytes.toBytes(primaryKeyPrefix), 0, rowKey, 1, + rowKey.length - 3); + for (int pk2Value : pk2Values) { + byte[] rowKeySuffix = Bytes.toBytes(String.format("%02d", pk2Value)); + rowKey[rowKey.length - 2] = rowKeySuffix[0]; + rowKey[rowKey.length - 1] = rowKeySuffix[1]; + rowKey[0] = SaltingUtil.getSaltingByte(rowKey, 1, rowKey.length - 1, + saltBucketCount); + Get get = new Get(rowKey); + if (!hTable.get(get).isEmpty()) { + rowCountFromHBase++; + } + } + // Assert all point lookups are visible from HBase + Assert.assertEquals(expectedRowCount, rowCountFromHBase); + StringBuilder pointLookupDql = new StringBuilder("SELECT COUNT(*) FROM "); + pointLookupDql.append(tableName); + pointLookupDql.append(" WHERE PK1=? AND PK2 IN (?"); + for (int i = 1; i < pk2Values.length; i++) { + pointLookupDql.append(",?"); + } + pointLookupDql.append(")"); + try (PreparedStatement stmt = conn.prepareStatement(pointLookupDql.toString())) { + stmt.setString(1, firstPrimaryKey); + for (int i = 0; i < pk2Values.length; i++) { + stmt.setString(i + 2, + String.format(secondPrimaryKeyPrefix + "%02d", i)); + } + ResultSet rs = stmt.executeQuery(); + rs.next(); + int rowsVisible = rs.getInt(1); + rs.close(); + // Assert all point lookups are visible from Phoenix + Assert.assertEquals(expectedRowCount, rowsVisible); + } + } + + private void triggerPhoenix7580(Connection conn, String tableName, int saltBucketCount, + String primaryKeyPrefixForNewRows, int[] pk2ValuesForNewRows) + throws Exception { + addRows(conn, tableName, primaryKeyPrefixForNewRows, pk2ValuesForNewRows, true); + + byte[] expectedEndKeyPrefixAfterSplit; + // Compute split key for splitting region corresponding to the second last salt bucket. + byte[] splitKey = null; + byte[] rowKeyPrefix = new byte[primaryKeyPrefixForNewRows.length() + 1]; + System.arraycopy(Bytes.toBytes(primaryKeyPrefixForNewRows), 0, + rowKeyPrefix, 1, rowKeyPrefix.length - 1); + // Doing minus 2 from salt bucket count to get second last bucket. + // Salt buckets are 0 indexed. + rowKeyPrefix[0] = (byte) (saltBucketCount - 2); + // Save this and will be used to verify that conditions to trigger PHOENIX-7580 are + // being met at the end of this method call. + expectedEndKeyPrefixAfterSplit = Bytes.copy(rowKeyPrefix); + Table hTable = conn.unwrap(PhoenixConnection.class) + .getQueryServices().getTable(tableName.getBytes()); + Scan scan = new Scan(); + scan.setStartStopRowForPrefixScan(rowKeyPrefix); + boolean pastFirstRow = false; + try (ResultScanner scanner = hTable.getScanner(scan)) { + Result r; + while((r = scanner.next()) != null) { + if (pastFirstRow) { + // Use row key of 2nd row out of 3 newly inserted rows as split key + // later for splitting the region corresponding to the second last + // salt bucket. + splitKey = r.getRow(); + break; + } + pastFirstRow = true; + } + } + + // Identify region corresponding to the second last salt bucket for splitting + Admin admin = conn.unwrap(PhoenixConnection.class).getQueryServices().getAdmin(); + List<RegionInfo> regions = admin.getRegions(TableName.valueOf(tableName)); + RegionInfo secondLastSaltBucketRegion = null; + for (RegionInfo regionInfo : regions) { + byte[] startKey = regionInfo.getStartKey(); + byte[] endKey = regionInfo.getEndKey(); + if (startKey.length > 0 && startKey[0] == saltBucketCount - 2 + && endKey.length > 0 && endKey[0] == saltBucketCount - 1) { + secondLastSaltBucketRegion = regionInfo; + break; + } + } + Assert.assertNotNull("Not able to determine region of second last salt bucket", + secondLastSaltBucketRegion); + + // Split region corresponding to the second last salt bucket + admin.splitRegionAsync(secondLastSaltBucketRegion.getEncodedNameAsBytes(), + splitKey).get(); + regions = admin.getRegions(TableName.valueOf(tableName)); + // Verify that after split the conditions to reproduce PHOENIX-7580 are being met + for (RegionInfo regionInfo : regions) { + byte[] startKey = regionInfo.getStartKey(); + byte[] endKey = regionInfo.getEndKey(); + if (startKey.length > 0 && startKey[0] == saltBucketCount - 2) { + Assert.assertTrue( + Bytes.compareTo(expectedEndKeyPrefixAfterSplit, endKey) < 0); + break; + } + } + } + + private void createTable(Connection conn, String tableName, int saltBucketCount) + throws Exception { + String createTableDdl = "CREATE TABLE IF NOT EXISTS " + tableName + " (\n" + + " PK1 CHAR(5) NOT NULL,\n" + + " PK2 CHAR(6) NOT NULL,\n" + + " COL1 VARCHAR,\n" + + " CONSTRAINT PK PRIMARY KEY (\n" + + " PK1,\n" + + " PK2 \n" + + " )\n" + + ") SALT_BUCKETS=" + saltBucketCount; + // Create table + try (Statement stmt = conn.createStatement()) { + stmt.execute(createTableDdl); + } + } + + private void addRows(Connection conn, String tableName, String primaryKeyPrefix, + int[] pk2Values, boolean skipUpdateStats) throws Exception { + String upsertDml = "UPSERT INTO " + tableName + " VALUES (?,?,?)"; + // Insert rows in the table with row key prefix at HBase level being + // {@code primaryKeyPrefix}. + try (PreparedStatement upsertStmt = conn.prepareStatement(upsertDml)) { + for (int i = 0; i < pk2Values.length; i++) { + upsertStmt.setString(1, primaryKeyPrefix); + upsertStmt.setString(2, String.format("pk2_%02d", pk2Values[i])); + upsertStmt.setString(3, "col1_" + i); + upsertStmt.executeUpdate(); + } + conn.commit(); + } + + if (!skipUpdateStats) { + try (Statement stmt = conn.createStatement()) { + stmt.execute("UPDATE STATISTICS " + tableName); + } + } + } +} diff --git a/phoenix-core/src/main/java/org/apache/phoenix/compile/ScanRanges.java b/phoenix-core/src/main/java/org/apache/phoenix/compile/ScanRanges.java index e2e4afac19..57f9f7add6 100644 --- a/phoenix-core/src/main/java/org/apache/phoenix/compile/ScanRanges.java +++ b/phoenix-core/src/main/java/org/apache/phoenix/compile/ScanRanges.java @@ -268,7 +268,11 @@ public class ScanRanges { // These is the start of the next bucket in byte[], without the PK suffix nextBucketByte = new byte[] { nextBucketStart[0] }; } - if (lastBucket || Bytes.compareTo(originalStopKey, nextBucketStart) <= 0) { + // PHOENIX-7580: Empty stop key is the biggest possible stop key. + // Special handling of empty stop else Byte comparison will treat empty stop key + // as smallest possible stop key + if (lastBucket || originalStopKey.length > 0 + && Bytes.compareTo(originalStopKey, nextBucketStart) <= 0) { // either we don't need to add synthetic guideposts, or we already have, and // are at the last bucket of the original scan addIfNotNull(newScans, intersectScan(scan, wrkStartKey, originalStopKey, diff --git a/phoenix-core/src/test/java/org/apache/phoenix/query/BaseTest.java b/phoenix-core/src/test/java/org/apache/phoenix/query/BaseTest.java index 9b135d9e1d..8d21212931 100644 --- a/phoenix-core/src/test/java/org/apache/phoenix/query/BaseTest.java +++ b/phoenix-core/src/test/java/org/apache/phoenix/query/BaseTest.java @@ -430,6 +430,13 @@ public abstract class BaseTest { } return url; } + + protected static String getUrl(String principal) throws Exception { + if (!clusterInitialized) { + throw new IllegalStateException("Cluster must be initialized before attempting to get the URL"); + } + return getLocalClusterUrl(utility, principal); + } protected static String checkClusterInitialized(ReadOnlyProps serverProps) throws Exception { if (!clusterInitialized) { @@ -556,6 +563,13 @@ public abstract class BaseTest { String url = QueryUtil.getConnectionUrl(new Properties(), util.getConfiguration()); return url + PHOENIX_TEST_DRIVER_URL_PARAM; } + + protected static String getLocalClusterUrl(HBaseTestingUtility util, String principal) + throws Exception { + String url = QueryUtil.getConnectionUrl(new Properties(), util.getConfiguration(), + principal); + return url + PHOENIX_TEST_DRIVER_URL_PARAM; + } /** * Initialize the cluster in distributed mode