Repository: zookeeper
Updated Branches:
  refs/heads/master f0b67b6e4 -> 722ba9409


ZOOKEEPER-2845: Apply commit log when restarting server.

I will be creating a patch/pull request for 3.4 and 3.5 too, but I wanted to 
get a pull request up for others to look at ASAP.

I have a version of this based off of #310 at 
https://github.com/revans2/zookeeper/tree/ZOOKEEPER-2845-orig-test-patch but 
the test itself is flaky.  Frequently leader election does not go as planned on 
the test and it ends up failing but not because it ended up in an inconsistent 
state.

I am happy to answer any questions anyone has about the patch.

Author: Robert Evans <ev...@yahoo-inc.com>

Reviewers: Abraham Fine <af...@apache.org>, Mark Fenes <mfe...@cloudera.com>, 
Andor Molnár <an...@cloudera.com>, Kishor Patil <kpa...@yahoo-inc.com>

Closes #453 from revans2/ZOOKEEPER-2845-master and squashes the following 
commits:

28c074a26 [Robert Evans] Addressed review comments
583e34435 [Robert Evans] Using framework APIs for test
f26a21ad6 [Robert Evans] Addressed review comments
93168d716 [Robert Evans] Added in a modified version of the test
3d042f981 [Robert Evans] ZOOKEEPER-2845: Apply commit log when restarting 
server.


Project: http://git-wip-us.apache.org/repos/asf/zookeeper/repo
Commit: http://git-wip-us.apache.org/repos/asf/zookeeper/commit/722ba940
Tree: http://git-wip-us.apache.org/repos/asf/zookeeper/tree/722ba940
Diff: http://git-wip-us.apache.org/repos/asf/zookeeper/diff/722ba940

Branch: refs/heads/master
Commit: 722ba9409a44a35d287aac803813f508cff2420a
Parents: f0b67b6
Author: Robert Evans <ev...@yahoo-inc.com>
Authored: Fri Feb 23 14:49:00 2018 -0800
Committer: Abraham Fine <af...@apache.org>
Committed: Fri Feb 23 14:49:00 2018 -0800

----------------------------------------------------------------------
 .../org/apache/zookeeper/server/ZKDatabase.java |  28 ++-
 .../zookeeper/server/ZooKeeperServer.java       |  24 ++-
 .../server/persistence/FileTxnSnapLog.java      |  16 ++
 .../server/quorum/QuorumPeerMainTest.java       | 186 +++++++++++++++----
 4 files changed, 209 insertions(+), 45 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/zookeeper/blob/722ba940/src/java/main/org/apache/zookeeper/server/ZKDatabase.java
----------------------------------------------------------------------
diff --git a/src/java/main/org/apache/zookeeper/server/ZKDatabase.java 
b/src/java/main/org/apache/zookeeper/server/ZKDatabase.java
index 6679e78..a03c955 100644
--- a/src/java/main/org/apache/zookeeper/server/ZKDatabase.java
+++ b/src/java/main/org/apache/zookeeper/server/ZKDatabase.java
@@ -223,6 +223,11 @@ public class ZKDatabase {
         return sessionsWithTimeouts;
     }
 
+    private final PlayBackListener commitProposalPlaybackListener = new 
PlayBackListener() {
+        public void onTxnLoaded(TxnHeader hdr, Record txn){
+            addCommittedProposal(hdr, txn);
+        }
+    };
 
     /**
      * load the database from the disk onto memory and also add
@@ -231,18 +236,27 @@ public class ZKDatabase {
      * @throws IOException
      */
     public long loadDataBase() throws IOException {
-        PlayBackListener listener=new PlayBackListener(){
-            public void onTxnLoaded(TxnHeader hdr,Record txn){
-                Request r = new Request(0, hdr.getCxid(),hdr.getType(), hdr, 
txn, hdr.getZxid());
-                addCommittedProposal(r);
-            }
-        };
+        long zxid = snapLog.restore(dataTree, sessionsWithTimeouts, 
commitProposalPlaybackListener);
+        initialized = true;
+        return zxid;
+    }
 
-        long zxid = snapLog.restore(dataTree,sessionsWithTimeouts,listener);
+    /**
+     * Fast forward the database adding transactions from the committed log 
into memory.
+     * @return the last valid zxid.
+     * @throws IOException
+     */
+    public long fastForwardDataBase() throws IOException {
+        long zxid = snapLog.fastForwardFromEdits(dataTree, 
sessionsWithTimeouts, commitProposalPlaybackListener);
         initialized = true;
         return zxid;
     }
 
+    private void addCommittedProposal(TxnHeader hdr, Record txn) {
+        Request r = new Request(0, hdr.getCxid(), hdr.getType(), hdr, txn, 
hdr.getZxid());
+        addCommittedProposal(r);
+    }
+
     /**
      * maintains a list of last <i>committedLog</i>
      *  or so committed requests. This is used for

http://git-wip-us.apache.org/repos/asf/zookeeper/blob/722ba940/src/java/main/org/apache/zookeeper/server/ZooKeeperServer.java
----------------------------------------------------------------------
diff --git a/src/java/main/org/apache/zookeeper/server/ZooKeeperServer.java 
b/src/java/main/org/apache/zookeeper/server/ZooKeeperServer.java
index c8cd72d..9099b2f 100644
--- a/src/java/main/org/apache/zookeeper/server/ZooKeeperServer.java
+++ b/src/java/main/org/apache/zookeeper/server/ZooKeeperServer.java
@@ -557,14 +557,24 @@ public class ZooKeeperServer implements SessionExpirer, 
ServerStats.Provider {
             firstProcessor.shutdown();
         }
 
-        if (fullyShutDown && zkDb != null) {
-            zkDb.clear();
+        if (zkDb != null) {
+            if (fullyShutDown) {
+                zkDb.clear();
+            } else {
+                // else there is no need to clear the database
+                //  * When a new quorum is established we can still apply the 
diff
+                //    on top of the same zkDb data
+                //  * If we fetch a new snapshot from leader, the zkDb will be
+                //    cleared anyway before loading the snapshot
+                try {
+                    //This will fast forward the database to the latest 
recorded transactions
+                    zkDb.fastForwardDataBase();
+                } catch (IOException e) {
+                    LOG.error("Error updating DB", e);
+                    zkDb.clear();
+                }
+            }
         }
-        // else there is no need to clear the database
-        //  * When a new quorum is established we can still apply the diff
-        //    on top of the same zkDb data
-        //  * If we fetch a new snapshot from leader, the zkDb will be
-        //    cleared anyway before loading the snapshot
 
         unregisterJMX();
     }

http://git-wip-us.apache.org/repos/asf/zookeeper/blob/722ba940/src/java/main/org/apache/zookeeper/server/persistence/FileTxnSnapLog.java
----------------------------------------------------------------------
diff --git 
a/src/java/main/org/apache/zookeeper/server/persistence/FileTxnSnapLog.java 
b/src/java/main/org/apache/zookeeper/server/persistence/FileTxnSnapLog.java
index 3ca1781..8702bf3 100644
--- a/src/java/main/org/apache/zookeeper/server/persistence/FileTxnSnapLog.java
+++ b/src/java/main/org/apache/zookeeper/server/persistence/FileTxnSnapLog.java
@@ -239,6 +239,22 @@ public class FileTxnSnapLog {
                 return -1L;
             }
         }
+        return fastForwardFromEdits(dt, sessions, listener);
+    }
+
+    /**
+     * This function will fast forward the server database to have the latest
+     * transactions in it.  This is the same as restore, but only reads from
+     * the transaction logs and not restores from a snapshot.
+     * @param dt the datatree to write transactions to.
+     * @param sessions the sessions to be restored.
+     * @param listener the playback listener to run on the
+     * database transactions.
+     * @return the highest zxid restored.
+     * @throws IOException
+     */
+    public long fastForwardFromEdits(DataTree dt, Map<Long, Integer> sessions,
+                                     PlayBackListener listener) throws 
IOException {
         TxnIterator itr = txnLog.read(dt.lastProcessedZxid+1);
         long highestZxid = dt.lastProcessedZxid;
         TxnHeader hdr;

http://git-wip-us.apache.org/repos/asf/zookeeper/blob/722ba940/src/java/test/org/apache/zookeeper/server/quorum/QuorumPeerMainTest.java
----------------------------------------------------------------------
diff --git 
a/src/java/test/org/apache/zookeeper/server/quorum/QuorumPeerMainTest.java 
b/src/java/test/org/apache/zookeeper/server/quorum/QuorumPeerMainTest.java
index 9c6bd3a..43c341a 100644
--- a/src/java/test/org/apache/zookeeper/server/quorum/QuorumPeerMainTest.java
+++ b/src/java/test/org/apache/zookeeper/server/quorum/QuorumPeerMainTest.java
@@ -39,6 +39,7 @@ import org.apache.log4j.WriterAppender;
 import org.apache.zookeeper.CreateMode;
 import org.apache.zookeeper.KeeperException;
 import org.apache.zookeeper.PortAssignment;
+import org.apache.zookeeper.Watcher;
 import org.apache.zookeeper.ZooDefs.OpCode;
 import org.apache.zookeeper.ZooKeeper;
 import org.apache.zookeeper.ZooDefs.Ids;
@@ -249,14 +250,7 @@ public class QuorumPeerMainTest extends QuorumPeerTestBase 
{
         numServers = 3;
         servers = LaunchServers(numServers);
         String path = "/hzxidtest";
-        int leader = -1;
-
-        // find the leader
-        for (int i = 0; i < numServers; i++) {
-            if (servers.mt[i].main.quorumPeer.leader != null) {
-                leader = i;
-            }
-        }
+        int leader = servers.findLeader();
 
         // make sure there is a leader
         Assert.assertTrue("There should be a leader", leader >= 0);
@@ -366,12 +360,7 @@ public class QuorumPeerMainTest extends QuorumPeerTestBase 
{
           servers = LaunchServers(numServers, 500);
 
           // find the leader
-          int trueLeader = -1;
-          for (int i = 0; i < numServers; i++) {
-            if (servers.mt[i].main.quorumPeer.leader != null) {
-              trueLeader = i;
-            }
-          }
+          int trueLeader = servers.findLeader();
           Assert.assertTrue("There should be a leader", trueLeader >= 0);
 
           // find a follower
@@ -435,12 +424,16 @@ public class QuorumPeerMainTest extends 
QuorumPeerTestBase {
         int iterations = ClientBase.CONNECTION_TIMEOUT / 500;
         while (zk.getState() != state) {
             if (iterations-- == 0) {
-                throw new RuntimeException("Waiting too long");
+                throw new RuntimeException("Waiting too long " + zk.getState() 
+ " != " + state);
             }
             Thread.sleep(500);
         }
     }
 
+    private void waitForAll(Servers servers, States state) throws 
InterruptedException {
+        waitForAll(servers.zk, state);
+    }
+
     private void waitForAll(ZooKeeper[] zks, States state) throws 
InterruptedException {
         int iterations = ClientBase.CONNECTION_TIMEOUT / 1000;
         boolean someoneNotConnected = true;
@@ -465,6 +458,37 @@ public class QuorumPeerMainTest extends QuorumPeerTestBase 
{
     private static class Servers {
         MainThread mt[];
         ZooKeeper zk[];
+        int[] clientPorts;
+
+        public void shutDownAllServers() throws InterruptedException {
+            for (MainThread t: mt) {
+                t.shutdown();
+            }
+        }
+
+        public void restartAllServersAndClients(Watcher watcher) throws 
IOException {
+            for (MainThread t : mt) {
+                if (!t.isAlive()) {
+                    t.start();
+                }
+            }
+            for (int i = 0; i < zk.length; i++) {
+                restartClient(i, watcher);
+            }
+        }
+
+        public void restartClient(int clientIndex, Watcher watcher) throws 
IOException {
+            zk[clientIndex] = new ZooKeeper("127.0.0.1:" + 
clientPorts[clientIndex], ClientBase.CONNECTION_TIMEOUT, watcher);
+        }
+
+        public int findLeader() {
+            for (int i = 0; i < mt.length; i++) {
+                if (mt[i].main.quorumPeer.leader != null) {
+                    return i;
+                }
+            }
+            return -1;
+        }
     }
 
 
@@ -474,7 +498,8 @@ public class QuorumPeerMainTest extends QuorumPeerTestBase {
 
     /** * This is a helper function for launching a set of servers
         *
-        * @param numServers* @param tickTime A ticktime to pass to MainThread
+        * @param numServers the number of servers
+     * @param tickTime A ticktime to pass to MainThread
         * @return
         * @throws IOException
         * @throws InterruptedException
@@ -482,30 +507,28 @@ public class QuorumPeerMainTest extends 
QuorumPeerTestBase {
        private Servers LaunchServers(int numServers, Integer tickTime) throws 
IOException, InterruptedException {
            int SERVER_COUNT = numServers;
            Servers svrs = new Servers();
-           final int clientPorts[] = new int[SERVER_COUNT];
+           svrs.clientPorts = new int[SERVER_COUNT];
            StringBuilder sb = new StringBuilder();
            for(int i = 0; i < SERVER_COUNT; i++) {
-               clientPorts[i] = PortAssignment.unique();
-               
sb.append("server."+i+"=127.0.0.1:"+PortAssignment.unique()+":"+PortAssignment.unique()+";"+clientPorts[i]+"\n");
+               svrs.clientPorts[i] = PortAssignment.unique();
+               
sb.append("server."+i+"=127.0.0.1:"+PortAssignment.unique()+":"+PortAssignment.unique()+";"+svrs.clientPorts[i]+"\n");
            }
            String quorumCfgSection = sb.toString();
 
-           MainThread mt[] = new MainThread[SERVER_COUNT];
-           ZooKeeper zk[] = new ZooKeeper[SERVER_COUNT];
+           svrs.mt = new MainThread[SERVER_COUNT];
+        svrs.zk = new ZooKeeper[SERVER_COUNT];
            for(int i = 0; i < SERVER_COUNT; i++) {
                if (tickTime != null) {
-                 mt[i] = new MainThread(i, clientPorts[i], quorumCfgSection, 
new HashMap<String, String>(), tickTime);
+                   svrs.mt[i] = new MainThread(i, svrs.clientPorts[i], 
quorumCfgSection, new HashMap<String, String>(), tickTime);
             } else {
-              mt[i] = new MainThread(i, clientPorts[i], quorumCfgSection);
+                   svrs.mt[i] = new MainThread(i, svrs.clientPorts[i], 
quorumCfgSection);
             }
-               mt[i].start();
-               zk[i] = new ZooKeeper("127.0.0.1:" + clientPorts[i], 
ClientBase.CONNECTION_TIMEOUT, this);
+            svrs.mt[i].start();
+               svrs.restartClient(i, this);
            }
-  
-           waitForAll(zk, States.CONNECTED);
-  
-           svrs.mt = mt;
-           svrs.zk = zk;
+
+           waitForAll(svrs, States.CONNECTED);
+
            return svrs;
        }
 
@@ -673,7 +696,7 @@ public class QuorumPeerMainTest extends QuorumPeerTestBase {
             + ":" + electionPort1 + ";" + CLIENT_PORT_QP1
             + "\nserver.2=127.0.0.1:" + PortAssignment.unique()
             + ":" +  electionPort2 + ";" + CLIENT_PORT_QP2;
-        
+
         MainThread q1 = new MainThread(1, CLIENT_PORT_QP1, quorumCfgSection);
         MainThread q2 = new MainThread(2, CLIENT_PORT_QP2, quorumCfgSection);
         q1.start();
@@ -888,4 +911,105 @@ public class QuorumPeerMainTest extends 
QuorumPeerTestBase {
                 maxSessionTimeOut, quorumPeer.getMaxSessionTimeout());
     }
 
+    @Test
+    public void testFailedTxnAsPartOfQuorumLoss() throws Exception {
+        final int LEADER_TIMEOUT_MS = 10_000;
+        // 1. start up server and wait for leader election to finish
+        ClientBase.setupTestEnv();
+        final int SERVER_COUNT = 3;
+        servers = LaunchServers(SERVER_COUNT);
+
+        waitForAll(servers, States.CONNECTED);
+
+        // we need to shutdown and start back up to make sure that the create 
session isn't the first transaction since
+        // that is rather innocuous.
+        servers.shutDownAllServers();
+        waitForAll(servers, States.CONNECTING);
+        servers.restartAllServersAndClients(this);
+        waitForAll(servers, States.CONNECTED);
+
+        // 2. kill all followers
+        int leader = servers.findLeader();
+        Map<Long, Proposal> outstanding =  
servers.mt[leader].main.quorumPeer.leader.outstandingProposals;
+        // increase the tick time to delay the leader going to looking
+        servers.mt[leader].main.quorumPeer.tickTime = LEADER_TIMEOUT_MS;
+        LOG.warn("LEADER {}", leader);
+
+        for (int i = 0; i < SERVER_COUNT; i++) {
+            if (i != leader) {
+                servers.mt[i].shutdown();
+            }
+        }
+
+        // 3. start up the followers to form a new quorum
+        for (int i = 0; i < SERVER_COUNT; i++) {
+            if (i != leader) {
+                servers.mt[i].start();
+            }
+        }
+
+        // 4. wait one of the follower to be the new leader
+        for (int i = 0; i < SERVER_COUNT; i++) {
+            if (i != leader) {
+                // Recreate a client session since the previous session was 
not persisted.
+                servers.restartClient(i, this);
+                waitForOne(servers.zk[i], States.CONNECTED);
+            }
+        }
+
+        // 5. send a create request to old leader and make sure it's synced to 
disk,
+        //    which means it acked from itself
+        try {
+            servers.zk[leader].create("/zk" + leader, "zk".getBytes(), 
Ids.OPEN_ACL_UNSAFE,
+                CreateMode.PERSISTENT);
+            Assert.fail("create /zk" + leader + " should have failed");
+        } catch (KeeperException e) {
+        }
+
+        // just make sure that we actually did get it in process at the
+        // leader
+        Assert.assertEquals(1, outstanding.size());
+        Proposal p = outstanding.values().iterator().next();
+        Assert.assertEquals(OpCode.create, p.request.getHdr().getType());
+
+        // make sure it has a chance to write it to disk
+        int sleepTime = 0;
+        Long longLeader = new Long(leader);
+        while (!p.qvAcksetPairs.get(0).getAckset().contains(longLeader)) {
+            if (sleepTime > 2000) {
+                Assert.fail("Transaction not synced to disk within 1 second " 
+ p.qvAcksetPairs.get(0).getAckset()
+                    + " expected " + leader);
+            }
+            Thread.sleep(100);
+            sleepTime += 100;
+        }
+
+        // 6. wait for the leader to quit due to not enough followers and come 
back up as a part of the new quorum
+        LOG.info("Waiting for leader {} to timeout followers", leader);
+        sleepTime = 0;
+        Follower f = servers.mt[leader].main.quorumPeer.follower;
+        while (f == null || !f.isRunning()) {
+            if (sleepTime > LEADER_TIMEOUT_MS * 2) {
+                Assert.fail("Took too long for old leader to time out " + 
servers.mt[leader].main.quorumPeer.getPeerState());
+            }
+            Thread.sleep(100);
+            sleepTime += 100;
+            f = servers.mt[leader].main.quorumPeer.follower;
+        }
+
+        int newLeader = servers.findLeader();
+        // make sure a different leader was elected
+        Assert.assertNotEquals(leader, newLeader);
+
+        // 7. restart the previous leader to force it to replay the edits and 
possibly come up in a bad state
+        servers.mt[leader].shutdown();
+        servers.mt[leader].start();
+        waitForAll(servers, States.CONNECTED);
+
+        // 8. check the node exist in previous leader but not others
+        //    make sure everything is consistent
+        for (int i = 0; i < SERVER_COUNT; i++) {
+            Assert.assertNull("server " + i + " should not have /zk" + leader, 
servers.zk[i].exists("/zk" + leader, false));
+        }
+    }
 }

Reply via email to