This is an automated email from the ASF dual-hosted git repository.

tkhurana pushed a commit to branch PHOENIX-7562-feature-new
in repository https://gitbox.apache.org/repos/asf/phoenix.git


The following commit(s) were added to refs/heads/PHOENIX-7562-feature-new by 
this push:
     new 125994216f PHOENIX-7566 Fix periodic ZK->SYSTEM.HA_GROUP sync: pair 
ZK/HDFS URLs and avoid redundant rewrites (#2521)
125994216f is described below

commit 125994216f4fa7604f51b1df0869441f21a35719
Author: ritegarg <[email protected]>
AuthorDate: Fri Jun 12 11:09:34 2026 -0700

    PHOENIX-7566 Fix periodic ZK->SYSTEM.HA_GROUP sync: pair ZK/HDFS URLs and 
avoid redundant rewrites (#2521)
    
    The periodic ZK->SYSTEM.HA_GROUP sync 
(HAGroupStoreClient#syncZKToSystemTable)
    rewrote the slot-indexed ZK_URL_n / CLUSTER_URL_n / CLUSTER_ROLE_n columns 
in a
    local-first order but never wrote HDFS_URL_1 / HDFS_URL_2. On the cluster 
whose
    ZK URL did not already occupy slot 1, this flipped the ZK/cluster/role 
columns
    while leaving the HDFS columns in place, so ZK_URL_n no longer referred to 
the
    same cluster as HDFS_URL_n. The read path (getSystemTableHAGroupRecord) 
resolves
    a cluster's HDFS URL by matching its ZK URL slot, so the swap propagated a 
wrong
    local/peer HDFS URL into the ZK record and replication wrote to the wrong IN
    directory.
    
    Fix: updateSystemTableHAGroupRecordSilently now writes all slot columns
    (including HDFS_URL_1/2) in a deterministic canonical order keyed on the
    formatted ZK URL (mirroring ClusterRoleRecord's url1/url2 canonicalization).
    Because the order does not depend on which cluster is local, both clusters 
of a
    pair persist identical rows and ZK_URL_n always stays paired with 
HDFS_URL_n; the
    sync self-heals an already-desynced row on its next cycle.
    
    The sync also formats the ZK URLs in the candidate record it compares 
against the
    system table, so the skip-if-unchanged check matches once converged instead 
of
    rewriting (and logging) the row on every cycle.
    
    The periodic-sync IT seeds the corrupted (mispaired) shape, asserts the row 
is
    re-paired into canonical slot order, and asserts a subsequent cycle does not
    rewrite the row.
    
    Co-authored-by: Ritesh Garg 
<[email protected]>
    Co-authored-by: Cursor <[email protected]>
---
 .../apache/phoenix/jdbc/HAGroupStoreClient.java    |  69 ++++++--
 .../apache/phoenix/jdbc/HAGroupStoreClientIT.java  | 179 +++++++++++----------
 2 files changed, 153 insertions(+), 95 deletions(-)

diff --git 
a/phoenix-core-client/src/main/java/org/apache/phoenix/jdbc/HAGroupStoreClient.java
 
b/phoenix-core-client/src/main/java/org/apache/phoenix/jdbc/HAGroupStoreClient.java
index 9d2c268eae..6543f291f3 100644
--- 
a/phoenix-core-client/src/main/java/org/apache/phoenix/jdbc/HAGroupStoreClient.java
+++ 
b/phoenix-core-client/src/main/java/org/apache/phoenix/jdbc/HAGroupStoreClient.java
@@ -673,6 +673,29 @@ public class HAGroupStoreClient implements Closeable {
     valuesClause.append("?");
     parameters.add(haGroupName);
 
+    // Choose a deterministic slot order keyed on the formatted ZK URL (slot 1 
= the smaller URL),
+    // independent of which cluster is local, so both clusters write identical 
rows and each slot
+    // keeps its ZK/CLUSTER/ROLE/HDFS columns pointing at one cluster (the 
read path resolves a
+    // cluster's HDFS URL by matching its ZK URL slot).
+    String localZk =
+      record.getZkUrl() != null ? JDBCUtil.formatUrl(record.getZkUrl(), 
RegistryType.ZK) : null;
+    String peerZk = record.getPeerZKUrl() != null
+      ? JDBCUtil.formatUrl(record.getPeerZKUrl(), RegistryType.ZK)
+      : null;
+    boolean localFirst = StringUtils.isBlank(peerZk)
+      || (StringUtils.isNotBlank(localZk) && localZk.compareTo(peerZk) <= 0);
+
+    String zkUrl1 = localFirst ? localZk : peerZk;
+    String zkUrl2 = localFirst ? peerZk : localZk;
+    ClusterRoleRecord.ClusterRole role1 =
+      localFirst ? record.getClusterRole() : record.getPeerClusterRole();
+    ClusterRoleRecord.ClusterRole role2 =
+      localFirst ? record.getPeerClusterRole() : record.getClusterRole();
+    String clusterUrl1 = localFirst ? record.getClusterUrl() : 
record.getPeerClusterUrl();
+    String clusterUrl2 = localFirst ? record.getPeerClusterUrl() : 
record.getClusterUrl();
+    String hdfsUrl1 = localFirst ? record.getHdfsUrl() : 
record.getPeerHdfsUrl();
+    String hdfsUrl2 = localFirst ? record.getPeerHdfsUrl() : 
record.getHdfsUrl();
+
     // Update non-null fields only.
     if (record.getPolicy() != null) {
       updateQuery.append(", ").append(POLICY);
@@ -680,40 +703,52 @@ public class HAGroupStoreClient implements Closeable {
       parameters.add(record.getPolicy().toString());
     }
 
-    if (record.getClusterRole() != null) {
+    if (role1 != null) {
       updateQuery.append(", ").append(CLUSTER_ROLE_1);
       valuesClause.append(", ?");
-      parameters.add(record.getClusterRole().name());
+      parameters.add(role1.name());
     }
 
-    if (record.getPeerClusterRole() != null) {
+    if (role2 != null) {
       updateQuery.append(", ").append(CLUSTER_ROLE_2);
       valuesClause.append(", ?");
-      parameters.add(record.getPeerClusterRole().name());
+      parameters.add(role2.name());
     }
 
-    if (record.getClusterUrl() != null) {
+    if (clusterUrl1 != null) {
       updateQuery.append(", ").append(CLUSTER_URL_1);
       valuesClause.append(", ?");
-      parameters.add(record.getClusterUrl());
+      parameters.add(clusterUrl1);
     }
 
-    if (record.getPeerClusterUrl() != null) {
+    if (clusterUrl2 != null) {
       updateQuery.append(", ").append(CLUSTER_URL_2);
       valuesClause.append(", ?");
-      parameters.add(record.getPeerClusterUrl());
+      parameters.add(clusterUrl2);
     }
 
-    if (record.getZkUrl() != null) {
+    if (zkUrl1 != null) {
       updateQuery.append(", ").append(ZK_URL_1);
       valuesClause.append(", ?");
-      parameters.add(record.getZkUrl());
+      parameters.add(zkUrl1);
     }
 
-    if (record.getPeerZKUrl() != null) {
+    if (zkUrl2 != null) {
       updateQuery.append(", ").append(ZK_URL_2);
       valuesClause.append(", ?");
-      parameters.add(record.getPeerZKUrl());
+      parameters.add(zkUrl2);
+    }
+
+    if (hdfsUrl1 != null) {
+      updateQuery.append(", ").append(HDFS_URL_1);
+      valuesClause.append(", ?");
+      parameters.add(hdfsUrl1);
+    }
+
+    if (hdfsUrl2 != null) {
+      updateQuery.append(", ").append(HDFS_URL_2);
+      valuesClause.append(", ?");
+      parameters.add(hdfsUrl2);
     }
 
     if (record.getAdminCRRVersion() > 0) {
@@ -794,11 +829,17 @@ public class HAGroupStoreClient implements Closeable {
       ClusterRoleRecord.ClusterRole peerClusterRole = peerZkRecord != null
         ? peerZkRecord.getClusterRole()
         : ClusterRoleRecord.ClusterRole.UNKNOWN;
-      // Create SystemTableHAGroupRecord from ZK data
+      // Create SystemTableHAGroupRecord from ZK data. Format the ZK URLs so 
they match the
+      // normalized values getSystemTableHAGroupRecord() reads back; otherwise 
the equality check
+      // below never matches and the sync rewrites (and logs) the row every 
cycle.
+      String formattedZkUrl = JDBCUtil.formatUrl(this.zkUrl, RegistryType.ZK);
+      String formattedPeerZkUrl = 
StringUtils.isNotBlank(zkRecord.getPeerZKUrl())
+        ? JDBCUtil.formatUrl(zkRecord.getPeerZKUrl(), RegistryType.ZK)
+        : zkRecord.getPeerZKUrl();
       SystemTableHAGroupRecord newSystemTableRecord =
         new 
SystemTableHAGroupRecord(HighAvailabilityPolicy.valueOf(zkRecord.getPolicy()),
           zkRecord.getClusterRole(), peerClusterRole, zkRecord.getClusterUrl(),
-          zkRecord.getPeerClusterUrl(), this.zkUrl, zkRecord.getPeerZKUrl(), 
zkRecord.getHdfsUrl(),
+          zkRecord.getPeerClusterUrl(), formattedZkUrl, formattedPeerZkUrl, 
zkRecord.getHdfsUrl(),
           zkRecord.getPeerHdfsUrl(), zkRecord.getAdminCRRVersion());
 
       // Read existing record from system table to check if update is needed
diff --git 
a/phoenix-core/src/it/java/org/apache/phoenix/jdbc/HAGroupStoreClientIT.java 
b/phoenix-core/src/it/java/org/apache/phoenix/jdbc/HAGroupStoreClientIT.java
index 8997fc2bc4..e70dbd5bda 100644
--- a/phoenix-core/src/it/java/org/apache/phoenix/jdbc/HAGroupStoreClientIT.java
+++ b/phoenix-core/src/it/java/org/apache/phoenix/jdbc/HAGroupStoreClientIT.java
@@ -56,11 +56,18 @@ import org.apache.commons.lang3.StringUtils;
 import org.apache.commons.lang3.tuple.Pair;
 import org.apache.curator.framework.recipes.cache.PathChildrenCacheListener;
 import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.hbase.Cell;
+import org.apache.hadoop.hbase.TableName;
+import org.apache.hadoop.hbase.client.Get;
+import org.apache.hadoop.hbase.client.Result;
+import org.apache.hadoop.hbase.client.Table;
+import org.apache.hadoop.hbase.util.Bytes;
 import org.apache.phoenix.end2end.NeedsOwnMiniClusterTest;
 import org.apache.phoenix.exception.InvalidClusterRoleTransitionException;
 import org.apache.phoenix.exception.StaleClusterRoleRecordVersionException;
 import org.apache.phoenix.util.HAGroupStoreTestUtil;
 import org.apache.phoenix.util.JDBCUtil;
+import org.apache.phoenix.util.SchemaUtil;
 import org.apache.zookeeper.data.Stat;
 import org.junit.After;
 import org.junit.Before;
@@ -1108,40 +1115,36 @@ public class HAGroupStoreClientIT extends HABaseIT {
   public void testPeriodicSyncJobExecutorStartsAndSyncsData() throws Exception 
{
     String haGroupName = testName.getMethodName();
 
-    // 1. Setup: Create initial system table record with default values
+    // Seed the system table in a desynced shape: the HDFS URLs are stored in 
the opposite slot
+    // order from the ZK URLs (HDFS_URL_1 holds the peer's HDFS URL). The 
periodic sync must re-pair
+    // them so each slot's ZK_URL_n and HDFS_URL_n refer to the same cluster. 
ZK (created below) is
+    // the source of truth.
     HAGroupStoreTestUtil.upsertHAGroupRecordInSystemTable(haGroupName, 
this.zkUrl, this.peerZKUrl,
       this.masterUrl, this.peerMasterUrl, ClusterRoleRecord.ClusterRole.ACTIVE,
-      ClusterRoleRecord.ClusterRole.STANDBY, null, CLUSTERS.getHdfsUrl1(), 
CLUSTERS.getHdfsUrl2());
+      ClusterRoleRecord.ClusterRole.STANDBY, null, CLUSTERS.getHdfsUrl2(), 
CLUSTERS.getHdfsUrl1());
 
-    // 2. Create ZK record with DIFFERENT values for testable fields (skip 
zkUrl changes)
+    // ZK carries the values the sync should propagate: locally 
DEGRADED_STANDBY (-> STANDBY role)
+    // with updated cluster URLs, peer STANDBY_TO_ACTIVE, version 5, HDFS URLs 
correctly paired.
     String updatedClusterUrl = this.masterUrl + ":updated";
     String updatedPeerClusterUrl = this.peerMasterUrl + ":updated";
     HAGroupStoreRecord zkRecord = new HAGroupStoreRecord("v2.0", haGroupName,
-      HAGroupStoreRecord.HAGroupState.DEGRADED_STANDBY, 
System.currentTimeMillis(), // Different
-                                                                               
     // state and
-                                                                               
     // sync time
-      HighAvailabilityPolicy.FAILOVER.toString(), this.peerZKUrl, // Keep 
original peer ZK URL
-      updatedClusterUrl, // Different cluster URL
-      updatedPeerClusterUrl, // Different peer cluster URL
-      CLUSTERS.getHdfsUrl1(), CLUSTERS.getHdfsUrl2(), 5L); // Different version
+      HAGroupStoreRecord.HAGroupState.DEGRADED_STANDBY, 
System.currentTimeMillis(),
+      HighAvailabilityPolicy.FAILOVER.toString(), this.peerZKUrl, 
updatedClusterUrl,
+      updatedPeerClusterUrl, CLUSTERS.getHdfsUrl1(), CLUSTERS.getHdfsUrl2(), 
5L);
     createOrUpdateHAGroupStoreRecordOnZookeeper(haAdmin, haGroupName, 
zkRecord);
-
-    // Also create a peer ZK record with STANDBY_TO_ACTIVE role to test peer 
role sync
     HAGroupStoreRecord peerZkRecord =
       new HAGroupStoreRecord("v2.0", haGroupName, 
HAGroupStoreRecord.HAGroupState.STANDBY_TO_ACTIVE,
-        0L, HighAvailabilityPolicy.FAILOVER.toString(), updatedClusterUrl, 
this.peerMasterUrl,
-        updatedClusterUrl, CLUSTERS.getHdfsUrl1(), CLUSTERS.getHdfsUrl2(), 5L);
+        0L, HighAvailabilityPolicy.FAILOVER.toString(), this.zkUrl, 
updatedPeerClusterUrl,
+        updatedClusterUrl, CLUSTERS.getHdfsUrl2(), CLUSTERS.getHdfsUrl1(), 5L);
     createOrUpdateHAGroupStoreRecordOnZookeeper(peerHaAdmin, haGroupName, 
peerZkRecord);
 
-    // 3. Create HAGroupStoreClient with short sync interval for testing
     Configuration testConf = new 
Configuration(CLUSTERS.getHBaseCluster1().getConfiguration());
-    testConf.setLong("phoenix.ha.group.store.sync.interval.seconds", 15); // 
15 seconds for faster
-                                                                          // 
testing
+    testConf.setLong("phoenix.ha.group.store.sync.interval.seconds", 15);
 
     try (HAGroupStoreClient haGroupStoreClient =
       new HAGroupStoreClient(testConf, null, null, haGroupName, zkUrl)) {
 
-      // 3. Verify sync executor is running by checking private field via 
reflection
+      // The sync executor must be started and running.
       Field syncExecutorField = 
HAGroupStoreClient.class.getDeclaredField("syncExecutor");
       syncExecutorField.setAccessible(true);
       ScheduledExecutorService syncExecutor =
@@ -1149,71 +1152,85 @@ public class HAGroupStoreClientIT extends HABaseIT {
       assertNotNull("Sync executor should be initialized", syncExecutor);
       assertFalse("Sync executor should not be shutdown", 
syncExecutor.isShutdown());
 
-      // 4. Wait for at least one sync cycle (with jitter buffer)
-      Thread.sleep(25000); // Wait 25 seconds (15s + 10s buffer for jitter)
-
-      // 5. Verify that system table was updated with ZK data (ZK is source of 
truth)
-      // Check system table directly to see if all fields were synced
-      try (
-        PhoenixConnection conn = (PhoenixConnection) DriverManager
-          .getConnection(JDBC_PROTOCOL_ZK + JDBC_PROTOCOL_SEPARATOR + zkUrl);
-        Statement stmt = conn.createStatement(); ResultSet rs = 
stmt.executeQuery("SELECT * FROM "
-          + SYSTEM_HA_GROUP_NAME + " WHERE HA_GROUP_NAME = '" + haGroupName + 
"'")) {
-        assertTrue("System table should have record", rs.next());
-
-        // Verify all fields were synced from ZK with the UPDATED values 
(except zkUrls which remain
-        // unchanged)
-        assertEquals("HA_GROUP_NAME should match", haGroupName, 
rs.getString("HA_GROUP_NAME"));
-        assertEquals("POLICY should be synced from ZK", "FAILOVER", 
rs.getString("POLICY"));
-        assertEquals("VERSION should be synced from ZK", 5L, 
rs.getLong("VERSION"));
-        assertEquals("ZK_URL_1 should remain unchanged", this.zkUrl, 
rs.getString("ZK_URL_1"));
-        assertEquals("ZK_URL_2 should remain unchanged", this.peerZKUrl, 
rs.getString("ZK_URL_2"));
-        assertEquals("CLUSTER_ROLE_1 should be synced", "STANDBY", 
rs.getString("CLUSTER_ROLE_1")); // DEGRADED_STANDBY
-                                                                               
                     // maps
-                                                                               
                     // to
-                                                                               
                     // STANDBY
-                                                                               
                     // role
-        assertEquals("CLUSTER_ROLE_2 should be synced", "STANDBY_TO_ACTIVE",
-          rs.getString("CLUSTER_ROLE_2")); // Peer role from peer ZK
-        assertEquals("CLUSTER_URL_1 should be synced", updatedClusterUrl,
-          rs.getString("CLUSTER_URL_1"));
-        assertEquals("CLUSTER_URL_2 should be synced", updatedPeerClusterUrl,
-          rs.getString("CLUSTER_URL_2"));
-
-        // All fields successfully verified - sync job is working correctly
-      }
+      // After one sync cycle (15s interval + up to 10s jitter), the table 
mirrors ZK with the
+      // ZK/CLUSTER/ROLE/HDFS columns re-paired into canonical slot order.
+      Thread.sleep(25000);
+      assertSyncedRowIsPaired(haGroupName, updatedClusterUrl, 
updatedPeerClusterUrl);
+      long rowTimestampAfterConverge = systemTableRowTimestamp(haGroupName);
+
+      // The row now matches ZK, so subsequent cycles must skip the upsert 
entirely. Wait for at
+      // least one more cycle and assert the row is neither changed nor 
rewritten (idempotent sync).
+      Thread.sleep(16000);
+      assertSyncedRowIsPaired(haGroupName, updatedClusterUrl, 
updatedPeerClusterUrl);
+      assertEquals("Periodic sync must not rewrite an unchanged row", 
rowTimestampAfterConverge,
+        systemTableRowTimestamp(haGroupName));
+
+      // close() shuts the executor down.
+      haGroupStoreClient.close();
+      assertTrue("Sync executor should be shutdown after close", 
syncExecutor.isShutdown());
+    }
+  }
 
-      // 7. Test that no update happens when system table is already in sync 
with ZK
-      // Wait for another sync cycle to ensure the optimization is working
-      Thread.sleep(16000); // Wait for another sync cycle (15s + 1s buffer)
-
-      // Verify system table still has the same data (no redundant updates)
-      try (
-        PhoenixConnection conn = (PhoenixConnection) DriverManager
-          .getConnection(JDBC_PROTOCOL_ZK + JDBC_PROTOCOL_SEPARATOR + zkUrl);
-        Statement stmt = conn.createStatement(); ResultSet rs = 
stmt.executeQuery("SELECT * FROM "
-          + SYSTEM_HA_GROUP_NAME + " WHERE HA_GROUP_NAME = '" + haGroupName + 
"'")) {
-        assertTrue("System table should still have record", rs.next());
-
-        // Verify all fields remain the same (no unnecessary update occurred)
-        assertEquals("VERSION should remain the same", 5L, 
rs.getLong("VERSION"));
-        assertEquals("CLUSTER_ROLE_1 should remain the same", "STANDBY",
-          rs.getString("CLUSTER_ROLE_1"));
-        assertEquals("CLUSTER_ROLE_2 should remain the same", 
"STANDBY_TO_ACTIVE",
-          rs.getString("CLUSTER_ROLE_2"));
-        assertEquals("CLUSTER_URL_1 should remain the same", updatedClusterUrl,
-          rs.getString("CLUSTER_URL_1"));
-        assertEquals("CLUSTER_URL_2 should remain the same", 
updatedPeerClusterUrl,
-          rs.getString("CLUSTER_URL_2"));
-
-        // This verifies that the equals() check is working and preventing 
redundant updates
-      }
+  /**
+   * Reads the SYSTEM.HA_GROUP row for {@code haGroupName} from the local 
cluster and asserts it is
+   * internally consistent after a ZK-&gt;table sync: whichever slot holds the 
local cluster's
+   * (formatted) ZK URL also holds the local cluster's CLUSTER_URL, 
CLUSTER_ROLE and HDFS_URL, with
+   * the peer's values in the other slot. Slot order is canonical 
(formatted-ZK alphabetical), so
+   * this holds regardless of which cluster lands in slot 1. The local 
cluster's expected role,
+   * version and HDFS URL are fixed by the test fixture (DEGRADED_STANDBY 
-&gt; STANDBY, version 5).
+   */
+  private void assertSyncedRowIsPaired(String haGroupName, String 
localClusterUrl,
+    String peerClusterUrl) throws SQLException {
+    String formattedLocalZk = JDBCUtil.formatUrl(this.zkUrl, 
ClusterRoleRecord.RegistryType.ZK);
+    String formattedPeerZk = JDBCUtil.formatUrl(this.peerZKUrl, 
ClusterRoleRecord.RegistryType.ZK);
+    try (
+      PhoenixConnection conn = (PhoenixConnection) DriverManager
+        .getConnection(JDBC_PROTOCOL_ZK + JDBC_PROTOCOL_SEPARATOR + zkUrl);
+      Statement stmt = conn.createStatement(); ResultSet rs = 
stmt.executeQuery(
+        "SELECT * FROM " + SYSTEM_HA_GROUP_NAME + " WHERE HA_GROUP_NAME = '" + 
haGroupName + "'")) {
+      assertTrue("System table should have a record for " + haGroupName, 
rs.next());
+      assertEquals("POLICY should be synced from ZK", "FAILOVER", 
rs.getString("POLICY"));
+      assertEquals("VERSION should be synced from ZK", 5L, 
rs.getLong("VERSION"));
+
+      // Resolve which slot the local cluster landed in, then assert every 
column in that slot
+      // belongs to the local cluster and every column in the other slot to 
the peer.
+      boolean localIsSlot1 = formattedLocalZk.equals(rs.getString("ZK_URL_1"));
+      assertEquals("slot 1 ZK URL", localIsSlot1 ? formattedLocalZk : 
formattedPeerZk,
+        rs.getString("ZK_URL_1"));
+      assertEquals("slot 2 ZK URL", localIsSlot1 ? formattedPeerZk : 
formattedLocalZk,
+        rs.getString("ZK_URL_2"));
+      assertEquals("CLUSTER_ROLE_1", localIsSlot1 ? "STANDBY" : 
"STANDBY_TO_ACTIVE",
+        rs.getString("CLUSTER_ROLE_1"));
+      assertEquals("CLUSTER_ROLE_2", localIsSlot1 ? "STANDBY_TO_ACTIVE" : 
"STANDBY",
+        rs.getString("CLUSTER_ROLE_2"));
+      assertEquals("CLUSTER_URL_1", localIsSlot1 ? localClusterUrl : 
peerClusterUrl,
+        rs.getString("CLUSTER_URL_1"));
+      assertEquals("CLUSTER_URL_2", localIsSlot1 ? peerClusterUrl : 
localClusterUrl,
+        rs.getString("CLUSTER_URL_2"));
+      assertEquals("HDFS_URL_1 must stay paired with ZK_URL_1",
+        localIsSlot1 ? CLUSTERS.getHdfsUrl1() : CLUSTERS.getHdfsUrl2(), 
rs.getString("HDFS_URL_1"));
+      assertEquals("HDFS_URL_2 must stay paired with ZK_URL_2",
+        localIsSlot1 ? CLUSTERS.getHdfsUrl2() : CLUSTERS.getHdfsUrl1(), 
rs.getString("HDFS_URL_2"));
+    }
+  }
 
-      // 8. Test cleanup - verify executor shuts down properly when we exit 
try-with-resources
-      // The close() will be called automatically, and we can verify shutdown 
in a separate
-      // assertion
-      haGroupStoreClient.close(); // Explicit close to test shutdown
-      assertTrue("Sync executor should be shutdown after close", 
syncExecutor.isShutdown());
+  /**
+   * Returns the latest HBase cell timestamp of the HA group's row in 
SYSTEM.HA_GROUP on the local
+   * cluster. An UPSERT always writes new cell versions, so a stable timestamp 
across a sync cycle
+   * proves the periodic sync skipped the write (i.e. the equality check 
matched).
+   */
+  private long systemTableRowTimestamp(String haGroupName) throws Exception {
+    Configuration conf = CLUSTERS.getHBaseCluster1().getConfiguration();
+    TableName physicalName =
+      SchemaUtil.getPhysicalTableName(Bytes.toBytes(SYSTEM_HA_GROUP_NAME), 
conf);
+    try (Table table = 
CLUSTERS.getHBaseCluster1().getConnection().getTable(physicalName)) {
+      Result result = table.get(new Get(Bytes.toBytes(haGroupName)));
+      long maxTimestamp = 0L;
+      for (Cell cell : result.rawCells()) {
+        maxTimestamp = Math.max(maxTimestamp, cell.getTimestamp());
+      }
+      assertTrue("System table row should exist for " + haGroupName, 
maxTimestamp > 0L);
+      return maxTimestamp;
     }
   }
 

Reply via email to