This is an automated email from the ASF dual-hosted git repository.
arshad pushed a commit to branch branch-3.6
in repository https://gitbox.apache.org/repos/asf/zookeeper.git
The following commit(s) were added to refs/heads/branch-3.6 by this push:
new a53cfeb ZOOKEEPER-4225: Backport ZOOKEEPER-3642 to branch-3.6
a53cfeb is described below
commit a53cfeb26e1e1b9b6b1e29fe7bd9f0277b8fff9a
Author: Mukti Krishnan <[email protected]>
AuthorDate: Tue Mar 9 21:46:52 2021 +0530
ZOOKEEPER-4225: Backport ZOOKEEPER-3642 to branch-3.6
Author: Mukti <[email protected]>
Reviewers: Mohammad Arshad <[email protected]>
Closes #1619 from MuktiKrishnan/ZOOKEEPER-4225 and squashes the following
commits:
cb7a9868f [Mukti] ZOOKEEPER-1871: Corrected argument order of assert
statement
51a489f2d [Mukti] ZOOKEEPER-4225: Backport ZOOKEEPER-3642 to branch-3.6
---
.../apache/zookeeper/server/ZooKeeperServer.java | 3 +
.../apache/zookeeper/server/quorum/Learner.java | 4 +-
.../server/quorum/QuorumPeerMainTest.java | 140 +++++++++++++++++++++
3 files changed, 146 insertions(+), 1 deletion(-)
diff --git
a/zookeeper-server/src/main/java/org/apache/zookeeper/server/ZooKeeperServer.java
b/zookeeper-server/src/main/java/org/apache/zookeeper/server/ZooKeeperServer.java
index 64ef6af..6a58dc1 100644
---
a/zookeeper-server/src/main/java/org/apache/zookeeper/server/ZooKeeperServer.java
+++
b/zookeeper-server/src/main/java/org/apache/zookeeper/server/ZooKeeperServer.java
@@ -802,6 +802,9 @@ public class ZooKeeperServer implements SessionExpirer,
ServerStats.Provider {
*/
public synchronized void shutdown(boolean fullyShutDown) {
if (!canShutdown()) {
+ if (fullyShutDown && zkDb != null) {
+ zkDb.clear();
+ }
LOG.debug("ZooKeeper server is not running, so not proceeding to
shutdown!");
return;
}
diff --git
a/zookeeper-server/src/main/java/org/apache/zookeeper/server/quorum/Learner.java
b/zookeeper-server/src/main/java/org/apache/zookeeper/server/quorum/Learner.java
index 4f48d97..fa7cc59 100644
---
a/zookeeper-server/src/main/java/org/apache/zookeeper/server/quorum/Learner.java
+++
b/zookeeper-server/src/main/java/org/apache/zookeeper/server/quorum/Learner.java
@@ -802,7 +802,9 @@ public class Learner {
closeSocket();
// shutdown previous zookeeper
if (zk != null) {
- zk.shutdown();
+ // If we haven't finished SNAP sync, force fully shutdown
+ // to avoid potential inconsistency
+ zk.shutdown(self.getSyncMode().equals(QuorumPeer.SyncMode.SNAP));
}
}
diff --git
a/zookeeper-server/src/test/java/org/apache/zookeeper/server/quorum/QuorumPeerMainTest.java
b/zookeeper-server/src/test/java/org/apache/zookeeper/server/quorum/QuorumPeerMainTest.java
index fed5fea..2cd4d00 100644
---
a/zookeeper-server/src/test/java/org/apache/zookeeper/server/quorum/QuorumPeerMainTest.java
+++
b/zookeeper-server/src/test/java/org/apache/zookeeper/server/quorum/QuorumPeerMainTest.java
@@ -1624,11 +1624,139 @@ public class QuorumPeerMainTest extends
QuorumPeerTestBase {
assertTrue("complains about metrics provider
MetricsProviderLifeCycleException", found);
}
+ /**
+ * If learner failed to do SNAP sync with leader before it's writing
+ * the snapshot to disk, it's possible that it might have DIFF sync
+ * with new leader or itself being elected as a leader.
+ *
+ * This test is trying to guarantee there is no data inconsistency for
+ * this case.
+ */
+ @Test
+ public void testDiffSyncAfterSnap() throws Exception {
+ final int ENSEMBLE_SERVERS = 3;
+ MainThread[] mt = new MainThread[ENSEMBLE_SERVERS];
+ ZooKeeper[] zk = new ZooKeeper[ENSEMBLE_SERVERS];
+
+ try {
+ // 1. start a quorum
+ final int[] clientPorts = new int[ENSEMBLE_SERVERS];
+ StringBuilder sb = new StringBuilder();
+ String server;
+
+ for (int i = 0; i < ENSEMBLE_SERVERS; i++) {
+ clientPorts[i] = PortAssignment.unique();
+ server = "server." + i + "=127.0.0.1:" +
PortAssignment.unique()
+ + ":" + PortAssignment.unique()
+ + ":participant;127.0.0.1:" + clientPorts[i];
+ sb.append(server + "\n");
+ }
+ String currentQuorumCfgSection = sb.toString();
+
+ // start servers
+ Context[] contexts = new Context[ENSEMBLE_SERVERS];
+ for (int i = 0; i < ENSEMBLE_SERVERS; i++) {
+ final Context context = new Context();
+ contexts[i] = context;
+ mt[i] = new MainThread(i, clientPorts[i],
currentQuorumCfgSection, false) {
+ @Override
+ public TestQPMain getTestQPMain() {
+ return new CustomizedQPMain(context);
+ }
+ };
+ mt[i].start();
+ zk[i] = new ZooKeeper("127.0.0.1:" + clientPorts[i],
ClientBase.CONNECTION_TIMEOUT, this);
+ }
+ waitForAll(zk, States.CONNECTED);
+ LOG.info("all servers started");
+
+ final String nodePath = "/testDiffSyncAfterSnap";
+
+ // 2. find leader and a follower
+ int leaderId = -1;
+ int followerA = -1;
+ for (int i = ENSEMBLE_SERVERS - 1; i >= 0; i--) {
+ if (mt[i].main.quorumPeer.leader != null) {
+ leaderId = i;
+ } else if (followerA == -1) {
+ followerA = i;
+ }
+ }
+
+ // 3. stop follower A
+ LOG.info("shutdown follower {}", followerA);
+ mt[followerA].shutdown();
+ waitForOne(zk[followerA], States.CONNECTING);
+
+ // 4. issue some traffic
+ int index = 0;
+ int numOfRequests = 10;
+ for (int i = 0; i < numOfRequests; i++) {
+ zk[leaderId].create(nodePath + index++,
+ new byte[1], Ids.OPEN_ACL_UNSAFE,
CreateMode.PERSISTENT);
+ }
+
+ CustomQuorumPeer leaderQuorumPeer = (CustomQuorumPeer)
mt[leaderId].main.quorumPeer;
+
+ // 5. inject fault to cause the follower exit when received
NEWLEADER
+ contexts[followerA].newLeaderReceivedCallback = new
NewLeaderReceivedCallback() {
+ boolean processed = false;
+ @Override
+ public void process() throws IOException {
+ if (processed) {
+ return;
+ }
+ processed = true;
+ System.setProperty(LearnerHandler.FORCE_SNAP_SYNC,
"false");
+ throw new IOException("read timedout");
+ }
+ };
+
+ // 6. force snap sync once
+ LOG.info("force snapshot sync");
+ System.setProperty(LearnerHandler.FORCE_SNAP_SYNC, "true");
+
+ // 7. start follower A
+ mt[followerA].start();
+ waitForOne(zk[followerA], States.CONNECTED);
+ LOG.info("verify the nodes are exist in memory");
+ for (int i = 0; i < index; i++) {
+ assertNotNull(zk[followerA].exists(nodePath + i, false));
+ }
+
+ // 8. issue another request which will be persisted on disk
+ zk[leaderId].create(nodePath + index++,
+ new byte[1], Ids.OPEN_ACL_UNSAFE, CreateMode.PERSISTENT);
+
+ // wait some time to let this get written to disk
+ Thread.sleep(500);
+
+ // 9. reload data from disk and make sure it's still consistent
+ LOG.info("restarting follower {}", followerA);
+ mt[followerA].shutdown();
+ waitForOne(zk[followerA], States.CONNECTING);
+ mt[followerA].start();
+ waitForOne(zk[followerA], States.CONNECTED);
+
+ for (int i = 0; i < index; i++) {
+ assertNotNull( "node " + i + " should exist",
zk[followerA].exists(nodePath + i, false));
+ }
+
+ } finally {
+ System.clearProperty(LearnerHandler.FORCE_SNAP_SYNC);
+ for (int i = 0; i < ENSEMBLE_SERVERS; i++) {
+ mt[i].shutdown();
+ zk[i].close();
+ }
+ }
+ }
+
static class Context {
boolean quitFollowing = false;
boolean exitWhenAckNewLeader = false;
NewLeaderAckCallback newLeaderAckCallback = null;
+ NewLeaderReceivedCallback newLeaderReceivedCallback = null;
}
@@ -1638,6 +1766,10 @@ public class QuorumPeerMainTest extends
QuorumPeerTestBase {
}
+ interface NewLeaderReceivedCallback {
+ void process() throws IOException;
+ }
+
interface StartForwardingListener {
void start();
@@ -1711,6 +1843,14 @@ public class QuorumPeerMainTest extends
QuorumPeerTestBase {
}
super.writePacket(pp, flush);
}
+
+ @Override
+ void readPacket(QuorumPacket qp) throws IOException {
+ super.readPacket(qp);
+ if (qp.getType() == Leader.NEWLEADER &&
context.newLeaderReceivedCallback != null) {
+ context.newLeaderReceivedCallback.process();
+ }
+ }
};
}