This is an automated email from the ASF dual-hosted git repository.

arshad pushed a commit to branch branch-3.6
in repository https://gitbox.apache.org/repos/asf/zookeeper.git


The following commit(s) were added to refs/heads/branch-3.6 by this push:
     new a53cfeb  ZOOKEEPER-4225: Backport ZOOKEEPER-3642 to branch-3.6
a53cfeb is described below

commit a53cfeb26e1e1b9b6b1e29fe7bd9f0277b8fff9a
Author: Mukti Krishnan <[email protected]>
AuthorDate: Tue Mar 9 21:46:52 2021 +0530

    ZOOKEEPER-4225: Backport ZOOKEEPER-3642 to branch-3.6
    
    Author: Mukti <[email protected]>
    
    Reviewers: Mohammad Arshad <[email protected]>
    
    Closes #1619 from MuktiKrishnan/ZOOKEEPER-4225 and squashes the following 
commits:
    
    cb7a9868f [Mukti] ZOOKEEPER-1871: Corrected argument order of assert 
statement
    51a489f2d [Mukti] ZOOKEEPER-4225: Backport ZOOKEEPER-3642 to branch-3.6
---
 .../apache/zookeeper/server/ZooKeeperServer.java   |   3 +
 .../apache/zookeeper/server/quorum/Learner.java    |   4 +-
 .../server/quorum/QuorumPeerMainTest.java          | 140 +++++++++++++++++++++
 3 files changed, 146 insertions(+), 1 deletion(-)

diff --git 
a/zookeeper-server/src/main/java/org/apache/zookeeper/server/ZooKeeperServer.java
 
b/zookeeper-server/src/main/java/org/apache/zookeeper/server/ZooKeeperServer.java
index 64ef6af..6a58dc1 100644
--- 
a/zookeeper-server/src/main/java/org/apache/zookeeper/server/ZooKeeperServer.java
+++ 
b/zookeeper-server/src/main/java/org/apache/zookeeper/server/ZooKeeperServer.java
@@ -802,6 +802,9 @@ public class ZooKeeperServer implements SessionExpirer, 
ServerStats.Provider {
      */
     public synchronized void shutdown(boolean fullyShutDown) {
         if (!canShutdown()) {
+            if (fullyShutDown && zkDb != null) {
+                zkDb.clear();
+            }
             LOG.debug("ZooKeeper server is not running, so not proceeding to 
shutdown!");
             return;
         }
diff --git 
a/zookeeper-server/src/main/java/org/apache/zookeeper/server/quorum/Learner.java
 
b/zookeeper-server/src/main/java/org/apache/zookeeper/server/quorum/Learner.java
index 4f48d97..fa7cc59 100644
--- 
a/zookeeper-server/src/main/java/org/apache/zookeeper/server/quorum/Learner.java
+++ 
b/zookeeper-server/src/main/java/org/apache/zookeeper/server/quorum/Learner.java
@@ -802,7 +802,9 @@ public class Learner {
         closeSocket();
         // shutdown previous zookeeper
         if (zk != null) {
-            zk.shutdown();
+            // If we haven't finished SNAP sync, force fully shutdown
+            // to avoid potential inconsistency
+            zk.shutdown(self.getSyncMode().equals(QuorumPeer.SyncMode.SNAP));
         }
     }
 
diff --git 
a/zookeeper-server/src/test/java/org/apache/zookeeper/server/quorum/QuorumPeerMainTest.java
 
b/zookeeper-server/src/test/java/org/apache/zookeeper/server/quorum/QuorumPeerMainTest.java
index fed5fea..2cd4d00 100644
--- 
a/zookeeper-server/src/test/java/org/apache/zookeeper/server/quorum/QuorumPeerMainTest.java
+++ 
b/zookeeper-server/src/test/java/org/apache/zookeeper/server/quorum/QuorumPeerMainTest.java
@@ -1624,11 +1624,139 @@ public class QuorumPeerMainTest extends 
QuorumPeerTestBase {
         assertTrue("complains about metrics provider 
MetricsProviderLifeCycleException", found);
     }
 
+    /**
+     * If learner failed to do SNAP sync with leader before it's writing
+     * the snapshot to disk, it's possible that it might have DIFF sync
+     * with new leader or itself being elected as a leader.
+     *
+     * This test is trying to guarantee there is no data inconsistency for
+     * this case.
+     */
+    @Test
+    public void testDiffSyncAfterSnap() throws Exception {
+        final int ENSEMBLE_SERVERS = 3;
+        MainThread[] mt = new MainThread[ENSEMBLE_SERVERS];
+        ZooKeeper[] zk = new ZooKeeper[ENSEMBLE_SERVERS];
+
+        try {
+            // 1. start a quorum
+            final int[] clientPorts = new int[ENSEMBLE_SERVERS];
+            StringBuilder sb = new StringBuilder();
+            String server;
+
+            for (int i = 0; i < ENSEMBLE_SERVERS; i++) {
+                clientPorts[i] = PortAssignment.unique();
+                server = "server." + i + "=127.0.0.1:" + 
PortAssignment.unique()
+                        + ":" + PortAssignment.unique()
+                        + ":participant;127.0.0.1:" + clientPorts[i];
+                sb.append(server + "\n");
+            }
+            String currentQuorumCfgSection = sb.toString();
+
+            // start servers
+            Context[] contexts = new Context[ENSEMBLE_SERVERS];
+            for (int i = 0; i < ENSEMBLE_SERVERS; i++) {
+                final Context context = new Context();
+                contexts[i] = context;
+                mt[i] = new MainThread(i, clientPorts[i], 
currentQuorumCfgSection, false) {
+                    @Override
+                    public TestQPMain getTestQPMain() {
+                        return new CustomizedQPMain(context);
+                    }
+                };
+                mt[i].start();
+                zk[i] = new ZooKeeper("127.0.0.1:" + clientPorts[i], 
ClientBase.CONNECTION_TIMEOUT, this);
+            }
+            waitForAll(zk, States.CONNECTED);
+            LOG.info("all servers started");
+
+            final String nodePath = "/testDiffSyncAfterSnap";
+
+            // 2. find leader and a follower
+            int leaderId = -1;
+            int followerA = -1;
+            for (int i = ENSEMBLE_SERVERS - 1; i >= 0; i--) {
+                if (mt[i].main.quorumPeer.leader != null) {
+                    leaderId = i;
+                } else if (followerA == -1) {
+                    followerA = i;
+                }
+            }
+
+            // 3. stop follower A
+            LOG.info("shutdown follower {}", followerA);
+            mt[followerA].shutdown();
+            waitForOne(zk[followerA], States.CONNECTING);
+
+            // 4. issue some traffic
+            int index = 0;
+            int numOfRequests = 10;
+            for (int i = 0; i < numOfRequests; i++) {
+                zk[leaderId].create(nodePath + index++,
+                        new byte[1], Ids.OPEN_ACL_UNSAFE, 
CreateMode.PERSISTENT);
+            }
+
+            CustomQuorumPeer leaderQuorumPeer = (CustomQuorumPeer) 
mt[leaderId].main.quorumPeer;
+
+            // 5. inject fault to cause the follower exit when received 
NEWLEADER
+            contexts[followerA].newLeaderReceivedCallback = new 
NewLeaderReceivedCallback() {
+                boolean processed = false;
+                @Override
+                public void process() throws IOException {
+                    if (processed) {
+                        return;
+                    }
+                    processed = true;
+                    System.setProperty(LearnerHandler.FORCE_SNAP_SYNC, 
"false");
+                    throw new IOException("read timedout");
+                }
+            };
+
+            // 6. force snap sync once
+            LOG.info("force snapshot sync");
+            System.setProperty(LearnerHandler.FORCE_SNAP_SYNC, "true");
+
+            // 7. start follower A
+            mt[followerA].start();
+            waitForOne(zk[followerA], States.CONNECTED);
+            LOG.info("verify the nodes are exist in memory");
+            for (int i = 0; i < index; i++) {
+                assertNotNull(zk[followerA].exists(nodePath + i, false));
+            }
+
+            // 8. issue another request which will be persisted on disk
+            zk[leaderId].create(nodePath + index++,
+                    new byte[1], Ids.OPEN_ACL_UNSAFE, CreateMode.PERSISTENT);
+
+            // wait some time to let this get written to disk
+            Thread.sleep(500);
+
+            // 9. reload data from disk and make sure it's still consistent
+            LOG.info("restarting follower {}", followerA);
+            mt[followerA].shutdown();
+            waitForOne(zk[followerA], States.CONNECTING);
+            mt[followerA].start();
+            waitForOne(zk[followerA], States.CONNECTED);
+
+            for (int i = 0; i < index; i++) {
+                assertNotNull( "node " + i + " should exist", 
zk[followerA].exists(nodePath + i, false));
+            }
+
+        } finally {
+            System.clearProperty(LearnerHandler.FORCE_SNAP_SYNC);
+            for (int i = 0; i < ENSEMBLE_SERVERS; i++) {
+                mt[i].shutdown();
+                zk[i].close();
+            }
+        }
+    }
+
     static class Context {
 
         boolean quitFollowing = false;
         boolean exitWhenAckNewLeader = false;
         NewLeaderAckCallback newLeaderAckCallback = null;
+        NewLeaderReceivedCallback newLeaderReceivedCallback = null;
 
     }
 
@@ -1638,6 +1766,10 @@ public class QuorumPeerMainTest extends 
QuorumPeerTestBase {
 
     }
 
+    interface NewLeaderReceivedCallback {
+        void process() throws IOException;
+    }
+
     interface StartForwardingListener {
 
         void start();
@@ -1711,6 +1843,14 @@ public class QuorumPeerMainTest extends 
QuorumPeerTestBase {
                     }
                     super.writePacket(pp, flush);
                 }
+
+                @Override
+                void readPacket(QuorumPacket qp) throws IOException {
+                    super.readPacket(qp);
+                    if (qp.getType() == Leader.NEWLEADER && 
context.newLeaderReceivedCallback != null) {
+                        context.newLeaderReceivedCallback.process();
+                    }
+                }
             };
         }
 

Reply via email to