Repository: zookeeper
Updated Branches:
  refs/heads/branch-3.4 0666919fe -> e0af6ed75


ZOOKEEPER-2845: Apply commit log when restarting server.

This is the version of #453 for the 3.4 branch

Author: Robert Evans <ev...@yahoo-inc.com>

Reviewers: Abraham Fine <af...@apache.org>, Mark Fenes <mfe...@cloudera.com>, 
Andor Molnár <an...@cloudera.com>, Kishor Patil <kpa...@yahoo-inc.com>

Closes #455 from revans2/ZOOKEEPER-2845-3.4


Project: http://git-wip-us.apache.org/repos/asf/zookeeper/repo
Commit: http://git-wip-us.apache.org/repos/asf/zookeeper/commit/e0af6ed7
Tree: http://git-wip-us.apache.org/repos/asf/zookeeper/tree/e0af6ed7
Diff: http://git-wip-us.apache.org/repos/asf/zookeeper/diff/e0af6ed7

Branch: refs/heads/branch-3.4
Commit: e0af6ed7598fc4555d7625ddc8efd86e2281babf
Parents: 0666919
Author: Robert Evans <ev...@yahoo-inc.com>
Authored: Fri Feb 23 15:18:55 2018 -0800
Committer: Abraham Fine <af...@apache.org>
Committed: Fri Feb 23 15:18:55 2018 -0800

----------------------------------------------------------------------
 .../org/apache/zookeeper/server/ZKDatabase.java |  41 ++-
 .../zookeeper/server/ZooKeeperServer.java       |  24 +-
 .../server/persistence/FileTxnSnapLog.java      |  16 ++
 .../server/quorum/QuorumPeerMainTest.java       | 257 ++++++++++++++-----
 4 files changed, 254 insertions(+), 84 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/zookeeper/blob/e0af6ed7/src/java/main/org/apache/zookeeper/server/ZKDatabase.java
----------------------------------------------------------------------
diff --git a/src/java/main/org/apache/zookeeper/server/ZKDatabase.java 
b/src/java/main/org/apache/zookeeper/server/ZKDatabase.java
index 72357b7..ce36422 100644
--- a/src/java/main/org/apache/zookeeper/server/ZKDatabase.java
+++ b/src/java/main/org/apache/zookeeper/server/ZKDatabase.java
@@ -201,7 +201,12 @@ public class ZKDatabase {
         return sessionsWithTimeouts;
     }
 
-    
+    private final PlayBackListener commitProposalPlaybackListener = new 
PlayBackListener() {
+        public void onTxnLoaded(TxnHeader hdr, Record txn){
+            addCommittedProposal(hdr, txn);
+        }
+    };
+
     /**
      * load the database from the disk onto memory and also add 
      * the transactions to the committedlog in memory.
@@ -209,22 +214,30 @@ public class ZKDatabase {
      * @throws IOException
      */
     public long loadDataBase() throws IOException {
-        PlayBackListener listener=new PlayBackListener(){
-            public void onTxnLoaded(TxnHeader hdr,Record txn){
-                Request r = new Request(null, 0, hdr.getCxid(),hdr.getType(),
-                        null, null);
-                r.txn = txn;
-                r.hdr = hdr;
-                r.zxid = hdr.getZxid();
-                addCommittedProposal(r);
-            }
-        };
-        
-        long zxid = snapLog.restore(dataTree,sessionsWithTimeouts,listener);
+        long zxid = snapLog.restore(dataTree, sessionsWithTimeouts, 
commitProposalPlaybackListener);
         initialized = true;
         return zxid;
     }
-    
+
+    /**
+     * Fast forward the database adding transactions from the committed log 
into memory.
+     * @return the last valid zxid.
+     * @throws IOException
+     */
+    public long fastForwardDataBase() throws IOException {
+        long zxid = snapLog.fastForwardFromEdits(dataTree, 
sessionsWithTimeouts, commitProposalPlaybackListener);
+        initialized = true;
+        return zxid;
+    }
+
+    private void addCommittedProposal(TxnHeader hdr, Record txn) {
+        Request r = new Request(null, 0, hdr.getCxid(), hdr.getType(), null, 
null);
+        r.txn = txn;
+        r.hdr = hdr;
+        r.zxid = hdr.getZxid();
+        addCommittedProposal(r);
+    }
+
     /**
      * maintains a list of last <i>committedLog</i>
      *  or so committed requests. This is used for

http://git-wip-us.apache.org/repos/asf/zookeeper/blob/e0af6ed7/src/java/main/org/apache/zookeeper/server/ZooKeeperServer.java
----------------------------------------------------------------------
diff --git a/src/java/main/org/apache/zookeeper/server/ZooKeeperServer.java 
b/src/java/main/org/apache/zookeeper/server/ZooKeeperServer.java
index cbf5fa5..977adc2 100644
--- a/src/java/main/org/apache/zookeeper/server/ZooKeeperServer.java
+++ b/src/java/main/org/apache/zookeeper/server/ZooKeeperServer.java
@@ -512,14 +512,24 @@ public class ZooKeeperServer implements SessionExpirer, 
ServerStats.Provider {
             firstProcessor.shutdown();
         }
 
-        if (fullyShutDown && zkDb != null) {
-            zkDb.clear();
+        if (zkDb != null) {
+            if (fullyShutDown) {
+                zkDb.clear();
+            } else {
+                // else there is no need to clear the database
+                //  * When a new quorum is established we can still apply the 
diff
+                //    on top of the same zkDb data
+                //  * If we fetch a new snapshot from leader, the zkDb will be
+                //    cleared anyway before loading the snapshot
+                try {
+                    //This will fast forward the database to the latest 
recorded transactions
+                    zkDb.fastForwardDataBase();
+                } catch (IOException e) {
+                    LOG.error("Error updating DB", e);
+                    zkDb.clear();
+                }
+            }
         }
-        // else there is no need to clear the database
-        //  * When a new quorum is established we can still apply the diff
-        //    on top of the same zkDb data
-        //  * If we fetch a new snapshot from leader, the zkDb will be
-        //    cleared anyway before loading the snapshot
 
         unregisterJMX();
     }

http://git-wip-us.apache.org/repos/asf/zookeeper/blob/e0af6ed7/src/java/main/org/apache/zookeeper/server/persistence/FileTxnSnapLog.java
----------------------------------------------------------------------
diff --git 
a/src/java/main/org/apache/zookeeper/server/persistence/FileTxnSnapLog.java 
b/src/java/main/org/apache/zookeeper/server/persistence/FileTxnSnapLog.java
index b261a8e..0ba8491 100644
--- a/src/java/main/org/apache/zookeeper/server/persistence/FileTxnSnapLog.java
+++ b/src/java/main/org/apache/zookeeper/server/persistence/FileTxnSnapLog.java
@@ -168,6 +168,22 @@ public class FileTxnSnapLog {
     public long restore(DataTree dt, Map<Long, Integer> sessions, 
             PlayBackListener listener) throws IOException {
         snapLog.deserialize(dt, sessions);
+        return fastForwardFromEdits(dt, sessions, listener);
+    }
+
+    /**
+     * This function will fast forward the server database to have the latest
+     * transactions in it.  This is the same as restore, but only reads from
+     * the transaction logs and not restores from a snapshot.
+     * @param dt the datatree to write transactions to.
+     * @param sessions the sessions to be restored.
+     * @param listener the playback listener to run on the
+     * database transactions.
+     * @return the highest zxid restored.
+     * @throws IOException
+     */
+    public long fastForwardFromEdits(DataTree dt, Map<Long, Integer> sessions,
+                                     PlayBackListener listener) throws 
IOException {
         FileTxnLog txnLog = new FileTxnLog(dataDir);
         TxnIterator itr = txnLog.read(dt.lastProcessedZxid+1);
         long highestZxid = dt.lastProcessedZxid;

http://git-wip-us.apache.org/repos/asf/zookeeper/blob/e0af6ed7/src/java/test/org/apache/zookeeper/server/quorum/QuorumPeerMainTest.java
----------------------------------------------------------------------
diff --git 
a/src/java/test/org/apache/zookeeper/server/quorum/QuorumPeerMainTest.java 
b/src/java/test/org/apache/zookeeper/server/quorum/QuorumPeerMainTest.java
index 0399c97..cd09a36 100644
--- a/src/java/test/org/apache/zookeeper/server/quorum/QuorumPeerMainTest.java
+++ b/src/java/test/org/apache/zookeeper/server/quorum/QuorumPeerMainTest.java
@@ -50,6 +50,7 @@ import org.apache.log4j.WriterAppender;
 import org.apache.zookeeper.CreateMode;
 import org.apache.zookeeper.KeeperException;
 import org.apache.zookeeper.PortAssignment;
+import org.apache.zookeeper.Watcher;
 import org.apache.zookeeper.ZooDefs.OpCode;
 import org.apache.zookeeper.ZooKeeper;
 import org.apache.zookeeper.ZooDefs.Ids;
@@ -254,15 +255,8 @@ public class QuorumPeerMainTest extends QuorumPeerTestBase 
{
         numServers = 3;
         servers = LaunchServers(numServers);
         String path = "/hzxidtest";
-        int leader=-1;
+        int leader = servers.findLeader();
 
-        // find the leader
-        for (int i=0; i < numServers; i++) {
-            if (servers.mt[i].main.quorumPeer.leader != null) {
-                leader = i;
-            }
-        }
-        
         // make sure there is a leader
         Assert.assertTrue("There should be a leader", leader >=0);
 
@@ -371,12 +365,7 @@ public class QuorumPeerMainTest extends QuorumPeerTestBase 
{
           servers = LaunchServers(numServers, 500);
 
           // find the leader
-          int trueLeader = -1;
-          for (int i = 0; i < numServers; i++) {
-            if (servers.mt[i].main.quorumPeer.leader != null) {
-              trueLeader = i;
-            }
-          }
+          int trueLeader = servers.findLeader();
           Assert.assertTrue("There should be a leader", trueLeader >= 0);
 
           // find a follower
@@ -437,27 +426,36 @@ public class QuorumPeerMainTest extends 
QuorumPeerTestBase {
     }
 
     private void waitForOne(ZooKeeper zk, States state) throws 
InterruptedException {
-       while(zk.getState() != state) {
-               Thread.sleep(500);
-       }
+        int iterations = ClientBase.CONNECTION_TIMEOUT / 500;
+        while (zk.getState() != state) {
+            if (iterations-- == 0) {
+                throw new RuntimeException("Waiting too long " + zk.getState() 
+ " != " + state);
+            }
+            Thread.sleep(500);
+        }
     }
 
-       private void waitForAll(ZooKeeper[] zks, States state) throws 
InterruptedException {
-               int iterations = 10;
-               boolean someoneNotConnected = true;
-        while(someoneNotConnected) {
-               if (iterations-- == 0) {
-                       ClientBase.logAllStackTraces();
-                       throw new RuntimeException("Waiting too long");
-               }
-               
-               someoneNotConnected = false;
-               for(ZooKeeper zk: zks) {
-                       if (zk.getState() != state) {
-                               someoneNotConnected = true;
-                       }
-               }
-               Thread.sleep(1000);
+    private void waitForAll(Servers servers, States state) throws 
InterruptedException {
+        waitForAll(servers.zk, state);
+    }
+
+    private void waitForAll(ZooKeeper[] zks, States state) throws 
InterruptedException {
+        int iterations = ClientBase.CONNECTION_TIMEOUT / 1000;
+        boolean someoneNotConnected = true;
+        while (someoneNotConnected) {
+            if (iterations-- == 0) {
+                ClientBase.logAllStackTraces();
+                throw new RuntimeException("Waiting too long");
+            }
+
+            someoneNotConnected = false;
+            for (ZooKeeper zk : zks) {
+                if (zk.getState() != state) {
+                    someoneNotConnected = true;
+                    break;
+                }
+            }
+            Thread.sleep(1000);
         }
        }
 
@@ -465,48 +463,79 @@ public class QuorumPeerMainTest extends 
QuorumPeerTestBase {
     private class Servers {
         MainThread mt[];
         ZooKeeper zk[];
+        int[] clientPorts;
+
+        public void shutDownAllServers() throws InterruptedException {
+            for (MainThread t: mt) {
+                t.shutdown();
+            }
+        }
+
+        public void restartAllServersAndClients(Watcher watcher) throws 
IOException {
+            for (MainThread t : mt) {
+                if (!t.isAlive()) {
+                    t.start();
+                }
+            }
+            for (int i = 0; i < zk.length; i++) {
+                restartClient(i, watcher);
+            }
+        }
+
+        public void restartClient(int clientIndex, Watcher watcher) throws 
IOException {
+            zk[clientIndex] = new ZooKeeper("127.0.0.1:" + 
clientPorts[clientIndex], ClientBase.CONNECTION_TIMEOUT, watcher);
+        }
+
+        public int findLeader() {
+            for (int i = 0; i < mt.length; i++) {
+                if (mt[i].main.quorumPeer.leader != null) {
+                    return i;
+                }
+            }
+            return -1;
+        }
     }
 
     private Servers LaunchServers(int numServers) throws IOException, 
InterruptedException {
            return LaunchServers(numServers, null);
     }
 
-    /** * This is a helper function for launching a set of servers
+    /**
+     * This is a helper function for launching a set of servers
      *
-     * @param numServers* @param tickTime A ticktime to pass to MainThread
+     * @param numServers the number of servers
+     * @param tickTime A ticktime to pass to MainThread
      * @return
      * @throws IOException
      * @throws InterruptedException
      */
     private Servers LaunchServers(int numServers, Integer tickTime) throws 
IOException, InterruptedException {
-           int SERVER_COUNT = numServers;
-           Servers svrs = new Servers();
-           final int clientPorts[] = new int[SERVER_COUNT];
-           StringBuilder sb = new StringBuilder();
-           for(int i = 0; i < SERVER_COUNT; i++) {
-               clientPorts[i] = PortAssignment.unique();
-               
sb.append("server."+i+"=127.0.0.1:"+PortAssignment.unique()+":"+PortAssignment.unique()+"\n");
-           }
-           String quorumCfgSection = sb.toString();
-
-           MainThread mt[] = new MainThread[SERVER_COUNT];
-           ZooKeeper zk[] = new ZooKeeper[SERVER_COUNT];
-           for (int i = 0; i < SERVER_COUNT; i++) {
-               if (tickTime != null) {
-                       mt[i] = new MainThread(i, clientPorts[i], 
quorumCfgSection, new HashMap<String, String>(), tickTime);
-               } else {
-                   mt[i] = new MainThread(i, clientPorts[i], quorumCfgSection);
-               }
-               mt[i].start();
-               zk[i] = new ZooKeeper("127.0.0.1:" + clientPorts[i], 
ClientBase.CONNECTION_TIMEOUT, this);
-           }
-
-           waitForAll(zk, States.CONNECTED);
-
-           svrs.mt = mt;
-           svrs.zk = zk;
-           return svrs;
-       }
+        int SERVER_COUNT = numServers;
+        Servers svrs = new Servers();
+        svrs.clientPorts = new int[SERVER_COUNT];
+        StringBuilder sb = new StringBuilder();
+        for(int i = 0; i < SERVER_COUNT; i++) {
+            svrs.clientPorts[i] = PortAssignment.unique();
+            
sb.append("server."+i+"=127.0.0.1:"+PortAssignment.unique()+":"+PortAssignment.unique()+"\n");
+        }
+        String quorumCfgSection = sb.toString();
+
+        svrs.mt = new MainThread[SERVER_COUNT];
+        svrs.zk = new ZooKeeper[SERVER_COUNT];
+        for(int i = 0; i < SERVER_COUNT; i++) {
+            if (tickTime != null) {
+                svrs.mt[i] = new MainThread(i, svrs.clientPorts[i], 
quorumCfgSection, new HashMap<String, String>(), tickTime);
+            } else {
+                svrs.mt[i] = new MainThread(i, svrs.clientPorts[i], 
quorumCfgSection);
+            }
+            svrs.mt[i].start();
+            svrs.restartClient(i, this);
+        }
+
+        waitForAll(svrs, States.CONNECTED);
+
+        return svrs;
+    }
 
 
     /**
@@ -1120,4 +1149,106 @@ public class QuorumPeerMainTest extends 
QuorumPeerTestBase {
             return qp;
         }
     }
+
+    @Test
+    public void testFailedTxnAsPartOfQuorumLoss() throws Exception {
+        final int LEADER_TIMEOUT_MS = 10000;
+        // 1. start up server and wait for leader election to finish
+        ClientBase.setupTestEnv();
+        final int SERVER_COUNT = 3;
+        servers = LaunchServers(SERVER_COUNT);
+
+        waitForAll(servers, States.CONNECTED);
+
+        // we need to shutdown and start back up to make sure that the create 
session isn't the first transaction since
+        // that is rather innocuous.
+        servers.shutDownAllServers();
+        waitForAll(servers, States.CONNECTING);
+        servers.restartAllServersAndClients(this);
+        waitForAll(servers, States.CONNECTED);
+
+        // 2. kill all followers
+        int leader = servers.findLeader();
+        Map<Long, Proposal> outstanding =  
servers.mt[leader].main.quorumPeer.leader.outstandingProposals;
+        // increase the tick time to delay the leader going to looking
+        servers.mt[leader].main.quorumPeer.tickTime = LEADER_TIMEOUT_MS;
+        LOG.warn("LEADER " + leader);
+
+        for (int i = 0; i < SERVER_COUNT; i++) {
+            if (i != leader) {
+                servers.mt[i].shutdown();
+            }
+        }
+
+        // 3. start up the followers to form a new quorum
+        for (int i = 0; i < SERVER_COUNT; i++) {
+            if (i != leader) {
+                servers.mt[i].start();
+            }
+        }
+
+        // 4. wait one of the follower to be the new leader
+        for (int i = 0; i < SERVER_COUNT; i++) {
+            if (i != leader) {
+                // Recreate a client session since the previous session was 
not persisted.
+                servers.restartClient(i, this);
+                waitForOne(servers.zk[i], States.CONNECTED);
+            }
+        }
+
+        // 5. send a create request to old leader and make sure it's synced to 
disk,
+        //    which means it acked from itself
+        try {
+            servers.zk[leader].create("/zk" + leader, "zk".getBytes(), 
Ids.OPEN_ACL_UNSAFE,
+                CreateMode.PERSISTENT);
+            Assert.fail("create /zk" + leader + " should have failed");
+        } catch (KeeperException e) {
+        }
+
+        // just make sure that we actually did get it in process at the
+        // leader
+        Assert.assertEquals(1, outstanding.size());
+        Proposal p = outstanding.values().iterator().next();
+        Assert.assertEquals(OpCode.create, p.request.hdr.getType());
+
+        // make sure it has a chance to write it to disk
+        int sleepTime = 0;
+        Long longLeader = new Long(leader);
+        while (!p.ackSet.contains(longLeader)) {
+            if (sleepTime > 2000) {
+                Assert.fail("Transaction not synced to disk within 1 second " 
+ p.ackSet
+                    + " expected " + leader);
+            }
+            Thread.sleep(100);
+            sleepTime += 100;
+        }
+
+        // 6. wait for the leader to quit due to not enough followers and come 
back up as a part of the new quorum
+        LOG.info("Waiting for leader " + leader + " to timeout followers");
+        sleepTime = 0;
+        Follower f = servers.mt[leader].main.quorumPeer.follower;
+        while (f == null || !f.isRunning()) {
+            if (sleepTime > LEADER_TIMEOUT_MS * 2) {
+                Assert.fail("Took too long for old leader to time out " + 
servers.mt[leader].main.quorumPeer.getPeerState());
+            }
+            Thread.sleep(100);
+            sleepTime += 100;
+            f = servers.mt[leader].main.quorumPeer.follower;
+        }
+
+        int newLeader = servers.findLeader();
+        // make sure a different leader was elected
+        Assert.assertTrue(leader != newLeader);
+
+        // 7. restart the previous leader to force it to replay the edits and 
possibly come up in a bad state
+        servers.mt[leader].shutdown();
+        servers.mt[leader].start();
+        waitForAll(servers, States.CONNECTED);
+
+        // 8. check the node exist in previous leader but not others
+        //    make sure everything is consistent
+        for (int i = 0; i < SERVER_COUNT; i++) {
+            Assert.assertNull("server " + i + " should not have /zk" + leader, 
servers.zk[i].exists("/zk" + leader, false));
+        }
+    }
 }

Reply via email to