This is an automated email from the ASF dual-hosted git repository.
kezhuw pushed a commit to branch branch-3.9
in repository https://gitbox.apache.org/repos/asf/zookeeper.git
The following commit(s) were added to refs/heads/branch-3.9 by this push:
new fc1301121 ZOOKEEPER-4747: Add synchronous sync to ease synchronous
call chains (#2068)
fc1301121 is described below
commit fc13011217e2147327b69dd9a3b4192c68d282f0
Author: Kezhu Wang <[email protected]>
AuthorDate: Tue Sep 10 14:30:13 2024 +0800
ZOOKEEPER-4747: Add synchronous sync to ease synchronous call chains (#2068)
Previously, there is no synchronous `sync` so client has to convert
asynchronous `sync` a bit to fit synchronous call chains. This is
apparently unfriendly.
Besides above, in absent of ZOOKEEPER-22, we can't issue a fire and
forget asynchronous `sync` to gain strong consistent. So it becomes
crucial for client to have a convenient synchronous `sync`.
Refs: ZOOKEEPER-1167, ZOOKEEPER-4747
(cherry picked from commit 3e2d6f3436962cd4a0445bc6c22f583e425d7ced)
---
.../main/java/org/apache/zookeeper/ZooKeeper.java | 25 +++++++++++++
.../test/java/org/apache/zookeeper/ZKTestCase.java | 20 +++++++++++
.../server/quorum/EagerACLFilterTest.java | 18 ++--------
.../server/quorum/QuorumRequestPipelineTest.java | 13 +++++++
.../java/org/apache/zookeeper/test/QuorumTest.java | 41 ++++++++++------------
.../org/apache/zookeeper/test/SessionTest.java | 18 ++--------
6 files changed, 82 insertions(+), 53 deletions(-)
diff --git a/zookeeper-server/src/main/java/org/apache/zookeeper/ZooKeeper.java
b/zookeeper-server/src/main/java/org/apache/zookeeper/ZooKeeper.java
index 09006ed10..551e4602d 100644
--- a/zookeeper-server/src/main/java/org/apache/zookeeper/ZooKeeper.java
+++ b/zookeeper-server/src/main/java/org/apache/zookeeper/ZooKeeper.java
@@ -2666,6 +2666,31 @@ public class ZooKeeper implements AutoCloseable {
getEphemerals("/", cb, ctx);
}
+ /**
+ * Synchronous sync. Flushes channel between process and leader.
+ *
+ * @param path the given path
+ * @throws KeeperException If the server signals an error with a non-zero
error code
+ * @throws InterruptedException If the server transaction is interrupted.
+ * @throws IllegalArgumentException if an invalid path is specified
+ */
+ public void sync(final String path) throws KeeperException,
InterruptedException {
+ final String clientPath = path;
+ PathUtils.validatePath(clientPath);
+
+ final String serverPath = prependChroot(clientPath);
+
+ RequestHeader h = new RequestHeader();
+ h.setType(ZooDefs.OpCode.sync);
+ SyncRequest request = new SyncRequest();
+ SyncResponse response = new SyncResponse();
+ request.setPath(serverPath);
+ ReplyHeader r = cnxn.submitRequest(h, request, response, null);
+ if (r.getErr() != 0) {
+ throw KeeperException.create(KeeperException.Code.get(r.getErr()),
clientPath);
+ }
+ }
+
/**
* Asynchronous sync. Flushes channel between process and leader.
* @param path
diff --git
a/zookeeper-server/src/test/java/org/apache/zookeeper/ZKTestCase.java
b/zookeeper-server/src/test/java/org/apache/zookeeper/ZKTestCase.java
index b29deedb0..db2479718 100644
--- a/zookeeper-server/src/test/java/org/apache/zookeeper/ZKTestCase.java
+++ b/zookeeper-server/src/test/java/org/apache/zookeeper/ZKTestCase.java
@@ -22,6 +22,7 @@ import static org.junit.jupiter.api.Assertions.assertTrue;
import static org.junit.jupiter.api.Assertions.fail;
import java.io.File;
import java.time.Instant;
+import java.util.concurrent.CompletableFuture;
import org.apache.zookeeper.metrics.MetricsUtils;
import org.apache.zookeeper.util.ServiceUtils;
import org.hamcrest.CustomMatcher;
@@ -58,6 +59,25 @@ public class ZKTestCase {
return testName;
}
+ public void syncClient(ZooKeeper zk, boolean synchronous) throws
KeeperException {
+ if (synchronous) {
+ try {
+ zk.sync("/");
+ } catch (InterruptedException ex) {
+ throw new RuntimeException(ex);
+ }
+ return;
+ }
+ final CompletableFuture<KeeperException.Code> synced = new
CompletableFuture<>();
+ zk.sync("/", (rc, path, ctx) -> {
+ synced.complete(KeeperException.Code.get(rc));
+ }, null);
+ KeeperException.Code code = synced.join();
+ if (code != KeeperException.Code.OK) {
+ throw KeeperException.create(code);
+ }
+ }
+
@BeforeAll
public static void before() {
if (!testBaseDir.exists()) {
diff --git
a/zookeeper-server/src/test/java/org/apache/zookeeper/server/quorum/EagerACLFilterTest.java
b/zookeeper-server/src/test/java/org/apache/zookeeper/server/quorum/EagerACLFilterTest.java
index 4141c586f..1af669fb5 100644
---
a/zookeeper-server/src/test/java/org/apache/zookeeper/server/quorum/EagerACLFilterTest.java
+++
b/zookeeper-server/src/test/java/org/apache/zookeeper/server/quorum/EagerACLFilterTest.java
@@ -22,13 +22,11 @@ import static org.junit.jupiter.api.Assertions.assertEquals;
import static org.junit.jupiter.api.Assertions.assertNotEquals;
import static org.junit.jupiter.api.Assertions.assertTrue;
import static org.junit.jupiter.api.Assertions.fail;
-import java.util.concurrent.CompletableFuture;
import java.util.stream.Stream;
import org.apache.zookeeper.CreateMode;
import org.apache.zookeeper.KeeperException;
import org.apache.zookeeper.TestableZooKeeper;
import org.apache.zookeeper.ZooDefs.Ids;
-import org.apache.zookeeper.ZooKeeper;
import org.apache.zookeeper.server.ZooKeeperServer;
import org.apache.zookeeper.server.quorum.QuorumPeer.ServerState;
import org.apache.zookeeper.test.QuorumBase;
@@ -95,18 +93,6 @@ public class EagerACLFilterTest extends QuorumBase {
clientWatchB.waitForConnected(CONNECTION_TIMEOUT);
}
- void syncClient(ZooKeeper zk) {
- CompletableFuture<Void> synced = new CompletableFuture<>();
- zk.sync("/", (rc, path, ctx) -> {
- if (rc == 0) {
- synced.complete(null);
- } else {
-
synced.completeExceptionally(KeeperException.create(KeeperException.Code.get(rc)));
- }
- }, null);
- synced.join();
- }
-
@AfterEach
public void tearDown() throws Exception {
if (zkClient != null) {
@@ -124,7 +110,7 @@ public class EagerACLFilterTest extends QuorumBase {
ZooKeeperServer.setEnableEagerACLCheck(enabled);
}
- private void assertTransactionState(String operation, QuorumPeer peer,
long lastxid) {
+ private void assertTransactionState(String operation, QuorumPeer peer,
long lastxid) throws Exception {
if (peer == zkLeader && peer != zkConnected) {
// The operation is performed on no leader, but we are asserting
on leader.
// There is no happen-before between
`zkLeader.getLastLoggedZxid()` and
@@ -133,7 +119,7 @@ public class EagerACLFilterTest extends QuorumBase {
// to sync leader client to go through commit and response path in
leader to
// build happen-before between `zkLeader.getLastLoggedZxid()` and
side effect
// of previous operation.
- syncClient(zkLeaderClient);
+ syncClient(zkLeaderClient, false);
}
assertTrue(peer == zkLeader || peer == zkConnected);
boolean eagerACL = ZooKeeperServer.isEnableEagerACLCheck();
diff --git
a/zookeeper-server/src/test/java/org/apache/zookeeper/server/quorum/QuorumRequestPipelineTest.java
b/zookeeper-server/src/test/java/org/apache/zookeeper/server/quorum/QuorumRequestPipelineTest.java
index 624661044..f848b0289 100644
---
a/zookeeper-server/src/test/java/org/apache/zookeeper/server/quorum/QuorumRequestPipelineTest.java
+++
b/zookeeper-server/src/test/java/org/apache/zookeeper/server/quorum/QuorumRequestPipelineTest.java
@@ -26,6 +26,8 @@ import java.util.Arrays;
import java.util.HashSet;
import java.util.Set;
import java.util.concurrent.CountDownLatch;
+import java.util.concurrent.ForkJoinPool;
+import java.util.concurrent.ForkJoinTask;
import java.util.concurrent.TimeUnit;
import java.util.stream.Stream;
import org.apache.zookeeper.AsyncCallback.VoidCallback;
@@ -179,4 +181,15 @@ public class QuorumRequestPipelineTest extends QuorumBase {
assertTrue(complete, String.format("%s Sync completed", serverState));
}
+ @ParameterizedTest
+ @MethodSource("data")
+ public void testSynchronousSync(ServerState serverState) throws Exception {
+ setUp(serverState);
+ create2EmptyNode(zkClient, PARENT_PATH);
+ ForkJoinTask<Void> task = ForkJoinPool.commonPool().submit(() -> {
+ zkClient.sync(PARENT_PATH);
+ return null;
+ });
+ task.get(30, TimeUnit.SECONDS);
+ }
}
diff --git
a/zookeeper-server/src/test/java/org/apache/zookeeper/test/QuorumTest.java
b/zookeeper-server/src/test/java/org/apache/zookeeper/test/QuorumTest.java
index ff53b0fe9..01aa9ddc7 100644
--- a/zookeeper-server/src/test/java/org/apache/zookeeper/test/QuorumTest.java
+++ b/zookeeper-server/src/test/java/org/apache/zookeeper/test/QuorumTest.java
@@ -27,6 +27,7 @@ import java.io.IOException;
import java.util.ArrayList;
import java.util.Arrays;
import java.util.concurrent.TimeoutException;
+import org.apache.zookeeper.AsyncCallback;
import org.apache.zookeeper.CreateMode;
import org.apache.zookeeper.DummyWatcher;
import org.apache.zookeeper.KeeperException;
@@ -182,15 +183,24 @@ public class QuorumTest extends ZKTestCase {
}
/**
- * Make sure that we can change sessions
- * from follower to leader.
- *
- * @throws IOException
- * @throws InterruptedException
- * @throws KeeperException
+ * Make sure that we can change sessions among servers and maintain
consistent view
+ * using {@link ZooKeeper#sync(String)}.
*/
@Test
- public void testSessionMoved() throws Exception {
+ public void testSessionMovedWithSynchronousSync() throws Exception {
+ testSessionMoved(true);
+ }
+
+ /**
+ * Make sure that we can change sessions among servers and maintain
consistent view
+ * using {@link ZooKeeper#sync(String, AsyncCallback.VoidCallback,
Object)}.
+ */
+ @Test
+ public void testSessionMovedWithAsynchronousSync() throws Exception {
+ testSessionMoved(false);
+ }
+
+ public void testSessionMoved(boolean synchronous_sync) throws Exception {
String[] hostPorts = qb.hostPort.split(",");
DisconnectableZooKeeper zk = new DisconnectableZooKeeper(
hostPorts[0],
@@ -208,21 +218,8 @@ public class QuorumTest extends ZKTestCase {
zk.getSessionId(),
zk.getSessionPasswd());
zknew.setData("/", new byte[1], -1);
- final int[] result = new int[1];
- result[0] = Integer.MAX_VALUE;
- zknew.sync("/", (rc, path, ctx) -> {
- synchronized (result) {
- result[0] = rc;
- result.notify();
- }
- }, null);
- synchronized (result) {
- if (result[0] == Integer.MAX_VALUE) {
- result.wait(5000);
- }
- }
- LOG.info("{} Sync returned {}", hostPorts[(i + 1) %
hostPorts.length], result[0]);
- assertTrue(result[0] == KeeperException.Code.OK.intValue());
+ syncClient(zknew, synchronous_sync);
+ LOG.info("{} Sync succeed", hostPorts[(i + 1) % hostPorts.length]);
try {
zk.setData("/", new byte[1], -1);
fail("Should have lost the connection");
diff --git
a/zookeeper-server/src/test/java/org/apache/zookeeper/test/SessionTest.java
b/zookeeper-server/src/test/java/org/apache/zookeeper/test/SessionTest.java
index 85f76c213..3a4766c77 100644
--- a/zookeeper-server/src/test/java/org/apache/zookeeper/test/SessionTest.java
+++ b/zookeeper-server/src/test/java/org/apache/zookeeper/test/SessionTest.java
@@ -243,21 +243,8 @@ public class SessionTest extends ZKTestCase {
% hostPorts.length], CONNECTION_TIMEOUT, new
MyWatcher(Integer.toString(
i
+ 1)), zk.getSessionId(), zk.getSessionPasswd());
- final int[] result = new int[1];
- result[0] = Integer.MAX_VALUE;
- zknew.sync("/", (rc, path, ctx) -> {
- synchronized (result) {
- result[0] = rc;
- result.notify();
- }
- }, null);
- synchronized (result) {
- if (result[0] == Integer.MAX_VALUE) {
- result.wait(5000);
- }
- }
- LOG.info("{} Sync returned {}", hostPorts[(i + 1) %
hostPorts.length], result[0]);
- assertTrue(result[0] == KeeperException.Code.OK.intValue());
+ zknew.sync("/");
+ LOG.info("{} Sync succeed", hostPorts[(i + 1) % hostPorts.length]);
zknew.setData("/", new byte[1], -1);
try {
zk.setData("/", new byte[1], -1);
@@ -270,6 +257,7 @@ public class SessionTest extends ZKTestCase {
}
zk.close();
}
+
/**
* This test makes sure that duplicate state changes are not communicated
* to the client watcher. For example we should not notify state as