Repository: zookeeper Updated Branches: refs/heads/master e7ac12c95 -> e501d9cc6
ZOOKEEPER-3127: Fixing potential data inconsistency due to update last processed zxid with partial multi-op txn Author: Fangmin Lyu <allen...@fb.com> Reviewers: Benjamin Reed <br...@apache.org>, Michael Han <h...@apache.org> Closes #606 from lvfangmin/ZOOKEEPER-3127 Project: http://git-wip-us.apache.org/repos/asf/zookeeper/repo Commit: http://git-wip-us.apache.org/repos/asf/zookeeper/commit/e501d9cc Tree: http://git-wip-us.apache.org/repos/asf/zookeeper/tree/e501d9cc Diff: http://git-wip-us.apache.org/repos/asf/zookeeper/diff/e501d9cc Branch: refs/heads/master Commit: e501d9cc67fbaa6e825292fd838711259b6c9789 Parents: e7ac12c Author: Fangmin Lyu <allen...@fb.com> Authored: Wed Sep 5 13:35:38 2018 -0700 Committer: Michael Han <h...@apache.org> Committed: Wed Sep 5 13:35:38 2018 -0700 ---------------------------------------------------------------------- .../org/apache/zookeeper/server/DataTree.java | 53 +++-- .../org/apache/zookeeper/server/ZKDatabase.java | 9 +- .../server/quorum/FuzzySnapshotRelatedTest.java | 212 +++++++++++++++++++ .../server/quorum/QuorumPeerMainTest.java | 6 +- 4 files changed, 260 insertions(+), 20 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/zookeeper/blob/e501d9cc/src/java/main/org/apache/zookeeper/server/DataTree.java ---------------------------------------------------------------------- diff --git a/src/java/main/org/apache/zookeeper/server/DataTree.java b/src/java/main/org/apache/zookeeper/server/DataTree.java index 9a4f1a7..e0e6661 100644 --- a/src/java/main/org/apache/zookeeper/server/DataTree.java +++ b/src/java/main/org/apache/zookeeper/server/DataTree.java @@ -788,7 +788,11 @@ public class DataTree { public volatile long lastProcessedZxid = 0; - public ProcessTxnResult processTxn(TxnHeader header, Record txn) + public ProcessTxnResult processTxn(TxnHeader header, Record txn) { + return this.processTxn(header, txn, false); + } + + public ProcessTxnResult processTxn(TxnHeader header, Record txn, boolean isSubTxn) { ProcessTxnResult rc = new ProcessTxnResult(); @@ -943,7 +947,7 @@ public class DataTree { TxnHeader subHdr = new TxnHeader(header.getClientId(), header.getCxid(), header.getZxid(), header.getTime(), subtxn.getType()); - ProcessTxnResult subRc = processTxn(subHdr, record); + ProcessTxnResult subRc = processTxn(subHdr, record, true); rc.multiResult.add(subRc); if (subRc.err != 0 && rc.err == 0) { rc.err = subRc.err ; @@ -961,22 +965,41 @@ public class DataTree { LOG.debug("Failed: " + header + ":" + txn, e); } } + + /* - * A snapshot might be in progress while we are modifying the data - * tree. If we set lastProcessedZxid prior to making corresponding - * change to the tree, then the zxid associated with the snapshot - * file will be ahead of its contents. Thus, while restoring from - * the snapshot, the restore method will not apply the transaction - * for zxid associated with the snapshot file, since the restore - * method assumes that transaction to be present in the snapshot. + * Things we can only update after the whole txn is applied to data + * tree. * - * To avoid this, we first apply the transaction and then modify - * lastProcessedZxid. During restore, we correctly handle the - * case where the snapshot contains data ahead of the zxid associated - * with the file. + * If we update the lastProcessedZxid with the first sub txn in multi + * and there is a snapshot in progress, it's possible that the zxid + * associated with the snapshot only include partial of the multi op. + * + * When loading snapshot, it will only load the txns after the zxid + * associated with snapshot file, which could cause data inconsistency + * due to missing sub txns. + * + * To avoid this, we only update the lastProcessedZxid when the whole + * multi-op txn is applied to DataTree. */ - if (rc.zxid > lastProcessedZxid) { - lastProcessedZxid = rc.zxid; + if (!isSubTxn) { + /* + * A snapshot might be in progress while we are modifying the data + * tree. If we set lastProcessedZxid prior to making corresponding + * change to the tree, then the zxid associated with the snapshot + * file will be ahead of its contents. Thus, while restoring from + * the snapshot, the restore method will not apply the transaction + * for zxid associated with the snapshot file, since the restore + * method assumes that transaction to be present in the snapshot. + * + * To avoid this, we first apply the transaction and then modify + * lastProcessedZxid. During restore, we correctly handle the + * case where the snapshot contains data ahead of the zxid associated + * with the file. + */ + if (rc.zxid > lastProcessedZxid) { + lastProcessedZxid = rc.zxid; + } } /* http://git-wip-us.apache.org/repos/asf/zookeeper/blob/e501d9cc/src/java/main/org/apache/zookeeper/server/ZKDatabase.java ---------------------------------------------------------------------- diff --git a/src/java/main/org/apache/zookeeper/server/ZKDatabase.java b/src/java/main/org/apache/zookeeper/server/ZKDatabase.java index 7b00715..04145cb 100644 --- a/src/java/main/org/apache/zookeeper/server/ZKDatabase.java +++ b/src/java/main/org/apache/zookeeper/server/ZKDatabase.java @@ -95,7 +95,7 @@ public class ZKDatabase { * @param snapLog the FileTxnSnapLog mapping this zkdatabase */ public ZKDatabase(FileTxnSnapLog snapLog) { - dataTree = new DataTree(); + dataTree = createDataTree(); sessionsWithTimeouts = new ConcurrentHashMap<Long, Integer>(); this.snapLog = snapLog; @@ -138,7 +138,7 @@ public class ZKDatabase { /* to be safe we just create a new * datatree. */ - dataTree = new DataTree(); + dataTree = createDataTree(); sessionsWithTimeouts.clear(); WriteLock lock = logLock.writeLock(); try { @@ -644,4 +644,9 @@ public class ZKDatabase { public boolean removeWatch(String path, WatcherType type, Watcher watcher) { return dataTree.removeWatch(path, type, watcher); } + + // visible for testing + public DataTree createDataTree() { + return new DataTree(); + } } http://git-wip-us.apache.org/repos/asf/zookeeper/blob/e501d9cc/src/java/test/org/apache/zookeeper/server/quorum/FuzzySnapshotRelatedTest.java ---------------------------------------------------------------------- diff --git a/src/java/test/org/apache/zookeeper/server/quorum/FuzzySnapshotRelatedTest.java b/src/java/test/org/apache/zookeeper/server/quorum/FuzzySnapshotRelatedTest.java new file mode 100644 index 0000000..0e3b230 --- /dev/null +++ b/src/java/test/org/apache/zookeeper/server/quorum/FuzzySnapshotRelatedTest.java @@ -0,0 +1,212 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.zookeeper.server.quorum; + +import java.util.Arrays; +import java.util.HashMap; +import java.util.List; +import java.util.Map; +import javax.security.sasl.SaslException; + +import org.apache.zookeeper.CreateMode; +import org.apache.zookeeper.KeeperException.NoNodeException; +import org.apache.zookeeper.KeeperException.NodeExistsException; +import org.apache.zookeeper.Op; +import org.apache.zookeeper.PortAssignment; +import org.apache.zookeeper.ZooDefs.Ids; +import org.apache.zookeeper.ZooKeeper; +import org.apache.zookeeper.ZooKeeper.States; +import org.apache.zookeeper.data.ACL; +import org.apache.zookeeper.data.Stat; +import org.apache.zookeeper.server.DataTree; +import org.apache.zookeeper.server.ZKDatabase; +import org.apache.zookeeper.server.ZooKeeperServer; +import org.apache.zookeeper.test.ClientBase; + +import org.junit.Assert; +import org.junit.After; +import org.junit.Before; +import org.junit.Test; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +/** + * Test cases used to catch corner cases due to fuzzy snapshot. + */ +public class FuzzySnapshotRelatedTest extends QuorumPeerTestBase { + + private static final Logger LOG = LoggerFactory.getLogger(FuzzySnapshotRelatedTest.class); + + MainThread[] mt = null; + ZooKeeper[] zk = null; + int leaderId; + int followerA; + + @Before + public void setup() throws Exception { + LOG.info("Start up a 3 server quorum"); + final int ENSEMBLE_SERVERS = 3; + final int clientPorts[] = new int[ENSEMBLE_SERVERS]; + StringBuilder sb = new StringBuilder(); + String server; + + for (int i = 0; i < ENSEMBLE_SERVERS; i++) { + clientPorts[i] = PortAssignment.unique(); + server = "server." + i + "=127.0.0.1:" + PortAssignment.unique() + + ":" + PortAssignment.unique() + ":participant;127.0.0.1:" + + clientPorts[i]; + sb.append(server + "\n"); + } + String currentQuorumCfgSection = sb.toString(); + + // start servers + mt = new MainThread[ENSEMBLE_SERVERS]; + zk = new ZooKeeper[ENSEMBLE_SERVERS]; + for (int i = 0; i < ENSEMBLE_SERVERS; i++) { + mt[i] = new MainThread(i, clientPorts[i], currentQuorumCfgSection, + false) { + @Override + public TestQPMain getTestQPMain() { + return new CustomizedQPMain(); + } + }; + mt[i].start(); + zk[i] = new ZooKeeper("127.0.0.1:" + clientPorts[i], + ClientBase.CONNECTION_TIMEOUT, this); + } + QuorumPeerMainTest.waitForAll(zk, States.CONNECTED); + LOG.info("all servers started"); + + leaderId = -1; + followerA = -1; + for (int i = 0; i < ENSEMBLE_SERVERS; i++) { + if (mt[i].main.quorumPeer.leader != null) { + leaderId = i; + } else if (followerA == -1) { + followerA = i; + } + } + } + + @After + public void tearDown() throws Exception { + if (mt != null) { + for (MainThread t: mt) { + t.shutdown(); + } + } + + if (zk != null) { + for (ZooKeeper z: zk) { + z.close(); + } + } + } + + @Test + public void testMultiOpConsistency() throws Exception { + LOG.info("Create a parent node"); + final String path = "/testMultiOpConsistency"; + createEmptyNode(zk[followerA], path); + + LOG.info("Hook to catch the 2nd sub create node txn in multi-op"); + CustomDataTree dt = + (CustomDataTree) mt[followerA].main.quorumPeer.getZkDb().getDataTree(); + + final ZooKeeperServer zkServer = mt[followerA].main.quorumPeer.getActiveServer(); + + String node1 = path + "/1"; + String node2 = path + "/2"; + + dt.addNodeCreateListener(node2, new NodeCreateListener() { + @Override + public void process(String path) { + LOG.info("Take a snapshot"); + zkServer.takeSnapshot(true); + } + }); + + LOG.info("Issue a multi op to create 2 nodes"); + zk[followerA].multi(Arrays.asList( + Op.create(node1, node1.getBytes(), + Ids.OPEN_ACL_UNSAFE, CreateMode.PERSISTENT), + Op.create(node2, node2.getBytes(), + Ids.OPEN_ACL_UNSAFE, CreateMode.PERSISTENT)) + ); + + LOG.info("Restart the server"); + mt[followerA].shutdown(); + QuorumPeerMainTest.waitForOne(zk[followerA], States.CONNECTING); + + mt[followerA].start(); + QuorumPeerMainTest.waitForOne(zk[followerA], States.CONNECTED); + + LOG.info("Make sure the node consistent with leader"); + Assert.assertEquals(new String(zk[leaderId].getData(node2, null, null)), + new String(zk[followerA].getData(node2, null, null))); + } + + private void createEmptyNode(ZooKeeper zk, String path) throws Exception { + zk.create(path, new byte[0], Ids.OPEN_ACL_UNSAFE, CreateMode.PERSISTENT); + } + + static interface NodeCreateListener { + public void process(String path); + + } + + static class CustomDataTree extends DataTree { + Map<String, NodeCreateListener> nodeCreateListeners = + new HashMap<String, NodeCreateListener>(); + + @Override + public void createNode(final String path, byte data[], List<ACL> acl, + long ephemeralOwner, int parentCVersion, long zxid, + long time, Stat outputStat) + throws NoNodeException, NodeExistsException { + NodeCreateListener listener = nodeCreateListeners.get(path); + if (listener != null) { + listener.process(path); + } + super.createNode(path, data, acl, ephemeralOwner, parentCVersion, + zxid, time, outputStat); + } + + public void addNodeCreateListener(String path, NodeCreateListener listener) { + nodeCreateListeners.put(path, listener); + } + } + + static class CustomizedQPMain extends TestQPMain { + @Override + protected QuorumPeer getQuorumPeer() throws SaslException { + return new QuorumPeer() { + @Override + public void setZKDatabase(ZKDatabase database) { + super.setZKDatabase(new ZKDatabase(this.getTxnFactory()) { + @Override + public DataTree createDataTree() { + return new CustomDataTree(); + } + }); + } + }; + } + } +} http://git-wip-us.apache.org/repos/asf/zookeeper/blob/e501d9cc/src/java/test/org/apache/zookeeper/server/quorum/QuorumPeerMainTest.java ---------------------------------------------------------------------- diff --git a/src/java/test/org/apache/zookeeper/server/quorum/QuorumPeerMainTest.java b/src/java/test/org/apache/zookeeper/server/quorum/QuorumPeerMainTest.java index 5928ea9..d48ea04 100644 --- a/src/java/test/org/apache/zookeeper/server/quorum/QuorumPeerMainTest.java +++ b/src/java/test/org/apache/zookeeper/server/quorum/QuorumPeerMainTest.java @@ -452,7 +452,7 @@ public class QuorumPeerMainTest extends QuorumPeerTestBase { Assert.assertTrue("falseLeader never rejoins the quorum", foundFollowing); } - private void waitForOne(ZooKeeper zk, States state) throws InterruptedException { + public static void waitForOne(ZooKeeper zk, States state) throws InterruptedException { int iterations = ClientBase.CONNECTION_TIMEOUT / 500; while (zk.getState() != state) { if (iterations-- == 0) { @@ -466,7 +466,7 @@ public class QuorumPeerMainTest extends QuorumPeerTestBase { waitForAll(servers.zk, state); } - private void waitForAll(ZooKeeper[] zks, States state) throws InterruptedException { + public static void waitForAll(ZooKeeper[] zks, States state) throws InterruptedException { int iterations = ClientBase.CONNECTION_TIMEOUT / 1000; boolean someoneNotConnected = true; while (someoneNotConnected) { @@ -487,7 +487,7 @@ public class QuorumPeerMainTest extends QuorumPeerTestBase { } } - private void logStates(ZooKeeper[] zks) { + public static void logStates(ZooKeeper[] zks) { StringBuilder sbBuilder = new StringBuilder("Connection States: {"); for (int i = 0; i < zks.length; i++) { sbBuilder.append(i + " : " + zks[i].getState() + ", ");