haridsv commented on code in PR #2109:
URL: https://github.com/apache/phoenix/pull/2109#discussion_r2041524512


##########
phoenix-core/src/it/java/org/apache/phoenix/end2end/salted/SaltedTableWithParallelStatsEnabledIT.java:
##########
@@ -0,0 +1,366 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.phoenix.end2end.salted;
+
+import org.apache.hadoop.hbase.TableName;
+import org.apache.hadoop.hbase.client.Admin;
+import org.apache.hadoop.hbase.client.Get;
+import org.apache.hadoop.hbase.client.RegionInfo;
+import org.apache.hadoop.hbase.client.Result;
+import org.apache.hadoop.hbase.client.ResultScanner;
+import org.apache.hadoop.hbase.client.Scan;
+import org.apache.hadoop.hbase.client.Table;
+import org.apache.hadoop.hbase.util.Bytes;
+import org.apache.phoenix.end2end.ParallelStatsEnabledIT;
+import org.apache.phoenix.jdbc.PhoenixConnection;
+import org.apache.phoenix.query.QueryServices;
+import org.junit.Test;
+import org.junit.Assert;
+import org.junit.experimental.categories.Category;
+
+import java.sql.Connection;
+import java.sql.DriverManager;
+import java.sql.ResultSet;
+import java.sql.PreparedStatement;
+import java.sql.Statement;
+import java.util.List;
+import java.util.Properties;
+
+
+@Category(ParallelStatsEnabledIT.class)
+public class SaltedTableWithParallelStatsEnabledIT extends 
ParallelStatsEnabledIT {
+
+    private void testPhoenix7580(boolean withStatsForParallelization, boolean 
withFullTableScan,
+                                 boolean withPointLookups)
+            throws Exception {
+        String tableName = generateUniqueName();
+        int saltBucketCount = 5;
+        int rowsToInsert = saltBucketCount * 10;
+        String primaryKeyPrefix = "pk1_1";
+        // First 3 values are from salt bucket 0, next 3 are from salt bucket 
1 and so on
+        // till the last salt bucket. These values have been specifically 
selected so that
+        // we have values from all the 5 salt buckets.
+        int[] pk2ValuesForPointLookups = new int[]
+                { 4, 9, 13, 0, 5, 14, 1, 6, 10, 2, 7, 11, 3, 8, 12 };
+        int pointLookupsPerSaltBkt = pk2ValuesForPointLookups.length / 
saltBucketCount;
+
+        String connProfile = "testRangeScanForPhoenix7580" + 
withStatsForParallelization;
+        Properties props = new Properties();
+        props.setProperty(QueryServices.USE_STATS_FOR_PARALLELIZATION,
+                withStatsForParallelization ? Boolean.TRUE.toString() : 
Boolean.FALSE.toString());
+        try (Connection conn = 
DriverManager.getConnection(getUrl(connProfile), props)) {
+            createTable(conn, tableName, saltBucketCount);
+            addRows(conn, tableName, primaryKeyPrefix, rowsToInsert);
+
+            // Run COUNT(*) range query on Phoenix with row key prefix as 
{@code primaryKeyPrefix}.
+            // Assert that count of rows reported by Phoenix is same as count 
of rows in HBase.
+            if (withFullTableScan) {
+                assertFullScanRowCntFromHBaseAndPhoenix(conn, rowsToInsert, 
tableName);
+            }
+            else if (withPointLookups) {
+                assertPointLookupsRowCntFromHBaseAndPhoenix(conn, 
pk2ValuesForPointLookups.length,
+                        tableName, saltBucketCount, primaryKeyPrefix, 
pk2ValuesForPointLookups,
+                        pointLookupsPerSaltBkt);
+            }
+            else {
+                assertRangeScanRowCntFromHBaseAndPhoenix(conn, rowsToInsert, 
tableName,
+                        saltBucketCount, primaryKeyPrefix);
+            }
+
+            // ********** Create conditions to trigger PHOENIX-7580 ***********
+
+            // Insert 3 rows with row key prefix greater than the row key 
prefix of rows
+            // earlier inserted.
+            String primaryKeyPrefixForNewRows = "pk1_2";
+            // These values have been carefully selected such that newly 
inserted rows go to
+            // second last salt bucket when salt bucket count = 5.
+            int[] pk2ValuesForNewRows = new int[] { 1, 6, 10 };
+            triggerPhoenix7580(conn, tableName, saltBucketCount, 
primaryKeyPrefixForNewRows,
+                    pk2ValuesForNewRows);
+
+            // **** Conditions to trigger PHOENIX-7580 have been satisfied. 
Test the fix now. ****
+
+            if (withFullTableScan) {
+                assertFullScanRowCntFromHBaseAndPhoenix(conn,
+                        rowsToInsert + pk2ValuesForNewRows.length, tableName);
+            }
+            else if (withPointLookups) {
+                assertPointLookupsRowCntFromHBaseAndPhoenix(conn, 
pk2ValuesForPointLookups.length,
+                        tableName, saltBucketCount, primaryKeyPrefix, 
pk2ValuesForPointLookups,
+                        pointLookupsPerSaltBkt);
+            }
+            else {
+                assertRangeScanRowCntFromHBaseAndPhoenix(conn, rowsToInsert, 
tableName,
+                        saltBucketCount, primaryKeyPrefix);
+            }
+        }
+    }
+
+    private void assertRangeScanRowCntFromHBaseAndPhoenix(Connection conn, int 
expectedRowCount,
+                                                          String tableName, 
int saltBucketCount,
+                                                          String 
primaryKeyPrefix)
+            throws Exception {
+        Table hTable = conn.unwrap(PhoenixConnection.class)
+                .getQueryServices().getTable(tableName.getBytes());
+        int rowCountFromHBase = 0;
+        byte[] rowKeyPrefix = new byte[primaryKeyPrefix.length() + 1];
+        System.arraycopy(Bytes.toBytes(primaryKeyPrefix), 0, rowKeyPrefix, 1,
+                rowKeyPrefix.length - 1);
+        for (int i = 0; i< saltBucketCount; i++) {
+            rowKeyPrefix[0] = (byte) i;
+            Scan scan = new Scan();
+            scan.setStartStopRowForPrefixScan(rowKeyPrefix);
+            try (ResultScanner scanner = hTable.getScanner(scan)) {
+                while(scanner.next() != null) {
+                    rowCountFromHBase++;
+                }
+            }
+        }
+        // Assert all the rows are visible on running prefix scan from HBase
+        Assert.assertEquals(expectedRowCount, rowCountFromHBase);
+        String rangeScanDql = "SELECT COUNT(*) FROM " + tableName + " WHERE 
PK1=?";
+        try (PreparedStatement stmt = conn.prepareStatement(rangeScanDql)) {
+            stmt.setString(1, primaryKeyPrefix);
+            ResultSet rs = stmt.executeQuery();
+            rs.next();
+            int rowsVisible = rs.getInt(1);
+            rs.close();
+            // Assert all the rows are visible on running range query from 
Phoenix
+            Assert.assertEquals(expectedRowCount, rowsVisible);
+        }
+    }
+
+    private void assertFullScanRowCntFromHBaseAndPhoenix(Connection conn, int 
expectedRowCount,
+                                                         String tableName) 
throws Exception {
+        Table hTable = conn.unwrap(PhoenixConnection.class)
+                .getQueryServices().getTable(tableName.getBytes());
+        int rowCountFromHBase = 0;
+        Scan scan = new Scan();
+        try (ResultScanner scanner = hTable.getScanner(scan)) {
+            while(scanner.next() != null) {
+                rowCountFromHBase++;
+            }
+        }
+        // Assert all the rows are visible on full table scan from HBase
+        Assert.assertEquals(expectedRowCount, rowCountFromHBase);
+        String fullScanDql = "SELECT COUNT(*) FROM " + tableName;
+        try (PreparedStatement stmt = conn.prepareStatement(fullScanDql)) {
+            ResultSet rs = stmt.executeQuery();
+            rs.next();
+            int rowsVisible = rs.getInt(1);
+            rs.close();
+            // Assert all the rows are visible on full table scan from Phoenix
+            Assert.assertEquals(expectedRowCount, rowsVisible);
+        }
+    }
+
+    private void assertPointLookupsRowCntFromHBaseAndPhoenix(Connection conn, 
int expectedRowCount,
+                                                             String tableName, 
int saltBucketCount,
+                                                             String 
firstPrimaryKey,
+                                                             int[] pk2Values, 
int rowsPerSaltBkt)
+            throws Exception {
+        String secondPrimaryKeyPrefix = "pk2_";
+        String primaryKeyPrefix = firstPrimaryKey + secondPrimaryKeyPrefix;
+        Table hTable = conn.unwrap(PhoenixConnection.class)
+                .getQueryServices().getTable(tableName.getBytes());
+        int rowCountFromHBase = 0;
+        byte[] rowKey = new byte[primaryKeyPrefix.length() + 3];
+        System.arraycopy(Bytes.toBytes(primaryKeyPrefix), 0, rowKey, 1,
+                rowKey.length - 3);
+        for (int i = 0; i < saltBucketCount; i++) {
+            for (int j = i * rowsPerSaltBkt; j < (i + 1) * rowsPerSaltBkt; 
j++) {
+                rowKey[0] = (byte) i;
+                byte[] rowKeySuffix = Bytes.toBytes(String.format("%02d", 
pk2Values[j]));
+                rowKey[rowKey.length - 2] = rowKeySuffix[0];
+                rowKey[rowKey.length - 1] = rowKeySuffix[1];
+                Get get = new Get(rowKey);
+                if (!hTable.get(get).isEmpty()) {
+                    rowCountFromHBase++;
+                }
+            }
+        }
+        // Assert all point lookups are visible from HBase
+        Assert.assertEquals(expectedRowCount, rowCountFromHBase);
+        StringBuilder pointLookupDql = new StringBuilder("SELECT COUNT(*) FROM 
");
+        pointLookupDql.append(tableName);
+        pointLookupDql.append(" WHERE PK1=? AND PK2 IN (?");
+        for (int i = 1; i < pk2Values.length; i++) {
+            pointLookupDql.append(",?");
+        }
+        pointLookupDql.append(")");
+        try (PreparedStatement stmt = 
conn.prepareStatement(pointLookupDql.toString())) {
+            stmt.setString(1, firstPrimaryKey);
+            for (int i = 0; i < pk2Values.length; i++) {
+                stmt.setString(i + 2,
+                        String.format(secondPrimaryKeyPrefix + "%02d", i));
+            }
+            ResultSet rs = stmt.executeQuery();
+            rs.next();
+            int rowsVisible = rs.getInt(1);
+            rs.close();
+            // Assert all point lookups are visible from Phoenix
+            Assert.assertEquals(expectedRowCount, rowsVisible);
+        }
+    }
+
+    private void triggerPhoenix7580(Connection conn, String tableName, int 
saltBucketCount,
+                                    String primaryKeyPrefixForNewRows, int[] 
pk2ValuesForNewRows)
+            throws Exception {
+        String upsertDml = "UPSERT INTO " + tableName + " VALUES (?,?,?)";
+        try (PreparedStatement stmt = conn.prepareStatement(upsertDml)) {
+            for (int i: pk2ValuesForNewRows) {
+                stmt.setString(1, primaryKeyPrefixForNewRows);
+                stmt.setString(2, String.format("pk2_%02d", i));
+                stmt.setString(3, "col1_" + i);
+                stmt.executeUpdate();
+            }
+            conn.commit();
+        }
+
+        byte[] expectedEndKeyPrefixAfterSplit;
+        // Compute split key for splitting region corresponding to the second 
last salt bucket.
+        byte[] splitKey = null;
+        byte[] rowKeyPrefix = new byte[primaryKeyPrefixForNewRows.length() + 
1];
+        System.arraycopy(Bytes.toBytes(primaryKeyPrefixForNewRows), 0,
+                rowKeyPrefix, 1, rowKeyPrefix.length - 1);
+        // Doing minus 2 from salt bucket count to get second last bucket.
+        // Salt buckets are 0 indexed.
+        rowKeyPrefix[0] = (byte) (saltBucketCount - 2);
+        expectedEndKeyPrefixAfterSplit = new byte[rowKeyPrefix.length];
+        // Save this and will be used to verify that conditions to trigger 
PHOENIX-7580 are
+        // being met at the end of this method call.
+        System.arraycopy(rowKeyPrefix, 0,
+                expectedEndKeyPrefixAfterSplit, 0, rowKeyPrefix.length);

Review Comment:
   I meant to suggest `expectedEndKeyPrefixAfterSplit = 
Bytes.copy(rowKeyPrefix)`.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: [email protected]

For queries about this service, please contact Infrastructure at:
[email protected]

Reply via email to