This is an automated email from the ASF dual-hosted git repository.
li4wang pushed a commit to branch branch-3.9
in repository https://gitbox.apache.org/repos/asf/zookeeper.git
The following commit(s) were added to refs/heads/branch-3.9 by this push:
new 4a49d4587 ZOOKEEPER-4785: Txn loss due to race condition in
Learner.syncWithLeader() during DIFF sync (#2111) (#2133)
4a49d4587 is described below
commit 4a49d458735b057dbf0638abe162c773e11f4b1f
Author: li4wang <[email protected]>
AuthorDate: Mon Feb 12 11:58:57 2024 -0800
ZOOKEEPER-4785: Txn loss due to race condition in Learner.syncWithLeader()
during DIFF sync (#2111) (#2133)
Author: Li Wang <[email protected]>
Co-authored-by: liwang <[email protected]>
---
.../server/quorum/FastLeaderElection.java | 15 +-
.../server/quorum/FollowerZooKeeperServer.java | 34 ++-
.../apache/zookeeper/server/quorum/Learner.java | 31 +-
.../zookeeper/server/quorum/DIFFSyncTest.java | 316 +++++++++++++++++++++
.../server/quorum/QuorumPeerTestBase.java | 12 +
5 files changed, 394 insertions(+), 14 deletions(-)
diff --git
a/zookeeper-server/src/main/java/org/apache/zookeeper/server/quorum/FastLeaderElection.java
b/zookeeper-server/src/main/java/org/apache/zookeeper/server/quorum/FastLeaderElection.java
index f4a5f9882..5a6b18fd1 100644
---
a/zookeeper-server/src/main/java/org/apache/zookeeper/server/quorum/FastLeaderElection.java
+++
b/zookeeper-server/src/main/java/org/apache/zookeeper/server/quorum/FastLeaderElection.java
@@ -702,14 +702,14 @@ public class FastLeaderElection implements Election {
qv.toString().getBytes(UTF_8));
LOG.debug(
- "Sending Notification: {} (n.leader), 0x{} (n.zxid), 0x{}
(n.round), {} (recipient),"
- + " {} (myid), 0x{} (n.peerEpoch) ",
+ "Sending Notification: {} (n.leader), 0x{} (n.zxid), 0x{}
(n.peerEpoch), 0x{} (n.round), {} (recipient),"
+ + " {} (myid) ",
proposedLeader,
Long.toHexString(proposedZxid),
+ Long.toHexString(proposedEpoch),
Long.toHexString(logicalclock.get()),
sid,
- self.getMyId(),
- Long.toHexString(proposedEpoch));
+ self.getMyId());
sendqueue.offer(notmsg);
}
@@ -722,12 +722,13 @@ public class FastLeaderElection implements Election {
*/
protected boolean totalOrderPredicate(long newId, long newZxid, long
newEpoch, long curId, long curZxid, long curEpoch) {
LOG.debug(
- "id: {}, proposed id: {}, zxid: 0x{}, proposed zxid: 0x{}",
+ "id: {}, proposed id: {}, zxid: 0x{}, proposed zxid: 0x{}, epoch:
0x{}, proposed epoch: 0x{}",
newId,
curId,
Long.toHexString(newZxid),
- Long.toHexString(curZxid));
-
+ Long.toHexString(curZxid),
+ Long.toHexString(newEpoch),
+ Long.toHexString(curEpoch));
if (self.getQuorumVerifier().getWeight(newId) == 0) {
return false;
}
diff --git
a/zookeeper-server/src/main/java/org/apache/zookeeper/server/quorum/FollowerZooKeeperServer.java
b/zookeeper-server/src/main/java/org/apache/zookeeper/server/quorum/FollowerZooKeeperServer.java
index 82e56c390..5af114dcc 100644
---
a/zookeeper-server/src/main/java/org/apache/zookeeper/server/quorum/FollowerZooKeeperServer.java
+++
b/zookeeper-server/src/main/java/org/apache/zookeeper/server/quorum/FollowerZooKeeperServer.java
@@ -80,14 +80,23 @@ public class FollowerZooKeeperServer extends
LearnerZooKeeperServer {
LinkedBlockingQueue<Request> pendingTxns = new LinkedBlockingQueue<>();
public void logRequest(TxnHeader hdr, Record txn, TxnDigest digest) {
- Request request = new Request(hdr.getClientId(), hdr.getCxid(),
hdr.getType(), hdr, txn, hdr.getZxid());
- request.setTxnDigest(digest);
- if ((request.zxid & 0xffffffffL) != 0) {
- pendingTxns.add(request);
- }
+ final Request request = buildRequestToProcess(hdr, txn, digest);
syncProcessor.processRequest(request);
}
+ /**
+ * Build a request for the txn and append it to the transaction log
+ * @param hdr the txn header
+ * @param txn the txn
+ * @param digest the digest of txn
+ * @return a request moving through a chain of RequestProcessors
+ */
+ public Request appendRequest(final TxnHeader hdr, final Record txn, final
TxnDigest digest) throws IOException {
+ final Request request = buildRequestToProcess(hdr, txn, digest);
+ getZKDatabase().append(request);
+ return request;
+ }
+
/**
* When a COMMIT message is received, eventually this method is called,
* which matches up the zxid from the COMMIT with (hopefully) the head of
@@ -181,4 +190,19 @@ public class FollowerZooKeeperServer extends
LearnerZooKeeperServer {
}
+ /**
+ * Build a request for the txn
+ * @param hdr the txn header
+ * @param txn the txn
+ * @param digest the digest of txn
+ * @return a request moving through a chain of RequestProcessors
+ */
+ private Request buildRequestToProcess(final TxnHeader hdr, final Record
txn, final TxnDigest digest) {
+ final Request request = new Request(hdr.getClientId(), hdr.getCxid(),
hdr.getType(), hdr, txn, hdr.getZxid());
+ request.setTxnDigest(digest);
+ if ((request.zxid & 0xffffffffL) != 0) {
+ pendingTxns.add(request);
+ }
+ return request;
+ }
}
diff --git
a/zookeeper-server/src/main/java/org/apache/zookeeper/server/quorum/Learner.java
b/zookeeper-server/src/main/java/org/apache/zookeeper/server/quorum/Learner.java
index d534b8f45..e3bd13d11 100644
---
a/zookeeper-server/src/main/java/org/apache/zookeeper/server/quorum/Learner.java
+++
b/zookeeper-server/src/main/java/org/apache/zookeeper/server/quorum/Learner.java
@@ -556,6 +556,8 @@ public class Learner {
readPacket(qp);
Deque<Long> packetsCommitted = new ArrayDeque<>();
Deque<PacketInFlight> packetsNotCommitted = new ArrayDeque<>();
+ Deque<Request> requestsToAck = new ArrayDeque<>();
+
synchronized (zk) {
if (qp.getType() == Leader.DIFF) {
LOG.info("Getting a diff from the leader 0x{}",
Long.toHexString(qp.getZxid()));
@@ -745,7 +747,7 @@ public class Learner {
zk.takeSnapshot(syncSnapshot);
}
- self.setCurrentEpoch(newEpoch);
+
writeToTxnLog = true;
//Anything after this needs to go to the transaction log,
not applied directly in memory
isPreZAB1_0 = false;
@@ -755,14 +757,27 @@ public class Learner {
self.setSyncMode(QuorumPeer.SyncMode.NONE);
zk.startupWithoutServing();
if (zk instanceof FollowerZooKeeperServer) {
+ long startTime = Time.currentElapsedTime();
FollowerZooKeeperServer fzk =
(FollowerZooKeeperServer) zk;
for (PacketInFlight p : packetsNotCommitted) {
- fzk.logRequest(p.hdr, p.rec, p.digest);
+ final Request request = fzk.appendRequest(p.hdr,
p.rec, p.digest);
+ requestsToAck.add(request);
}
+
+ // persist the txns to disk
+ fzk.getZKDatabase().commit();
+ LOG.info("{} txns have been persisted and it took
{}ms",
+ packetsNotCommitted.size(), Time.currentElapsedTime()
- startTime);
packetsNotCommitted.clear();
}
+ // set the current epoch after all the tnxs are persisted
+ self.setCurrentEpoch(newEpoch);
+ LOG.info("Set the current epoch to {}", newEpoch);
+
+ // send NEWLEADER ack after all the tnxs are persisted
writePacket(new QuorumPacket(Leader.ACK, newLeaderZxid,
null, null), true);
+ LOG.info("Sent NEWLEADER ack to leader with zxid {}",
Long.toHexString(newLeaderZxid));
break;
}
}
@@ -781,13 +796,25 @@ public class Learner {
// We need to log the stuff that came in between the snapshot and the
uptodate
if (zk instanceof FollowerZooKeeperServer) {
+ // reply ACK of PROPOSAL after ACK of NEWLEADER to avoid leader
shutdown due to timeout
+ // on waiting for a quorum of followers
+ for (final Request request : requestsToAck) {
+ final QuorumPacket ackPacket = new QuorumPacket(Leader.ACK,
request.getHdr().getZxid(), null, null);
+ writePacket(ackPacket, false);
+ }
+ writePacket(null, true);
+ requestsToAck.clear();
+
FollowerZooKeeperServer fzk = (FollowerZooKeeperServer) zk;
for (PacketInFlight p : packetsNotCommitted) {
fzk.logRequest(p.hdr, p.rec, p.digest);
}
+ LOG.info("{} txns have been logged asynchronously",
packetsNotCommitted.size());
+
for (Long zxid : packetsCommitted) {
fzk.commit(zxid);
}
+ LOG.info("{} txns have been committed", packetsCommitted.size());
} else if (zk instanceof ObserverZooKeeperServer) {
// Similar to follower, we need to log requests between the
snapshot
// and UPTODATE
diff --git
a/zookeeper-server/src/test/java/org/apache/zookeeper/server/quorum/DIFFSyncTest.java
b/zookeeper-server/src/test/java/org/apache/zookeeper/server/quorum/DIFFSyncTest.java
new file mode 100644
index 000000000..a9be09f39
--- /dev/null
+++
b/zookeeper-server/src/test/java/org/apache/zookeeper/server/quorum/DIFFSyncTest.java
@@ -0,0 +1,316 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.zookeeper.server.quorum;
+
+import static org.apache.zookeeper.test.ClientBase.CONNECTION_TIMEOUT;
+import static org.junit.jupiter.api.Assertions.assertEquals;
+import static org.junit.jupiter.api.Assertions.assertNotNull;
+import static org.junit.jupiter.api.Assertions.assertTrue;
+import java.io.IOException;
+import java.util.ArrayList;
+import java.util.List;
+import javax.security.sasl.SaslException;
+import org.apache.zookeeper.CreateMode;
+import org.apache.zookeeper.PortAssignment;
+import org.apache.zookeeper.ZooDefs;
+import org.apache.zookeeper.ZooKeeper;
+import org.apache.zookeeper.server.FinalRequestProcessor;
+import org.apache.zookeeper.server.Request;
+import org.apache.zookeeper.server.RequestProcessor;
+import org.apache.zookeeper.server.SyncRequestProcessor;
+import org.apache.zookeeper.server.ZooKeeperServer;
+import org.apache.zookeeper.server.ZooKeeperServerListener;
+import org.apache.zookeeper.server.persistence.FileTxnSnapLog;
+import org.apache.zookeeper.test.ClientBase;
+import org.junit.jupiter.api.AfterEach;
+import org.junit.jupiter.api.BeforeEach;
+import org.junit.jupiter.api.Test;
+import org.junit.jupiter.api.Timeout;
+
+public class DIFFSyncTest extends QuorumPeerTestBase {
+ private static final int SERVER_COUNT = 3;
+ private static final String PATH_PREFIX = "/test_";
+
+ private int[] clientPorts;
+ private MainThread[] mt;
+ private ZooKeeper[] zkClients;
+
+ @BeforeEach
+ public void start() throws Exception {
+ clientPorts = new int[SERVER_COUNT];
+ mt = startQuorum(clientPorts);
+ zkClients = new ZooKeeper[SERVER_COUNT];
+ }
+
+ @AfterEach
+ public void tearDown() throws Exception{
+ for (final ZooKeeper zk : zkClients) {
+ try {
+ if (zk != null) {
+ zk.close();
+ }
+ } catch (final InterruptedException e) {
+ LOG.warn("ZooKeeper interrupted while shutting it down", e);
+ }
+ }
+
+ for (final MainThread mainThread : mt) {
+ try {
+ mainThread.shutdown();
+ } catch (final InterruptedException e) {
+ LOG.warn("Quorum Peer interrupted while shutting it down", e);
+ }
+ }
+ }
+
+ @Test
+ @Timeout(value = 120)
+ public void testTxnLoss_FailToPersistAndCommitTxns() throws Exception {
+ final List<String> paths = new ArrayList<>();
+ assertEquals(2, mt[2].getQuorumPeer().getLeaderId());
+
+ // create a ZK client to the leader (currentEpoch=1,
lastLoggedZxid=<1, 1>)
+ createZKClient(2);
+
+ // create a znode (currentEpoch=1, lastLoggedZxid=<1, 2>)
+ paths.add(createNode(zkClients[2], PATH_PREFIX + "0"));
+
+ // shut down S0
+ mt[0].shutdown();
+ LOG.info("S0 shutdown.");
+
+ // create a znode (currentEpoch=1, lastLoggedZxid=<1, 3>), so S0 is 1
txn behind
+ paths.add(createNode(zkClients[2], PATH_PREFIX + "1"));
+ logEpochsAndLastLoggedTxnForAllServers();
+
+ // shut down S1
+ mt[1].shutdown();
+ LOG.info("S1 shutdown.");
+
+ // restart S0 and trigger a new leader election (currentEpoch=2)
+ // S0 starts with MockSyncRequestProcessor and MockCommitProcessor to
simulate it writes the
+ // currentEpoch and sends NEWLEADER ACK but fails to persist and
commit txns afterwards
+ // in DIFF sync
+ mt[0].start(new MockTestQPMain());
+ assertTrue(ClientBase.waitForServerUp("127.0.0.1:" + clientPorts[0],
CONNECTION_TIMEOUT),
+ "waiting for server 0 being up");
+ LOG.info("S0 restarted.");
+ logEpochsAndLastLoggedTxnForAllServers();
+
+ // validate S2 is still the leader
+ assertEquals(2, mt[2].getQuorumPeer().getLeaderId());
+
+ // shut down the leader (i.e. S2). This causes S0 disconnects from
leader, performs partial
+ // shutdown, fast forwards its database to the latest persisted tnx
(i.e. <1, 3>) and change
+ // its state to LOOKING
+ mt[2].shutdown();
+ LOG.info("S2 shutdown.");
+
+ // start S1 and trigger a leader election (currentEpoch=3)
+ mt[1].start();
+ assertTrue(ClientBase.waitForServerUp("127.0.0.1:" + clientPorts[1],
CONNECTION_TIMEOUT),
+ "waiting for server 1 being up");
+ LOG.info("S1 restarted.");
+ logEpochsAndLastLoggedTxnForAllServers();
+
+ // validate S0 is the new leader because of it has higher epoch
+ assertEquals(0, mt[0].getQuorumPeer().getLeaderId());
+
+ // connect to the new leader (i.e. S0) (currentEpoch=3,
lastLoggedZxid=<3, 1>
+ createZKClient(0);
+
+ // create a znode (currentEpoch=3, lastLoggedZxid=<3, 2>)
+ paths.add(createNode(zkClients[0], PATH_PREFIX + "3"));
+
+ // start S2 which is the old leader
+ mt[2].start();
+ assertTrue(ClientBase.waitForServerUp("127.0.0.1:" + clientPorts[2],
CONNECTION_TIMEOUT),
+ "waiting for server " + 2 + " being up");
+ LOG.info("S2 restarted.");
+ logEpochsAndLastLoggedTxnForAllServers();
+
+ // validate all the znodes exist from all the clients
+ validateDataFromAllClients(paths);
+ }
+
+ @Test
+ @Timeout(value = 120)
+ public void testLeaderShutdown_AckProposalBeforeAckNewLeader() throws
Exception {
+ assertEquals(2, mt[2].getQuorumPeer().getLeaderId());
+
+ // create a ZK client to the leader (currentEpoch=1,
lastLoggedZxid=<1, 1>)
+ createZKClient(2);
+
+ // create a znode (currentEpoch=1, lastLoggedZxid=<1, 2>)
+ createNode(zkClients[2], PATH_PREFIX + "0");
+
+ // shut down S0
+ mt[0].shutdown();
+ LOG.info("S0 shutdown.");
+
+ // create a znode (currentEpoch=1, lastLoggedZxid=<1, 3>), so S0 is 1
txn behind
+ createNode(zkClients[2], PATH_PREFIX + "1");
+ logEpochsAndLastLoggedTxnForAllServers();
+
+ // shut down S1
+ mt[1].shutdown();
+ LOG.info("S1 shutdown.");
+
+ // restart S0 and trigger a new leader election and DIFF sync
(currentEpoch=2)
+ mt[0].start();
+ assertTrue(ClientBase.waitForServerUp("127.0.0.1:" + clientPorts[0],
CONNECTION_TIMEOUT),
+ "waiting for server 0 being up");
+ LOG.info("S0 restarted.");
+
+ // create a znode (currentEpoch=2, lastLoggedZxid=<2, 1>)
+ createNode(zkClients[2], PATH_PREFIX + "2");
+
+ // validate quorum is up without additional round of leader election
+ for (int i = 0; i < SERVER_COUNT; i++) {
+ if (i != 1) {
+ final QuorumPeer qp = mt[i].getQuorumPeer();
+ assertNotNull(qp);
+ assertEquals(2, qp.getCurrentEpoch());
+ assertEquals(2, qp.getAcceptedEpoch());
+ assertEquals("200000001",
Long.toHexString(qp.getLastLoggedZxid()));
+ }
+ }
+ }
+
+ private MainThread[] startQuorum(final int[] clientPorts) throws
IOException {
+ final StringBuilder sb = new StringBuilder();
+ String server;
+
+ for (int i = 0; i < SERVER_COUNT; i++) {
+ clientPorts[i] = PortAssignment.unique();
+ server = "server." + i + "=127.0.0.1:" + PortAssignment.unique() +
":" + PortAssignment.unique()
+ + ":participant;127.0.0.1:" + clientPorts[i];
+ sb.append(server);
+ sb.append("\n");
+ }
+
+ final MainThread[] mt = new MainThread[SERVER_COUNT];
+
+ // start all the servers
+ for (int i = 0; i < SERVER_COUNT; i++) {
+ mt[i] = new MainThread(i, clientPorts[i], sb.toString(), false);
+ mt[i].start();
+ }
+
+ // ensure all servers started
+ for (int i = 0; i < SERVER_COUNT; i++) {
+ assertTrue(
+ ClientBase.waitForServerUp("127.0.0.1:" + clientPorts[i],
CONNECTION_TIMEOUT),
+ "waiting for server " + i + " being up");
+ }
+ return mt;
+ }
+
+ private void createZKClient(final int idx) throws Exception {
+ zkClients[idx] = null;
+ final ClientBase.CountdownWatcher watch = new
ClientBase.CountdownWatcher();
+ zkClients[idx] = new ZooKeeper("127.0.0.1:" + clientPorts[idx],
ClientBase.CONNECTION_TIMEOUT, watch);
+ watch.waitForConnected(ClientBase.CONNECTION_TIMEOUT);
+ }
+
+ private String createNode(final ZooKeeper zk, final String path) throws
Exception {
+ final String fullPath = zk.create(path, new byte[0],
ZooDefs.Ids.OPEN_ACL_UNSAFE, CreateMode.PERSISTENT);
+ assertNotNull(zk.exists(path, false));
+ return fullPath;
+ }
+
+ private static class MockTestQPMain extends TestQPMain {
+ @Override
+ protected QuorumPeer getQuorumPeer() throws SaslException {
+ return new TestQuorumPeer();
+ }
+ }
+
+ private static class TestQuorumPeer extends QuorumPeer {
+ public TestQuorumPeer() throws SaslException {
+ }
+
+ @Override
+ protected Follower makeFollower(FileTxnSnapLog logFactory) throws
IOException {
+ final FollowerZooKeeperServer followerZookeeperServer = new
FollowerZooKeeperServer(logFactory, this, this.getZkDb()) {
+ @Override
+ protected void setupRequestProcessors() {
+ RequestProcessor finalProcessor = new
FinalRequestProcessor(this);
+ commitProcessor = new MockCommitProcessor(finalProcessor,
Long.toString(getServerId()), true, getZooKeeperServerListener());
+ commitProcessor.start();
+
+ firstProcessor = new FollowerRequestProcessor(this,
commitProcessor);
+ ((FollowerRequestProcessor) firstProcessor).start();
+ syncProcessor = new MockSyncRequestProcessor(this, new
SendAckRequestProcessor(getFollower()));
+
+ syncProcessor.start();
+ }
+ };
+ return new Follower(this, followerZookeeperServer);
+ }
+ }
+
+ private static class MockSyncRequestProcessor extends SyncRequestProcessor
{
+ public MockSyncRequestProcessor(final ZooKeeperServer zks, final
RequestProcessor nextProcessor) {
+ super(zks, nextProcessor);
+ }
+
+ @Override
+ public void processRequest(final Request request) {
+ LOG.info("Sync request for zxid {} is dropped",
Long.toHexString(request.getHdr().getZxid()));
+ }
+ }
+
+ private static class MockCommitProcessor extends CommitProcessor {
+ public MockCommitProcessor(final RequestProcessor nextProcessor, final
String id,
+ final boolean matchSyncs, final
ZooKeeperServerListener listener) {
+
+ super(nextProcessor, id, matchSyncs, listener);
+ }
+
+ @Override
+ public void commit(final Request request) {
+ LOG.info("Commit request for zxid {} is dropped",
Long.toHexString(request.getHdr().getZxid()));
+ }
+ }
+
+ private void logEpochsAndLastLoggedTxnForAllServers() throws Exception {
+ for (int i = 0; i < SERVER_COUNT; i++) {
+ final QuorumPeer qp = mt[i].getQuorumPeer();
+ if (qp != null) {
+ LOG.info(String.format("server id=%d, acceptedEpoch=%d,
currentEpoch=%d, lastLoggedTxn=%s",
+ qp.getMyId(), qp.getAcceptedEpoch(),
+ qp.getCurrentEpoch(),
Long.toHexString(qp.getLastLoggedZxid())));
+ }
+ }
+ }
+
+ private void validateDataFromAllClients(final List<String> paths) throws
Exception{
+ for (int i = 0; i < SERVER_COUNT; i++) {
+ if (zkClients[i] == null) {
+ createZKClient(i);
+ }
+
+ for (final String path : paths) {
+ assertNotNull(zkClients[i].exists(path, false), "znode " +
path + " is missing");
+ }
+ assertEquals(3, paths.size());
+ }
+ }
+}
diff --git
a/zookeeper-server/src/test/java/org/apache/zookeeper/server/quorum/QuorumPeerTestBase.java
b/zookeeper-server/src/test/java/org/apache/zookeeper/server/quorum/QuorumPeerTestBase.java
index 754dca656..c3842808a 100644
---
a/zookeeper-server/src/test/java/org/apache/zookeeper/server/quorum/QuorumPeerTestBase.java
+++
b/zookeeper-server/src/test/java/org/apache/zookeeper/server/quorum/QuorumPeerTestBase.java
@@ -318,6 +318,18 @@ public class QuorumPeerTestBase extends ZKTestCase
implements Watcher {
currentThread.start();
}
+ /**
+ * start the QuorumPeer with the passed TestQPMain
+ *
+ * @param testQPMain the TestQPMain to use
+ */
+
+ public synchronized void start(final TestQPMain testQPMain) {
+ main = testQPMain;
+ currentThread = new Thread(this);
+ currentThread.start();
+ }
+
public TestQPMain getTestQPMain() {
return new TestQPMain();
}