This is an automated email from the ASF dual-hosted git repository.

sijie pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/bookkeeper.git


The following commit(s) were added to refs/heads/master by this push:
     new 381248d  ISSUE #1039: BKClient tests with BookieErrors
381248d is described below

commit 381248d64d878b1e970e4842598646c054d7a989
Author: Charan Reddy Guttapalem <[email protected]>
AuthorDate: Wed Jan 24 10:54:08 2018 -0800

    ISSUE #1039: BKClient tests with BookieErrors
    
    Descriptions of the changes in this PR:
    
    - end-to-end tests for validating BKClient readEntry
    calls incase of combination of bookie timeouts and
    data corruption
    
    Master Issue: #1039
    
    Author: Charan Reddy Guttapalem <[email protected]>
    Author: cguttapalem <[email protected]>
    
    Reviewers: Sijie Guo <[email protected]>
    
    This closes #1040 from reddycharan/bookkeeperclienttests, closes #1039
---
 .../BookKeeperClientTestsWithBookieErrors.java     | 332 +++++++++++++++++++++
 1 file changed, 332 insertions(+)

diff --git 
a/bookkeeper-server/src/test/java/org/apache/bookkeeper/client/BookKeeperClientTestsWithBookieErrors.java
 
b/bookkeeper-server/src/test/java/org/apache/bookkeeper/client/BookKeeperClientTestsWithBookieErrors.java
new file mode 100644
index 0000000..98f1961
--- /dev/null
+++ 
b/bookkeeper-server/src/test/java/org/apache/bookkeeper/client/BookKeeperClientTestsWithBookieErrors.java
@@ -0,0 +1,332 @@
+/*
+ *
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ *
+ */
+package org.apache.bookkeeper.client;
+
+import static org.junit.Assert.fail;
+
+import io.netty.buffer.ByteBuf;
+import java.io.IOException;
+import java.nio.ByteBuffer;
+import java.util.ArrayList;
+import java.util.HashMap;
+import java.util.List;
+import java.util.function.Consumer;
+
+import org.apache.bookkeeper.bookie.SortedLedgerStorage;
+import org.apache.bookkeeper.client.BookKeeper.DigestType;
+import org.apache.bookkeeper.conf.ClientConfiguration;
+import org.apache.bookkeeper.test.BookKeeperClusterTestCase;
+import org.junit.Assert;
+import org.junit.Before;
+import org.junit.Test;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+/**
+ * Test the bookkeeper client with errors from Bookies.
+ */
+public class BookKeeperClientTestsWithBookieErrors extends 
BookKeeperClusterTestCase {
+    private static final Logger LOG = 
LoggerFactory.getLogger(BookKeeperClientTestsWithBookieErrors.class);
+    private static final int NUM_BOOKIES = 3;
+    // The amount of sleeptime to sleep in injectSleepWhileRead fault injection
+    private final long sleepTime;
+    // Fault injection which would sleep for sleepTime before returning 
readEntry call
+    private final Consumer<ByteBuf> injectSleepWhileRead;
+    // Fault injection which would corrupt the entry data before returning 
readEntry call
+    private final Consumer<ByteBuf> injectCorruptData;
+    /*
+     * The ordered list of injections for the Bookies (LedgerStorage). The 
first
+     * bookie to get readEntry call will use the first faultInjection, and the
+     * second bookie to get readentry call will use the second one and so on..
+     *
+     * It is assumed that there would be faultInjection for each Bookie. So if
+     * there aren't NUM_BOOKIES num of faulInjections in this list then it will
+     * fail with NullPointerException
+     */
+    private static List<Consumer<ByteBuf>> faultInjections = new 
ArrayList<Consumer<ByteBuf>>();
+    /*
+     * This map is used for storing LedgerStorage and the corresponding
+     * faultInjection, according to the faultInjections list
+     */
+    private static HashMap<MockSortedLedgerStorage, Consumer<ByteBuf>> 
storageFaultInjectionsMap =
+            new HashMap<MockSortedLedgerStorage, Consumer<ByteBuf>>();
+    // Lock object for synchronizing injectCorruptData and faultInjections
+    private static final Object lock = new Object();
+
+    public BookKeeperClientTestsWithBookieErrors() {
+        super(NUM_BOOKIES);
+        
baseConf.setLedgerStorageClass(MockSortedLedgerStorage.class.getName());
+
+        // this fault injection will corrupt the entry data by modifying the 
last byte of the entry
+        injectCorruptData = (byteBuf) -> {
+            ByteBuffer buf = byteBuf.nioBuffer();
+            int lastByteIndex = buf.limit() - 1;
+            buf.put(lastByteIndex, (byte) (buf.get(lastByteIndex) - 1));
+        };
+
+        // this fault injection, will sleep for ReadEntryTimeout+2 secs before 
returning the readEntry call
+        sleepTime = (baseClientConf.getReadEntryTimeout() + 2) * 1000;
+        injectSleepWhileRead = (byteBuf) -> {
+            try {
+                Thread.sleep(sleepTime);
+            } catch (InterruptedException e) {
+            }
+        };
+    }
+
+    @Before
+    public void setUp() throws Exception {
+        faultInjections.clear();
+        storageFaultInjectionsMap.clear();
+        super.setUp();
+    }
+
+    // Mock SortedLedgerStorage to simulate Fault Injection
+    static class MockSortedLedgerStorage extends SortedLedgerStorage {
+        public MockSortedLedgerStorage() {
+            super();
+        }
+
+        @Override
+        public ByteBuf getEntry(long ledgerId, long entryId) throws 
IOException {
+            Consumer<ByteBuf> faultInjection;
+            synchronized (lock) {
+                faultInjection = storageFaultInjectionsMap.get(this);
+                if (faultInjection == null) {
+                    int readLedgerStorageIndex = 
storageFaultInjectionsMap.size();
+                    faultInjection = 
faultInjections.get(readLedgerStorageIndex);
+                    storageFaultInjectionsMap.put(this, faultInjection);
+                }
+            }
+            ByteBuf byteBuf = super.getEntry(ledgerId, entryId);
+            faultInjection.accept(byteBuf);
+            return byteBuf;
+        }
+    }
+
+    // In this testcase all the bookies will return corrupt entry
+    @Test(timeout = 60000)
+    public void testBookkeeperAllDigestErrors() throws Exception {
+        ClientConfiguration conf = new 
ClientConfiguration().setZkServers(zkUtil.getZooKeeperConnectString());
+        BookKeeper bkc = new BookKeeper(conf);
+
+        byte[] passwd = "AAAAAAA".getBytes();
+
+        // all the bookies need to return corrupt data
+        faultInjections.add(injectCorruptData);
+        faultInjections.add(injectCorruptData);
+        faultInjections.add(injectCorruptData);
+
+        LedgerHandle wlh = bkc.createLedger(3, 3, 2, DigestType.CRC32, passwd);
+        long id = wlh.getId();
+        for (int i = 0; i < 10; i++) {
+            wlh.addEntry("foobarfoo".getBytes());
+        }
+        wlh.close();
+
+        LedgerHandle rlh = bkc.openLedger(id, DigestType.CRC32, passwd);
+        try {
+            rlh.readEntries(4, 4);
+            fail("It is expected to fail with BKDigestMatchException");
+        } catch (BKException.BKDigestMatchException e) {
+        }
+        rlh.close();
+        bkc.close();
+    }
+
+    // In this testcase first two bookies will sleep (for ReadEntryTimeout+2 
secs) before returning the data,
+    // and the last one will return corrupt data
+    @Test(timeout = 60000)
+    public void testBKReadFirstTimeoutThenDigestError() throws Exception {
+        ClientConfiguration conf = new 
ClientConfiguration().setZkServers(zkUtil.getZooKeeperConnectString());
+        BookKeeper bkc = new BookKeeper(conf);
+
+        byte[] passwd = "AAAAAAA".getBytes();
+
+        faultInjections.add(injectSleepWhileRead);
+        faultInjections.add(injectSleepWhileRead);
+        faultInjections.add(injectCorruptData);
+
+        LedgerHandle wlh = bkc.createLedger(3, 3, 2, DigestType.CRC32, passwd);
+        long id = wlh.getId();
+        for (int i = 0; i < 10; i++) {
+            wlh.addEntry("foobarfoo".getBytes());
+        }
+        wlh.close();
+
+        LedgerHandle rlh = bkc.openLedger(id, DigestType.CRC32, passwd);
+        try {
+            rlh.readEntries(4, 4);
+            fail("It is expected to fail with BKDigestMatchException");
+        } catch (BKException.BKDigestMatchException e) {
+        }
+        rlh.close();
+        bkc.close();
+    }
+
+    // In this testcase first one will return corrupt data and the last two 
bookies will
+    // sleep (for ReadEntryTimeout+2 secs) before returning the data
+    @Test(timeout = 60000)
+    public void testBKReadFirstDigestErrorThenTimeout() throws Exception {
+        ClientConfiguration conf = new 
ClientConfiguration().setZkServers(zkUtil.getZooKeeperConnectString());
+        BookKeeper bkc = new BookKeeper(conf);
+
+        byte[] passwd = "AAAAAAA".getBytes();
+
+        faultInjections.add(injectCorruptData);
+        faultInjections.add(injectSleepWhileRead);
+        faultInjections.add(injectSleepWhileRead);
+
+        LedgerHandle wlh = bkc.createLedger(3, 3, 2, DigestType.CRC32, passwd);
+        long id = wlh.getId();
+        for (int i = 0; i < 10; i++) {
+            wlh.addEntry("foobarfoo".getBytes());
+        }
+        wlh.close();
+
+        LedgerHandle rlh = bkc.openLedger(id, DigestType.CRC32, passwd);
+        try {
+            rlh.readEntries(4, 4);
+            fail("It is expected to fail with BKDigestMatchException");
+        } catch (BKException.BKDigestMatchException e) {
+        }
+        rlh.close();
+        bkc.close();
+    }
+
+    // In this testcase first two bookies are killed before making the 
readentry call
+    // and the last one will return corrupt data
+    @Test(timeout = 60000)
+    public void testBKReadFirstBookiesDownThenDigestError() throws Exception {
+        ClientConfiguration conf = new 
ClientConfiguration().setZkServers(zkUtil.getZooKeeperConnectString());
+        BookKeeper bkc = new BookKeeper(conf);
+
+        byte[] passwd = "AAAAAAA".getBytes();
+
+        faultInjections.add(injectCorruptData);
+
+        LedgerHandle wlh = bkc.createLedger(3, 3, 2, DigestType.CRC32, passwd);
+        long id = wlh.getId();
+        wlh.addEntry("foobarfoo".getBytes());
+        wlh.close();
+
+        super.killBookie(0);
+        super.killBookie(1);
+
+        Thread.sleep(500);
+
+        LedgerHandle rlh = bkc.openLedger(id, DigestType.CRC32, passwd);
+        try {
+            rlh.readEntries(0, 0);
+            fail("It is expected to fail with BKDigestMatchException");
+        } catch (BKException.BKDigestMatchException e) {
+        }
+        rlh.close();
+        bkc.close();
+    }
+
+    // In this testcase all the bookies will sleep (for ReadEntryTimeout+2 
secs) before returning the data
+    @Test(timeout = 60000)
+    public void testBKReadAllTimeouts() throws Exception {
+        ClientConfiguration conf = new 
ClientConfiguration().setZkServers(zkUtil.getZooKeeperConnectString());
+        BookKeeper bkc = new BookKeeper(conf);
+
+        byte[] passwd = "AAAAAAA".getBytes();
+
+        faultInjections.add(injectSleepWhileRead);
+        faultInjections.add(injectSleepWhileRead);
+        faultInjections.add(injectSleepWhileRead);
+
+        LedgerHandle wlh = bkc.createLedger(3, 3, 2, DigestType.CRC32, passwd);
+        long id = wlh.getId();
+        for (int i = 0; i < 10; i++) {
+            wlh.addEntry("foobarfoo".getBytes());
+        }
+        wlh.close();
+
+        LedgerHandle rlh = bkc.openLedger(id, DigestType.CRC32, passwd);
+        try {
+            rlh.readEntries(4, 4);
+            fail("It is expected to fail with BKTimeoutException");
+        } catch (BKException.BKTimeoutException e) {
+        }
+        rlh.close();
+        bkc.close();
+    }
+
+    // In this testcase first two bookies will sleep (for ReadEntryTimeout+2 
secs) before returning the data,
+    // but the last one will return as expected
+    @Test(timeout = 60000)
+    public void testBKReadTwoBookiesTimeout() throws Exception {
+        ClientConfiguration conf = new 
ClientConfiguration().setZkServers(zkUtil.getZooKeeperConnectString());
+        BookKeeper bkc = new BookKeeper(conf);
+
+        byte[] passwd = "AAAAAAA".getBytes();
+
+        faultInjections.add(injectSleepWhileRead);
+        faultInjections.add(injectSleepWhileRead);
+        faultInjections.add((byteBuf) -> {
+        });
+
+        LedgerHandle wlh = bkc.createLedger(3, 3, 2, DigestType.CRC32, passwd);
+        long id = wlh.getId();
+        for (int i = 0; i < 10; i++) {
+            wlh.addEntry("foobarfoo".getBytes());
+        }
+        wlh.close();
+
+        LedgerHandle rlh = bkc.openLedger(id, DigestType.CRC32, passwd);
+        LedgerEntry entry = rlh.readEntries(4, 4).nextElement();
+        Assert.assertTrue("The read Entry should match with what have been 
written",
+                (new String(entry.getEntry())).equals("foobarfoo"));
+        rlh.close();
+        bkc.close();
+    }
+
+    // In this testcase first two bookies return the corrupt data,
+    // but the last one will return as expected
+    @Test(timeout = 60000)
+    public void testBKReadTwoBookiesWithDigestError() throws Exception {
+        ClientConfiguration conf = new 
ClientConfiguration().setZkServers(zkUtil.getZooKeeperConnectString());
+        BookKeeper bkc = new BookKeeper(conf);
+
+        byte[] passwd = "AAAAAAA".getBytes();
+
+        faultInjections.add(injectCorruptData);
+        faultInjections.add(injectCorruptData);
+        faultInjections.add((byteBuf) -> {
+        });
+
+        LedgerHandle wlh = bkc.createLedger(3, 3, 2, DigestType.CRC32, passwd);
+        long id = wlh.getId();
+        for (int i = 0; i < 10; i++) {
+            wlh.addEntry("foobarfoo".getBytes());
+        }
+        wlh.close();
+
+        LedgerHandle rlh = bkc.openLedger(id, DigestType.CRC32, passwd);
+        LedgerEntry entry = rlh.readEntries(4, 4).nextElement();
+        Assert.assertTrue("The read Entry should match with what have been 
written",
+                (new String(entry.getEntry())).equals("foobarfoo"));
+        rlh.close();
+        bkc.close();
+    }
+}

-- 
To stop receiving notification emails like this one, please contact
[email protected].

Reply via email to