Repository: zookeeper Updated Branches: refs/heads/branch-3.5 f07a836e3 -> c164ee454
ZOOKEEPER-2678: Discovery and Sync can take a very long time on large DBs (master) This is the master version of #157 Author: Robert (Bobby) Evans <[email protected]> Reviewers: Flavio Junqueira <[email protected]>, Edward Ribeiro <[email protected]>, Abraham Fine <[email protected]>, Michael Han <[email protected]> Closes #159 from revans2/ZOOKEEPER-2678-master and squashes the following commits: 69fbe19 [Robert (Bobby) Evans] ZOOKEEPER-2678: Addressed review comments a432642 [Robert (Bobby) Evans] ZOOKEEPER-2678: Improved test to verify snapshot times 742367e [Robert (Bobby) Evans] Addressed review comments f4c5b0e [Robert (Bobby) Evans] ZOOKEEPER-2678: Discovery and Sync can take a very long time on large DBs Project: http://git-wip-us.apache.org/repos/asf/zookeeper/repo Commit: http://git-wip-us.apache.org/repos/asf/zookeeper/commit/c164ee45 Tree: http://git-wip-us.apache.org/repos/asf/zookeeper/tree/c164ee45 Diff: http://git-wip-us.apache.org/repos/asf/zookeeper/diff/c164ee45 Branch: refs/heads/branch-3.5 Commit: c164ee454cd70b3e65212ea1367ec86bb16ec022 Parents: f07a836 Author: Robert (Bobby) Evans <[email protected]> Authored: Sat Feb 11 15:12:00 2017 -0800 Committer: Michael Han <[email protected]> Committed: Wed Feb 15 16:06:40 2017 -0800 ---------------------------------------------------------------------- .../zookeeper/server/ZooKeeperServer.java | 18 ++++++++-- .../zookeeper/server/ZooKeeperServerMain.java | 2 +- .../apache/zookeeper/server/quorum/Learner.java | 37 +++++++++++++------- .../zookeeper/server/quorum/Zab1_0Test.java | 29 ++++++++++++++- 4 files changed, 69 insertions(+), 17 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/zookeeper/blob/c164ee45/src/java/main/org/apache/zookeeper/server/ZooKeeperServer.java ---------------------------------------------------------------------- diff --git a/src/java/main/org/apache/zookeeper/server/ZooKeeperServer.java b/src/java/main/org/apache/zookeeper/server/ZooKeeperServer.java index 06848d2..91b810c 100644 --- a/src/java/main/org/apache/zookeeper/server/ZooKeeperServer.java +++ b/src/java/main/org/apache/zookeeper/server/ZooKeeperServer.java @@ -525,7 +525,15 @@ public class ZooKeeperServer implements SessionExpirer, ServerStats.Provider { return state == State.RUNNING; } - public synchronized void shutdown() { + public void shutdown() { + shutdown(false); + } + + /** + * Shut down the server instance + * @param fullyShutDown true if another server using the same database will not replace this one in the same process + */ + public synchronized void shutdown(boolean fullyShutDown) { if (!canShutdown()) { LOG.debug("ZooKeeper server is not running, so not proceeding to shutdown!"); return; @@ -543,9 +551,15 @@ public class ZooKeeperServer implements SessionExpirer, ServerStats.Provider { if (firstProcessor != null) { firstProcessor.shutdown(); } - if (zkDb != null) { + + if (fullyShutDown && zkDb != null) { zkDb.clear(); } + // else there is no need to clear the database + // * When a new quorum is established we can still apply the diff + // on top of the same zkDb data + // * If we fetch a new snapshot from leader, the zkDb will be + // cleared anyway before loading the snapshot unregisterJMX(); } http://git-wip-us.apache.org/repos/asf/zookeeper/blob/c164ee45/src/java/main/org/apache/zookeeper/server/ZooKeeperServerMain.java ---------------------------------------------------------------------- diff --git a/src/java/main/org/apache/zookeeper/server/ZooKeeperServerMain.java b/src/java/main/org/apache/zookeeper/server/ZooKeeperServerMain.java index 5bfeed3..372c78a 100644 --- a/src/java/main/org/apache/zookeeper/server/ZooKeeperServerMain.java +++ b/src/java/main/org/apache/zookeeper/server/ZooKeeperServerMain.java @@ -167,7 +167,7 @@ public class ZooKeeperServerMain { secureCnxnFactory.join(); } if (zkServer.canShutdown()) { - zkServer.shutdown(); + zkServer.shutdown(true); } } catch (InterruptedException e) { // warn, but generally this is ok http://git-wip-us.apache.org/repos/asf/zookeeper/blob/c164ee45/src/java/main/org/apache/zookeeper/server/quorum/Learner.java ---------------------------------------------------------------------- diff --git a/src/java/main/org/apache/zookeeper/server/quorum/Learner.java b/src/java/main/org/apache/zookeeper/server/quorum/Learner.java index 9803197..f048da8 100644 --- a/src/java/main/org/apache/zookeeper/server/quorum/Learner.java +++ b/src/java/main/org/apache/zookeeper/server/quorum/Learner.java @@ -358,12 +358,16 @@ public class Learner { QuorumVerifier newLeaderQV = null; - readPacket(qp); + // In the DIFF case we don't need to do a snapshot because the transactions will sync on top of any existing snapshot + // For SNAP and TRUNC the snapshot is needed to save that history + boolean snapshotNeeded = true; + readPacket(qp); LinkedList<Long> packetsCommitted = new LinkedList<Long>(); LinkedList<PacketInFlight> packetsNotCommitted = new LinkedList<PacketInFlight>(); synchronized (zk) { if (qp.getType() == Leader.DIFF) { - LOG.info("Getting a diff from the leader 0x" + Long.toHexString(qp.getZxid())); + LOG.info("Getting a diff from the leader 0x{}", Long.toHexString(qp.getZxid())); + snapshotNeeded = false; } else if (qp.getType() == Leader.SNAP) { LOG.info("Getting a snapshot from leader"); @@ -400,10 +404,13 @@ public class Learner { long lastQueued = 0; - // in V1.0 we take a snapshot when we get the NEWLEADER message, but in pre V1.0 - // we take the snapshot at the UPDATE, since V1.0 also gets the UPDATE (after the NEWLEADER) + // in Zab V1.0 (ZK 3.4+) we might take a snapshot when we get the NEWLEADER message, but in pre V1.0 + // we take the snapshot on the UPDATE message, since Zab V1.0 also gets the UPDATE (after the NEWLEADER) // we need to make sure that we don't take the snapshot twice. - boolean snapshotTaken = false; + boolean isPreZAB1_0 = true; + //If we are not going to take the snapshot be sure the transactions are not applied in memory + // but written out to the transaction log + boolean writeToTxnLog = !snapshotNeeded; // we are now going to start getting transactions to apply followed by an UPTODATE outerLoop: while (self.isRunning()) { @@ -440,7 +447,7 @@ public class Learner { throw new Exception("changes proposed in reconfig"); } } - if (!snapshotTaken) { + if (!writeToTxnLog) { if (pif.hdr.getZxid() != qp.getZxid()) { LOG.warn("Committing " + qp.getZxid() + ", but next proposal is " + pif.hdr.getZxid()); } else { @@ -479,8 +486,7 @@ public class Learner { } lastQueued = packet.hdr.getZxid(); } - - if (!snapshotTaken) { + if (!writeToTxnLog) { // Apply to db directly if we haven't taken the snapshot zk.processTxn(packet.hdr, packet.rec); } else { @@ -498,14 +504,15 @@ public class Learner { throw new Exception("changes proposed in reconfig"); } } - if (!snapshotTaken) { // true for the pre v1.0 case - zk.takeSnapshot(); + if (isPreZAB1_0) { + zk.takeSnapshot(); self.setCurrentEpoch(newEpoch); } self.setZooKeeperServer(zk); self.adminServer.setZooKeeperServer(zk); break outerLoop; - case Leader.NEWLEADER: // it will be NEWLEADER in v1.0 + case Leader.NEWLEADER: // Getting NEWLEADER here instead of in discovery + // means this is Zab 1.0 LOG.info("Learner received NEWLEADER message"); if (qp.getData()!=null && qp.getData().length > 1) { try { @@ -516,10 +523,14 @@ public class Learner { e.printStackTrace(); } } + + if (snapshotNeeded) { + zk.takeSnapshot(); + } - zk.takeSnapshot(); self.setCurrentEpoch(newEpoch); - snapshotTaken = true; + writeToTxnLog = true; //Anything after this needs to go to the transaction log, not applied directly in memory + isPreZAB1_0 = false; writePacket(new QuorumPacket(Leader.ACK, newLeaderZxid, null, null), true); break; } http://git-wip-us.apache.org/repos/asf/zookeeper/blob/c164ee45/src/java/test/org/apache/zookeeper/server/quorum/Zab1_0Test.java ---------------------------------------------------------------------- diff --git a/src/java/test/org/apache/zookeeper/server/quorum/Zab1_0Test.java b/src/java/test/org/apache/zookeeper/server/quorum/Zab1_0Test.java index 37f6cc2..778ea1e 100644 --- a/src/java/test/org/apache/zookeeper/server/quorum/Zab1_0Test.java +++ b/src/java/test/org/apache/zookeeper/server/quorum/Zab1_0Test.java @@ -19,6 +19,9 @@ package org.apache.zookeeper.server.quorum; import static org.junit.Assert.assertEquals; +import static org.mockito.Mockito.never; +import static org.mockito.Mockito.spy; +import static org.mockito.Mockito.verify; import java.io.BufferedReader; import java.io.ByteArrayOutputStream; @@ -651,6 +654,8 @@ public class Zab1_0Test extends ZKTestCase { tmpDir.mkdir(); File logDir = f.fzk.getTxnLogFactory().getDataDir().getParentFile(); File snapDir = f.fzk.getTxnLogFactory().getSnapDir().getParentFile(); + //Spy on ZK so we can check if a snapshot happened or not. + f.zk = spy(f.zk); try { Assert.assertEquals(0, f.self.getAcceptedEpoch()); Assert.assertEquals(0, f.self.getCurrentEpoch()); @@ -693,6 +698,10 @@ public class Zab1_0Test extends ZKTestCase { oa.writeRecord(qp, null); zkDb.serializeSnapshot(oa); oa.writeString("BenWasHere", null); + Thread.sleep(10); //Give it some time to process the snap + //No Snapshot taken yet, the SNAP was applied in memory + verify(f.zk, never()).takeSnapshot(); + qp.setType(Leader.NEWLEADER); qp.setZxid(ZxidUtils.makeZxid(1, 0)); oa.writeRecord(qp, null); @@ -703,7 +712,8 @@ public class Zab1_0Test extends ZKTestCase { Assert.assertEquals(ZxidUtils.makeZxid(1, 0), qp.getZxid()); Assert.assertEquals(1, f.self.getAcceptedEpoch()); Assert.assertEquals(1, f.self.getCurrentEpoch()); - + //Make sure that we did take the snapshot now + verify(f.zk).takeSnapshot(); Assert.assertEquals(firstZxid, f.fzk.getLastProcessedZxid()); // Make sure the data was recorded in the filesystem ok @@ -779,6 +789,8 @@ public class Zab1_0Test extends ZKTestCase { tmpDir.mkdir(); File logDir = f.fzk.getTxnLogFactory().getDataDir().getParentFile(); File snapDir = f.fzk.getTxnLogFactory().getSnapDir().getParentFile(); + //Spy on ZK so we can check if a snapshot happened or not. + f.zk = spy(f.zk); try { Assert.assertEquals(0, f.self.getAcceptedEpoch()); Assert.assertEquals(0, f.self.getCurrentEpoch()); @@ -846,13 +858,28 @@ public class Zab1_0Test extends ZKTestCase { Assert.assertEquals(1, f.self.getAcceptedEpoch()); Assert.assertEquals(1, f.self.getCurrentEpoch()); + //Wait for the transactions to be written out. The thread that writes them out + // does not send anything back when it is done. + long start = System.currentTimeMillis(); + while (createSessionZxid != f.fzk.getLastProcessedZxid() && (System.currentTimeMillis() - start) < 50) { + Thread.sleep(1); + } + Assert.assertEquals(createSessionZxid, f.fzk.getLastProcessedZxid()); // Make sure the data was recorded in the filesystem ok ZKDatabase zkDb2 = new ZKDatabase(new FileTxnSnapLog(logDir, snapDir)); + start = System.currentTimeMillis(); zkDb2.loadDataBase(); + while (zkDb2.getSessionWithTimeOuts().isEmpty() && (System.currentTimeMillis() - start) < 50) { + Thread.sleep(1); + zkDb2.loadDataBase(); + } LOG.info("zkdb2 sessions:" + zkDb2.getSessions()); + LOG.info("zkdb2 with timeouts:" + zkDb2.getSessionWithTimeOuts()); Assert.assertNotNull(zkDb2.getSessionWithTimeOuts().get(4L)); + //Snapshot was never taken during very simple sync + verify(f.zk, never()).takeSnapshot(); } finally { TestUtils.deleteFileRecursively(tmpDir); }
