This is an automated email from the ASF dual-hosted git repository.

sijie pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/bookkeeper.git


The following commit(s) were added to refs/heads/master by this push:
     new 91dcbce  AuditorPeriodicCheckTest: add test for entries with failed 
bookie writes
91dcbce is described below

commit 91dcbce5f0d686f013efab3b76b42962a4546f86
Author: Samuel Just <[email protected]>
AuthorDate: Wed Jan 24 10:43:11 2018 -0800

    AuditorPeriodicCheckTest: add test for entries with failed bookie writes
    
    Validate that the auditor's periodic check will detect entries
    with failed bookie writes and repair them.
    
    Signed-off-by: Rithin <rithin.shettysalesforce.com>
    (bug W-3311952)(bug W-3311965)
    Signed-off-by: Samuel Just <sjustsalesforce.com>
    
    Author: Samuel Just <[email protected]>
    
    Reviewers: Enrico Olivelli <[email protected]>, Sijie Guo 
<[email protected]>
    
    This closes #1044 from athanatos/forupstream/tests/failedwrites
---
 .../replication/AuditorPeriodicCheckTest.java      | 116 +++++++++++++++++++++
 1 file changed, 116 insertions(+)

diff --git 
a/bookkeeper-server/src/test/java/org/apache/bookkeeper/replication/AuditorPeriodicCheckTest.java
 
b/bookkeeper-server/src/test/java/org/apache/bookkeeper/replication/AuditorPeriodicCheckTest.java
index 6baa5d9..a3b0b0d 100644
--- 
a/bookkeeper-server/src/test/java/org/apache/bookkeeper/replication/AuditorPeriodicCheckTest.java
+++ 
b/bookkeeper-server/src/test/java/org/apache/bookkeeper/replication/AuditorPeriodicCheckTest.java
@@ -22,6 +22,7 @@ package org.apache.bookkeeper.replication;
 
 import static org.junit.Assert.assertEquals;
 import static org.junit.Assert.assertFalse;
+import static org.junit.Assert.assertNotEquals;
 import static org.junit.Assert.assertTrue;
 
 import io.netty.buffer.ByteBuf;
@@ -34,20 +35,25 @@ import java.util.ArrayList;
 import java.util.HashMap;
 import java.util.LinkedList;
 import java.util.List;
+import java.util.Map;
 import java.util.concurrent.CountDownLatch;
 import java.util.concurrent.atomic.AtomicBoolean;
 import java.util.concurrent.atomic.AtomicInteger;
 import org.apache.bookkeeper.bookie.Bookie;
 import org.apache.bookkeeper.bookie.BookieAccessor;
+import org.apache.bookkeeper.bookie.BookieException;
 import org.apache.bookkeeper.bookie.IndexPersistenceMgr;
 import org.apache.bookkeeper.client.AsyncCallback.AddCallback;
 import org.apache.bookkeeper.client.BKException;
 import org.apache.bookkeeper.client.BookKeeper.DigestType;
 import org.apache.bookkeeper.client.LedgerHandle;
+import org.apache.bookkeeper.client.LedgerHandleAdapter;
 import org.apache.bookkeeper.conf.ServerConfiguration;
 import org.apache.bookkeeper.discover.RegistrationManager;
 import org.apache.bookkeeper.meta.LedgerManagerFactory;
 import org.apache.bookkeeper.meta.LedgerUnderreplicationManager;
+import org.apache.bookkeeper.net.BookieSocketAddress;
+import org.apache.bookkeeper.proto.BookkeeperInternalCallbacks.WriteCallback;
 import org.apache.bookkeeper.stats.NullStatsLogger;
 import org.apache.bookkeeper.test.BookKeeperClusterTestCase;
 import org.apache.bookkeeper.zookeeper.ZooKeeperClient;
@@ -348,4 +354,114 @@ public class AuditorPeriodicCheckTest extends 
BookKeeperClusterTestCase {
         t.join();
         assertFalse("Shouldn't have thrown exception", exceptionCaught.get());
     }
+
+    private BookieSocketAddress 
replaceBookieWithWriteFailingBookie(LedgerHandle lh) throws Exception {
+        int bookieIdx = -1;
+        Long entryId = 
LedgerHandleAdapter.getLedgerMetadata(lh).getEnsembles().firstKey();
+        ArrayList<BookieSocketAddress> curEnsemble = LedgerHandleAdapter
+                .getLedgerMetadata(lh).getEnsembles().get(entryId);
+
+        // Identify a bookie in the current ledger ensemble to be replaced
+        BookieSocketAddress replacedBookie = null;
+        for (int i = 0; i < numBookies; i++) {
+            if (curEnsemble.contains(bs.get(i).getLocalAddress())) {
+                bookieIdx = i;
+                replacedBookie = bs.get(i).getLocalAddress();
+                break;
+            }
+        }
+        assertNotEquals("Couldn't find ensemble bookie in bookie list", -1, 
bookieIdx);
+
+        LOG.info("Killing bookie " + bs.get(bookieIdx).getLocalAddress());
+        ServerConfiguration conf = killBookie(bookieIdx);
+        Bookie writeFailingBookie = new Bookie(conf) {
+            @Override
+            public void addEntry(ByteBuf entry, WriteCallback cb,
+                             Object ctx, byte[] masterKey)
+                             throws IOException, BookieException {
+                try {
+                    LOG.info("Failing write to entry ");
+                    // sleep a bit so that writes to other bookies succeed 
before
+                    // the client hears about the failure on this bookie. If 
the
+                    // client gets ack-quorum number of acks first, it won't 
care
+                    // about any failures and won't reform the ensemble.
+                    Thread.sleep(100);
+                    throw new IOException();
+                } catch (InterruptedException ie) {
+                    // ignore, only interrupted if shutting down,
+                    // and an exception would spam the logs
+                    Thread.currentThread().interrupt();
+                }
+            }
+        };
+        bsConfs.add(conf);
+        bs.add(startBookie(conf, writeFailingBookie));
+        return replacedBookie;
+    }
+
+    /*
+     * Validates that the periodic ledger check will fix entries with a failed 
write.
+     */
+    @Test
+    public void testFailedWriteRecovery() throws Exception {
+        LedgerManagerFactory mFactory = 
LedgerManagerFactory.newLedgerManagerFactory(
+                bsConfs.get(0),
+                
RegistrationManager.instantiateRegistrationManager(bsConfs.get(0)).getLayoutManager());
+        LedgerUnderreplicationManager underReplicationManager = 
mFactory.newLedgerUnderreplicationManager();
+        underReplicationManager.disableLedgerReplication();
+
+        LedgerHandle lh = bkc.createLedger(2, 2, 1, DigestType.CRC32, 
"passwd".getBytes());
+
+        // kill one of the bookies and replace it with one that rejects write;
+        // This way we get into the under replication state
+        BookieSocketAddress replacedBookie = 
replaceBookieWithWriteFailingBookie(lh);
+
+        // Write a few entries; this should cause under replication
+        byte[] data = "foobar".getBytes();
+        data = "foobar".getBytes();
+        lh.addEntry(data);
+        lh.addEntry(data);
+        lh.addEntry(data);
+
+        lh.close();
+
+        // enable under replication detection and wait for it to report
+        // under replicated ledger
+        underReplicationManager.enableLedgerReplication();
+        long underReplicatedLedger = -1;
+        for (int i = 0; i < 5; i++) {
+            underReplicatedLedger = 
underReplicationManager.pollLedgerToRereplicate();
+            if (underReplicatedLedger != -1) {
+                break;
+            }
+            Thread.sleep(CHECK_INTERVAL * 1000);
+        }
+        assertEquals("Ledger should be under replicated", lh.getId(), 
underReplicatedLedger);
+
+        // now start the replication workers
+        List<ReplicationWorker> l = new ArrayList<ReplicationWorker>();
+        for (int i = 0; i < numBookies; i++) {
+            ReplicationWorker rw = new ReplicationWorker(
+                    zkc, bsConfs.get(i), NullStatsLogger.INSTANCE);
+            rw.start();
+            l.add(rw);
+        }
+        underReplicationManager.close();
+
+        // Wait for ensemble to change after replication
+        Thread.sleep(3000);
+        for (ReplicationWorker rw : l) {
+            rw.shutdown();
+        }
+
+        // check that ensemble has changed and the bookie that rejected writes 
has
+        // been replaced in the ensemble
+        LedgerHandle newLh = bkc.openLedger(lh.getId(), DigestType.CRC32, 
"passwd".getBytes());
+        for (Map.Entry<Long, ArrayList<BookieSocketAddress>> e : 
LedgerHandleAdapter.getLedgerMetadata(newLh).
+                getEnsembles().entrySet()) {
+            ArrayList<BookieSocketAddress> ensemble = e.getValue();
+            assertFalse("Ensemble hasn't been updated", 
ensemble.contains(replacedBookie));
+        }
+        newLh.close();
+    }
 }

-- 
To stop receiving notification emails like this one, please contact
[email protected].

Reply via email to