This is an automated email from the ASF dual-hosted git repository.

andor pushed a commit to branch branch-3.9
in repository https://gitbox.apache.org/repos/asf/zookeeper.git


The following commit(s) were added to refs/heads/branch-3.9 by this push:
     new dcb3fc93b Revert "ZOOKEEPER-4394: Apply only committed requests in 
sync with leader before NEWLEADER ACK"
dcb3fc93b is described below

commit dcb3fc93b431dcdf02f73636c2502f058a5ad347
Author: Andor Molnar <[email protected]>
AuthorDate: Thu Sep 19 11:54:51 2024 -0500

    Revert "ZOOKEEPER-4394: Apply only committed requests in sync with leader 
before NEWLEADER ACK"
    
    This reverts commit 055b253f3a6028c96c8fc9580e8ad5bf094cb202.
---
 .../apache/zookeeper/server/ZooKeeperServer.java   |  15 +-
 .../server/quorum/FollowerZooKeeperServer.java     |   7 +-
 .../apache/zookeeper/server/quorum/Learner.java    |  80 ++++------
 .../apache/zookeeper/server/quorum/Zab1_0Test.java | 165 +++------------------
 4 files changed, 71 insertions(+), 196 deletions(-)

diff --git 
a/zookeeper-server/src/main/java/org/apache/zookeeper/server/ZooKeeperServer.java
 
b/zookeeper-server/src/main/java/org/apache/zookeeper/server/ZooKeeperServer.java
index 9147c3ebf..4ee61ca11 100644
--- 
a/zookeeper-server/src/main/java/org/apache/zookeeper/server/ZooKeeperServer.java
+++ 
b/zookeeper-server/src/main/java/org/apache/zookeeper/server/ZooKeeperServer.java
@@ -793,6 +793,19 @@ public class ZooKeeperServer implements SessionExpirer, 
ServerStats.Provider {
     }
 
     public synchronized void startup() {
+        startupWithServerState(State.RUNNING);
+    }
+
+    public synchronized void startupWithoutServing() {
+        startupWithServerState(State.INITIAL);
+    }
+
+    public synchronized void startServing() {
+        setState(State.RUNNING);
+        notifyAll();
+    }
+
+    private void startupWithServerState(State state) {
         if (sessionTracker == null) {
             createSessionTracker();
         }
@@ -807,7 +820,7 @@ public class ZooKeeperServer implements SessionExpirer, 
ServerStats.Provider {
 
         registerMetrics();
 
-        setState(State.RUNNING);
+        setState(state);
 
         requestPathMetricsCollector.start();
 
diff --git 
a/zookeeper-server/src/main/java/org/apache/zookeeper/server/quorum/FollowerZooKeeperServer.java
 
b/zookeeper-server/src/main/java/org/apache/zookeeper/server/quorum/FollowerZooKeeperServer.java
index b67661999..5af114dcc 100644
--- 
a/zookeeper-server/src/main/java/org/apache/zookeeper/server/quorum/FollowerZooKeeperServer.java
+++ 
b/zookeeper-server/src/main/java/org/apache/zookeeper/server/quorum/FollowerZooKeeperServer.java
@@ -89,11 +89,12 @@ public class FollowerZooKeeperServer extends 
LearnerZooKeeperServer {
      * @param hdr the txn header
      * @param txn the txn
      * @param digest the digest of txn
+     * @return a request moving through a chain of RequestProcessors
      */
-    public void appendRequest(final TxnHeader hdr, final Record txn, final 
TxnDigest digest) throws IOException {
-        final Request request = new Request(hdr.getClientId(), hdr.getCxid(), 
hdr.getType(), hdr, txn, hdr.getZxid());
-        request.setTxnDigest(digest);
+    public Request appendRequest(final TxnHeader hdr, final Record txn, final 
TxnDigest digest) throws IOException {
+        final Request request = buildRequestToProcess(hdr, txn, digest);
         getZKDatabase().append(request);
+        return request;
     }
 
     /**
diff --git 
a/zookeeper-server/src/main/java/org/apache/zookeeper/server/quorum/Learner.java
 
b/zookeeper-server/src/main/java/org/apache/zookeeper/server/quorum/Learner.java
index 8f3204392..e3bd13d11 100644
--- 
a/zookeeper-server/src/main/java/org/apache/zookeeper/server/quorum/Learner.java
+++ 
b/zookeeper-server/src/main/java/org/apache/zookeeper/server/quorum/Learner.java
@@ -555,7 +555,8 @@ public class Learner {
         boolean syncSnapshot = false;
         readPacket(qp);
         Deque<Long> packetsCommitted = new ArrayDeque<>();
-        Deque<PacketInFlight> packetsNotLogged = new ArrayDeque<>();
+        Deque<PacketInFlight> packetsNotCommitted = new ArrayDeque<>();
+        Deque<Request> requestsToAck = new ArrayDeque<>();
 
         synchronized (zk) {
             if (qp.getType() == Leader.DIFF) {
@@ -644,11 +645,11 @@ public class Learner {
                         self.setLastSeenQuorumVerifier(qv, true);
                     }
 
-                    packetsNotLogged.add(pif);
+                    packetsNotCommitted.add(pif);
                     break;
                 case Leader.COMMIT:
                 case Leader.COMMITANDACTIVATE:
-                    pif = packetsNotLogged.peekFirst();
+                    pif = packetsNotCommitted.peekFirst();
                     if (pif.hdr.getZxid() == qp.getZxid() && qp.getType() == 
Leader.COMMITANDACTIVATE) {
                         QuorumVerifier qv = self.configFromString(new 
String(((SetDataTxn) pif.rec).getData(), UTF_8));
                         boolean majorChange = self.processReconfig(
@@ -667,7 +668,7 @@ public class Learner {
                                 Long.toHexString(pif.hdr.getZxid()));
                         } else {
                             zk.processTxn(pif.hdr, pif.rec);
-                            packetsNotLogged.remove();
+                            packetsNotCommitted.remove();
                         }
                     } else {
                         packetsCommitted.add(qp.getZxid());
@@ -709,7 +710,7 @@ public class Learner {
                         // Apply to db directly if we haven't taken the 
snapshot
                         zk.processTxn(packet.hdr, packet.rec);
                     } else {
-                        packetsNotLogged.add(packet);
+                        packetsNotCommitted.add(packet);
                         packetsCommitted.add(qp.getZxid());
                     }
 
@@ -752,55 +753,29 @@ public class Learner {
                     isPreZAB1_0 = false;
 
                     // ZOOKEEPER-3911: make sure sync the uncommitted logs 
before commit them (ACK NEWLEADER).
-                    if (zk instanceof FollowerZooKeeperServer && 
!packetsCommitted.isEmpty()) {
+                    sock.setSoTimeout(self.tickTime * self.syncLimit);
+                    self.setSyncMode(QuorumPeer.SyncMode.NONE);
+                    zk.startupWithoutServing();
+                    if (zk instanceof FollowerZooKeeperServer) {
                         long startTime = Time.currentElapsedTime();
                         FollowerZooKeeperServer fzk = 
(FollowerZooKeeperServer) zk;
-
-                        /*
-                         * @see https://github.com/apache/zookeeper/pull/1848
-                         * Persist and process the committed txns in 
"packetsNotLogged"
-                         * according to "packetsCommitted", which have been 
committed by
-                         * the leader. For these committed proposals, there is 
no need to
-                         * reply ack.
-                         *
-                         * @see 
https://issues.apache.org/jira/browse/ZOOKEEPER-4394
-                         * Keep the outstanding proposals in 
"packetsNotLogged" to avoid
-                         * NullPointerException when the follower receives 
COMMIT packet(s)
-                         * right after replying NEWLEADER ack.
-                         */
-                        while (!packetsCommitted.isEmpty()) {
-                            long zxid = packetsCommitted.removeFirst();
-                            pif = packetsNotLogged.peekFirst();
-                            if (pif == null) {
-                                LOG.warn("Committing 0x{}, but got no 
proposal", Long.toHexString(zxid));
-                                continue;
-                            } else if (pif.hdr.getZxid() != zxid) {
-                                LOG.warn("Committing 0x{}, but next proposal 
is 0x{}",
-                                        Long.toHexString(zxid), 
Long.toHexString(pif.hdr.getZxid()));
-                                continue;
-                            }
-                            packetsNotLogged.removeFirst();
-                            fzk.appendRequest(pif.hdr, pif.rec, pif.digest);
-                            fzk.processTxn(pif.hdr, pif.rec);
+                        for (PacketInFlight p : packetsNotCommitted) {
+                            final Request request = fzk.appendRequest(p.hdr, 
p.rec, p.digest);
+                            requestsToAck.add(request);
                         }
 
-                        // @see 
https://issues.apache.org/jira/browse/ZOOKEEPER-4646
-                        // Make sure to persist the txns to disk before 
replying NEWLEADER ack.
+                        // persist the txns to disk
                         fzk.getZKDatabase().commit();
-                        LOG.info("It took {}ms to persist and commit txns in 
packetsCommitted. "
-                                        + "{} outstanding txns left in 
packetsNotLogged",
-                                Time.currentElapsedTime() - startTime, 
packetsNotLogged.size());
+                        LOG.info("{} txns have been persisted and it took 
{}ms",
+                        packetsNotCommitted.size(), Time.currentElapsedTime() 
- startTime);
+                        packetsNotCommitted.clear();
                     }
 
-                    // @see 
https://issues.apache.org/jira/browse/ZOOKEEPER-4643
-                    // @see 
https://issues.apache.org/jira/browse/ZOOKEEPER-4785
-                    // Update current epoch after the committed txns are 
persisted
+                    // set the current epoch after all the tnxs are persisted
                     self.setCurrentEpoch(newEpoch);
                     LOG.info("Set the current epoch to {}", newEpoch);
-                    sock.setSoTimeout(self.tickTime * self.syncLimit);
-                    self.setSyncMode(QuorumPeer.SyncMode.NONE);
 
-                    // send NEWLEADER ack after the committed txns are 
persisted
+                    // send NEWLEADER ack after all the tnxs are persisted
                     writePacket(new QuorumPacket(Leader.ACK, newLeaderZxid, 
null, null), true);
                     LOG.info("Sent NEWLEADER ack to leader with zxid {}", 
Long.toHexString(newLeaderZxid));
                     break;
@@ -809,7 +784,7 @@ public class Learner {
         }
         ack.setZxid(ZxidUtils.makeZxid(newEpoch, 0));
         writePacket(ack, true);
-        zk.startup();
+        zk.startServing();
         /*
          * Update the election vote here to ensure that all members of the
          * ensemble report the same vote to new servers that start up and
@@ -821,11 +796,20 @@ public class Learner {
 
         // We need to log the stuff that came in between the snapshot and the 
uptodate
         if (zk instanceof FollowerZooKeeperServer) {
+            // reply ACK of PROPOSAL after ACK of NEWLEADER to avoid leader 
shutdown due to timeout
+            // on waiting for a quorum of followers
+            for (final Request request : requestsToAck) {
+                final QuorumPacket ackPacket = new QuorumPacket(Leader.ACK, 
request.getHdr().getZxid(), null, null);
+                writePacket(ackPacket, false);
+            }
+            writePacket(null, true);
+            requestsToAck.clear();
+
             FollowerZooKeeperServer fzk = (FollowerZooKeeperServer) zk;
-            for (PacketInFlight p : packetsNotLogged) {
+            for (PacketInFlight p : packetsNotCommitted) {
                 fzk.logRequest(p.hdr, p.rec, p.digest);
             }
-            LOG.info("{} txns have been logged asynchronously", 
packetsNotLogged.size());
+            LOG.info("{} txns have been logged asynchronously", 
packetsNotCommitted.size());
 
             for (Long zxid : packetsCommitted) {
                 fzk.commit(zxid);
@@ -835,7 +819,7 @@ public class Learner {
             // Similar to follower, we need to log requests between the 
snapshot
             // and UPTODATE
             ObserverZooKeeperServer ozk = (ObserverZooKeeperServer) zk;
-            for (PacketInFlight p : packetsNotLogged) {
+            for (PacketInFlight p : packetsNotCommitted) {
                 Long zxid = packetsCommitted.peekFirst();
                 if (p.hdr.getZxid() != zxid) {
                     // log warning message if there is no matching commit
diff --git 
a/zookeeper-server/src/test/java/org/apache/zookeeper/server/quorum/Zab1_0Test.java
 
b/zookeeper-server/src/test/java/org/apache/zookeeper/server/quorum/Zab1_0Test.java
index dfb617db1..f0db38e26 100644
--- 
a/zookeeper-server/src/test/java/org/apache/zookeeper/server/quorum/Zab1_0Test.java
+++ 
b/zookeeper-server/src/test/java/org/apache/zookeeper/server/quorum/Zab1_0Test.java
@@ -743,8 +743,8 @@ public class Zab1_0Test extends ZKTestCase {
 
                     readPacketSkippingPing(ia, qp);
                     assertEquals(Leader.ACKEPOCH, qp.getType());
-                    assertEquals(ZxidUtils.makeZxid(0, 0), qp.getZxid());
-                    assertEquals(0, ByteBuffer.wrap(qp.getData()).getInt());
+                    assertEquals(0, qp.getZxid());
+                    assertEquals(ZxidUtils.makeZxid(0, 0), 
ByteBuffer.wrap(qp.getData()).getInt());
                     assertEquals(1, f.self.getAcceptedEpoch());
                     assertEquals(0, f.self.getCurrentEpoch());
 
@@ -767,22 +767,36 @@ public class Zab1_0Test extends ZKTestCase {
                     qp.setZxid(0);
                     oa.writeRecord(qp, null);
 
-                    // Get the ack of the new leader
+                    // Read the uptodate ack
                     readPacketSkippingPing(ia, qp);
                     assertEquals(Leader.ACK, qp.getType());
                     assertEquals(ZxidUtils.makeZxid(1, 0), qp.getZxid());
-                    assertEquals(1, f.self.getAcceptedEpoch());
-                    assertEquals(1, f.self.getCurrentEpoch());
-                    assertEquals(createSessionZxid, 
f.fzk.getLastProcessedZxid());
 
-                    // Read the uptodate ack
+                    // Get the ack of the new leader
                     readPacketSkippingPing(ia, qp);
                     assertEquals(Leader.ACK, qp.getType());
                     assertEquals(ZxidUtils.makeZxid(1, 0), qp.getZxid());
+                    assertEquals(1, f.self.getAcceptedEpoch());
+                    assertEquals(1, f.self.getCurrentEpoch());
+
+                    //Wait for the transactions to be written out. The thread 
that writes them out
+                    // does not send anything back when it is done.
+                    long start = System.currentTimeMillis();
+                    while (createSessionZxid != f.fzk.getLastProcessedZxid()
+                                   && (System.currentTimeMillis() - start) < 
50) {
+                        Thread.sleep(1);
+                    }
+
+                    assertEquals(createSessionZxid, 
f.fzk.getLastProcessedZxid());
 
                     // Make sure the data was recorded in the filesystem ok
                     ZKDatabase zkDb2 = new ZKDatabase(new 
FileTxnSnapLog(logDir, snapDir));
+                    start = System.currentTimeMillis();
                     zkDb2.loadDataBase();
+                    while (zkDb2.getSessionWithTimeOuts().isEmpty() && 
(System.currentTimeMillis() - start) < 50) {
+                        Thread.sleep(1);
+                        zkDb2.loadDataBase();
+                    }
                     LOG.info("zkdb2 sessions:{}", zkDb2.getSessions());
                     LOG.info("zkdb2 with timeouts:{}", 
zkDb2.getSessionWithTimeOuts());
                     assertNotNull(zkDb2.getSessionWithTimeOuts().get(4L));
@@ -805,143 +819,6 @@ public class Zab1_0Test extends ZKTestCase {
                 boa.writeRecord(cst, null);
                 qp.setData(baos.toByteArray());
             }
-        }, testData);
-    }
-
-    @Test
-    public void 
testNormalFollowerRun_ProcessCommitInSyncAfterAckNewLeader(@TempDir File 
testData) throws Exception {
-        testFollowerConversation(new FollowerConversation() {
-            @Override
-            public void converseWithFollower(InputArchive ia, OutputArchive 
oa, Follower f) throws Exception {
-                File tmpDir = File.createTempFile("test", "dir", testData);
-                tmpDir.delete();
-                tmpDir.mkdir();
-                File logDir = 
f.fzk.getTxnLogFactory().getDataLogDir().getParentFile();
-                File snapDir = 
f.fzk.getTxnLogFactory().getSnapDir().getParentFile();
-                //Spy on ZK so we can check if a snapshot happened or not.
-                f.zk = spy(f.zk);
-                try {
-                    assertEquals(0, f.self.getAcceptedEpoch());
-                    assertEquals(0, f.self.getCurrentEpoch());
-
-                    // Setup a database with a single /foo node
-                    ZKDatabase zkDb = new ZKDatabase(new 
FileTxnSnapLog(tmpDir, tmpDir));
-                    final long firstZxid = ZxidUtils.makeZxid(1, 1);
-                    zkDb.processTxn(new TxnHeader(13, 1313, firstZxid, 33, 
ZooDefs.OpCode.create), new CreateTxn("/foo", "data1".getBytes(), 
ZooDefs.Ids.OPEN_ACL_UNSAFE, false, 1), null);
-                    Stat stat = new Stat();
-                    assertEquals("data1", new String(zkDb.getData("/foo", 
stat, null)));
-
-                    QuorumPacket qp = new QuorumPacket();
-                    readPacketSkippingPing(ia, qp);
-                    assertEquals(Leader.FOLLOWERINFO, qp.getType());
-                    assertEquals(qp.getZxid(), 0);
-                    LearnerInfo learnInfo = new LearnerInfo();
-                    
ByteBufferInputStream.byteBuffer2Record(ByteBuffer.wrap(qp.getData()), 
learnInfo);
-                    assertEquals(learnInfo.getProtocolVersion(), 0x10000);
-                    assertEquals(learnInfo.getServerid(), 0);
-
-                    // We are simulating an established leader, so the epoch 
is 1
-                    qp.setType(Leader.LEADERINFO);
-                    qp.setZxid(ZxidUtils.makeZxid(1, 0));
-                    byte[] protoBytes = new byte[4];
-                    ByteBuffer.wrap(protoBytes).putInt(0x10000);
-                    qp.setData(protoBytes);
-                    oa.writeRecord(qp, null);
-
-                    readPacketSkippingPing(ia, qp);
-                    assertEquals(Leader.ACKEPOCH, qp.getType());
-                    assertEquals(0, qp.getZxid());
-                    assertEquals(ZxidUtils.makeZxid(0, 0), 
ByteBuffer.wrap(qp.getData()).getInt());
-                    assertEquals(1, f.self.getAcceptedEpoch());
-                    assertEquals(0, f.self.getCurrentEpoch());
-
-                    // Send the snapshot we created earlier
-                    qp.setType(Leader.SNAP);
-                    qp.setData(new byte[0]);
-                    qp.setZxid(zkDb.getDataTreeLastProcessedZxid());
-                    oa.writeRecord(qp, null);
-                    zkDb.serializeSnapshot(oa);
-                    oa.writeString("BenWasHere", null);
-                    Thread.sleep(10); //Give it some time to process the snap
-                    //No Snapshot taken yet, the SNAP was applied in memory
-                    verify(f.zk, never()).takeSnapshot();
-
-                    // Leader sends an outstanding proposal
-                    long proposalZxid = ZxidUtils.makeZxid(1, 1001);
-                    proposeSetData(qp, proposalZxid, "data2", 2);
-                    oa.writeRecord(qp, null);
-
-                    qp.setType(Leader.NEWLEADER);
-                    qp.setZxid(ZxidUtils.makeZxid(1, 0));
-                    qp.setData(null);
-                    oa.writeRecord(qp, null);
-
-                    // Get the ack of the new leader
-                    readPacketSkippingPing(ia, qp);
-                    assertEquals(Leader.ACK, qp.getType());
-                    assertEquals(ZxidUtils.makeZxid(1, 0), qp.getZxid());
-                    assertEquals(1, f.self.getAcceptedEpoch());
-                    assertEquals(1, f.self.getCurrentEpoch());
-                    //Make sure that we did take the snapshot now
-                    verify(f.zk).takeSnapshot(true);
-                    assertEquals(firstZxid, f.fzk.getLastProcessedZxid());
-
-                    // The outstanding proposal has not been persisted yet
-                    ZKDatabase zkDb2 = new ZKDatabase(new 
FileTxnSnapLog(logDir, snapDir));
-                    long lastZxid = zkDb2.loadDataBase();
-                    assertEquals("data1", new String(zkDb2.getData("/foo", 
stat, null)));
-                    assertEquals(firstZxid, lastZxid);
-
-                    TrackerWatcher watcher = new TrackerWatcher();
-
-                    // The change should not have happened yet
-                    assertEquals("data1", new 
String(f.fzk.getZKDatabase().getData("/foo", stat, watcher)));
-
-                    // Leader commits proposalZxid right after it sends 
NEWLEADER to follower
-                    qp.setType(Leader.COMMIT);
-                    qp.setZxid(proposalZxid);
-                    qp.setData(null);
-                    oa.writeRecord(qp, null);
-
-                    qp.setType(Leader.UPTODATE);
-                    qp.setZxid(0);
-                    qp.setData(null);
-                    oa.writeRecord(qp, null);
-
-                    // Read the uptodate ack
-                    readPacketSkippingPing(ia, qp);
-                    assertEquals(Leader.ACK, qp.getType());
-                    assertEquals(ZxidUtils.makeZxid(1, 0), qp.getZxid());
-
-                    readPacketSkippingPing(ia, qp);
-                    assertEquals(Leader.ACK, qp.getType());
-                    assertEquals(proposalZxid, qp.getZxid());
-
-                    // The change should happen now
-                    watcher.waitForChange();
-                    assertEquals("data2", new 
String(f.fzk.getZKDatabase().getData("/foo", stat, null)));
-
-                    // check and make sure the change is persisted
-                    zkDb2 = new ZKDatabase(new FileTxnSnapLog(logDir, 
snapDir));
-                    lastZxid = zkDb2.loadDataBase();
-                    assertEquals("data2", new String(zkDb2.getData("/foo", 
stat, null)));
-                    assertEquals(proposalZxid, lastZxid);
-                } finally {
-                    TestUtils.deleteFileRecursively(tmpDir);
-                }
-            }
-
-            private void proposeSetData(QuorumPacket qp, long zxid, String 
data, int version) throws IOException {
-                qp.setType(Leader.PROPOSAL);
-                qp.setZxid(zxid);
-                TxnHeader hdr = new TxnHeader(4, 1414, qp.getZxid(), 55, 
ZooDefs.OpCode.setData);
-                SetDataTxn sdt = new SetDataTxn("/foo", data.getBytes(), 
version);
-                ByteArrayOutputStream baos = new ByteArrayOutputStream();
-                OutputArchive boa = BinaryOutputArchive.getArchive(baos);
-                boa.writeRecord(hdr, null);
-                boa.writeRecord(sdt, null);
-                qp.setData(baos.toByteArray());
-            }
         });
     }
 

Reply via email to