This is an automated email from the ASF dual-hosted git repository.
andor pushed a commit to branch branch-3.9
in repository https://gitbox.apache.org/repos/asf/zookeeper.git
The following commit(s) were added to refs/heads/branch-3.9 by this push:
new dcb3fc93b Revert "ZOOKEEPER-4394: Apply only committed requests in
sync with leader before NEWLEADER ACK"
dcb3fc93b is described below
commit dcb3fc93b431dcdf02f73636c2502f058a5ad347
Author: Andor Molnar <[email protected]>
AuthorDate: Thu Sep 19 11:54:51 2024 -0500
Revert "ZOOKEEPER-4394: Apply only committed requests in sync with leader
before NEWLEADER ACK"
This reverts commit 055b253f3a6028c96c8fc9580e8ad5bf094cb202.
---
.../apache/zookeeper/server/ZooKeeperServer.java | 15 +-
.../server/quorum/FollowerZooKeeperServer.java | 7 +-
.../apache/zookeeper/server/quorum/Learner.java | 80 ++++------
.../apache/zookeeper/server/quorum/Zab1_0Test.java | 165 +++------------------
4 files changed, 71 insertions(+), 196 deletions(-)
diff --git
a/zookeeper-server/src/main/java/org/apache/zookeeper/server/ZooKeeperServer.java
b/zookeeper-server/src/main/java/org/apache/zookeeper/server/ZooKeeperServer.java
index 9147c3ebf..4ee61ca11 100644
---
a/zookeeper-server/src/main/java/org/apache/zookeeper/server/ZooKeeperServer.java
+++
b/zookeeper-server/src/main/java/org/apache/zookeeper/server/ZooKeeperServer.java
@@ -793,6 +793,19 @@ public class ZooKeeperServer implements SessionExpirer,
ServerStats.Provider {
}
public synchronized void startup() {
+ startupWithServerState(State.RUNNING);
+ }
+
+ public synchronized void startupWithoutServing() {
+ startupWithServerState(State.INITIAL);
+ }
+
+ public synchronized void startServing() {
+ setState(State.RUNNING);
+ notifyAll();
+ }
+
+ private void startupWithServerState(State state) {
if (sessionTracker == null) {
createSessionTracker();
}
@@ -807,7 +820,7 @@ public class ZooKeeperServer implements SessionExpirer,
ServerStats.Provider {
registerMetrics();
- setState(State.RUNNING);
+ setState(state);
requestPathMetricsCollector.start();
diff --git
a/zookeeper-server/src/main/java/org/apache/zookeeper/server/quorum/FollowerZooKeeperServer.java
b/zookeeper-server/src/main/java/org/apache/zookeeper/server/quorum/FollowerZooKeeperServer.java
index b67661999..5af114dcc 100644
---
a/zookeeper-server/src/main/java/org/apache/zookeeper/server/quorum/FollowerZooKeeperServer.java
+++
b/zookeeper-server/src/main/java/org/apache/zookeeper/server/quorum/FollowerZooKeeperServer.java
@@ -89,11 +89,12 @@ public class FollowerZooKeeperServer extends
LearnerZooKeeperServer {
* @param hdr the txn header
* @param txn the txn
* @param digest the digest of txn
+ * @return a request moving through a chain of RequestProcessors
*/
- public void appendRequest(final TxnHeader hdr, final Record txn, final
TxnDigest digest) throws IOException {
- final Request request = new Request(hdr.getClientId(), hdr.getCxid(),
hdr.getType(), hdr, txn, hdr.getZxid());
- request.setTxnDigest(digest);
+ public Request appendRequest(final TxnHeader hdr, final Record txn, final
TxnDigest digest) throws IOException {
+ final Request request = buildRequestToProcess(hdr, txn, digest);
getZKDatabase().append(request);
+ return request;
}
/**
diff --git
a/zookeeper-server/src/main/java/org/apache/zookeeper/server/quorum/Learner.java
b/zookeeper-server/src/main/java/org/apache/zookeeper/server/quorum/Learner.java
index 8f3204392..e3bd13d11 100644
---
a/zookeeper-server/src/main/java/org/apache/zookeeper/server/quorum/Learner.java
+++
b/zookeeper-server/src/main/java/org/apache/zookeeper/server/quorum/Learner.java
@@ -555,7 +555,8 @@ public class Learner {
boolean syncSnapshot = false;
readPacket(qp);
Deque<Long> packetsCommitted = new ArrayDeque<>();
- Deque<PacketInFlight> packetsNotLogged = new ArrayDeque<>();
+ Deque<PacketInFlight> packetsNotCommitted = new ArrayDeque<>();
+ Deque<Request> requestsToAck = new ArrayDeque<>();
synchronized (zk) {
if (qp.getType() == Leader.DIFF) {
@@ -644,11 +645,11 @@ public class Learner {
self.setLastSeenQuorumVerifier(qv, true);
}
- packetsNotLogged.add(pif);
+ packetsNotCommitted.add(pif);
break;
case Leader.COMMIT:
case Leader.COMMITANDACTIVATE:
- pif = packetsNotLogged.peekFirst();
+ pif = packetsNotCommitted.peekFirst();
if (pif.hdr.getZxid() == qp.getZxid() && qp.getType() ==
Leader.COMMITANDACTIVATE) {
QuorumVerifier qv = self.configFromString(new
String(((SetDataTxn) pif.rec).getData(), UTF_8));
boolean majorChange = self.processReconfig(
@@ -667,7 +668,7 @@ public class Learner {
Long.toHexString(pif.hdr.getZxid()));
} else {
zk.processTxn(pif.hdr, pif.rec);
- packetsNotLogged.remove();
+ packetsNotCommitted.remove();
}
} else {
packetsCommitted.add(qp.getZxid());
@@ -709,7 +710,7 @@ public class Learner {
// Apply to db directly if we haven't taken the
snapshot
zk.processTxn(packet.hdr, packet.rec);
} else {
- packetsNotLogged.add(packet);
+ packetsNotCommitted.add(packet);
packetsCommitted.add(qp.getZxid());
}
@@ -752,55 +753,29 @@ public class Learner {
isPreZAB1_0 = false;
// ZOOKEEPER-3911: make sure sync the uncommitted logs
before commit them (ACK NEWLEADER).
- if (zk instanceof FollowerZooKeeperServer &&
!packetsCommitted.isEmpty()) {
+ sock.setSoTimeout(self.tickTime * self.syncLimit);
+ self.setSyncMode(QuorumPeer.SyncMode.NONE);
+ zk.startupWithoutServing();
+ if (zk instanceof FollowerZooKeeperServer) {
long startTime = Time.currentElapsedTime();
FollowerZooKeeperServer fzk =
(FollowerZooKeeperServer) zk;
-
- /*
- * @see https://github.com/apache/zookeeper/pull/1848
- * Persist and process the committed txns in
"packetsNotLogged"
- * according to "packetsCommitted", which have been
committed by
- * the leader. For these committed proposals, there is
no need to
- * reply ack.
- *
- * @see
https://issues.apache.org/jira/browse/ZOOKEEPER-4394
- * Keep the outstanding proposals in
"packetsNotLogged" to avoid
- * NullPointerException when the follower receives
COMMIT packet(s)
- * right after replying NEWLEADER ack.
- */
- while (!packetsCommitted.isEmpty()) {
- long zxid = packetsCommitted.removeFirst();
- pif = packetsNotLogged.peekFirst();
- if (pif == null) {
- LOG.warn("Committing 0x{}, but got no
proposal", Long.toHexString(zxid));
- continue;
- } else if (pif.hdr.getZxid() != zxid) {
- LOG.warn("Committing 0x{}, but next proposal
is 0x{}",
- Long.toHexString(zxid),
Long.toHexString(pif.hdr.getZxid()));
- continue;
- }
- packetsNotLogged.removeFirst();
- fzk.appendRequest(pif.hdr, pif.rec, pif.digest);
- fzk.processTxn(pif.hdr, pif.rec);
+ for (PacketInFlight p : packetsNotCommitted) {
+ final Request request = fzk.appendRequest(p.hdr,
p.rec, p.digest);
+ requestsToAck.add(request);
}
- // @see
https://issues.apache.org/jira/browse/ZOOKEEPER-4646
- // Make sure to persist the txns to disk before
replying NEWLEADER ack.
+ // persist the txns to disk
fzk.getZKDatabase().commit();
- LOG.info("It took {}ms to persist and commit txns in
packetsCommitted. "
- + "{} outstanding txns left in
packetsNotLogged",
- Time.currentElapsedTime() - startTime,
packetsNotLogged.size());
+ LOG.info("{} txns have been persisted and it took
{}ms",
+ packetsNotCommitted.size(), Time.currentElapsedTime()
- startTime);
+ packetsNotCommitted.clear();
}
- // @see
https://issues.apache.org/jira/browse/ZOOKEEPER-4643
- // @see
https://issues.apache.org/jira/browse/ZOOKEEPER-4785
- // Update current epoch after the committed txns are
persisted
+ // set the current epoch after all the tnxs are persisted
self.setCurrentEpoch(newEpoch);
LOG.info("Set the current epoch to {}", newEpoch);
- sock.setSoTimeout(self.tickTime * self.syncLimit);
- self.setSyncMode(QuorumPeer.SyncMode.NONE);
- // send NEWLEADER ack after the committed txns are
persisted
+ // send NEWLEADER ack after all the tnxs are persisted
writePacket(new QuorumPacket(Leader.ACK, newLeaderZxid,
null, null), true);
LOG.info("Sent NEWLEADER ack to leader with zxid {}",
Long.toHexString(newLeaderZxid));
break;
@@ -809,7 +784,7 @@ public class Learner {
}
ack.setZxid(ZxidUtils.makeZxid(newEpoch, 0));
writePacket(ack, true);
- zk.startup();
+ zk.startServing();
/*
* Update the election vote here to ensure that all members of the
* ensemble report the same vote to new servers that start up and
@@ -821,11 +796,20 @@ public class Learner {
// We need to log the stuff that came in between the snapshot and the
uptodate
if (zk instanceof FollowerZooKeeperServer) {
+ // reply ACK of PROPOSAL after ACK of NEWLEADER to avoid leader
shutdown due to timeout
+ // on waiting for a quorum of followers
+ for (final Request request : requestsToAck) {
+ final QuorumPacket ackPacket = new QuorumPacket(Leader.ACK,
request.getHdr().getZxid(), null, null);
+ writePacket(ackPacket, false);
+ }
+ writePacket(null, true);
+ requestsToAck.clear();
+
FollowerZooKeeperServer fzk = (FollowerZooKeeperServer) zk;
- for (PacketInFlight p : packetsNotLogged) {
+ for (PacketInFlight p : packetsNotCommitted) {
fzk.logRequest(p.hdr, p.rec, p.digest);
}
- LOG.info("{} txns have been logged asynchronously",
packetsNotLogged.size());
+ LOG.info("{} txns have been logged asynchronously",
packetsNotCommitted.size());
for (Long zxid : packetsCommitted) {
fzk.commit(zxid);
@@ -835,7 +819,7 @@ public class Learner {
// Similar to follower, we need to log requests between the
snapshot
// and UPTODATE
ObserverZooKeeperServer ozk = (ObserverZooKeeperServer) zk;
- for (PacketInFlight p : packetsNotLogged) {
+ for (PacketInFlight p : packetsNotCommitted) {
Long zxid = packetsCommitted.peekFirst();
if (p.hdr.getZxid() != zxid) {
// log warning message if there is no matching commit
diff --git
a/zookeeper-server/src/test/java/org/apache/zookeeper/server/quorum/Zab1_0Test.java
b/zookeeper-server/src/test/java/org/apache/zookeeper/server/quorum/Zab1_0Test.java
index dfb617db1..f0db38e26 100644
---
a/zookeeper-server/src/test/java/org/apache/zookeeper/server/quorum/Zab1_0Test.java
+++
b/zookeeper-server/src/test/java/org/apache/zookeeper/server/quorum/Zab1_0Test.java
@@ -743,8 +743,8 @@ public class Zab1_0Test extends ZKTestCase {
readPacketSkippingPing(ia, qp);
assertEquals(Leader.ACKEPOCH, qp.getType());
- assertEquals(ZxidUtils.makeZxid(0, 0), qp.getZxid());
- assertEquals(0, ByteBuffer.wrap(qp.getData()).getInt());
+ assertEquals(0, qp.getZxid());
+ assertEquals(ZxidUtils.makeZxid(0, 0),
ByteBuffer.wrap(qp.getData()).getInt());
assertEquals(1, f.self.getAcceptedEpoch());
assertEquals(0, f.self.getCurrentEpoch());
@@ -767,22 +767,36 @@ public class Zab1_0Test extends ZKTestCase {
qp.setZxid(0);
oa.writeRecord(qp, null);
- // Get the ack of the new leader
+ // Read the uptodate ack
readPacketSkippingPing(ia, qp);
assertEquals(Leader.ACK, qp.getType());
assertEquals(ZxidUtils.makeZxid(1, 0), qp.getZxid());
- assertEquals(1, f.self.getAcceptedEpoch());
- assertEquals(1, f.self.getCurrentEpoch());
- assertEquals(createSessionZxid,
f.fzk.getLastProcessedZxid());
- // Read the uptodate ack
+ // Get the ack of the new leader
readPacketSkippingPing(ia, qp);
assertEquals(Leader.ACK, qp.getType());
assertEquals(ZxidUtils.makeZxid(1, 0), qp.getZxid());
+ assertEquals(1, f.self.getAcceptedEpoch());
+ assertEquals(1, f.self.getCurrentEpoch());
+
+ //Wait for the transactions to be written out. The thread
that writes them out
+ // does not send anything back when it is done.
+ long start = System.currentTimeMillis();
+ while (createSessionZxid != f.fzk.getLastProcessedZxid()
+ && (System.currentTimeMillis() - start) <
50) {
+ Thread.sleep(1);
+ }
+
+ assertEquals(createSessionZxid,
f.fzk.getLastProcessedZxid());
// Make sure the data was recorded in the filesystem ok
ZKDatabase zkDb2 = new ZKDatabase(new
FileTxnSnapLog(logDir, snapDir));
+ start = System.currentTimeMillis();
zkDb2.loadDataBase();
+ while (zkDb2.getSessionWithTimeOuts().isEmpty() &&
(System.currentTimeMillis() - start) < 50) {
+ Thread.sleep(1);
+ zkDb2.loadDataBase();
+ }
LOG.info("zkdb2 sessions:{}", zkDb2.getSessions());
LOG.info("zkdb2 with timeouts:{}",
zkDb2.getSessionWithTimeOuts());
assertNotNull(zkDb2.getSessionWithTimeOuts().get(4L));
@@ -805,143 +819,6 @@ public class Zab1_0Test extends ZKTestCase {
boa.writeRecord(cst, null);
qp.setData(baos.toByteArray());
}
- }, testData);
- }
-
- @Test
- public void
testNormalFollowerRun_ProcessCommitInSyncAfterAckNewLeader(@TempDir File
testData) throws Exception {
- testFollowerConversation(new FollowerConversation() {
- @Override
- public void converseWithFollower(InputArchive ia, OutputArchive
oa, Follower f) throws Exception {
- File tmpDir = File.createTempFile("test", "dir", testData);
- tmpDir.delete();
- tmpDir.mkdir();
- File logDir =
f.fzk.getTxnLogFactory().getDataLogDir().getParentFile();
- File snapDir =
f.fzk.getTxnLogFactory().getSnapDir().getParentFile();
- //Spy on ZK so we can check if a snapshot happened or not.
- f.zk = spy(f.zk);
- try {
- assertEquals(0, f.self.getAcceptedEpoch());
- assertEquals(0, f.self.getCurrentEpoch());
-
- // Setup a database with a single /foo node
- ZKDatabase zkDb = new ZKDatabase(new
FileTxnSnapLog(tmpDir, tmpDir));
- final long firstZxid = ZxidUtils.makeZxid(1, 1);
- zkDb.processTxn(new TxnHeader(13, 1313, firstZxid, 33,
ZooDefs.OpCode.create), new CreateTxn("/foo", "data1".getBytes(),
ZooDefs.Ids.OPEN_ACL_UNSAFE, false, 1), null);
- Stat stat = new Stat();
- assertEquals("data1", new String(zkDb.getData("/foo",
stat, null)));
-
- QuorumPacket qp = new QuorumPacket();
- readPacketSkippingPing(ia, qp);
- assertEquals(Leader.FOLLOWERINFO, qp.getType());
- assertEquals(qp.getZxid(), 0);
- LearnerInfo learnInfo = new LearnerInfo();
-
ByteBufferInputStream.byteBuffer2Record(ByteBuffer.wrap(qp.getData()),
learnInfo);
- assertEquals(learnInfo.getProtocolVersion(), 0x10000);
- assertEquals(learnInfo.getServerid(), 0);
-
- // We are simulating an established leader, so the epoch
is 1
- qp.setType(Leader.LEADERINFO);
- qp.setZxid(ZxidUtils.makeZxid(1, 0));
- byte[] protoBytes = new byte[4];
- ByteBuffer.wrap(protoBytes).putInt(0x10000);
- qp.setData(protoBytes);
- oa.writeRecord(qp, null);
-
- readPacketSkippingPing(ia, qp);
- assertEquals(Leader.ACKEPOCH, qp.getType());
- assertEquals(0, qp.getZxid());
- assertEquals(ZxidUtils.makeZxid(0, 0),
ByteBuffer.wrap(qp.getData()).getInt());
- assertEquals(1, f.self.getAcceptedEpoch());
- assertEquals(0, f.self.getCurrentEpoch());
-
- // Send the snapshot we created earlier
- qp.setType(Leader.SNAP);
- qp.setData(new byte[0]);
- qp.setZxid(zkDb.getDataTreeLastProcessedZxid());
- oa.writeRecord(qp, null);
- zkDb.serializeSnapshot(oa);
- oa.writeString("BenWasHere", null);
- Thread.sleep(10); //Give it some time to process the snap
- //No Snapshot taken yet, the SNAP was applied in memory
- verify(f.zk, never()).takeSnapshot();
-
- // Leader sends an outstanding proposal
- long proposalZxid = ZxidUtils.makeZxid(1, 1001);
- proposeSetData(qp, proposalZxid, "data2", 2);
- oa.writeRecord(qp, null);
-
- qp.setType(Leader.NEWLEADER);
- qp.setZxid(ZxidUtils.makeZxid(1, 0));
- qp.setData(null);
- oa.writeRecord(qp, null);
-
- // Get the ack of the new leader
- readPacketSkippingPing(ia, qp);
- assertEquals(Leader.ACK, qp.getType());
- assertEquals(ZxidUtils.makeZxid(1, 0), qp.getZxid());
- assertEquals(1, f.self.getAcceptedEpoch());
- assertEquals(1, f.self.getCurrentEpoch());
- //Make sure that we did take the snapshot now
- verify(f.zk).takeSnapshot(true);
- assertEquals(firstZxid, f.fzk.getLastProcessedZxid());
-
- // The outstanding proposal has not been persisted yet
- ZKDatabase zkDb2 = new ZKDatabase(new
FileTxnSnapLog(logDir, snapDir));
- long lastZxid = zkDb2.loadDataBase();
- assertEquals("data1", new String(zkDb2.getData("/foo",
stat, null)));
- assertEquals(firstZxid, lastZxid);
-
- TrackerWatcher watcher = new TrackerWatcher();
-
- // The change should not have happened yet
- assertEquals("data1", new
String(f.fzk.getZKDatabase().getData("/foo", stat, watcher)));
-
- // Leader commits proposalZxid right after it sends
NEWLEADER to follower
- qp.setType(Leader.COMMIT);
- qp.setZxid(proposalZxid);
- qp.setData(null);
- oa.writeRecord(qp, null);
-
- qp.setType(Leader.UPTODATE);
- qp.setZxid(0);
- qp.setData(null);
- oa.writeRecord(qp, null);
-
- // Read the uptodate ack
- readPacketSkippingPing(ia, qp);
- assertEquals(Leader.ACK, qp.getType());
- assertEquals(ZxidUtils.makeZxid(1, 0), qp.getZxid());
-
- readPacketSkippingPing(ia, qp);
- assertEquals(Leader.ACK, qp.getType());
- assertEquals(proposalZxid, qp.getZxid());
-
- // The change should happen now
- watcher.waitForChange();
- assertEquals("data2", new
String(f.fzk.getZKDatabase().getData("/foo", stat, null)));
-
- // check and make sure the change is persisted
- zkDb2 = new ZKDatabase(new FileTxnSnapLog(logDir,
snapDir));
- lastZxid = zkDb2.loadDataBase();
- assertEquals("data2", new String(zkDb2.getData("/foo",
stat, null)));
- assertEquals(proposalZxid, lastZxid);
- } finally {
- TestUtils.deleteFileRecursively(tmpDir);
- }
- }
-
- private void proposeSetData(QuorumPacket qp, long zxid, String
data, int version) throws IOException {
- qp.setType(Leader.PROPOSAL);
- qp.setZxid(zxid);
- TxnHeader hdr = new TxnHeader(4, 1414, qp.getZxid(), 55,
ZooDefs.OpCode.setData);
- SetDataTxn sdt = new SetDataTxn("/foo", data.getBytes(),
version);
- ByteArrayOutputStream baos = new ByteArrayOutputStream();
- OutputArchive boa = BinaryOutputArchive.getArchive(baos);
- boa.writeRecord(hdr, null);
- boa.writeRecord(sdt, null);
- qp.setData(baos.toByteArray());
- }
});
}