This is an automated email from the ASF dual-hosted git repository.
sijie pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/bookkeeper.git
The following commit(s) were added to refs/heads/master by this push:
new 91dcbce AuditorPeriodicCheckTest: add test for entries with failed
bookie writes
91dcbce is described below
commit 91dcbce5f0d686f013efab3b76b42962a4546f86
Author: Samuel Just <[email protected]>
AuthorDate: Wed Jan 24 10:43:11 2018 -0800
AuditorPeriodicCheckTest: add test for entries with failed bookie writes
Validate that the auditor's periodic check will detect entries
with failed bookie writes and repair them.
Signed-off-by: Rithin <rithin.shettysalesforce.com>
(bug W-3311952)(bug W-3311965)
Signed-off-by: Samuel Just <sjustsalesforce.com>
Author: Samuel Just <[email protected]>
Reviewers: Enrico Olivelli <[email protected]>, Sijie Guo
<[email protected]>
This closes #1044 from athanatos/forupstream/tests/failedwrites
---
.../replication/AuditorPeriodicCheckTest.java | 116 +++++++++++++++++++++
1 file changed, 116 insertions(+)
diff --git
a/bookkeeper-server/src/test/java/org/apache/bookkeeper/replication/AuditorPeriodicCheckTest.java
b/bookkeeper-server/src/test/java/org/apache/bookkeeper/replication/AuditorPeriodicCheckTest.java
index 6baa5d9..a3b0b0d 100644
---
a/bookkeeper-server/src/test/java/org/apache/bookkeeper/replication/AuditorPeriodicCheckTest.java
+++
b/bookkeeper-server/src/test/java/org/apache/bookkeeper/replication/AuditorPeriodicCheckTest.java
@@ -22,6 +22,7 @@ package org.apache.bookkeeper.replication;
import static org.junit.Assert.assertEquals;
import static org.junit.Assert.assertFalse;
+import static org.junit.Assert.assertNotEquals;
import static org.junit.Assert.assertTrue;
import io.netty.buffer.ByteBuf;
@@ -34,20 +35,25 @@ import java.util.ArrayList;
import java.util.HashMap;
import java.util.LinkedList;
import java.util.List;
+import java.util.Map;
import java.util.concurrent.CountDownLatch;
import java.util.concurrent.atomic.AtomicBoolean;
import java.util.concurrent.atomic.AtomicInteger;
import org.apache.bookkeeper.bookie.Bookie;
import org.apache.bookkeeper.bookie.BookieAccessor;
+import org.apache.bookkeeper.bookie.BookieException;
import org.apache.bookkeeper.bookie.IndexPersistenceMgr;
import org.apache.bookkeeper.client.AsyncCallback.AddCallback;
import org.apache.bookkeeper.client.BKException;
import org.apache.bookkeeper.client.BookKeeper.DigestType;
import org.apache.bookkeeper.client.LedgerHandle;
+import org.apache.bookkeeper.client.LedgerHandleAdapter;
import org.apache.bookkeeper.conf.ServerConfiguration;
import org.apache.bookkeeper.discover.RegistrationManager;
import org.apache.bookkeeper.meta.LedgerManagerFactory;
import org.apache.bookkeeper.meta.LedgerUnderreplicationManager;
+import org.apache.bookkeeper.net.BookieSocketAddress;
+import org.apache.bookkeeper.proto.BookkeeperInternalCallbacks.WriteCallback;
import org.apache.bookkeeper.stats.NullStatsLogger;
import org.apache.bookkeeper.test.BookKeeperClusterTestCase;
import org.apache.bookkeeper.zookeeper.ZooKeeperClient;
@@ -348,4 +354,114 @@ public class AuditorPeriodicCheckTest extends
BookKeeperClusterTestCase {
t.join();
assertFalse("Shouldn't have thrown exception", exceptionCaught.get());
}
+
+ private BookieSocketAddress
replaceBookieWithWriteFailingBookie(LedgerHandle lh) throws Exception {
+ int bookieIdx = -1;
+ Long entryId =
LedgerHandleAdapter.getLedgerMetadata(lh).getEnsembles().firstKey();
+ ArrayList<BookieSocketAddress> curEnsemble = LedgerHandleAdapter
+ .getLedgerMetadata(lh).getEnsembles().get(entryId);
+
+ // Identify a bookie in the current ledger ensemble to be replaced
+ BookieSocketAddress replacedBookie = null;
+ for (int i = 0; i < numBookies; i++) {
+ if (curEnsemble.contains(bs.get(i).getLocalAddress())) {
+ bookieIdx = i;
+ replacedBookie = bs.get(i).getLocalAddress();
+ break;
+ }
+ }
+ assertNotEquals("Couldn't find ensemble bookie in bookie list", -1,
bookieIdx);
+
+ LOG.info("Killing bookie " + bs.get(bookieIdx).getLocalAddress());
+ ServerConfiguration conf = killBookie(bookieIdx);
+ Bookie writeFailingBookie = new Bookie(conf) {
+ @Override
+ public void addEntry(ByteBuf entry, WriteCallback cb,
+ Object ctx, byte[] masterKey)
+ throws IOException, BookieException {
+ try {
+ LOG.info("Failing write to entry ");
+ // sleep a bit so that writes to other bookies succeed
before
+ // the client hears about the failure on this bookie. If
the
+ // client gets ack-quorum number of acks first, it won't
care
+ // about any failures and won't reform the ensemble.
+ Thread.sleep(100);
+ throw new IOException();
+ } catch (InterruptedException ie) {
+ // ignore, only interrupted if shutting down,
+ // and an exception would spam the logs
+ Thread.currentThread().interrupt();
+ }
+ }
+ };
+ bsConfs.add(conf);
+ bs.add(startBookie(conf, writeFailingBookie));
+ return replacedBookie;
+ }
+
+ /*
+ * Validates that the periodic ledger check will fix entries with a failed
write.
+ */
+ @Test
+ public void testFailedWriteRecovery() throws Exception {
+ LedgerManagerFactory mFactory =
LedgerManagerFactory.newLedgerManagerFactory(
+ bsConfs.get(0),
+
RegistrationManager.instantiateRegistrationManager(bsConfs.get(0)).getLayoutManager());
+ LedgerUnderreplicationManager underReplicationManager =
mFactory.newLedgerUnderreplicationManager();
+ underReplicationManager.disableLedgerReplication();
+
+ LedgerHandle lh = bkc.createLedger(2, 2, 1, DigestType.CRC32,
"passwd".getBytes());
+
+ // kill one of the bookies and replace it with one that rejects write;
+ // This way we get into the under replication state
+ BookieSocketAddress replacedBookie =
replaceBookieWithWriteFailingBookie(lh);
+
+ // Write a few entries; this should cause under replication
+ byte[] data = "foobar".getBytes();
+ data = "foobar".getBytes();
+ lh.addEntry(data);
+ lh.addEntry(data);
+ lh.addEntry(data);
+
+ lh.close();
+
+ // enable under replication detection and wait for it to report
+ // under replicated ledger
+ underReplicationManager.enableLedgerReplication();
+ long underReplicatedLedger = -1;
+ for (int i = 0; i < 5; i++) {
+ underReplicatedLedger =
underReplicationManager.pollLedgerToRereplicate();
+ if (underReplicatedLedger != -1) {
+ break;
+ }
+ Thread.sleep(CHECK_INTERVAL * 1000);
+ }
+ assertEquals("Ledger should be under replicated", lh.getId(),
underReplicatedLedger);
+
+ // now start the replication workers
+ List<ReplicationWorker> l = new ArrayList<ReplicationWorker>();
+ for (int i = 0; i < numBookies; i++) {
+ ReplicationWorker rw = new ReplicationWorker(
+ zkc, bsConfs.get(i), NullStatsLogger.INSTANCE);
+ rw.start();
+ l.add(rw);
+ }
+ underReplicationManager.close();
+
+ // Wait for ensemble to change after replication
+ Thread.sleep(3000);
+ for (ReplicationWorker rw : l) {
+ rw.shutdown();
+ }
+
+ // check that ensemble has changed and the bookie that rejected writes
has
+ // been replaced in the ensemble
+ LedgerHandle newLh = bkc.openLedger(lh.getId(), DigestType.CRC32,
"passwd".getBytes());
+ for (Map.Entry<Long, ArrayList<BookieSocketAddress>> e :
LedgerHandleAdapter.getLedgerMetadata(newLh).
+ getEnsembles().entrySet()) {
+ ArrayList<BookieSocketAddress> ensemble = e.getValue();
+ assertFalse("Ensemble hasn't been updated",
ensemble.contains(replacedBookie));
+ }
+ newLh.close();
+ }
}
--
To stop receiving notification emails like this one, please contact
[email protected].