This is an automated email from the ASF dual-hosted git repository.
jinsongzhou pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/amoro.git
The following commit(s) were added to refs/heads/master by this push:
new 0ee321c1b [AMORO-3686][Improvement]: Use ZK's multi operation to
ensure write consistency. (#3687)
0ee321c1b is described below
commit 0ee321c1b02c737b93965b2333802c55d0fb3206
Author: can <[email protected]>
AuthorDate: Thu Jul 31 16:32:31 2025 +0800
[AMORO-3686][Improvement]: Use ZK's multi operation to ensure write
consistency. (#3687)
[Improvement]: Use ZK's multi operation in HighAvailabilityContainer to
ensure write consistency and fix the followerLath word spelling error #3686
Co-authored-by: wardli <[email protected]>
---
.../amoro/server/HighAvailabilityContainer.java | 41 +++++++++++++---------
1 file changed, 25 insertions(+), 16 deletions(-)
diff --git
a/amoro-ams/src/main/java/org/apache/amoro/server/HighAvailabilityContainer.java
b/amoro-ams/src/main/java/org/apache/amoro/server/HighAvailabilityContainer.java
index 97ed08c7b..6d15d3735 100644
---
a/amoro-ams/src/main/java/org/apache/amoro/server/HighAvailabilityContainer.java
+++
b/amoro-ams/src/main/java/org/apache/amoro/server/HighAvailabilityContainer.java
@@ -23,6 +23,7 @@ import org.apache.amoro.config.Configurations;
import org.apache.amoro.properties.AmsHAProperties;
import
org.apache.amoro.shade.zookeeper3.org.apache.curator.framework.CuratorFramework;
import
org.apache.amoro.shade.zookeeper3.org.apache.curator.framework.CuratorFrameworkFactory;
+import
org.apache.amoro.shade.zookeeper3.org.apache.curator.framework.api.transaction.CuratorOp;
import
org.apache.amoro.shade.zookeeper3.org.apache.curator.framework.recipes.leader.LeaderLatch;
import
org.apache.amoro.shade.zookeeper3.org.apache.curator.framework.recipes.leader.LeaderLatchListener;
import
org.apache.amoro.shade.zookeeper3.org.apache.curator.retry.ExponentialBackoffRetry;
@@ -46,7 +47,7 @@ public class HighAvailabilityContainer implements
LeaderLatchListener {
private final String optimizingServiceMasterPath;
private final AmsServerInfo tableServiceServerInfo;
private final AmsServerInfo optimizingServiceServerInfo;
- private volatile CountDownLatch followerLath;
+ private volatile CountDownLatch followerLatch;
public HighAvailabilityContainer(Configurations serviceConfig) throws
Exception {
if (serviceConfig.getBoolean(AmoroManagementConf.HA_ENABLE)) {
@@ -92,7 +93,7 @@ public class HighAvailabilityContainer implements
LeaderLatchListener {
tableServiceServerInfo = null;
optimizingServiceServerInfo = null;
// block follower latch forever when ha is disabled
- followerLath = new CountDownLatch(1);
+ followerLatch = new CountDownLatch(1);
}
}
@@ -101,17 +102,25 @@ public class HighAvailabilityContainer implements
LeaderLatchListener {
if (leaderLatch != null) {
leaderLatch.await();
if (leaderLatch.hasLeadership()) {
+ CuratorOp tableServiceMasterPathOp =
+ zkClient
+ .transactionOp()
+ .setData()
+ .forPath(
+ tableServiceMasterPath,
+ JacksonUtil.toJSONString(tableServiceServerInfo)
+ .getBytes(StandardCharsets.UTF_8));
+ CuratorOp optimizingServiceMasterPathOp =
+ zkClient
+ .transactionOp()
+ .setData()
+ .forPath(
+ optimizingServiceMasterPath,
+ JacksonUtil.toJSONString(optimizingServiceServerInfo)
+ .getBytes(StandardCharsets.UTF_8));
zkClient
- .setData()
- .forPath(
- tableServiceMasterPath,
-
JacksonUtil.toJSONString(tableServiceServerInfo).getBytes(StandardCharsets.UTF_8));
- zkClient
- .setData()
- .forPath(
- optimizingServiceMasterPath,
- JacksonUtil.toJSONString(optimizingServiceServerInfo)
- .getBytes(StandardCharsets.UTF_8));
+ .transaction()
+ .forOperations(tableServiceMasterPathOp,
optimizingServiceMasterPathOp);
}
}
LOG.info("Became the leader of AMS");
@@ -119,8 +128,8 @@ public class HighAvailabilityContainer implements
LeaderLatchListener {
public void waitFollowerShip() throws Exception {
LOG.info("Waiting to become the follower of AMS");
- if (followerLath != null) {
- followerLath.await();
+ if (followerLatch != null) {
+ followerLatch.await();
}
LOG.info("Became the follower of AMS");
}
@@ -142,7 +151,7 @@ public class HighAvailabilityContainer implements
LeaderLatchListener {
"Table service server {} and optimizing service server {} got
leadership",
tableServiceServerInfo.toString(),
optimizingServiceServerInfo.toString());
- followerLath = new CountDownLatch(1);
+ followerLatch = new CountDownLatch(1);
}
@Override
@@ -151,7 +160,7 @@ public class HighAvailabilityContainer implements
LeaderLatchListener {
"Table service server {} and optimizing service server {} lost
leadership",
tableServiceServerInfo.toString(),
optimizingServiceServerInfo.toString());
- followerLath.countDown();
+ followerLatch.countDown();
}
private AmsServerInfo buildServerInfo(String host, int thriftBindPort, int
restBindPort) {