This is an automated email from the ASF dual-hosted git repository. symat pushed a commit to branch branch-3.5 in repository https://gitbox.apache.org/repos/asf/zookeeper.git
The following commit(s) were added to refs/heads/branch-3.5 by this push: new e9dbd63 ZOOKEEPER-2307: ZooKeeper not starting because acceptedEpoch is less than the currentEpoch e9dbd63 is described below commit e9dbd6357e7c93fa5e31eb37062f4d17da179597 Author: Mohammad Arshad <ars...@apache.org> AuthorDate: Tue Apr 28 14:33:32 2020 +0000 ZOOKEEPER-2307: ZooKeeper not starting because acceptedEpoch is less than the currentEpoch Update acceptedEpoch and currentEpoch in file first then in memory. cherry-picked from the original commit (19d8567) and fixed to be compatible with branch-3.5 Author: Mohammad Arshad <arshadapache.org> Reviewers: andorapache.org Author: Mate Szalay-Beko <sy...@apache.org> Reviewers: Andor Molnar <an...@apache.org>, Mate Szalay-Beko <sy...@apache.org> Closes #1339 from symat/ZOOKEEPER-3810 --- .../apache/zookeeper/server/quorum/QuorumPeer.java | 7 +- .../server/quorum/EpochWriteFailureTest.java | 151 +++++++++++++++++++++ .../apache/zookeeper/server/quorum/Zab1_0Test.java | 24 +++- 3 files changed, 177 insertions(+), 5 deletions(-) diff --git a/zookeeper-server/src/main/java/org/apache/zookeeper/server/quorum/QuorumPeer.java b/zookeeper-server/src/main/java/org/apache/zookeeper/server/quorum/QuorumPeer.java index 40ea311..4a1fd34 100644 --- a/zookeeper-server/src/main/java/org/apache/zookeeper/server/quorum/QuorumPeer.java +++ b/zookeeper-server/src/main/java/org/apache/zookeeper/server/quorum/QuorumPeer.java @@ -1876,7 +1876,8 @@ public class QuorumPeer extends ZooKeeperThread implements QuorumStats.Provider * @param value the long value to write to the named file * @throws IOException if the file cannot be written atomically */ - private void writeLongToFile(String name, final long value) throws IOException { + // visibleForTest + void writeLongToFile(String name, final long value) throws IOException { File file = new File(logFactory.getSnapDir(), name); new AtomicFileWritingIdiom(file, new WriterStatement() { @Override @@ -1901,14 +1902,14 @@ public class QuorumPeer extends ZooKeeperThread implements QuorumStats.Provider } public void setCurrentEpoch(long e) throws IOException { - currentEpoch = e; writeLongToFile(CURRENT_EPOCH_FILENAME, e); + currentEpoch = e; } public void setAcceptedEpoch(long e) throws IOException { - acceptedEpoch = e; writeLongToFile(ACCEPTED_EPOCH_FILENAME, e); + acceptedEpoch = e; } public boolean processReconfig(QuorumVerifier qv, Long suggestedLeaderId, Long zxid, boolean restartLE) { diff --git a/zookeeper-server/src/test/java/org/apache/zookeeper/server/quorum/EpochWriteFailureTest.java b/zookeeper-server/src/test/java/org/apache/zookeeper/server/quorum/EpochWriteFailureTest.java new file mode 100644 index 0000000..7b111dc --- /dev/null +++ b/zookeeper-server/src/test/java/org/apache/zookeeper/server/quorum/EpochWriteFailureTest.java @@ -0,0 +1,151 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.zookeeper.server.quorum; + +import static org.apache.zookeeper.test.ClientBase.CONNECTION_TIMEOUT; +import java.io.File; +import java.io.IOException; +import java.util.Map; +import org.apache.zookeeper.CreateMode; +import org.apache.zookeeper.PortAssignment; +import org.apache.zookeeper.ZooDefs.Ids; +import org.apache.zookeeper.ZooKeeper; +import org.apache.zookeeper.test.ClientBase; +import org.apache.zookeeper.test.ClientBase.CountdownWatcher; +import org.junit.AfterClass; +import org.junit.Assert; +import org.junit.Test; + +public class EpochWriteFailureTest extends QuorumPeerTestBase { + private static int SERVER_COUNT = 3; + private static int[] clientPorts = new int[SERVER_COUNT]; + private static MainThread[] mt = new MainThread[SERVER_COUNT]; + private static ZooKeeper zk; + + /* + * Test case for https://issues.apache.org/jira/browse/ZOOKEEPER-2307 + * Expectation: During leader election when accepted epoch write to file + * fails, it should not complete leader election, also it should not update + * run time values of acceptedEpoch, + */ + @Test(timeout = 120000) + public void testAcceptedEpochWriteFailure() throws Exception { + StringBuilder sb = new StringBuilder(); + sb.append("admin.enableServer=false"); + sb.append("\n"); + String server; + for (int i = 0; i < SERVER_COUNT; i++) { + clientPorts[i] = PortAssignment.unique(); + server = "server." + i + "=127.0.0.1:" + PortAssignment.unique() + ":" + + PortAssignment.unique() + ":participant;127.0.0.1:" + clientPorts[i]; + sb.append(server); + sb.append("\n"); + } + String currentQuorumCfgSection = sb.toString(); + for (int i = 0; i < SERVER_COUNT - 1; i++) { + mt[i] = new MainThread(i, clientPorts[i], currentQuorumCfgSection, false); + mt[i].start(); + } + + // ensure two servers started + for (int i = 0; i < SERVER_COUNT - 1; i++) { + Assert.assertTrue("waiting for server " + i + " being up", + ClientBase.waitForServerUp("127.0.0.1:" + clientPorts[i], CONNECTION_TIMEOUT)); + } + + CountdownWatcher watch1 = new CountdownWatcher(); + zk = new ZooKeeper("127.0.0.1:" + clientPorts[0], ClientBase.CONNECTION_TIMEOUT, + watch1); + watch1.waitForConnected(ClientBase.CONNECTION_TIMEOUT); + + String data = "originalData"; + zk.create("/epochIssue", data.getBytes(), Ids.OPEN_ACL_UNSAFE, CreateMode.PERSISTENT); + + //initialize third server + mt[2] = new MainThread(2, clientPorts[2], currentQuorumCfgSection, false) { + + @Override + public TestQPMain getTestQPMain() { + return new MockTestQPMain(); + } + }; + + //This server has problem it fails while writing acceptedEpoch. + mt[2].start(); + + /* + * Verify that problematic server does not start as acceptedEpoch update + * failure is injected and it keeps on trying to join the quorum + */ + + Assert.assertFalse("verify server 2 not started", + ClientBase.waitForServerUp("127.0.0.1:" + clientPorts[2], CONNECTION_TIMEOUT / 2)); + + QuorumPeer quorumPeer = mt[2].getQuorumPeer(); + + Assert.assertEquals("acceptedEpoch must not have changed", 0, + quorumPeer.getAcceptedEpoch()); + Assert.assertEquals("currentEpoch must not have changed", 0, + quorumPeer.getCurrentEpoch()); + } + + static class CustomQuorumPeer extends QuorumPeer { + CustomQuorumPeer(Map<Long, QuorumServer> quorumPeers, File snapDir, File logDir, int clientPort, + int electionAlg, long myid, int tickTime, int initLimit, int syncLimit) throws IOException { + super(quorumPeers, snapDir, logDir, clientPort, electionAlg, myid, tickTime, initLimit, syncLimit); + } + + @Override + protected void writeLongToFile(String name, long value) throws IOException { + // initial epoch writing should be successful + if (0 != value) { + throw new IOException("Input/output error"); + } + } + } + + private static class MockTestQPMain extends TestQPMain { + @Override + public void runFromConfig(QuorumPeerConfig config) + throws IOException { + quorumPeer = new CustomQuorumPeer(config.getQuorumVerifier().getAllMembers(), + config.getDataDir(), config.getDataLogDir(), + config.getClientPortAddress().getPort(), config.getElectionAlg(), + config.getServerId(), config.getTickTime(), config.getInitLimit(), + config.getSyncLimit()); + quorumPeer.start(); + try { + quorumPeer.join(); + } catch (InterruptedException e) { + LOG.warn("Quorum Peer interrupted", e); + } + } + } + + @AfterClass + public static void tearDownAfterClass() throws InterruptedException { + for (int i = 0; i < SERVER_COUNT; i++) { + if (mt[i] != null) { + mt[i].shutdown(); + } + } + if (zk != null) { + zk.close(); + } + } +} diff --git a/zookeeper-server/src/test/java/org/apache/zookeeper/server/quorum/Zab1_0Test.java b/zookeeper-server/src/test/java/org/apache/zookeeper/server/quorum/Zab1_0Test.java index 922257b..754e40b 100644 --- a/zookeeper-server/src/test/java/org/apache/zookeeper/server/quorum/Zab1_0Test.java +++ b/zookeeper-server/src/test/java/org/apache/zookeeper/server/quorum/Zab1_0Test.java @@ -60,6 +60,7 @@ import org.apache.zookeeper.server.persistence.FileTxnSnapLog; import org.apache.zookeeper.server.quorum.QuorumPeer.QuorumServer; import org.apache.zookeeper.server.util.ZxidUtils; +import org.apache.zookeeper.test.ClientBase; import org.apache.zookeeper.test.TestUtils; import org.apache.zookeeper.txn.CreateSessionTxn; import org.apache.zookeeper.txn.CreateTxn; @@ -870,7 +871,7 @@ public class Zab1_0Test extends ZKTestCase { Assert.assertEquals(Leader.NEWLEADER, qp.getType()); Assert.assertEquals(ZxidUtils.makeZxid(1, 0), qp.getZxid()); Assert.assertEquals(1, l.self.getAcceptedEpoch()); - Assert.assertEquals(1, l.self.getCurrentEpoch()); + assertCurrentEpochGotUpdated(1, l.self, ClientBase.CONNECTION_TIMEOUT); qp = new QuorumPacket(Leader.ACK, qp.getZxid(), null, null); oa.writeRecord(qp, null); @@ -915,7 +916,7 @@ public class Zab1_0Test extends ZKTestCase { Assert.assertEquals(Leader.NEWLEADER, qp.getType()); Assert.assertEquals(ZxidUtils.makeZxid(1, 0), qp.getZxid()); Assert.assertEquals(1, l.self.getAcceptedEpoch()); - Assert.assertEquals(1, l.self.getCurrentEpoch()); + assertCurrentEpochGotUpdated(1, l.self, ClientBase.CONNECTION_TIMEOUT); qp = new QuorumPacket(Leader.ACK, qp.getZxid(), null, null); oa.writeRecord(qp, null); @@ -1268,4 +1269,23 @@ public class Zab1_0Test extends ZKTestCase { TestUtils.deleteFileRecursively(tmpDir); } } + + /* + * Epoch is first written to file then updated in memory. Give some time to + * write the epoch in file and then go for assert. + */ + private void assertCurrentEpochGotUpdated(int expected, QuorumPeer self, long timeout) + throws IOException { + long elapsedTime = 0; + long waitInterval = 10; + while (self.getCurrentEpoch() != expected && elapsedTime < timeout) { + try { + Thread.sleep(waitInterval); + } catch (InterruptedException e) { + Assert.fail("CurrentEpoch update failed"); + } + elapsedTime = elapsedTime + waitInterval; + } + Assert.assertEquals("CurrentEpoch update failed", expected, self.getCurrentEpoch()); + } }