Repository: zookeeper
Updated Branches:
  refs/heads/master e7ac12c95 -> e501d9cc6


ZOOKEEPER-3127: Fixing potential data inconsistency due to update last 
processed zxid with partial multi-op txn

Author: Fangmin Lyu <allen...@fb.com>

Reviewers: Benjamin Reed <br...@apache.org>, Michael Han <h...@apache.org>

Closes #606 from lvfangmin/ZOOKEEPER-3127


Project: http://git-wip-us.apache.org/repos/asf/zookeeper/repo
Commit: http://git-wip-us.apache.org/repos/asf/zookeeper/commit/e501d9cc
Tree: http://git-wip-us.apache.org/repos/asf/zookeeper/tree/e501d9cc
Diff: http://git-wip-us.apache.org/repos/asf/zookeeper/diff/e501d9cc

Branch: refs/heads/master
Commit: e501d9cc67fbaa6e825292fd838711259b6c9789
Parents: e7ac12c
Author: Fangmin Lyu <allen...@fb.com>
Authored: Wed Sep 5 13:35:38 2018 -0700
Committer: Michael Han <h...@apache.org>
Committed: Wed Sep 5 13:35:38 2018 -0700

----------------------------------------------------------------------
 .../org/apache/zookeeper/server/DataTree.java   |  53 +++--
 .../org/apache/zookeeper/server/ZKDatabase.java |   9 +-
 .../server/quorum/FuzzySnapshotRelatedTest.java | 212 +++++++++++++++++++
 .../server/quorum/QuorumPeerMainTest.java       |   6 +-
 4 files changed, 260 insertions(+), 20 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/zookeeper/blob/e501d9cc/src/java/main/org/apache/zookeeper/server/DataTree.java
----------------------------------------------------------------------
diff --git a/src/java/main/org/apache/zookeeper/server/DataTree.java 
b/src/java/main/org/apache/zookeeper/server/DataTree.java
index 9a4f1a7..e0e6661 100644
--- a/src/java/main/org/apache/zookeeper/server/DataTree.java
+++ b/src/java/main/org/apache/zookeeper/server/DataTree.java
@@ -788,7 +788,11 @@ public class DataTree {
 
     public volatile long lastProcessedZxid = 0;
 
-    public ProcessTxnResult processTxn(TxnHeader header, Record txn)
+    public ProcessTxnResult processTxn(TxnHeader header, Record txn) {
+        return this.processTxn(header, txn, false);
+    }
+
+    public ProcessTxnResult processTxn(TxnHeader header, Record txn, boolean 
isSubTxn)
     {
         ProcessTxnResult rc = new ProcessTxnResult();
 
@@ -943,7 +947,7 @@ public class DataTree {
                         TxnHeader subHdr = new TxnHeader(header.getClientId(), 
header.getCxid(),
                                                          header.getZxid(), 
header.getTime(),
                                                          subtxn.getType());
-                        ProcessTxnResult subRc = processTxn(subHdr, record);
+                        ProcessTxnResult subRc = processTxn(subHdr, record, 
true);
                         rc.multiResult.add(subRc);
                         if (subRc.err != 0 && rc.err == 0) {
                             rc.err = subRc.err ;
@@ -961,22 +965,41 @@ public class DataTree {
                 LOG.debug("Failed: " + header + ":" + txn, e);
             }
         }
+
+
         /*
-         * A snapshot might be in progress while we are modifying the data
-         * tree. If we set lastProcessedZxid prior to making corresponding
-         * change to the tree, then the zxid associated with the snapshot
-         * file will be ahead of its contents. Thus, while restoring from
-         * the snapshot, the restore method will not apply the transaction
-         * for zxid associated with the snapshot file, since the restore
-         * method assumes that transaction to be present in the snapshot.
+         * Things we can only update after the whole txn is applied to data
+         * tree.
          *
-         * To avoid this, we first apply the transaction and then modify
-         * lastProcessedZxid.  During restore, we correctly handle the
-         * case where the snapshot contains data ahead of the zxid associated
-         * with the file.
+         * If we update the lastProcessedZxid with the first sub txn in multi
+         * and there is a snapshot in progress, it's possible that the zxid
+         * associated with the snapshot only include partial of the multi op.
+         *
+         * When loading snapshot, it will only load the txns after the zxid
+         * associated with snapshot file, which could cause data inconsistency
+         * due to missing sub txns.
+         *
+         * To avoid this, we only update the lastProcessedZxid when the whole
+         * multi-op txn is applied to DataTree.
          */
-        if (rc.zxid > lastProcessedZxid) {
-            lastProcessedZxid = rc.zxid;
+        if (!isSubTxn) {
+            /*
+             * A snapshot might be in progress while we are modifying the data
+             * tree. If we set lastProcessedZxid prior to making corresponding
+             * change to the tree, then the zxid associated with the snapshot
+             * file will be ahead of its contents. Thus, while restoring from
+             * the snapshot, the restore method will not apply the transaction
+             * for zxid associated with the snapshot file, since the restore
+             * method assumes that transaction to be present in the snapshot.
+             *
+             * To avoid this, we first apply the transaction and then modify
+             * lastProcessedZxid.  During restore, we correctly handle the
+             * case where the snapshot contains data ahead of the zxid 
associated
+             * with the file.
+             */
+            if (rc.zxid > lastProcessedZxid) {
+                lastProcessedZxid = rc.zxid;
+            }
         }
 
         /*

http://git-wip-us.apache.org/repos/asf/zookeeper/blob/e501d9cc/src/java/main/org/apache/zookeeper/server/ZKDatabase.java
----------------------------------------------------------------------
diff --git a/src/java/main/org/apache/zookeeper/server/ZKDatabase.java 
b/src/java/main/org/apache/zookeeper/server/ZKDatabase.java
index 7b00715..04145cb 100644
--- a/src/java/main/org/apache/zookeeper/server/ZKDatabase.java
+++ b/src/java/main/org/apache/zookeeper/server/ZKDatabase.java
@@ -95,7 +95,7 @@ public class ZKDatabase {
      * @param snapLog the FileTxnSnapLog mapping this zkdatabase
      */
     public ZKDatabase(FileTxnSnapLog snapLog) {
-        dataTree = new DataTree();
+        dataTree = createDataTree();
         sessionsWithTimeouts = new ConcurrentHashMap<Long, Integer>();
         this.snapLog = snapLog;
 
@@ -138,7 +138,7 @@ public class ZKDatabase {
         /* to be safe we just create a new
          * datatree.
          */
-        dataTree = new DataTree();
+        dataTree = createDataTree();
         sessionsWithTimeouts.clear();
         WriteLock lock = logLock.writeLock();
         try {
@@ -644,4 +644,9 @@ public class ZKDatabase {
     public boolean removeWatch(String path, WatcherType type, Watcher watcher) 
{
         return dataTree.removeWatch(path, type, watcher);
     }
+
+    // visible for testing
+    public DataTree createDataTree() {
+        return new DataTree();
+    }
 }

http://git-wip-us.apache.org/repos/asf/zookeeper/blob/e501d9cc/src/java/test/org/apache/zookeeper/server/quorum/FuzzySnapshotRelatedTest.java
----------------------------------------------------------------------
diff --git 
a/src/java/test/org/apache/zookeeper/server/quorum/FuzzySnapshotRelatedTest.java
 
b/src/java/test/org/apache/zookeeper/server/quorum/FuzzySnapshotRelatedTest.java
new file mode 100644
index 0000000..0e3b230
--- /dev/null
+++ 
b/src/java/test/org/apache/zookeeper/server/quorum/FuzzySnapshotRelatedTest.java
@@ -0,0 +1,212 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.zookeeper.server.quorum;
+
+import java.util.Arrays;
+import java.util.HashMap;
+import java.util.List;
+import java.util.Map;
+import javax.security.sasl.SaslException;
+
+import org.apache.zookeeper.CreateMode;
+import org.apache.zookeeper.KeeperException.NoNodeException;
+import org.apache.zookeeper.KeeperException.NodeExistsException;
+import org.apache.zookeeper.Op;
+import org.apache.zookeeper.PortAssignment;
+import org.apache.zookeeper.ZooDefs.Ids;
+import org.apache.zookeeper.ZooKeeper;
+import org.apache.zookeeper.ZooKeeper.States;
+import org.apache.zookeeper.data.ACL;
+import org.apache.zookeeper.data.Stat;
+import org.apache.zookeeper.server.DataTree;
+import org.apache.zookeeper.server.ZKDatabase;
+import org.apache.zookeeper.server.ZooKeeperServer;
+import org.apache.zookeeper.test.ClientBase;
+
+import org.junit.Assert;
+import org.junit.After;
+import org.junit.Before;
+import org.junit.Test;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+/**
+ * Test cases used to catch corner cases due to fuzzy snapshot.
+ */
+public class FuzzySnapshotRelatedTest extends QuorumPeerTestBase {
+
+    private static final Logger LOG = 
LoggerFactory.getLogger(FuzzySnapshotRelatedTest.class);
+
+    MainThread[] mt = null;
+    ZooKeeper[] zk = null;
+    int leaderId;
+    int followerA;
+
+    @Before
+    public void setup() throws Exception {
+        LOG.info("Start up a 3 server quorum");
+        final int ENSEMBLE_SERVERS = 3;
+        final int clientPorts[] = new int[ENSEMBLE_SERVERS];
+        StringBuilder sb = new StringBuilder();
+        String server;
+
+        for (int i = 0; i < ENSEMBLE_SERVERS; i++) {
+            clientPorts[i] = PortAssignment.unique();
+            server = "server." + i + "=127.0.0.1:" + PortAssignment.unique()
+                    + ":" + PortAssignment.unique() + ":participant;127.0.0.1:"
+                    + clientPorts[i];
+            sb.append(server + "\n");
+        }
+        String currentQuorumCfgSection = sb.toString();
+
+        // start servers
+        mt = new MainThread[ENSEMBLE_SERVERS];
+        zk = new ZooKeeper[ENSEMBLE_SERVERS];
+        for (int i = 0; i < ENSEMBLE_SERVERS; i++) {
+            mt[i] = new MainThread(i, clientPorts[i], currentQuorumCfgSection,
+                    false) {
+                @Override
+                public TestQPMain getTestQPMain() {
+                    return new CustomizedQPMain();
+                }
+            };
+            mt[i].start();
+            zk[i] = new ZooKeeper("127.0.0.1:" + clientPorts[i],
+                    ClientBase.CONNECTION_TIMEOUT, this);
+        }
+        QuorumPeerMainTest.waitForAll(zk, States.CONNECTED);
+        LOG.info("all servers started");
+
+        leaderId = -1;
+        followerA = -1;
+        for (int i = 0; i < ENSEMBLE_SERVERS; i++) {
+            if (mt[i].main.quorumPeer.leader != null) {
+                leaderId = i;
+            } else if (followerA == -1) {
+                followerA = i;
+            }
+        }
+    }
+
+    @After
+    public void tearDown() throws Exception {
+        if (mt != null) {
+            for (MainThread t: mt) {
+                t.shutdown();
+            }
+        }
+
+        if (zk != null) {
+            for (ZooKeeper z: zk) {
+                z.close();
+            }
+        }
+    }
+
+    @Test
+    public void testMultiOpConsistency() throws Exception {
+        LOG.info("Create a parent node");
+        final String path = "/testMultiOpConsistency";
+        createEmptyNode(zk[followerA], path);
+
+        LOG.info("Hook to catch the 2nd sub create node txn in multi-op");
+        CustomDataTree dt =
+                (CustomDataTree) 
mt[followerA].main.quorumPeer.getZkDb().getDataTree();
+
+        final ZooKeeperServer zkServer = 
mt[followerA].main.quorumPeer.getActiveServer();
+
+        String node1 = path + "/1";
+        String node2 = path + "/2";
+
+        dt.addNodeCreateListener(node2, new NodeCreateListener() {
+            @Override
+            public void process(String path) {
+                LOG.info("Take a snapshot");
+                zkServer.takeSnapshot(true);
+            }
+        });
+
+        LOG.info("Issue a multi op to create 2 nodes");
+        zk[followerA].multi(Arrays.asList(
+            Op.create(node1, node1.getBytes(),
+                    Ids.OPEN_ACL_UNSAFE, CreateMode.PERSISTENT),
+            Op.create(node2, node2.getBytes(),
+                    Ids.OPEN_ACL_UNSAFE, CreateMode.PERSISTENT))
+        );
+
+        LOG.info("Restart the server");
+        mt[followerA].shutdown();
+        QuorumPeerMainTest.waitForOne(zk[followerA], States.CONNECTING);
+
+        mt[followerA].start();
+        QuorumPeerMainTest.waitForOne(zk[followerA], States.CONNECTED);
+
+        LOG.info("Make sure the node consistent with leader");
+        Assert.assertEquals(new String(zk[leaderId].getData(node2, null, 
null)),
+                new String(zk[followerA].getData(node2, null, null)));
+    }
+
+    private void createEmptyNode(ZooKeeper zk, String path) throws Exception {
+        zk.create(path, new byte[0], Ids.OPEN_ACL_UNSAFE, 
CreateMode.PERSISTENT);
+    }
+
+    static interface NodeCreateListener {
+        public void process(String path);
+
+    }
+
+    static class CustomDataTree extends DataTree {
+        Map<String, NodeCreateListener> nodeCreateListeners =
+                new HashMap<String, NodeCreateListener>();
+
+        @Override
+        public void createNode(final String path, byte data[], List<ACL> acl,
+                           long ephemeralOwner, int parentCVersion, long zxid,
+                           long time, Stat outputStat)
+                throws NoNodeException, NodeExistsException {
+            NodeCreateListener listener = nodeCreateListeners.get(path);
+            if (listener != null) {
+                listener.process(path);
+            }
+            super.createNode(path, data, acl, ephemeralOwner, parentCVersion,
+                    zxid, time, outputStat);
+        }
+
+        public void addNodeCreateListener(String path, NodeCreateListener 
listener) {
+            nodeCreateListeners.put(path, listener);
+        }
+    }
+
+    static class CustomizedQPMain extends TestQPMain {
+        @Override
+        protected QuorumPeer getQuorumPeer() throws SaslException {
+            return new QuorumPeer() {
+                @Override
+                public void setZKDatabase(ZKDatabase database) {
+                    super.setZKDatabase(new ZKDatabase(this.getTxnFactory()) {
+                        @Override
+                        public DataTree createDataTree() {
+                            return new CustomDataTree();
+                        }
+                    });
+                }
+            };
+        }
+    }
+}

http://git-wip-us.apache.org/repos/asf/zookeeper/blob/e501d9cc/src/java/test/org/apache/zookeeper/server/quorum/QuorumPeerMainTest.java
----------------------------------------------------------------------
diff --git 
a/src/java/test/org/apache/zookeeper/server/quorum/QuorumPeerMainTest.java 
b/src/java/test/org/apache/zookeeper/server/quorum/QuorumPeerMainTest.java
index 5928ea9..d48ea04 100644
--- a/src/java/test/org/apache/zookeeper/server/quorum/QuorumPeerMainTest.java
+++ b/src/java/test/org/apache/zookeeper/server/quorum/QuorumPeerMainTest.java
@@ -452,7 +452,7 @@ public class QuorumPeerMainTest extends QuorumPeerTestBase {
         Assert.assertTrue("falseLeader never rejoins the quorum", 
foundFollowing);
     }
 
-    private void waitForOne(ZooKeeper zk, States state) throws 
InterruptedException {
+    public static void waitForOne(ZooKeeper zk, States state) throws 
InterruptedException {
         int iterations = ClientBase.CONNECTION_TIMEOUT / 500;
         while (zk.getState() != state) {
             if (iterations-- == 0) {
@@ -466,7 +466,7 @@ public class QuorumPeerMainTest extends QuorumPeerTestBase {
         waitForAll(servers.zk, state);
     }
 
-    private void waitForAll(ZooKeeper[] zks, States state) throws 
InterruptedException {
+    public static void waitForAll(ZooKeeper[] zks, States state) throws 
InterruptedException {
         int iterations = ClientBase.CONNECTION_TIMEOUT / 1000;
         boolean someoneNotConnected = true;
         while (someoneNotConnected) {
@@ -487,7 +487,7 @@ public class QuorumPeerMainTest extends QuorumPeerTestBase {
         }
     }
 
-    private void logStates(ZooKeeper[] zks) {
+    public static void logStates(ZooKeeper[] zks) {
             StringBuilder sbBuilder = new StringBuilder("Connection States: 
{");
            for (int i = 0; i < zks.length; i++) {
                 sbBuilder.append(i + " : " + zks[i].getState() + ", ");

Reply via email to