This is an automated email from the ASF dual-hosted git repository.

jinsongzhou pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/amoro.git


The following commit(s) were added to refs/heads/master by this push:
     new 0ee321c1b [AMORO-3686][Improvement]: Use ZK's multi operation to 
ensure write consistency. (#3687)
0ee321c1b is described below

commit 0ee321c1b02c737b93965b2333802c55d0fb3206
Author: can <[email protected]>
AuthorDate: Thu Jul 31 16:32:31 2025 +0800

    [AMORO-3686][Improvement]: Use ZK's multi operation to ensure write 
consistency. (#3687)
    
    [Improvement]: Use ZK's multi operation in HighAvailabilityContainer to 
ensure write consistency and fix the followerLath word spelling error #3686
    
    Co-authored-by: wardli <[email protected]>
---
 .../amoro/server/HighAvailabilityContainer.java    | 41 +++++++++++++---------
 1 file changed, 25 insertions(+), 16 deletions(-)

diff --git 
a/amoro-ams/src/main/java/org/apache/amoro/server/HighAvailabilityContainer.java
 
b/amoro-ams/src/main/java/org/apache/amoro/server/HighAvailabilityContainer.java
index 97ed08c7b..6d15d3735 100644
--- 
a/amoro-ams/src/main/java/org/apache/amoro/server/HighAvailabilityContainer.java
+++ 
b/amoro-ams/src/main/java/org/apache/amoro/server/HighAvailabilityContainer.java
@@ -23,6 +23,7 @@ import org.apache.amoro.config.Configurations;
 import org.apache.amoro.properties.AmsHAProperties;
 import 
org.apache.amoro.shade.zookeeper3.org.apache.curator.framework.CuratorFramework;
 import 
org.apache.amoro.shade.zookeeper3.org.apache.curator.framework.CuratorFrameworkFactory;
+import 
org.apache.amoro.shade.zookeeper3.org.apache.curator.framework.api.transaction.CuratorOp;
 import 
org.apache.amoro.shade.zookeeper3.org.apache.curator.framework.recipes.leader.LeaderLatch;
 import 
org.apache.amoro.shade.zookeeper3.org.apache.curator.framework.recipes.leader.LeaderLatchListener;
 import 
org.apache.amoro.shade.zookeeper3.org.apache.curator.retry.ExponentialBackoffRetry;
@@ -46,7 +47,7 @@ public class HighAvailabilityContainer implements 
LeaderLatchListener {
   private final String optimizingServiceMasterPath;
   private final AmsServerInfo tableServiceServerInfo;
   private final AmsServerInfo optimizingServiceServerInfo;
-  private volatile CountDownLatch followerLath;
+  private volatile CountDownLatch followerLatch;
 
   public HighAvailabilityContainer(Configurations serviceConfig) throws 
Exception {
     if (serviceConfig.getBoolean(AmoroManagementConf.HA_ENABLE)) {
@@ -92,7 +93,7 @@ public class HighAvailabilityContainer implements 
LeaderLatchListener {
       tableServiceServerInfo = null;
       optimizingServiceServerInfo = null;
       // block follower latch forever when ha is disabled
-      followerLath = new CountDownLatch(1);
+      followerLatch = new CountDownLatch(1);
     }
   }
 
@@ -101,17 +102,25 @@ public class HighAvailabilityContainer implements 
LeaderLatchListener {
     if (leaderLatch != null) {
       leaderLatch.await();
       if (leaderLatch.hasLeadership()) {
+        CuratorOp tableServiceMasterPathOp =
+            zkClient
+                .transactionOp()
+                .setData()
+                .forPath(
+                    tableServiceMasterPath,
+                    JacksonUtil.toJSONString(tableServiceServerInfo)
+                        .getBytes(StandardCharsets.UTF_8));
+        CuratorOp optimizingServiceMasterPathOp =
+            zkClient
+                .transactionOp()
+                .setData()
+                .forPath(
+                    optimizingServiceMasterPath,
+                    JacksonUtil.toJSONString(optimizingServiceServerInfo)
+                        .getBytes(StandardCharsets.UTF_8));
         zkClient
-            .setData()
-            .forPath(
-                tableServiceMasterPath,
-                
JacksonUtil.toJSONString(tableServiceServerInfo).getBytes(StandardCharsets.UTF_8));
-        zkClient
-            .setData()
-            .forPath(
-                optimizingServiceMasterPath,
-                JacksonUtil.toJSONString(optimizingServiceServerInfo)
-                    .getBytes(StandardCharsets.UTF_8));
+            .transaction()
+            .forOperations(tableServiceMasterPathOp, 
optimizingServiceMasterPathOp);
       }
     }
     LOG.info("Became the leader of AMS");
@@ -119,8 +128,8 @@ public class HighAvailabilityContainer implements 
LeaderLatchListener {
 
   public void waitFollowerShip() throws Exception {
     LOG.info("Waiting to become the follower of AMS");
-    if (followerLath != null) {
-      followerLath.await();
+    if (followerLatch != null) {
+      followerLatch.await();
     }
     LOG.info("Became the follower of AMS");
   }
@@ -142,7 +151,7 @@ public class HighAvailabilityContainer implements 
LeaderLatchListener {
         "Table service server {} and optimizing service server {} got 
leadership",
         tableServiceServerInfo.toString(),
         optimizingServiceServerInfo.toString());
-    followerLath = new CountDownLatch(1);
+    followerLatch = new CountDownLatch(1);
   }
 
   @Override
@@ -151,7 +160,7 @@ public class HighAvailabilityContainer implements 
LeaderLatchListener {
         "Table service server {} and optimizing service server {} lost 
leadership",
         tableServiceServerInfo.toString(),
         optimizingServiceServerInfo.toString());
-    followerLath.countDown();
+    followerLatch.countDown();
   }
 
   private AmsServerInfo buildServerInfo(String host, int thriftBindPort, int 
restBindPort) {

Reply via email to