This is an automated email from the ASF dual-hosted git repository.

kezhuw pushed a commit to branch branch-3.9
in repository https://gitbox.apache.org/repos/asf/zookeeper.git


The following commit(s) were added to refs/heads/branch-3.9 by this push:
     new fc1301121 ZOOKEEPER-4747: Add synchronous sync to ease synchronous 
call chains (#2068)
fc1301121 is described below

commit fc13011217e2147327b69dd9a3b4192c68d282f0
Author: Kezhu Wang <[email protected]>
AuthorDate: Tue Sep 10 14:30:13 2024 +0800

    ZOOKEEPER-4747: Add synchronous sync to ease synchronous call chains (#2068)
    
    Previously, there is no synchronous `sync` so client has to convert
    asynchronous `sync` a bit to fit synchronous call chains. This is
    apparently unfriendly.
    
    Besides above, in absent of ZOOKEEPER-22, we can't issue a fire and
    forget asynchronous `sync` to gain strong consistent. So it becomes
    crucial for client to have a convenient synchronous `sync`.
    
    Refs: ZOOKEEPER-1167, ZOOKEEPER-4747
    
    (cherry picked from commit 3e2d6f3436962cd4a0445bc6c22f583e425d7ced)
---
 .../main/java/org/apache/zookeeper/ZooKeeper.java  | 25 +++++++++++++
 .../test/java/org/apache/zookeeper/ZKTestCase.java | 20 +++++++++++
 .../server/quorum/EagerACLFilterTest.java          | 18 ++--------
 .../server/quorum/QuorumRequestPipelineTest.java   | 13 +++++++
 .../java/org/apache/zookeeper/test/QuorumTest.java | 41 ++++++++++------------
 .../org/apache/zookeeper/test/SessionTest.java     | 18 ++--------
 6 files changed, 82 insertions(+), 53 deletions(-)

diff --git a/zookeeper-server/src/main/java/org/apache/zookeeper/ZooKeeper.java 
b/zookeeper-server/src/main/java/org/apache/zookeeper/ZooKeeper.java
index 09006ed10..551e4602d 100644
--- a/zookeeper-server/src/main/java/org/apache/zookeeper/ZooKeeper.java
+++ b/zookeeper-server/src/main/java/org/apache/zookeeper/ZooKeeper.java
@@ -2666,6 +2666,31 @@ public class ZooKeeper implements AutoCloseable {
         getEphemerals("/", cb, ctx);
     }
 
+    /**
+     * Synchronous sync. Flushes channel between process and leader.
+     *
+     * @param path the given path
+     * @throws KeeperException If the server signals an error with a non-zero 
error code
+     * @throws InterruptedException If the server transaction is interrupted.
+     * @throws IllegalArgumentException if an invalid path is specified
+     */
+    public void sync(final String path) throws KeeperException, 
InterruptedException {
+        final String clientPath = path;
+        PathUtils.validatePath(clientPath);
+
+        final String serverPath = prependChroot(clientPath);
+
+        RequestHeader h = new RequestHeader();
+        h.setType(ZooDefs.OpCode.sync);
+        SyncRequest request = new SyncRequest();
+        SyncResponse response = new SyncResponse();
+        request.setPath(serverPath);
+        ReplyHeader r = cnxn.submitRequest(h, request, response, null);
+        if (r.getErr() != 0) {
+            throw KeeperException.create(KeeperException.Code.get(r.getErr()), 
clientPath);
+        }
+    }
+
     /**
      * Asynchronous sync. Flushes channel between process and leader.
      * @param path
diff --git 
a/zookeeper-server/src/test/java/org/apache/zookeeper/ZKTestCase.java 
b/zookeeper-server/src/test/java/org/apache/zookeeper/ZKTestCase.java
index b29deedb0..db2479718 100644
--- a/zookeeper-server/src/test/java/org/apache/zookeeper/ZKTestCase.java
+++ b/zookeeper-server/src/test/java/org/apache/zookeeper/ZKTestCase.java
@@ -22,6 +22,7 @@ import static org.junit.jupiter.api.Assertions.assertTrue;
 import static org.junit.jupiter.api.Assertions.fail;
 import java.io.File;
 import java.time.Instant;
+import java.util.concurrent.CompletableFuture;
 import org.apache.zookeeper.metrics.MetricsUtils;
 import org.apache.zookeeper.util.ServiceUtils;
 import org.hamcrest.CustomMatcher;
@@ -58,6 +59,25 @@ public class ZKTestCase {
         return testName;
     }
 
+    public void syncClient(ZooKeeper zk, boolean synchronous) throws 
KeeperException {
+        if (synchronous) {
+            try {
+                zk.sync("/");
+            } catch (InterruptedException ex) {
+                throw new RuntimeException(ex);
+            }
+            return;
+        }
+        final CompletableFuture<KeeperException.Code> synced = new 
CompletableFuture<>();
+        zk.sync("/", (rc, path, ctx) -> {
+            synced.complete(KeeperException.Code.get(rc));
+        }, null);
+        KeeperException.Code code = synced.join();
+        if (code != KeeperException.Code.OK) {
+            throw KeeperException.create(code);
+        }
+    }
+
     @BeforeAll
     public static void before() {
         if (!testBaseDir.exists()) {
diff --git 
a/zookeeper-server/src/test/java/org/apache/zookeeper/server/quorum/EagerACLFilterTest.java
 
b/zookeeper-server/src/test/java/org/apache/zookeeper/server/quorum/EagerACLFilterTest.java
index 4141c586f..1af669fb5 100644
--- 
a/zookeeper-server/src/test/java/org/apache/zookeeper/server/quorum/EagerACLFilterTest.java
+++ 
b/zookeeper-server/src/test/java/org/apache/zookeeper/server/quorum/EagerACLFilterTest.java
@@ -22,13 +22,11 @@ import static org.junit.jupiter.api.Assertions.assertEquals;
 import static org.junit.jupiter.api.Assertions.assertNotEquals;
 import static org.junit.jupiter.api.Assertions.assertTrue;
 import static org.junit.jupiter.api.Assertions.fail;
-import java.util.concurrent.CompletableFuture;
 import java.util.stream.Stream;
 import org.apache.zookeeper.CreateMode;
 import org.apache.zookeeper.KeeperException;
 import org.apache.zookeeper.TestableZooKeeper;
 import org.apache.zookeeper.ZooDefs.Ids;
-import org.apache.zookeeper.ZooKeeper;
 import org.apache.zookeeper.server.ZooKeeperServer;
 import org.apache.zookeeper.server.quorum.QuorumPeer.ServerState;
 import org.apache.zookeeper.test.QuorumBase;
@@ -95,18 +93,6 @@ public class EagerACLFilterTest extends QuorumBase {
         clientWatchB.waitForConnected(CONNECTION_TIMEOUT);
     }
 
-    void syncClient(ZooKeeper zk) {
-        CompletableFuture<Void> synced = new CompletableFuture<>();
-        zk.sync("/", (rc, path, ctx) -> {
-            if (rc == 0) {
-                synced.complete(null);
-            } else {
-                
synced.completeExceptionally(KeeperException.create(KeeperException.Code.get(rc)));
-            }
-        }, null);
-        synced.join();
-    }
-
     @AfterEach
     public void tearDown() throws Exception {
         if (zkClient != null) {
@@ -124,7 +110,7 @@ public class EagerACLFilterTest extends QuorumBase {
         ZooKeeperServer.setEnableEagerACLCheck(enabled);
     }
 
-    private void assertTransactionState(String operation, QuorumPeer peer, 
long lastxid) {
+    private void assertTransactionState(String operation, QuorumPeer peer, 
long lastxid) throws Exception {
         if (peer == zkLeader && peer != zkConnected) {
             // The operation is performed on no leader, but we are asserting 
on leader.
             // There is no happen-before between 
`zkLeader.getLastLoggedZxid()` and
@@ -133,7 +119,7 @@ public class EagerACLFilterTest extends QuorumBase {
             // to sync leader client to go through commit and response path in 
leader to
             // build happen-before between `zkLeader.getLastLoggedZxid()` and 
side effect
             // of previous operation.
-            syncClient(zkLeaderClient);
+            syncClient(zkLeaderClient, false);
         }
         assertTrue(peer == zkLeader || peer == zkConnected);
         boolean eagerACL = ZooKeeperServer.isEnableEagerACLCheck();
diff --git 
a/zookeeper-server/src/test/java/org/apache/zookeeper/server/quorum/QuorumRequestPipelineTest.java
 
b/zookeeper-server/src/test/java/org/apache/zookeeper/server/quorum/QuorumRequestPipelineTest.java
index 624661044..f848b0289 100644
--- 
a/zookeeper-server/src/test/java/org/apache/zookeeper/server/quorum/QuorumRequestPipelineTest.java
+++ 
b/zookeeper-server/src/test/java/org/apache/zookeeper/server/quorum/QuorumRequestPipelineTest.java
@@ -26,6 +26,8 @@ import java.util.Arrays;
 import java.util.HashSet;
 import java.util.Set;
 import java.util.concurrent.CountDownLatch;
+import java.util.concurrent.ForkJoinPool;
+import java.util.concurrent.ForkJoinTask;
 import java.util.concurrent.TimeUnit;
 import java.util.stream.Stream;
 import org.apache.zookeeper.AsyncCallback.VoidCallback;
@@ -179,4 +181,15 @@ public class QuorumRequestPipelineTest extends QuorumBase {
         assertTrue(complete, String.format("%s Sync completed", serverState));
     }
 
+    @ParameterizedTest
+    @MethodSource("data")
+    public void testSynchronousSync(ServerState serverState) throws Exception {
+        setUp(serverState);
+        create2EmptyNode(zkClient, PARENT_PATH);
+        ForkJoinTask<Void> task = ForkJoinPool.commonPool().submit(() -> {
+            zkClient.sync(PARENT_PATH);
+            return null;
+        });
+        task.get(30, TimeUnit.SECONDS);
+    }
 }
diff --git 
a/zookeeper-server/src/test/java/org/apache/zookeeper/test/QuorumTest.java 
b/zookeeper-server/src/test/java/org/apache/zookeeper/test/QuorumTest.java
index ff53b0fe9..01aa9ddc7 100644
--- a/zookeeper-server/src/test/java/org/apache/zookeeper/test/QuorumTest.java
+++ b/zookeeper-server/src/test/java/org/apache/zookeeper/test/QuorumTest.java
@@ -27,6 +27,7 @@ import java.io.IOException;
 import java.util.ArrayList;
 import java.util.Arrays;
 import java.util.concurrent.TimeoutException;
+import org.apache.zookeeper.AsyncCallback;
 import org.apache.zookeeper.CreateMode;
 import org.apache.zookeeper.DummyWatcher;
 import org.apache.zookeeper.KeeperException;
@@ -182,15 +183,24 @@ public class QuorumTest extends ZKTestCase {
     }
 
     /**
-     * Make sure that we can change sessions
-     *  from follower to leader.
-     *
-     * @throws IOException
-     * @throws InterruptedException
-     * @throws KeeperException
+     * Make sure that we can change sessions among servers and maintain 
consistent view
+     * using {@link ZooKeeper#sync(String)}.
      */
     @Test
-    public void testSessionMoved() throws Exception {
+    public void testSessionMovedWithSynchronousSync() throws Exception {
+        testSessionMoved(true);
+    }
+
+    /**
+     * Make sure that we can change sessions among servers and maintain 
consistent view
+     * using {@link ZooKeeper#sync(String, AsyncCallback.VoidCallback, 
Object)}.
+     */
+    @Test
+    public void testSessionMovedWithAsynchronousSync() throws Exception {
+        testSessionMoved(false);
+    }
+
+    public void testSessionMoved(boolean synchronous_sync) throws Exception {
         String[] hostPorts = qb.hostPort.split(",");
         DisconnectableZooKeeper zk = new DisconnectableZooKeeper(
             hostPorts[0],
@@ -208,21 +218,8 @@ public class QuorumTest extends ZKTestCase {
                 zk.getSessionId(),
                 zk.getSessionPasswd());
             zknew.setData("/", new byte[1], -1);
-            final int[] result = new int[1];
-            result[0] = Integer.MAX_VALUE;
-            zknew.sync("/", (rc, path, ctx) -> {
-                synchronized (result) {
-                    result[0] = rc;
-                    result.notify();
-                }
-            }, null);
-            synchronized (result) {
-                if (result[0] == Integer.MAX_VALUE) {
-                    result.wait(5000);
-                }
-            }
-            LOG.info("{} Sync returned {}", hostPorts[(i + 1) % 
hostPorts.length], result[0]);
-            assertTrue(result[0] == KeeperException.Code.OK.intValue());
+            syncClient(zknew, synchronous_sync);
+            LOG.info("{} Sync succeed", hostPorts[(i + 1) % hostPorts.length]);
             try {
                 zk.setData("/", new byte[1], -1);
                 fail("Should have lost the connection");
diff --git 
a/zookeeper-server/src/test/java/org/apache/zookeeper/test/SessionTest.java 
b/zookeeper-server/src/test/java/org/apache/zookeeper/test/SessionTest.java
index 85f76c213..3a4766c77 100644
--- a/zookeeper-server/src/test/java/org/apache/zookeeper/test/SessionTest.java
+++ b/zookeeper-server/src/test/java/org/apache/zookeeper/test/SessionTest.java
@@ -243,21 +243,8 @@ public class SessionTest extends ZKTestCase {
                                                                                
           % hostPorts.length], CONNECTION_TIMEOUT, new 
MyWatcher(Integer.toString(
                     i
                             + 1)), zk.getSessionId(), zk.getSessionPasswd());
-            final int[] result = new int[1];
-            result[0] = Integer.MAX_VALUE;
-            zknew.sync("/", (rc, path, ctx) -> {
-                synchronized (result) {
-                    result[0] = rc;
-                    result.notify();
-                }
-            }, null);
-            synchronized (result) {
-                if (result[0] == Integer.MAX_VALUE) {
-                    result.wait(5000);
-                }
-            }
-            LOG.info("{} Sync returned {}", hostPorts[(i + 1) % 
hostPorts.length], result[0]);
-            assertTrue(result[0] == KeeperException.Code.OK.intValue());
+            zknew.sync("/");
+            LOG.info("{} Sync succeed", hostPorts[(i + 1) % hostPorts.length]);
             zknew.setData("/", new byte[1], -1);
             try {
                 zk.setData("/", new byte[1], -1);
@@ -270,6 +257,7 @@ public class SessionTest extends ZKTestCase {
         }
         zk.close();
     }
+
     /**
      * This test makes sure that duplicate state changes are not communicated
      * to the client watcher. For example we should not notify state as

Reply via email to