This is an automated email from the ASF dual-hosted git repository.
tkhurana pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/phoenix.git
The following commit(s) were added to refs/heads/master by this push:
new d2fe17e015 PHOENIX-7518 Fix Flapper test in CDCQueryIT (#2066)
d2fe17e015 is described below
commit d2fe17e01508c15f5c387cc0b2a971a002a090a7
Author: tkhurana <[email protected]>
AuthorDate: Tue Jan 28 18:28:51 2025 -0800
PHOENIX-7518 Fix Flapper test in CDCQueryIT (#2066)
Co-authored-by: Tanuj Khurana <[email protected]>
---
.../org/apache/phoenix/end2end/CDCQueryIT.java | 32 ++++++++++++++++++++--
1 file changed, 29 insertions(+), 3 deletions(-)
diff --git
a/phoenix-core/src/it/java/org/apache/phoenix/end2end/CDCQueryIT.java
b/phoenix-core/src/it/java/org/apache/phoenix/end2end/CDCQueryIT.java
index 094815638c..a1519cc64b 100644
--- a/phoenix-core/src/it/java/org/apache/phoenix/end2end/CDCQueryIT.java
+++ b/phoenix-core/src/it/java/org/apache/phoenix/end2end/CDCQueryIT.java
@@ -21,6 +21,7 @@ import org.apache.hadoop.hbase.TableName;
import org.apache.hadoop.hbase.client.Scan;
import org.apache.hadoop.hbase.filter.Filter;
import org.apache.hadoop.hbase.filter.FilterList;
+import org.apache.hbase.thirdparty.com.google.common.collect.Sets;
import org.apache.phoenix.coprocessorclient.BaseScannerRegionObserverConstants;
import org.apache.phoenix.filter.DistinctPrefixFilter;
import org.apache.phoenix.iterate.ResultIterator;
@@ -162,11 +163,14 @@ public class CDCQueryIT extends CDCBaseIT {
return false;
}
- private void checkIndexPartitionIdCount(Connection conn, String cdcName)
throws Exception {
+ private void checkIndexPartitionIdCount(Connection conn,
+ String tableName,
+ String cdcName) throws Exception {
+ // The number of partitions will be the number of non-empty salt
buckets on the data table
+ int saltBuckets = getNonEmptySaltBucketCount(conn, tableName);
// Verify that we can use retrieve partition ids
ResultSet rs = conn.createStatement().executeQuery("SELECT
PARTITION_ID() FROM "
+ cdcName + " ORDER BY PARTITION_ID()");
- int saltBuckets = tableSaltBuckets == null ? 1 : tableSaltBuckets;
String[] partitionId = new String[saltBuckets];
int[] countPerPartition = new int[saltBuckets];
int partitionIndex = 0;
@@ -477,6 +481,11 @@ public class CDCQueryIT extends CDCBaseIT {
for (Set<ChangeRow> batch: allBatches.get(tenantId)) {
changes.addAll(batch);
}
+ long currentTime = System.currentTimeMillis();
+ long nextTime = changes.get(changes.size() - 1).getTimestamp() + 1;
+ if (nextTime > currentTime) {
+ Thread.sleep(nextTime - currentTime);
+ }
verifyChangesViaSCN(tenantId, conn.createStatement().executeQuery(
addPartitionInList(conn, cdcFullName,"SELECT * FROM " +
cdcFullName)),
datatableName, dataColumns, changes, CHANGE_IMG);
@@ -493,8 +502,25 @@ public class CDCQueryIT extends CDCBaseIT {
"SELECT /*+ CDC_INCLUDE(CHANGE, PRE, POST) */ *
FROM " + cdcFullName)),
datatableName, dataColumns, changes, ALL_IMG);
cdcIndexShouldNotBeUsedForDataTableQueries(conn, tableName,
cdcName);
- checkIndexPartitionIdCount(conn, cdcFullName);
+ checkIndexPartitionIdCount(conn, tableName, cdcFullName);
+ }
+ }
+
+ private int getNonEmptySaltBucketCount(Connection conn, String tableName)
throws SQLException {
+ if (tableSaltBuckets == null) {
+ return 1;
+ }
+ Set<String> nonEmptySaltBuckets = Sets.newHashSet();
+ String query = "SELECT /*+ NO_INDEX */ ROWKEY_BYTES_STRING() FROM " +
tableName;
+ try (ResultSet rs = conn.createStatement().executeQuery(query)) {
+ while (rs.next()) {
+ String rowKey = rs.getString(1); // StringBinary format
+ // the first 4 bytes will have the salt bucket id like "\x02"
+ String bucketID = rowKey.substring(0,4);
+ nonEmptySaltBuckets.add(bucketID);
+ }
}
+ return nonEmptySaltBuckets.size();
}
private void _testSelectCDCImmutable(PTable.ImmutableStorageScheme
immutableStorageScheme)