Repository: zookeeper
Updated Branches:
  refs/heads/branch-3.5 e40d4050b -> ffb81b9a6


ZOOKEEPER-2845: Apply commit log when restarting server.

This is the version of #453 for the 3.5 branch

Author: Robert Evans <ev...@yahoo-inc.com>

Reviewers: Abraham Fine <af...@apache.org>, Mark Fenes <mfe...@cloudera.com>, 
Andor Molnár <an...@cloudera.com>, Kishor Patil <kpa...@yahoo-inc.com>

Closes #454 from revans2/ZOOKEEPER-2845-3.5


Project: http://git-wip-us.apache.org/repos/asf/zookeeper/repo
Commit: http://git-wip-us.apache.org/repos/asf/zookeeper/commit/ffb81b9a
Tree: http://git-wip-us.apache.org/repos/asf/zookeeper/tree/ffb81b9a
Diff: http://git-wip-us.apache.org/repos/asf/zookeeper/diff/ffb81b9a

Branch: refs/heads/branch-3.5
Commit: ffb81b9a618c2e3bc0dcffb9c2d240224d49409f
Parents: e40d405
Author: Robert Evans <ev...@yahoo-inc.com>
Authored: Fri Feb 23 14:52:45 2018 -0800
Committer: Abraham Fine <af...@apache.org>
Committed: Fri Feb 23 14:52:45 2018 -0800

----------------------------------------------------------------------
 .../org/apache/zookeeper/server/ZKDatabase.java |  28 ++-
 .../zookeeper/server/ZooKeeperServer.java       |  24 ++-
 .../server/persistence/FileTxnSnapLog.java      |  16 ++
 .../server/quorum/QuorumPeerMainTest.java       | 181 ++++++++++++++++---
 4 files changed, 206 insertions(+), 43 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/zookeeper/blob/ffb81b9a/src/java/main/org/apache/zookeeper/server/ZKDatabase.java
----------------------------------------------------------------------
diff --git a/src/java/main/org/apache/zookeeper/server/ZKDatabase.java 
b/src/java/main/org/apache/zookeeper/server/ZKDatabase.java
index 6679e78..a03c955 100644
--- a/src/java/main/org/apache/zookeeper/server/ZKDatabase.java
+++ b/src/java/main/org/apache/zookeeper/server/ZKDatabase.java
@@ -223,6 +223,11 @@ public class ZKDatabase {
         return sessionsWithTimeouts;
     }
 
+    private final PlayBackListener commitProposalPlaybackListener = new 
PlayBackListener() {
+        public void onTxnLoaded(TxnHeader hdr, Record txn){
+            addCommittedProposal(hdr, txn);
+        }
+    };
 
     /**
      * load the database from the disk onto memory and also add
@@ -231,18 +236,27 @@ public class ZKDatabase {
      * @throws IOException
      */
     public long loadDataBase() throws IOException {
-        PlayBackListener listener=new PlayBackListener(){
-            public void onTxnLoaded(TxnHeader hdr,Record txn){
-                Request r = new Request(0, hdr.getCxid(),hdr.getType(), hdr, 
txn, hdr.getZxid());
-                addCommittedProposal(r);
-            }
-        };
+        long zxid = snapLog.restore(dataTree, sessionsWithTimeouts, 
commitProposalPlaybackListener);
+        initialized = true;
+        return zxid;
+    }
 
-        long zxid = snapLog.restore(dataTree,sessionsWithTimeouts,listener);
+    /**
+     * Fast forward the database adding transactions from the committed log 
into memory.
+     * @return the last valid zxid.
+     * @throws IOException
+     */
+    public long fastForwardDataBase() throws IOException {
+        long zxid = snapLog.fastForwardFromEdits(dataTree, 
sessionsWithTimeouts, commitProposalPlaybackListener);
         initialized = true;
         return zxid;
     }
 
+    private void addCommittedProposal(TxnHeader hdr, Record txn) {
+        Request r = new Request(0, hdr.getCxid(), hdr.getType(), hdr, txn, 
hdr.getZxid());
+        addCommittedProposal(r);
+    }
+
     /**
      * maintains a list of last <i>committedLog</i>
      *  or so committed requests. This is used for

http://git-wip-us.apache.org/repos/asf/zookeeper/blob/ffb81b9a/src/java/main/org/apache/zookeeper/server/ZooKeeperServer.java
----------------------------------------------------------------------
diff --git a/src/java/main/org/apache/zookeeper/server/ZooKeeperServer.java 
b/src/java/main/org/apache/zookeeper/server/ZooKeeperServer.java
index 5a8610e..76040df 100644
--- a/src/java/main/org/apache/zookeeper/server/ZooKeeperServer.java
+++ b/src/java/main/org/apache/zookeeper/server/ZooKeeperServer.java
@@ -552,14 +552,24 @@ public class ZooKeeperServer implements SessionExpirer, 
ServerStats.Provider {
             firstProcessor.shutdown();
         }
 
-        if (fullyShutDown && zkDb != null) {
-            zkDb.clear();
+        if (zkDb != null) {
+            if (fullyShutDown) {
+                zkDb.clear();
+            } else {
+                // else there is no need to clear the database
+                //  * When a new quorum is established we can still apply the 
diff
+                //    on top of the same zkDb data
+                //  * If we fetch a new snapshot from leader, the zkDb will be
+                //    cleared anyway before loading the snapshot
+                try {
+                    //This will fast forward the database to the latest 
recorded transactions
+                    zkDb.fastForwardDataBase();
+                } catch (IOException e) {
+                    LOG.error("Error updating DB", e);
+                    zkDb.clear();
+                }
+            }
         }
-        // else there is no need to clear the database
-        //  * When a new quorum is established we can still apply the diff
-        //    on top of the same zkDb data
-        //  * If we fetch a new snapshot from leader, the zkDb will be
-        //    cleared anyway before loading the snapshot
 
         unregisterJMX();
     }

http://git-wip-us.apache.org/repos/asf/zookeeper/blob/ffb81b9a/src/java/main/org/apache/zookeeper/server/persistence/FileTxnSnapLog.java
----------------------------------------------------------------------
diff --git 
a/src/java/main/org/apache/zookeeper/server/persistence/FileTxnSnapLog.java 
b/src/java/main/org/apache/zookeeper/server/persistence/FileTxnSnapLog.java
index eef87a6..458f4a2 100644
--- a/src/java/main/org/apache/zookeeper/server/persistence/FileTxnSnapLog.java
+++ b/src/java/main/org/apache/zookeeper/server/persistence/FileTxnSnapLog.java
@@ -213,6 +213,22 @@ public class FileTxnSnapLog {
             /* return a zxid of zero, since we the database is empty */
             return 0;
         }
+        return fastForwardFromEdits(dt, sessions, listener);
+    }
+
+    /**
+     * This function will fast forward the server database to have the latest
+     * transactions in it.  This is the same as restore, but only reads from
+     * the transaction logs and not restores from a snapshot.
+     * @param dt the datatree to write transactions to.
+     * @param sessions the sessions to be restored.
+     * @param listener the playback listener to run on the
+     * database transactions.
+     * @return the highest zxid restored.
+     * @throws IOException
+     */
+    public long fastForwardFromEdits(DataTree dt, Map<Long, Integer> sessions,
+                                     PlayBackListener listener) throws 
IOException {
         TxnIterator itr = txnLog.read(dt.lastProcessedZxid+1);
         long highestZxid = dt.lastProcessedZxid;
         TxnHeader hdr;

http://git-wip-us.apache.org/repos/asf/zookeeper/blob/ffb81b9a/src/java/test/org/apache/zookeeper/server/quorum/QuorumPeerMainTest.java
----------------------------------------------------------------------
diff --git 
a/src/java/test/org/apache/zookeeper/server/quorum/QuorumPeerMainTest.java 
b/src/java/test/org/apache/zookeeper/server/quorum/QuorumPeerMainTest.java
index e6e613c..7a3dbdb 100644
--- a/src/java/test/org/apache/zookeeper/server/quorum/QuorumPeerMainTest.java
+++ b/src/java/test/org/apache/zookeeper/server/quorum/QuorumPeerMainTest.java
@@ -39,6 +39,7 @@ import org.apache.log4j.WriterAppender;
 import org.apache.zookeeper.CreateMode;
 import org.apache.zookeeper.KeeperException;
 import org.apache.zookeeper.PortAssignment;
+import org.apache.zookeeper.Watcher;
 import org.apache.zookeeper.ZooDefs.OpCode;
 import org.apache.zookeeper.ZooKeeper;
 import org.apache.zookeeper.ZooDefs.Ids;
@@ -249,14 +250,7 @@ public class QuorumPeerMainTest extends QuorumPeerTestBase 
{
         numServers = 3;
         servers = LaunchServers(numServers);
         String path = "/hzxidtest";
-        int leader = -1;
-
-        // find the leader
-        for (int i = 0; i < numServers; i++) {
-            if (servers.mt[i].main.quorumPeer.leader != null) {
-                leader = i;
-            }
-        }
+        int leader = servers.findLeader();
 
         // make sure there is a leader
         Assert.assertTrue("There should be a leader", leader >= 0);
@@ -366,12 +360,7 @@ public class QuorumPeerMainTest extends QuorumPeerTestBase 
{
           servers = LaunchServers(numServers, 500);
 
           // find the leader
-          int trueLeader = -1;
-          for (int i = 0; i < numServers; i++) {
-            if (servers.mt[i].main.quorumPeer.leader != null) {
-              trueLeader = i;
-            }
-          }
+          int trueLeader = servers.findLeader();
           Assert.assertTrue("There should be a leader", trueLeader >= 0);
 
           // find a follower
@@ -435,12 +424,16 @@ public class QuorumPeerMainTest extends 
QuorumPeerTestBase {
         int iterations = ClientBase.CONNECTION_TIMEOUT / 500;
         while (zk.getState() != state) {
             if (iterations-- == 0) {
-                throw new RuntimeException("Waiting too long");
+                throw new RuntimeException("Waiting too long " + zk.getState() 
+ " != " + state);
             }
             Thread.sleep(500);
         }
     }
 
+    private void waitForAll(Servers servers, States state) throws 
InterruptedException {
+        waitForAll(servers.zk, state);
+    }
+
     private void waitForAll(ZooKeeper[] zks, States state) throws 
InterruptedException {
         int iterations = ClientBase.CONNECTION_TIMEOUT / 1000;
         boolean someoneNotConnected = true;
@@ -465,6 +458,37 @@ public class QuorumPeerMainTest extends QuorumPeerTestBase 
{
     private static class Servers {
         MainThread mt[];
         ZooKeeper zk[];
+        int[] clientPorts;
+
+        public void shutDownAllServers() throws InterruptedException {
+            for (MainThread t: mt) {
+                t.shutdown();
+            }
+        }
+
+        public void restartAllServersAndClients(Watcher watcher) throws 
IOException {
+            for (MainThread t : mt) {
+                if (!t.isAlive()) {
+                    t.start();
+                }
+            }
+            for (int i = 0; i < zk.length; i++) {
+                restartClient(i, watcher);
+            }
+        }
+
+        public void restartClient(int clientIndex, Watcher watcher) throws 
IOException {
+            zk[clientIndex] = new ZooKeeper("127.0.0.1:" + 
clientPorts[clientIndex], ClientBase.CONNECTION_TIMEOUT, watcher);
+        }
+
+        public int findLeader() {
+            for (int i = 0; i < mt.length; i++) {
+                if (mt[i].main.quorumPeer.leader != null) {
+                    return i;
+                }
+            }
+            return -1;
+        }
     }
 
 
@@ -475,7 +499,7 @@ public class QuorumPeerMainTest extends QuorumPeerTestBase {
     /**
      * This is a helper function for launching a set of servers
      *
-     * @param numServers
+     * @param numServers the number of servers
      * @param tickTime A ticktime to pass to MainThread
      * @return
      * @throws IOException
@@ -484,30 +508,28 @@ public class QuorumPeerMainTest extends 
QuorumPeerTestBase {
     private Servers LaunchServers(int numServers, Integer tickTime) throws 
IOException, InterruptedException {
         int SERVER_COUNT = numServers;
         Servers svrs = new Servers();
-        final int clientPorts[] = new int[SERVER_COUNT];
+        svrs.clientPorts = new int[SERVER_COUNT];
         StringBuilder sb = new StringBuilder();
         for (int i = 0; i < SERVER_COUNT; i++) {
-            clientPorts[i] = PortAssignment.unique();
-            
sb.append("server."+i+"=127.0.0.1:"+PortAssignment.unique()+":"+PortAssignment.unique()+";"+clientPorts[i]+"\n");
+            svrs.clientPorts[i] = PortAssignment.unique();
+            
sb.append("server."+i+"=127.0.0.1:"+PortAssignment.unique()+":"+PortAssignment.unique()+";"+svrs.clientPorts[i]+"\n");
         }
         String quorumCfgSection = sb.toString();
 
-        MainThread mt[] = new MainThread[SERVER_COUNT];
-        ZooKeeper zk[] = new ZooKeeper[SERVER_COUNT];
+        svrs.mt = new MainThread[SERVER_COUNT];
+        svrs.zk = new ZooKeeper[SERVER_COUNT];
         for(int i = 0; i < SERVER_COUNT; i++) {
             if (tickTime != null) {
-              mt[i] = new MainThread(i, clientPorts[i], quorumCfgSection, new 
HashMap<String, String>(), tickTime);
+                svrs.mt[i] = new MainThread(i, svrs.clientPorts[i], 
quorumCfgSection, new HashMap<String, String>(), tickTime);
             } else {
-              mt[i] = new MainThread(i, clientPorts[i], quorumCfgSection);
+                svrs.mt[i] = new MainThread(i, svrs.clientPorts[i], 
quorumCfgSection);
             }
-            mt[i].start();
-            zk[i] = new ZooKeeper("127.0.0.1:" + clientPorts[i], 
ClientBase.CONNECTION_TIMEOUT, this);
+            svrs.mt[i].start();
+            svrs.restartClient(i, this);
         }
 
-        waitForAll(zk, States.CONNECTED);
+        waitForAll(svrs, States.CONNECTED);
 
-        svrs.mt = mt;
-        svrs.zk = zk;
         return svrs;
     }
 
@@ -675,7 +697,7 @@ public class QuorumPeerMainTest extends QuorumPeerTestBase {
             + ":" + electionPort1 + ";" + CLIENT_PORT_QP1
             + "\nserver.2=127.0.0.1:" + PortAssignment.unique()
             + ":" +  electionPort2 + ";" + CLIENT_PORT_QP2;
-        
+
         MainThread q1 = new MainThread(1, CLIENT_PORT_QP1, quorumCfgSection);
         MainThread q2 = new MainThread(2, CLIENT_PORT_QP2, quorumCfgSection);
         q1.start();
@@ -890,4 +912,105 @@ public class QuorumPeerMainTest extends 
QuorumPeerTestBase {
                 maxSessionTimeOut, quorumPeer.getMaxSessionTimeout());
     }
 
+    @Test
+    public void testFailedTxnAsPartOfQuorumLoss() throws Exception {
+        final int LEADER_TIMEOUT_MS = 10_000;
+        // 1. start up server and wait for leader election to finish
+        ClientBase.setupTestEnv();
+        final int SERVER_COUNT = 3;
+        servers = LaunchServers(SERVER_COUNT);
+
+        waitForAll(servers, States.CONNECTED);
+
+        // we need to shutdown and start back up to make sure that the create 
session isn't the first transaction since
+        // that is rather innocuous.
+        servers.shutDownAllServers();
+        waitForAll(servers, States.CONNECTING);
+        servers.restartAllServersAndClients(this);
+        waitForAll(servers, States.CONNECTED);
+
+        // 2. kill all followers
+        int leader = servers.findLeader();
+        Map<Long, Proposal> outstanding =  
servers.mt[leader].main.quorumPeer.leader.outstandingProposals;
+        // increase the tick time to delay the leader going to looking
+        servers.mt[leader].main.quorumPeer.tickTime = LEADER_TIMEOUT_MS;
+        LOG.warn("LEADER {}", leader);
+
+        for (int i = 0; i < SERVER_COUNT; i++) {
+            if (i != leader) {
+                servers.mt[i].shutdown();
+            }
+        }
+
+        // 3. start up the followers to form a new quorum
+        for (int i = 0; i < SERVER_COUNT; i++) {
+            if (i != leader) {
+                servers.mt[i].start();
+            }
+        }
+
+        // 4. wait one of the follower to be the new leader
+        for (int i = 0; i < SERVER_COUNT; i++) {
+            if (i != leader) {
+                // Recreate a client session since the previous session was 
not persisted.
+                servers.restartClient(i, this);
+                waitForOne(servers.zk[i], States.CONNECTED);
+            }
+        }
+
+        // 5. send a create request to old leader and make sure it's synced to 
disk,
+        //    which means it acked from itself
+        try {
+            servers.zk[leader].create("/zk" + leader, "zk".getBytes(), 
Ids.OPEN_ACL_UNSAFE,
+                CreateMode.PERSISTENT);
+            Assert.fail("create /zk" + leader + " should have failed");
+        } catch (KeeperException e) {
+        }
+
+        // just make sure that we actually did get it in process at the
+        // leader
+        Assert.assertEquals(1, outstanding.size());
+        Proposal p = outstanding.values().iterator().next();
+        Assert.assertEquals(OpCode.create, p.request.getHdr().getType());
+
+        // make sure it has a chance to write it to disk
+        int sleepTime = 0;
+        Long longLeader = new Long(leader);
+        while (!p.qvAcksetPairs.get(0).getAckset().contains(longLeader)) {
+            if (sleepTime > 2000) {
+                Assert.fail("Transaction not synced to disk within 1 second " 
+ p.qvAcksetPairs.get(0).getAckset()
+                    + " expected " + leader);
+            }
+            Thread.sleep(100);
+            sleepTime += 100;
+        }
+
+        // 6. wait for the leader to quit due to not enough followers and come 
back up as a part of the new quorum
+        LOG.info("Waiting for leader {} to timeout followers", leader);
+        sleepTime = 0;
+        Follower f = servers.mt[leader].main.quorumPeer.follower;
+        while (f == null || !f.isRunning()) {
+            if (sleepTime > LEADER_TIMEOUT_MS * 2) {
+                Assert.fail("Took too long for old leader to time out " + 
servers.mt[leader].main.quorumPeer.getPeerState());
+            }
+            Thread.sleep(100);
+            sleepTime += 100;
+            f = servers.mt[leader].main.quorumPeer.follower;
+        }
+
+        int newLeader = servers.findLeader();
+        // make sure a different leader was elected
+        Assert.assertNotEquals(leader, newLeader);
+
+        // 7. restart the previous leader to force it to replay the edits and 
possibly come up in a bad state
+        servers.mt[leader].shutdown();
+        servers.mt[leader].start();
+        waitForAll(servers, States.CONNECTED);
+
+        // 8. check the node exist in previous leader but not others
+        //    make sure everything is consistent
+        for (int i = 0; i < SERVER_COUNT; i++) {
+            Assert.assertNull("server " + i + " should not have /zk" + leader, 
servers.zk[i].exists("/zk" + leader, false));
+        }
+    }
 }

Reply via email to