This is an automated email from the ASF dual-hosted git repository.

sijie pushed a commit to branch branch-4.7
in repository https://gitbox.apache.org/repos/asf/bookkeeper.git


The following commit(s) were added to refs/heads/branch-4.7 by this push:
     new 5442537  ISSUE #1403: ArrayIndexOutOfBoundsException is thrown on 
readLastAddConfirmedAndEntry
5442537 is described below

commit 5442537cf34babc9ef2021d1c18cad6fb675ae52
Author: Sijie Guo <si...@apache.org>
AuthorDate: Tue May 15 11:12:54 2018 -0700

    ISSUE #1403: ArrayIndexOutOfBoundsException is thrown on 
readLastAddConfirmedAndEntry
    
    Descriptions of the changes in this PR:
    
    *Motivation*
    
    There are two bugs in `ReadLastAddConfirmedAndEntry`:
    
    1) a regression was introduced by #657. the long poll read op is attempting 
to long-poll reading lac. since lac is stored across ensemble, so the retry 
logic assumes it will attempt over all the bookies in the ensemble. however 
#657 use a `write-quorum-size` write set for tracking those attempts. this 
leads to ArrayIndexOutOfBoundsException reported at #1403. The integrate tests 
added in this PR can easily reproduce this issue.
    
    2) there was a bug on retry logic when a ledger whose ensemble size is 
larger than write quorum size. when this happens, it will claim lac is not 
advanced prior to attempt the bookie in the ensemble. so the client will never 
know lac is advanced if using long poll reads on such ledgers. The integrate 
tests added in this PR can also catch this issue.
    
    disclaim: twitter uses long poll reads but never uses `ensembleSize > 
writeQuorumSize`. so this is not a problem for dlog users.
    
    *Solution*
    
    - introduce a `getWriteSetForLongPoll` call that uses `ensembleSize` for 
building the write set. this would address problem 1)
    - fix the assignment of `numEmptyResponsesAllowed`, so the long poll reads 
can work with `ensembleSize > writeQuorumSize`
    - add integration tests for long polling reads
    
    at the same time, also add an integration test for normal tailing reads with
    
    Author: Sijie Guo <si...@apache.org>
    
    Reviewers: Enrico Olivelli <eolive...@gmail.com>, Jia Zhai <None>
    
    This closes #1404 from sijie/longpoll_tests, closes #1403
    
    (cherry picked from commit 92591733c8b778f63cd71a00b2c7ed2c69c41312)
    Signed-off-by: Sijie Guo <si...@apache.org>
---
 .../bookkeeper/client/DistributionSchedule.java    |   7 +
 .../client/ReadLastConfirmedAndEntryOp.java        |   7 +-
 .../client/RoundRobinDistributionSchedule.java     |   7 +
 .../bookkeeper/tests/integration/TestSmoke.java    | 181 +++++++++++++++++----
 4 files changed, 171 insertions(+), 31 deletions(-)

diff --git 
a/bookkeeper-server/src/main/java/org/apache/bookkeeper/client/DistributionSchedule.java
 
b/bookkeeper-server/src/main/java/org/apache/bookkeeper/client/DistributionSchedule.java
index 0e36faa..2bd2a99 100644
--- 
a/bookkeeper-server/src/main/java/org/apache/bookkeeper/client/DistributionSchedule.java
+++ 
b/bookkeeper-server/src/main/java/org/apache/bookkeeper/client/DistributionSchedule.java
@@ -141,6 +141,13 @@ public interface DistributionSchedule {
      */
     WriteSet getWriteSet(long entryId);
 
+    /**
+     * Return the set of bookies indices to send the messages to for longpoll 
reads.
+     *
+     * @param entryId expected next entry id to read.
+     * @return the set of bookies indices to read from.
+     */
+    WriteSet getWriteSetForLongPoll(long entryId);
 
     /**
      * An ack set represents the set of bookies from which
diff --git 
a/bookkeeper-server/src/main/java/org/apache/bookkeeper/client/ReadLastConfirmedAndEntryOp.java
 
b/bookkeeper-server/src/main/java/org/apache/bookkeeper/client/ReadLastConfirmedAndEntryOp.java
index 5327639..b9888ba 100644
--- 
a/bookkeeper-server/src/main/java/org/apache/bookkeeper/client/ReadLastConfirmedAndEntryOp.java
+++ 
b/bookkeeper-server/src/main/java/org/apache/bookkeeper/client/ReadLastConfirmedAndEntryOp.java
@@ -81,7 +81,7 @@ class ReadLastConfirmedAndEntryOp implements 
BookkeeperInternalCallbacks.ReadEnt
         ReadLACAndEntryRequest(ArrayList<BookieSocketAddress> ensemble, long 
lId, long eId) {
             this.entryImpl = LedgerEntryImpl.create(lId, eId);
             this.ensemble = ensemble;
-            this.writeSet = lh.distributionSchedule.getWriteSet(eId);
+            this.writeSet = 
lh.distributionSchedule.getWriteSetForLongPoll(eId);
             if (lh.bk.reorderReadSequence) {
                 this.orderedEnsemble = 
lh.bk.placementPolicy.reorderReadLACSequence(ensemble,
                         lh.getBookiesHealthInfo(), writeSet.copy());
@@ -435,7 +435,10 @@ class ReadLastConfirmedAndEntryOp implements 
BookkeeperInternalCallbacks.ReadEnt
         this.lastAddConfirmed = lh.getLastAddConfirmed();
         this.timeOutInMillis = timeOutInMillis;
         this.numResponsesPending = 0;
-        this.numEmptyResponsesAllowed = 
getLedgerMetadata().getWriteQuorumSize()
+        // since long poll is effectively reading lac with waits, lac can be 
potentially
+        // be advanced in different write quorums, so we need to make sure to 
cover enough
+        // bookies before claiming lac is not advanced.
+        this.numEmptyResponsesAllowed = getLedgerMetadata().getEnsembleSize()
                 - getLedgerMetadata().getAckQuorumSize() + 1;
         this.requestTimeNano = MathUtils.nowInNano();
         this.scheduler = scheduler;
diff --git 
a/bookkeeper-server/src/main/java/org/apache/bookkeeper/client/RoundRobinDistributionSchedule.java
 
b/bookkeeper-server/src/main/java/org/apache/bookkeeper/client/RoundRobinDistributionSchedule.java
index 220779b..e399b01 100644
--- 
a/bookkeeper-server/src/main/java/org/apache/bookkeeper/client/RoundRobinDistributionSchedule.java
+++ 
b/bookkeeper-server/src/main/java/org/apache/bookkeeper/client/RoundRobinDistributionSchedule.java
@@ -53,6 +53,13 @@ class RoundRobinDistributionSchedule implements 
DistributionSchedule {
         return WriteSetImpl.create(ensembleSize, writeQuorumSize, entryId);
     }
 
+    @Override
+    public WriteSet getWriteSetForLongPoll(long entryId) {
+        // for long poll reads, we are trying all the bookies in the ensemble
+        // so we create a `WriteSet` with `writeQuorumSize == ensembleSize`.
+        return WriteSetImpl.create(ensembleSize, ensembleSize /* 
writeQuorumSize */, entryId);
+    }
+
     @VisibleForTesting
     static WriteSet writeSetFromValues(Integer... values) {
         WriteSetImpl writeSet = WriteSetImpl.create(0, 0, 0);
diff --git 
a/tests/integration/smoke/src/test/java/org/apache/bookkeeper/tests/integration/TestSmoke.java
 
b/tests/integration/smoke/src/test/java/org/apache/bookkeeper/tests/integration/TestSmoke.java
index 4e16c68..f2eff85 100644
--- 
a/tests/integration/smoke/src/test/java/org/apache/bookkeeper/tests/integration/TestSmoke.java
+++ 
b/tests/integration/smoke/src/test/java/org/apache/bookkeeper/tests/integration/TestSmoke.java
@@ -22,19 +22,25 @@ import static 
org.apache.bookkeeper.common.concurrent.FutureUtils.result;
 import static org.junit.Assert.assertEquals;
 
 import com.github.dockerjava.api.DockerClient;
+import com.google.common.base.Stopwatch;
 import com.google.common.util.concurrent.ThreadFactoryBuilder;
 import java.util.Enumeration;
 import java.util.concurrent.CompletableFuture;
 import java.util.concurrent.ExecutorService;
 import java.util.concurrent.Executors;
+import java.util.concurrent.ScheduledExecutorService;
 import java.util.concurrent.TimeUnit;
+import java.util.concurrent.atomic.AtomicLong;
 import lombok.Cleanup;
 import lombok.extern.slf4j.Slf4j;
+import org.apache.bookkeeper.client.BKException;
+import org.apache.bookkeeper.client.BKException.Code;
 import org.apache.bookkeeper.client.BookKeeper;
 import org.apache.bookkeeper.client.BookKeeper.DigestType;
 import org.apache.bookkeeper.client.LedgerEntry;
 import org.apache.bookkeeper.client.LedgerHandle;
 import org.apache.bookkeeper.common.concurrent.FutureUtils;
+import org.apache.bookkeeper.conf.ClientConfiguration;
 import org.apache.bookkeeper.tests.BookKeeperClusterUtils;
 import org.jboss.arquillian.junit.Arquillian;
 import org.jboss.arquillian.test.api.ArquillianResource;
@@ -71,59 +77,72 @@ public class TestSmoke {
     @Test
     public void testReadWrite() throws Exception {
         String zookeeper = 
BookKeeperClusterUtils.zookeeperConnectString(docker);
+        int numEntries = 100;
         try (BookKeeper bk = new BookKeeper(zookeeper)) {
             long ledgerId;
             try (LedgerHandle writelh = 
bk.createLedger(BookKeeper.DigestType.CRC32C, PASSWD)) {
                 ledgerId = writelh.getId();
-                for (int i = 0; i < 100; i++) {
+                for (int i = 0; i < numEntries; i++) {
                     writelh.addEntry(("entry-" + i).getBytes());
                 }
             }
 
-            try (LedgerHandle readlh = bk.openLedger(ledgerId, 
BookKeeper.DigestType.CRC32C, PASSWD)) {
-                long lac = readlh.getLastAddConfirmed();
-                int i = 0;
-                Enumeration<LedgerEntry> entries = readlh.readEntries(0, lac);
-                while (entries.hasMoreElements()) {
-                    LedgerEntry e = entries.nextElement();
-                    String readBack = new String(e.getEntry());
-                    assertEquals(readBack, "entry-" + i++);
-                }
-                assertEquals(i, 100);
-            }
+            readEntries(bk, ledgerId, numEntries);
         }
     }
 
     @Test
     public void testReadWriteAdv() throws Exception {
         String zookeeper = 
BookKeeperClusterUtils.zookeeperConnectString(docker);
+        int numEntries = 100;
         try (BookKeeper bk = new BookKeeper(zookeeper)) {
             long ledgerId;
             try (LedgerHandle writelh = bk.createLedgerAdv(3, 3, 2, 
BookKeeper.DigestType.CRC32C, PASSWD)) {
                 ledgerId = writelh.getId();
-                for (int i = 0; i < 100; i++) {
+                for (int i = 0; i < numEntries; i++) {
                     writelh.addEntry(i, ("entry-" + i).getBytes());
                 }
             }
 
-            try (LedgerHandle readlh = bk.openLedger(ledgerId, 
BookKeeper.DigestType.CRC32C, PASSWD)) {
-                long lac = readlh.getLastAddConfirmed();
-                int i = 0;
-                Enumeration<LedgerEntry> entries = readlh.readEntries(0, lac);
-                while (entries.hasMoreElements()) {
-                    LedgerEntry e = entries.nextElement();
-                    String readBack = new String(e.getEntry());
-                    assertEquals(readBack, "entry-" + i++);
-                }
-                assertEquals(i, 100);
+            readEntries(bk, ledgerId, numEntries);
+        }
+    }
+
+    private static void readEntries(BookKeeper bk,
+                                    long ledgerId,
+                                    int numExpectedEntries) throws Exception {
+        try (LedgerHandle readlh = bk.openLedger(ledgerId, 
BookKeeper.DigestType.CRC32C, PASSWD)) {
+            long lac = readlh.getLastAddConfirmed();
+            int i = 0;
+            Enumeration<LedgerEntry> entries = readlh.readEntries(0, lac);
+            while (entries.hasMoreElements()) {
+                LedgerEntry e = entries.nextElement();
+                String readBack = new String(e.getEntry());
+                assertEquals(readBack, "entry-" + i++);
             }
+            assertEquals(i, numExpectedEntries);
         }
     }
 
     @Test
-    public void testTailingReads() throws Exception {
+    public void testTailingReadsWithoutExplicitLac() throws Exception {
+        testTailingReads(100, 98, 0);
+    }
+
+    @Test
+    public void testTailingReadsWithExplicitLac() throws Exception {
+        testTailingReads(100, 99, 100);
+    }
+
+    private void testTailingReads(int numEntries,
+                                  long lastExpectedConfirmedEntryId,
+                                  int lacIntervalMs)
+            throws Exception {
         String zookeeper = 
BookKeeperClusterUtils.zookeeperConnectString(docker);
-        @Cleanup BookKeeper bk = new BookKeeper(zookeeper);
+        ClientConfiguration conf = new ClientConfiguration()
+            .setExplictLacInterval(lacIntervalMs)
+            .setMetadataServiceUri("zk://" + zookeeper + "/ledgers");
+        @Cleanup BookKeeper bk = BookKeeper.forConfig(conf).build();
         @Cleanup LedgerHandle writeLh = bk.createLedger(DigestType.CRC32C, 
PASSWD);
         @Cleanup("shutdown") ExecutorService writeExecutor = 
Executors.newSingleThreadExecutor(
             new 
ThreadFactoryBuilder().setNameFormat("write-executor").build());
@@ -132,18 +151,18 @@ public class TestSmoke {
         @Cleanup("shutdown") ExecutorService readExecutor = 
Executors.newSingleThreadExecutor(
             new ThreadFactoryBuilder().setNameFormat("read-executor").build());
 
-        int numEntries = 100;
         CompletableFuture<Void> readFuture = new CompletableFuture<>();
         CompletableFuture<Void> writeFuture = new CompletableFuture<>();
 
         // start the read thread
         readExecutor.submit(() -> {
-            long lastExpectedConfirmedEntryId = numEntries - 2;
             long nextEntryId = 0L;
             try {
                 while (nextEntryId <= lastExpectedConfirmedEntryId) {
                     long lac = readLh.getLastAddConfirmed();
                     while (lac >= nextEntryId) {
+                        log.info("Attempt to read entries : [{} - {}]",
+                            nextEntryId, lac);
                         Enumeration<LedgerEntry> entries = 
readLh.readEntries(nextEntryId, lac);
                         while (entries.hasMoreElements()) {
                             LedgerEntry e = entries.nextElement();
@@ -159,8 +178,16 @@ public class TestSmoke {
                     }
 
                     // refresh lac
-                    while (readLh.readLastConfirmed() < nextEntryId) {
+                    readLh.readLastConfirmed();
+                    while (readLh.getLastAddConfirmed() < nextEntryId) {
+                        log.info("Refresh lac {}, next entry id = {}",
+                            readLh.getLastAddConfirmed(), nextEntryId);
                         TimeUnit.MILLISECONDS.sleep(100L);
+
+                        readLh.readLastConfirmed();
+                        if (readLh.getLastAddConfirmed() < nextEntryId) {
+                            readLh.readExplicitLastConfirmed();
+                        }
                     }
                 }
                 FutureUtils.complete(readFuture, null);
@@ -172,6 +199,21 @@ public class TestSmoke {
         });
 
         // start the write thread
+        writeEntries(numEntries, writeLh, writeExecutor, writeFuture);
+
+        // both write and read should be successful
+        result(readFuture);
+        result(writeFuture);
+
+        assertEquals(lastExpectedConfirmedEntryId, 
readLh.getLastAddConfirmed());
+        assertEquals(numEntries - 1, writeLh.getLastAddConfirmed());
+        assertEquals(numEntries - 1, writeLh.getLastAddPushed());
+    }
+
+    private static void writeEntries(int numEntries,
+                                     LedgerHandle writeLh,
+                                     ExecutorService writeExecutor,
+                                     CompletableFuture<Void> writeFuture) {
         writeExecutor.submit(() -> {
             try {
                 for (int i = 0; i < 100; i++) {
@@ -184,12 +226,93 @@ public class TestSmoke {
                 writeFuture.completeExceptionally(e);
             }
         });
+    }
+
+    @Test
+    public void testLongTailingReadsWithoutExplicitLac() throws Exception {
+        testLongPollTailingReads(100, 98, 0);
+    }
+
+    @Test
+    public void testLongTailingReadsWithExplicitLac() throws Exception {
+        testLongPollTailingReads(100, 99, 100);
+    }
+
+    private void testLongPollTailingReads(int numEntries,
+                                          long lastExpectedConfirmedEntryId,
+                                          int lacIntervalMs)
+            throws Exception {
+        String zookeeper = 
BookKeeperClusterUtils.zookeeperConnectString(docker);
+        ClientConfiguration conf = new ClientConfiguration()
+            .setExplictLacInterval(lacIntervalMs)
+            .setMetadataServiceUri("zk://" + zookeeper + "/ledgers");
+        @Cleanup BookKeeper bk = BookKeeper.forConfig(conf).build();
+        @Cleanup LedgerHandle writeLh = bk.createLedger(DigestType.CRC32C, 
PASSWD);
+        @Cleanup("shutdown") ExecutorService writeExecutor = 
Executors.newSingleThreadExecutor(
+            new 
ThreadFactoryBuilder().setNameFormat("write-executor").build());
+
+        @Cleanup LedgerHandle readLh = 
bk.openLedgerNoRecovery(writeLh.getId(), DigestType.CRC32C, PASSWD);
+        @Cleanup("shutdown") ScheduledExecutorService readExecutor = 
Executors.newSingleThreadScheduledExecutor(
+            new ThreadFactoryBuilder().setNameFormat("read-executor").build());
+
+        CompletableFuture<Void> readFuture = new CompletableFuture<>();
+        CompletableFuture<Void> writeFuture = new CompletableFuture<>();
+
+        // start the read thread
+        AtomicLong nextEntryId = new AtomicLong(0L);
+
+        Runnable readNextFunc = new Runnable() {
+
+            @Override
+            public void run() {
+                if (nextEntryId.get() > lastExpectedConfirmedEntryId) {
+                    FutureUtils.complete(readFuture, null);
+                    return;
+                }
+
+                Stopwatch readWatch = Stopwatch.createStarted();
+                log.info("Attempt to read next entry {} - lac {}", 
nextEntryId.get(), readLh.getLastAddConfirmed());
+                readLh.asyncReadLastConfirmedAndEntry(nextEntryId.get(), 
Long.MAX_VALUE / 2, false,
+                    (rc, lastConfirmed, entry, ctx) -> {
+                        log.info("Read return in {} ms : rc = {}, lac = {}, 
entry = {}",
+                            readWatch.elapsed(TimeUnit.MILLISECONDS), rc, 
lastConfirmed, entry);
+                        if (Code.OK == rc) {
+                            if (null != entry) {
+                                log.info("Successfully read entry {} : {}",
+                                    entry.getEntryId(), new 
String(entry.getEntry(), UTF_8));
+                                if (entry.getEntryId() != nextEntryId.get()) {
+                                    log.error("Attempt to read entry {} but 
received entry {}",
+                                        nextEntryId.get(), entry.getEntryId());
+                                    readFuture.completeExceptionally(
+                                        
BKException.create(Code.UnexpectedConditionException));
+                                    return;
+                                } else {
+                                    nextEntryId.incrementAndGet();
+                                }
+                            }
+                            readExecutor.submit(this);
+                        } else if (Code.NoSuchLedgerExistsException == rc) {
+                            // the ledger hasn't been created yet.
+                            readExecutor.schedule(this, 200, 
TimeUnit.MILLISECONDS);
+                        } else {
+                            log.error("Failed to read entries : {}", 
BKException.getMessage(rc));
+                            
readFuture.completeExceptionally(BKException.create(rc));
+                        }
+                    }, null);
+            }
+        };
+
+        readNextFunc.run();
+
+        // start the write thread
+        writeEntries(numEntries, writeLh, writeExecutor, writeFuture);
 
         // both write and read should be successful
         result(readFuture);
         result(writeFuture);
 
-        assertEquals(numEntries - 2, readLh.getLastAddConfirmed());
+        assertEquals(lastExpectedConfirmedEntryId + 1, nextEntryId.get());
+        assertEquals(lastExpectedConfirmedEntryId, 
readLh.getLastAddConfirmed());
         assertEquals(numEntries - 1, writeLh.getLastAddConfirmed());
         assertEquals(numEntries - 1, writeLh.getLastAddPushed());
     }

-- 
To stop receiving notification emails like this one, please contact
si...@apache.org.

Reply via email to