This is an automated email from the ASF dual-hosted git repository.

tkhurana pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/phoenix.git


The following commit(s) were added to refs/heads/master by this push:
     new d2fe17e015 PHOENIX-7518 Fix Flapper test in CDCQueryIT (#2066)
d2fe17e015 is described below

commit d2fe17e01508c15f5c387cc0b2a971a002a090a7
Author: tkhurana <[email protected]>
AuthorDate: Tue Jan 28 18:28:51 2025 -0800

    PHOENIX-7518 Fix Flapper test in CDCQueryIT (#2066)
    
    Co-authored-by: Tanuj Khurana <[email protected]>
---
 .../org/apache/phoenix/end2end/CDCQueryIT.java     | 32 ++++++++++++++++++++--
 1 file changed, 29 insertions(+), 3 deletions(-)

diff --git 
a/phoenix-core/src/it/java/org/apache/phoenix/end2end/CDCQueryIT.java 
b/phoenix-core/src/it/java/org/apache/phoenix/end2end/CDCQueryIT.java
index 094815638c..a1519cc64b 100644
--- a/phoenix-core/src/it/java/org/apache/phoenix/end2end/CDCQueryIT.java
+++ b/phoenix-core/src/it/java/org/apache/phoenix/end2end/CDCQueryIT.java
@@ -21,6 +21,7 @@ import org.apache.hadoop.hbase.TableName;
 import org.apache.hadoop.hbase.client.Scan;
 import org.apache.hadoop.hbase.filter.Filter;
 import org.apache.hadoop.hbase.filter.FilterList;
+import org.apache.hbase.thirdparty.com.google.common.collect.Sets;
 import org.apache.phoenix.coprocessorclient.BaseScannerRegionObserverConstants;
 import org.apache.phoenix.filter.DistinctPrefixFilter;
 import org.apache.phoenix.iterate.ResultIterator;
@@ -162,11 +163,14 @@ public class CDCQueryIT extends CDCBaseIT {
         return false;
     }
 
-    private void checkIndexPartitionIdCount(Connection conn, String cdcName) 
throws Exception {
+    private void checkIndexPartitionIdCount(Connection conn,
+                                            String tableName,
+                                            String cdcName) throws Exception {
+        // The number of partitions will be the number of non-empty salt 
buckets on the data table
+        int saltBuckets = getNonEmptySaltBucketCount(conn, tableName);
         // Verify that we can use retrieve partition ids
         ResultSet rs = conn.createStatement().executeQuery("SELECT 
PARTITION_ID() FROM "
                 + cdcName + " ORDER BY PARTITION_ID()");
-        int saltBuckets = tableSaltBuckets == null ? 1 : tableSaltBuckets;
         String[] partitionId = new String[saltBuckets];
         int[] countPerPartition = new int[saltBuckets];
         int partitionIndex = 0;
@@ -477,6 +481,11 @@ public class CDCQueryIT extends CDCBaseIT {
             for (Set<ChangeRow> batch: allBatches.get(tenantId)) {
                 changes.addAll(batch);
             }
+            long currentTime = System.currentTimeMillis();
+            long nextTime = changes.get(changes.size() - 1).getTimestamp() + 1;
+            if (nextTime > currentTime) {
+                Thread.sleep(nextTime - currentTime);
+            }
             verifyChangesViaSCN(tenantId, conn.createStatement().executeQuery(
                     addPartitionInList(conn, cdcFullName,"SELECT * FROM " + 
cdcFullName)),
                     datatableName, dataColumns, changes, CHANGE_IMG);
@@ -493,8 +502,25 @@ public class CDCQueryIT extends CDCBaseIT {
                             "SELECT /*+ CDC_INCLUDE(CHANGE, PRE, POST) */ * 
FROM " + cdcFullName)),
                     datatableName, dataColumns, changes, ALL_IMG);
             cdcIndexShouldNotBeUsedForDataTableQueries(conn, tableName, 
cdcName);
-            checkIndexPartitionIdCount(conn, cdcFullName);
+            checkIndexPartitionIdCount(conn, tableName, cdcFullName);
+        }
+    }
+
+    private int getNonEmptySaltBucketCount(Connection conn, String tableName) 
throws SQLException {
+        if (tableSaltBuckets == null) {
+            return 1;
+        }
+        Set<String> nonEmptySaltBuckets = Sets.newHashSet();
+        String query = "SELECT /*+ NO_INDEX */ ROWKEY_BYTES_STRING() FROM " + 
tableName;
+        try (ResultSet rs = conn.createStatement().executeQuery(query)) {
+            while (rs.next()) {
+                String rowKey = rs.getString(1); // StringBinary format
+                // the first 4 bytes will have the salt bucket id like "\x02"
+                String bucketID = rowKey.substring(0,4);
+                nonEmptySaltBuckets.add(bucketID);
+            }
         }
+        return nonEmptySaltBuckets.size();
     }
 
     private void _testSelectCDCImmutable(PTable.ImmutableStorageScheme 
immutableStorageScheme)

Reply via email to