This is an automated email from the ASF dual-hosted git repository.
andor pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/zookeeper.git
The following commit(s) were added to refs/heads/master by this push:
new 9fcb805fa ZOOKEEPER-4394: Apply only committed requests in sync with
leader before NEWLEADER ACK
9fcb805fa is described below
commit 9fcb805fa1bb37b1f64ceb3bc5d7ccf349971bb7
Author: AlphaCanisMajoris <[email protected]>
AuthorDate: Thu Sep 19 23:28:02 2024 +0800
ZOOKEEPER-4394: Apply only committed requests in sync with leader before
NEWLEADER ACK
Reviewers: kezhuw, anmolnar, kezhuw
Author: AlphaCanisMajoris
Closes #2152 from AlphaCanisMajoris/ZK-4643
---
.../apache/zookeeper/server/ZooKeeperServer.java | 15 +-
.../server/quorum/FollowerZooKeeperServer.java | 7 +-
.../apache/zookeeper/server/quorum/Learner.java | 80 ++++++----
.../apache/zookeeper/server/quorum/Zab1_0Test.java | 165 ++++++++++++++++++---
4 files changed, 196 insertions(+), 71 deletions(-)
diff --git
a/zookeeper-server/src/main/java/org/apache/zookeeper/server/ZooKeeperServer.java
b/zookeeper-server/src/main/java/org/apache/zookeeper/server/ZooKeeperServer.java
index 598a69768..af4147afd 100644
---
a/zookeeper-server/src/main/java/org/apache/zookeeper/server/ZooKeeperServer.java
+++
b/zookeeper-server/src/main/java/org/apache/zookeeper/server/ZooKeeperServer.java
@@ -793,19 +793,6 @@ public class ZooKeeperServer implements SessionExpirer,
ServerStats.Provider {
}
public synchronized void startup() {
- startupWithServerState(State.RUNNING);
- }
-
- public synchronized void startupWithoutServing() {
- startupWithServerState(State.INITIAL);
- }
-
- public synchronized void startServing() {
- setState(State.RUNNING);
- notifyAll();
- }
-
- private void startupWithServerState(State state) {
if (sessionTracker == null) {
createSessionTracker();
}
@@ -820,7 +807,7 @@ public class ZooKeeperServer implements SessionExpirer,
ServerStats.Provider {
registerMetrics();
- setState(state);
+ setState(State.RUNNING);
requestPathMetricsCollector.start();
diff --git
a/zookeeper-server/src/main/java/org/apache/zookeeper/server/quorum/FollowerZooKeeperServer.java
b/zookeeper-server/src/main/java/org/apache/zookeeper/server/quorum/FollowerZooKeeperServer.java
index 5af114dcc..b67661999 100644
---
a/zookeeper-server/src/main/java/org/apache/zookeeper/server/quorum/FollowerZooKeeperServer.java
+++
b/zookeeper-server/src/main/java/org/apache/zookeeper/server/quorum/FollowerZooKeeperServer.java
@@ -89,12 +89,11 @@ public class FollowerZooKeeperServer extends
LearnerZooKeeperServer {
* @param hdr the txn header
* @param txn the txn
* @param digest the digest of txn
- * @return a request moving through a chain of RequestProcessors
*/
- public Request appendRequest(final TxnHeader hdr, final Record txn, final
TxnDigest digest) throws IOException {
- final Request request = buildRequestToProcess(hdr, txn, digest);
+ public void appendRequest(final TxnHeader hdr, final Record txn, final
TxnDigest digest) throws IOException {
+ final Request request = new Request(hdr.getClientId(), hdr.getCxid(),
hdr.getType(), hdr, txn, hdr.getZxid());
+ request.setTxnDigest(digest);
getZKDatabase().append(request);
- return request;
}
/**
diff --git
a/zookeeper-server/src/main/java/org/apache/zookeeper/server/quorum/Learner.java
b/zookeeper-server/src/main/java/org/apache/zookeeper/server/quorum/Learner.java
index 3fe981cce..bb7be70dd 100644
---
a/zookeeper-server/src/main/java/org/apache/zookeeper/server/quorum/Learner.java
+++
b/zookeeper-server/src/main/java/org/apache/zookeeper/server/quorum/Learner.java
@@ -555,8 +555,7 @@ public class Learner {
boolean syncSnapshot = false;
readPacket(qp);
Deque<Long> packetsCommitted = new ArrayDeque<>();
- Deque<PacketInFlight> packetsNotCommitted = new ArrayDeque<>();
- Deque<Request> requestsToAck = new ArrayDeque<>();
+ Deque<PacketInFlight> packetsNotLogged = new ArrayDeque<>();
synchronized (zk) {
if (qp.getType() == Leader.DIFF) {
@@ -645,11 +644,11 @@ public class Learner {
self.setLastSeenQuorumVerifier(qv, true);
}
- packetsNotCommitted.add(pif);
+ packetsNotLogged.add(pif);
break;
case Leader.COMMIT:
case Leader.COMMITANDACTIVATE:
- pif = packetsNotCommitted.peekFirst();
+ pif = packetsNotLogged.peekFirst();
if (pif.hdr.getZxid() == qp.getZxid() && qp.getType() ==
Leader.COMMITANDACTIVATE) {
QuorumVerifier qv = self.configFromString(new
String(((SetDataTxn) pif.rec).getData(), UTF_8));
boolean majorChange = self.processReconfig(
@@ -668,7 +667,7 @@ public class Learner {
Long.toHexString(pif.hdr.getZxid()));
} else {
zk.processTxn(pif.hdr, pif.rec);
- packetsNotCommitted.remove();
+ packetsNotLogged.remove();
}
} else {
packetsCommitted.add(qp.getZxid());
@@ -710,7 +709,7 @@ public class Learner {
// Apply to db directly if we haven't taken the
snapshot
zk.processTxn(packet.hdr, packet.rec);
} else {
- packetsNotCommitted.add(packet);
+ packetsNotLogged.add(packet);
packetsCommitted.add(qp.getZxid());
}
@@ -753,29 +752,55 @@ public class Learner {
isPreZAB1_0 = false;
// ZOOKEEPER-3911: make sure sync the uncommitted logs
before commit them (ACK NEWLEADER).
- sock.setSoTimeout(self.tickTime * self.syncLimit);
- self.setSyncMode(QuorumPeer.SyncMode.NONE);
- zk.startupWithoutServing();
- if (zk instanceof FollowerZooKeeperServer) {
+ if (zk instanceof FollowerZooKeeperServer &&
!packetsCommitted.isEmpty()) {
long startTime = Time.currentElapsedTime();
FollowerZooKeeperServer fzk =
(FollowerZooKeeperServer) zk;
- for (PacketInFlight p : packetsNotCommitted) {
- final Request request = fzk.appendRequest(p.hdr,
p.rec, p.digest);
- requestsToAck.add(request);
+
+ /*
+ * @see https://github.com/apache/zookeeper/pull/1848
+ * Persist and process the committed txns in
"packetsNotLogged"
+ * according to "packetsCommitted", which have been
committed by
+ * the leader. For these committed proposals, there is
no need to
+ * reply ack.
+ *
+ * @see
https://issues.apache.org/jira/browse/ZOOKEEPER-4394
+ * Keep the outstanding proposals in
"packetsNotLogged" to avoid
+ * NullPointerException when the follower receives
COMMIT packet(s)
+ * right after replying NEWLEADER ack.
+ */
+ while (!packetsCommitted.isEmpty()) {
+ long zxid = packetsCommitted.removeFirst();
+ pif = packetsNotLogged.peekFirst();
+ if (pif == null) {
+ LOG.warn("Committing 0x{}, but got no
proposal", Long.toHexString(zxid));
+ continue;
+ } else if (pif.hdr.getZxid() != zxid) {
+ LOG.warn("Committing 0x{}, but next proposal
is 0x{}",
+ Long.toHexString(zxid),
Long.toHexString(pif.hdr.getZxid()));
+ continue;
+ }
+ packetsNotLogged.removeFirst();
+ fzk.appendRequest(pif.hdr, pif.rec, pif.digest);
+ fzk.processTxn(pif.hdr, pif.rec);
}
- // persist the txns to disk
+ // @see
https://issues.apache.org/jira/browse/ZOOKEEPER-4646
+ // Make sure to persist the txns to disk before
replying NEWLEADER ack.
fzk.getZKDatabase().commit();
- LOG.info("{} txns have been persisted and it took
{}ms",
- packetsNotCommitted.size(), Time.currentElapsedTime()
- startTime);
- packetsNotCommitted.clear();
+ LOG.info("It took {}ms to persist and commit txns in
packetsCommitted. "
+ + "{} outstanding txns left in
packetsNotLogged",
+ Time.currentElapsedTime() - startTime,
packetsNotLogged.size());
}
- // set the current epoch after all the tnxs are persisted
+ // @see
https://issues.apache.org/jira/browse/ZOOKEEPER-4643
+ // @see
https://issues.apache.org/jira/browse/ZOOKEEPER-4785
+ // Update current epoch after the committed txns are
persisted
self.setCurrentEpoch(newEpoch);
LOG.info("Set the current epoch to {}", newEpoch);
+ sock.setSoTimeout(self.tickTime * self.syncLimit);
+ self.setSyncMode(QuorumPeer.SyncMode.NONE);
- // send NEWLEADER ack after all the tnxs are persisted
+ // send NEWLEADER ack after the committed txns are
persisted
writePacket(new QuorumPacket(Leader.ACK, newLeaderZxid,
null, null), true);
LOG.info("Sent NEWLEADER ack to leader with zxid {}",
Long.toHexString(newLeaderZxid));
break;
@@ -784,7 +809,7 @@ public class Learner {
}
ack.setZxid(ZxidUtils.makeZxid(newEpoch, 0));
writePacket(ack, true);
- zk.startServing();
+ zk.startup();
/*
* Update the election vote here to ensure that all members of the
* ensemble report the same vote to new servers that start up and
@@ -796,20 +821,11 @@ public class Learner {
// We need to log the stuff that came in between the snapshot and the
uptodate
if (zk instanceof FollowerZooKeeperServer) {
- // reply ACK of PROPOSAL after ACK of NEWLEADER to avoid leader
shutdown due to timeout
- // on waiting for a quorum of followers
- for (final Request request : requestsToAck) {
- final QuorumPacket ackPacket = new QuorumPacket(Leader.ACK,
request.getHdr().getZxid(), null, null);
- writePacket(ackPacket, false);
- }
- writePacket(null, true);
- requestsToAck.clear();
-
FollowerZooKeeperServer fzk = (FollowerZooKeeperServer) zk;
- for (PacketInFlight p : packetsNotCommitted) {
+ for (PacketInFlight p : packetsNotLogged) {
fzk.logRequest(p.hdr, p.rec, p.digest);
}
- LOG.info("{} txns have been logged asynchronously",
packetsNotCommitted.size());
+ LOG.info("{} txns have been logged asynchronously",
packetsNotLogged.size());
for (Long zxid : packetsCommitted) {
fzk.commit(zxid);
@@ -819,7 +835,7 @@ public class Learner {
// Similar to follower, we need to log requests between the
snapshot
// and UPTODATE
ObserverZooKeeperServer ozk = (ObserverZooKeeperServer) zk;
- for (PacketInFlight p : packetsNotCommitted) {
+ for (PacketInFlight p : packetsNotLogged) {
Long zxid = packetsCommitted.peekFirst();
if (p.hdr.getZxid() != zxid) {
// log warning message if there is no matching commit
diff --git
a/zookeeper-server/src/test/java/org/apache/zookeeper/server/quorum/Zab1_0Test.java
b/zookeeper-server/src/test/java/org/apache/zookeeper/server/quorum/Zab1_0Test.java
index 76a678f50..d374062e2 100644
---
a/zookeeper-server/src/test/java/org/apache/zookeeper/server/quorum/Zab1_0Test.java
+++
b/zookeeper-server/src/test/java/org/apache/zookeeper/server/quorum/Zab1_0Test.java
@@ -741,8 +741,8 @@ public class Zab1_0Test extends ZKTestCase {
readPacketSkippingPing(ia, qp);
assertEquals(Leader.ACKEPOCH, qp.getType());
- assertEquals(0, qp.getZxid());
- assertEquals(ZxidUtils.makeZxid(0, 0),
ByteBuffer.wrap(qp.getData()).getInt());
+ assertEquals(ZxidUtils.makeZxid(0, 0), qp.getZxid());
+ assertEquals(0, ByteBuffer.wrap(qp.getData()).getInt());
assertEquals(1, f.self.getAcceptedEpoch());
assertEquals(0, f.self.getCurrentEpoch());
@@ -765,36 +765,22 @@ public class Zab1_0Test extends ZKTestCase {
qp.setZxid(0);
oa.writeRecord(qp, null);
- // Read the uptodate ack
- readPacketSkippingPing(ia, qp);
- assertEquals(Leader.ACK, qp.getType());
- assertEquals(ZxidUtils.makeZxid(1, 0), qp.getZxid());
-
// Get the ack of the new leader
readPacketSkippingPing(ia, qp);
assertEquals(Leader.ACK, qp.getType());
assertEquals(ZxidUtils.makeZxid(1, 0), qp.getZxid());
assertEquals(1, f.self.getAcceptedEpoch());
assertEquals(1, f.self.getCurrentEpoch());
-
- //Wait for the transactions to be written out. The thread
that writes them out
- // does not send anything back when it is done.
- long start = System.currentTimeMillis();
- while (createSessionZxid != f.fzk.getLastProcessedZxid()
- && (System.currentTimeMillis() - start) <
50) {
- Thread.sleep(1);
- }
-
assertEquals(createSessionZxid,
f.fzk.getLastProcessedZxid());
+ // Read the uptodate ack
+ readPacketSkippingPing(ia, qp);
+ assertEquals(Leader.ACK, qp.getType());
+ assertEquals(ZxidUtils.makeZxid(1, 0), qp.getZxid());
+
// Make sure the data was recorded in the filesystem ok
ZKDatabase zkDb2 = new ZKDatabase(new
FileTxnSnapLog(logDir, snapDir));
- start = System.currentTimeMillis();
zkDb2.loadDataBase();
- while (zkDb2.getSessionWithTimeOuts().isEmpty() &&
(System.currentTimeMillis() - start) < 50) {
- Thread.sleep(1);
- zkDb2.loadDataBase();
- }
LOG.info("zkdb2 sessions:{}", zkDb2.getSessions());
LOG.info("zkdb2 with timeouts:{}",
zkDb2.getSessionWithTimeOuts());
assertNotNull(zkDb2.getSessionWithTimeOuts().get(4L));
@@ -820,6 +806,143 @@ public class Zab1_0Test extends ZKTestCase {
}, testData);
}
+ @Test
+ public void
testNormalFollowerRun_ProcessCommitInSyncAfterAckNewLeader(@TempDir File
testData) throws Exception {
+ testFollowerConversation(new FollowerConversation() {
+ @Override
+ public void converseWithFollower(InputArchive ia, OutputArchive
oa, Follower f) throws Exception {
+ File tmpDir = File.createTempFile("test", "dir", testData);
+ tmpDir.delete();
+ tmpDir.mkdir();
+ File logDir =
f.fzk.getTxnLogFactory().getDataLogDir().getParentFile();
+ File snapDir =
f.fzk.getTxnLogFactory().getSnapDir().getParentFile();
+ //Spy on ZK so we can check if a snapshot happened or not.
+ f.zk = spy(f.zk);
+ try {
+ assertEquals(0, f.self.getAcceptedEpoch());
+ assertEquals(0, f.self.getCurrentEpoch());
+
+ // Setup a database with a single /foo node
+ ZKDatabase zkDb = new ZKDatabase(new
FileTxnSnapLog(tmpDir, tmpDir));
+ final long firstZxid = ZxidUtils.makeZxid(1, 1);
+ zkDb.processTxn(new TxnHeader(13, 1313, firstZxid, 33,
ZooDefs.OpCode.create), new CreateTxn("/foo", "data1".getBytes(),
ZooDefs.Ids.OPEN_ACL_UNSAFE, false, 1), null);
+ Stat stat = new Stat();
+ assertEquals("data1", new String(zkDb.getData("/foo",
stat, null)));
+
+ QuorumPacket qp = new QuorumPacket();
+ readPacketSkippingPing(ia, qp);
+ assertEquals(Leader.FOLLOWERINFO, qp.getType());
+ assertEquals(qp.getZxid(), 0);
+ LearnerInfo learnInfo = new LearnerInfo();
+
ByteBufferInputStream.byteBuffer2Record(ByteBuffer.wrap(qp.getData()),
learnInfo);
+ assertEquals(learnInfo.getProtocolVersion(), 0x10000);
+ assertEquals(learnInfo.getServerid(), 0);
+
+ // We are simulating an established leader, so the epoch
is 1
+ qp.setType(Leader.LEADERINFO);
+ qp.setZxid(ZxidUtils.makeZxid(1, 0));
+ byte[] protoBytes = new byte[4];
+ ByteBuffer.wrap(protoBytes).putInt(0x10000);
+ qp.setData(protoBytes);
+ oa.writeRecord(qp, null);
+
+ readPacketSkippingPing(ia, qp);
+ assertEquals(Leader.ACKEPOCH, qp.getType());
+ assertEquals(0, qp.getZxid());
+ assertEquals(ZxidUtils.makeZxid(0, 0),
ByteBuffer.wrap(qp.getData()).getInt());
+ assertEquals(1, f.self.getAcceptedEpoch());
+ assertEquals(0, f.self.getCurrentEpoch());
+
+ // Send the snapshot we created earlier
+ qp.setType(Leader.SNAP);
+ qp.setData(new byte[0]);
+ qp.setZxid(zkDb.getDataTreeLastProcessedZxid());
+ oa.writeRecord(qp, null);
+ zkDb.serializeSnapshot(oa);
+ oa.writeString("BenWasHere", null);
+ Thread.sleep(10); //Give it some time to process the snap
+ //No Snapshot taken yet, the SNAP was applied in memory
+ verify(f.zk, never()).takeSnapshot();
+
+ // Leader sends an outstanding proposal
+ long proposalZxid = ZxidUtils.makeZxid(1, 1001);
+ proposeSetData(qp, proposalZxid, "data2", 2);
+ oa.writeRecord(qp, null);
+
+ qp.setType(Leader.NEWLEADER);
+ qp.setZxid(ZxidUtils.makeZxid(1, 0));
+ qp.setData(null);
+ oa.writeRecord(qp, null);
+
+ // Get the ack of the new leader
+ readPacketSkippingPing(ia, qp);
+ assertEquals(Leader.ACK, qp.getType());
+ assertEquals(ZxidUtils.makeZxid(1, 0), qp.getZxid());
+ assertEquals(1, f.self.getAcceptedEpoch());
+ assertEquals(1, f.self.getCurrentEpoch());
+ //Make sure that we did take the snapshot now
+ verify(f.zk).takeSnapshot(true);
+ assertEquals(firstZxid, f.fzk.getLastProcessedZxid());
+
+ // The outstanding proposal has not been persisted yet
+ ZKDatabase zkDb2 = new ZKDatabase(new
FileTxnSnapLog(logDir, snapDir));
+ long lastZxid = zkDb2.loadDataBase();
+ assertEquals("data1", new String(zkDb2.getData("/foo",
stat, null)));
+ assertEquals(firstZxid, lastZxid);
+
+ TrackerWatcher watcher = new TrackerWatcher();
+
+ // The change should not have happened yet
+ assertEquals("data1", new
String(f.fzk.getZKDatabase().getData("/foo", stat, watcher)));
+
+ // Leader commits proposalZxid right after it sends
NEWLEADER to follower
+ qp.setType(Leader.COMMIT);
+ qp.setZxid(proposalZxid);
+ qp.setData(null);
+ oa.writeRecord(qp, null);
+
+ qp.setType(Leader.UPTODATE);
+ qp.setZxid(0);
+ qp.setData(null);
+ oa.writeRecord(qp, null);
+
+ // Read the uptodate ack
+ readPacketSkippingPing(ia, qp);
+ assertEquals(Leader.ACK, qp.getType());
+ assertEquals(ZxidUtils.makeZxid(1, 0), qp.getZxid());
+
+ readPacketSkippingPing(ia, qp);
+ assertEquals(Leader.ACK, qp.getType());
+ assertEquals(proposalZxid, qp.getZxid());
+
+ // The change should happen now
+ watcher.waitForChange();
+ assertEquals("data2", new
String(f.fzk.getZKDatabase().getData("/foo", stat, null)));
+
+ // check and make sure the change is persisted
+ zkDb2 = new ZKDatabase(new FileTxnSnapLog(logDir,
snapDir));
+ lastZxid = zkDb2.loadDataBase();
+ assertEquals("data2", new String(zkDb2.getData("/foo",
stat, null)));
+ assertEquals(proposalZxid, lastZxid);
+ } finally {
+ TestUtils.deleteFileRecursively(tmpDir);
+ }
+ }
+
+ private void proposeSetData(QuorumPacket qp, long zxid, String
data, int version) throws IOException {
+ qp.setType(Leader.PROPOSAL);
+ qp.setZxid(zxid);
+ TxnHeader hdr = new TxnHeader(4, 1414, qp.getZxid(), 55,
ZooDefs.OpCode.setData);
+ SetDataTxn sdt = new SetDataTxn("/foo", data.getBytes(),
version);
+ ByteArrayOutputStream baos = new ByteArrayOutputStream();
+ OutputArchive boa = BinaryOutputArchive.getArchive(baos);
+ boa.writeRecord(hdr, null);
+ boa.writeRecord(sdt, null);
+ qp.setData(baos.toByteArray());
+ }
+ }, testData);
+ }
+
@Test
public void testNormalRun(@TempDir File testData) throws Exception {
testLeaderConversation(new LeaderConversation() {