This is an automated email from the ASF dual-hosted git repository.
sijie pushed a commit to branch branch-4.7
in repository https://gitbox.apache.org/repos/asf/bookkeeper.git
The following commit(s) were added to refs/heads/branch-4.7 by this push:
new 5442537 ISSUE #1403: ArrayIndexOutOfBoundsException is thrown on
readLastAddConfirmedAndEntry
5442537 is described below
commit 5442537cf34babc9ef2021d1c18cad6fb675ae52
Author: Sijie Guo <[email protected]>
AuthorDate: Tue May 15 11:12:54 2018 -0700
ISSUE #1403: ArrayIndexOutOfBoundsException is thrown on
readLastAddConfirmedAndEntry
Descriptions of the changes in this PR:
*Motivation*
There are two bugs in `ReadLastAddConfirmedAndEntry`:
1) a regression was introduced by #657. the long poll read op is attempting
to long-poll reading lac. since lac is stored across ensemble, so the retry
logic assumes it will attempt over all the bookies in the ensemble. however
#657 use a `write-quorum-size` write set for tracking those attempts. this
leads to ArrayIndexOutOfBoundsException reported at #1403. The integrate tests
added in this PR can easily reproduce this issue.
2) there was a bug on retry logic when a ledger whose ensemble size is
larger than write quorum size. when this happens, it will claim lac is not
advanced prior to attempt the bookie in the ensemble. so the client will never
know lac is advanced if using long poll reads on such ledgers. The integrate
tests added in this PR can also catch this issue.
disclaim: twitter uses long poll reads but never uses `ensembleSize >
writeQuorumSize`. so this is not a problem for dlog users.
*Solution*
- introduce a `getWriteSetForLongPoll` call that uses `ensembleSize` for
building the write set. this would address problem 1)
- fix the assignment of `numEmptyResponsesAllowed`, so the long poll reads
can work with `ensembleSize > writeQuorumSize`
- add integration tests for long polling reads
at the same time, also add an integration test for normal tailing reads with
Author: Sijie Guo <[email protected]>
Reviewers: Enrico Olivelli <[email protected]>, Jia Zhai <None>
This closes #1404 from sijie/longpoll_tests, closes #1403
(cherry picked from commit 92591733c8b778f63cd71a00b2c7ed2c69c41312)
Signed-off-by: Sijie Guo <[email protected]>
---
.../bookkeeper/client/DistributionSchedule.java | 7 +
.../client/ReadLastConfirmedAndEntryOp.java | 7 +-
.../client/RoundRobinDistributionSchedule.java | 7 +
.../bookkeeper/tests/integration/TestSmoke.java | 181 +++++++++++++++++----
4 files changed, 171 insertions(+), 31 deletions(-)
diff --git
a/bookkeeper-server/src/main/java/org/apache/bookkeeper/client/DistributionSchedule.java
b/bookkeeper-server/src/main/java/org/apache/bookkeeper/client/DistributionSchedule.java
index 0e36faa..2bd2a99 100644
---
a/bookkeeper-server/src/main/java/org/apache/bookkeeper/client/DistributionSchedule.java
+++
b/bookkeeper-server/src/main/java/org/apache/bookkeeper/client/DistributionSchedule.java
@@ -141,6 +141,13 @@ public interface DistributionSchedule {
*/
WriteSet getWriteSet(long entryId);
+ /**
+ * Return the set of bookies indices to send the messages to for longpoll
reads.
+ *
+ * @param entryId expected next entry id to read.
+ * @return the set of bookies indices to read from.
+ */
+ WriteSet getWriteSetForLongPoll(long entryId);
/**
* An ack set represents the set of bookies from which
diff --git
a/bookkeeper-server/src/main/java/org/apache/bookkeeper/client/ReadLastConfirmedAndEntryOp.java
b/bookkeeper-server/src/main/java/org/apache/bookkeeper/client/ReadLastConfirmedAndEntryOp.java
index 5327639..b9888ba 100644
---
a/bookkeeper-server/src/main/java/org/apache/bookkeeper/client/ReadLastConfirmedAndEntryOp.java
+++
b/bookkeeper-server/src/main/java/org/apache/bookkeeper/client/ReadLastConfirmedAndEntryOp.java
@@ -81,7 +81,7 @@ class ReadLastConfirmedAndEntryOp implements
BookkeeperInternalCallbacks.ReadEnt
ReadLACAndEntryRequest(ArrayList<BookieSocketAddress> ensemble, long
lId, long eId) {
this.entryImpl = LedgerEntryImpl.create(lId, eId);
this.ensemble = ensemble;
- this.writeSet = lh.distributionSchedule.getWriteSet(eId);
+ this.writeSet =
lh.distributionSchedule.getWriteSetForLongPoll(eId);
if (lh.bk.reorderReadSequence) {
this.orderedEnsemble =
lh.bk.placementPolicy.reorderReadLACSequence(ensemble,
lh.getBookiesHealthInfo(), writeSet.copy());
@@ -435,7 +435,10 @@ class ReadLastConfirmedAndEntryOp implements
BookkeeperInternalCallbacks.ReadEnt
this.lastAddConfirmed = lh.getLastAddConfirmed();
this.timeOutInMillis = timeOutInMillis;
this.numResponsesPending = 0;
- this.numEmptyResponsesAllowed =
getLedgerMetadata().getWriteQuorumSize()
+ // since long poll is effectively reading lac with waits, lac can be
potentially
+ // be advanced in different write quorums, so we need to make sure to
cover enough
+ // bookies before claiming lac is not advanced.
+ this.numEmptyResponsesAllowed = getLedgerMetadata().getEnsembleSize()
- getLedgerMetadata().getAckQuorumSize() + 1;
this.requestTimeNano = MathUtils.nowInNano();
this.scheduler = scheduler;
diff --git
a/bookkeeper-server/src/main/java/org/apache/bookkeeper/client/RoundRobinDistributionSchedule.java
b/bookkeeper-server/src/main/java/org/apache/bookkeeper/client/RoundRobinDistributionSchedule.java
index 220779b..e399b01 100644
---
a/bookkeeper-server/src/main/java/org/apache/bookkeeper/client/RoundRobinDistributionSchedule.java
+++
b/bookkeeper-server/src/main/java/org/apache/bookkeeper/client/RoundRobinDistributionSchedule.java
@@ -53,6 +53,13 @@ class RoundRobinDistributionSchedule implements
DistributionSchedule {
return WriteSetImpl.create(ensembleSize, writeQuorumSize, entryId);
}
+ @Override
+ public WriteSet getWriteSetForLongPoll(long entryId) {
+ // for long poll reads, we are trying all the bookies in the ensemble
+ // so we create a `WriteSet` with `writeQuorumSize == ensembleSize`.
+ return WriteSetImpl.create(ensembleSize, ensembleSize /*
writeQuorumSize */, entryId);
+ }
+
@VisibleForTesting
static WriteSet writeSetFromValues(Integer... values) {
WriteSetImpl writeSet = WriteSetImpl.create(0, 0, 0);
diff --git
a/tests/integration/smoke/src/test/java/org/apache/bookkeeper/tests/integration/TestSmoke.java
b/tests/integration/smoke/src/test/java/org/apache/bookkeeper/tests/integration/TestSmoke.java
index 4e16c68..f2eff85 100644
---
a/tests/integration/smoke/src/test/java/org/apache/bookkeeper/tests/integration/TestSmoke.java
+++
b/tests/integration/smoke/src/test/java/org/apache/bookkeeper/tests/integration/TestSmoke.java
@@ -22,19 +22,25 @@ import static
org.apache.bookkeeper.common.concurrent.FutureUtils.result;
import static org.junit.Assert.assertEquals;
import com.github.dockerjava.api.DockerClient;
+import com.google.common.base.Stopwatch;
import com.google.common.util.concurrent.ThreadFactoryBuilder;
import java.util.Enumeration;
import java.util.concurrent.CompletableFuture;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;
+import java.util.concurrent.ScheduledExecutorService;
import java.util.concurrent.TimeUnit;
+import java.util.concurrent.atomic.AtomicLong;
import lombok.Cleanup;
import lombok.extern.slf4j.Slf4j;
+import org.apache.bookkeeper.client.BKException;
+import org.apache.bookkeeper.client.BKException.Code;
import org.apache.bookkeeper.client.BookKeeper;
import org.apache.bookkeeper.client.BookKeeper.DigestType;
import org.apache.bookkeeper.client.LedgerEntry;
import org.apache.bookkeeper.client.LedgerHandle;
import org.apache.bookkeeper.common.concurrent.FutureUtils;
+import org.apache.bookkeeper.conf.ClientConfiguration;
import org.apache.bookkeeper.tests.BookKeeperClusterUtils;
import org.jboss.arquillian.junit.Arquillian;
import org.jboss.arquillian.test.api.ArquillianResource;
@@ -71,59 +77,72 @@ public class TestSmoke {
@Test
public void testReadWrite() throws Exception {
String zookeeper =
BookKeeperClusterUtils.zookeeperConnectString(docker);
+ int numEntries = 100;
try (BookKeeper bk = new BookKeeper(zookeeper)) {
long ledgerId;
try (LedgerHandle writelh =
bk.createLedger(BookKeeper.DigestType.CRC32C, PASSWD)) {
ledgerId = writelh.getId();
- for (int i = 0; i < 100; i++) {
+ for (int i = 0; i < numEntries; i++) {
writelh.addEntry(("entry-" + i).getBytes());
}
}
- try (LedgerHandle readlh = bk.openLedger(ledgerId,
BookKeeper.DigestType.CRC32C, PASSWD)) {
- long lac = readlh.getLastAddConfirmed();
- int i = 0;
- Enumeration<LedgerEntry> entries = readlh.readEntries(0, lac);
- while (entries.hasMoreElements()) {
- LedgerEntry e = entries.nextElement();
- String readBack = new String(e.getEntry());
- assertEquals(readBack, "entry-" + i++);
- }
- assertEquals(i, 100);
- }
+ readEntries(bk, ledgerId, numEntries);
}
}
@Test
public void testReadWriteAdv() throws Exception {
String zookeeper =
BookKeeperClusterUtils.zookeeperConnectString(docker);
+ int numEntries = 100;
try (BookKeeper bk = new BookKeeper(zookeeper)) {
long ledgerId;
try (LedgerHandle writelh = bk.createLedgerAdv(3, 3, 2,
BookKeeper.DigestType.CRC32C, PASSWD)) {
ledgerId = writelh.getId();
- for (int i = 0; i < 100; i++) {
+ for (int i = 0; i < numEntries; i++) {
writelh.addEntry(i, ("entry-" + i).getBytes());
}
}
- try (LedgerHandle readlh = bk.openLedger(ledgerId,
BookKeeper.DigestType.CRC32C, PASSWD)) {
- long lac = readlh.getLastAddConfirmed();
- int i = 0;
- Enumeration<LedgerEntry> entries = readlh.readEntries(0, lac);
- while (entries.hasMoreElements()) {
- LedgerEntry e = entries.nextElement();
- String readBack = new String(e.getEntry());
- assertEquals(readBack, "entry-" + i++);
- }
- assertEquals(i, 100);
+ readEntries(bk, ledgerId, numEntries);
+ }
+ }
+
+ private static void readEntries(BookKeeper bk,
+ long ledgerId,
+ int numExpectedEntries) throws Exception {
+ try (LedgerHandle readlh = bk.openLedger(ledgerId,
BookKeeper.DigestType.CRC32C, PASSWD)) {
+ long lac = readlh.getLastAddConfirmed();
+ int i = 0;
+ Enumeration<LedgerEntry> entries = readlh.readEntries(0, lac);
+ while (entries.hasMoreElements()) {
+ LedgerEntry e = entries.nextElement();
+ String readBack = new String(e.getEntry());
+ assertEquals(readBack, "entry-" + i++);
}
+ assertEquals(i, numExpectedEntries);
}
}
@Test
- public void testTailingReads() throws Exception {
+ public void testTailingReadsWithoutExplicitLac() throws Exception {
+ testTailingReads(100, 98, 0);
+ }
+
+ @Test
+ public void testTailingReadsWithExplicitLac() throws Exception {
+ testTailingReads(100, 99, 100);
+ }
+
+ private void testTailingReads(int numEntries,
+ long lastExpectedConfirmedEntryId,
+ int lacIntervalMs)
+ throws Exception {
String zookeeper =
BookKeeperClusterUtils.zookeeperConnectString(docker);
- @Cleanup BookKeeper bk = new BookKeeper(zookeeper);
+ ClientConfiguration conf = new ClientConfiguration()
+ .setExplictLacInterval(lacIntervalMs)
+ .setMetadataServiceUri("zk://" + zookeeper + "/ledgers");
+ @Cleanup BookKeeper bk = BookKeeper.forConfig(conf).build();
@Cleanup LedgerHandle writeLh = bk.createLedger(DigestType.CRC32C,
PASSWD);
@Cleanup("shutdown") ExecutorService writeExecutor =
Executors.newSingleThreadExecutor(
new
ThreadFactoryBuilder().setNameFormat("write-executor").build());
@@ -132,18 +151,18 @@ public class TestSmoke {
@Cleanup("shutdown") ExecutorService readExecutor =
Executors.newSingleThreadExecutor(
new ThreadFactoryBuilder().setNameFormat("read-executor").build());
- int numEntries = 100;
CompletableFuture<Void> readFuture = new CompletableFuture<>();
CompletableFuture<Void> writeFuture = new CompletableFuture<>();
// start the read thread
readExecutor.submit(() -> {
- long lastExpectedConfirmedEntryId = numEntries - 2;
long nextEntryId = 0L;
try {
while (nextEntryId <= lastExpectedConfirmedEntryId) {
long lac = readLh.getLastAddConfirmed();
while (lac >= nextEntryId) {
+ log.info("Attempt to read entries : [{} - {}]",
+ nextEntryId, lac);
Enumeration<LedgerEntry> entries =
readLh.readEntries(nextEntryId, lac);
while (entries.hasMoreElements()) {
LedgerEntry e = entries.nextElement();
@@ -159,8 +178,16 @@ public class TestSmoke {
}
// refresh lac
- while (readLh.readLastConfirmed() < nextEntryId) {
+ readLh.readLastConfirmed();
+ while (readLh.getLastAddConfirmed() < nextEntryId) {
+ log.info("Refresh lac {}, next entry id = {}",
+ readLh.getLastAddConfirmed(), nextEntryId);
TimeUnit.MILLISECONDS.sleep(100L);
+
+ readLh.readLastConfirmed();
+ if (readLh.getLastAddConfirmed() < nextEntryId) {
+ readLh.readExplicitLastConfirmed();
+ }
}
}
FutureUtils.complete(readFuture, null);
@@ -172,6 +199,21 @@ public class TestSmoke {
});
// start the write thread
+ writeEntries(numEntries, writeLh, writeExecutor, writeFuture);
+
+ // both write and read should be successful
+ result(readFuture);
+ result(writeFuture);
+
+ assertEquals(lastExpectedConfirmedEntryId,
readLh.getLastAddConfirmed());
+ assertEquals(numEntries - 1, writeLh.getLastAddConfirmed());
+ assertEquals(numEntries - 1, writeLh.getLastAddPushed());
+ }
+
+ private static void writeEntries(int numEntries,
+ LedgerHandle writeLh,
+ ExecutorService writeExecutor,
+ CompletableFuture<Void> writeFuture) {
writeExecutor.submit(() -> {
try {
for (int i = 0; i < 100; i++) {
@@ -184,12 +226,93 @@ public class TestSmoke {
writeFuture.completeExceptionally(e);
}
});
+ }
+
+ @Test
+ public void testLongTailingReadsWithoutExplicitLac() throws Exception {
+ testLongPollTailingReads(100, 98, 0);
+ }
+
+ @Test
+ public void testLongTailingReadsWithExplicitLac() throws Exception {
+ testLongPollTailingReads(100, 99, 100);
+ }
+
+ private void testLongPollTailingReads(int numEntries,
+ long lastExpectedConfirmedEntryId,
+ int lacIntervalMs)
+ throws Exception {
+ String zookeeper =
BookKeeperClusterUtils.zookeeperConnectString(docker);
+ ClientConfiguration conf = new ClientConfiguration()
+ .setExplictLacInterval(lacIntervalMs)
+ .setMetadataServiceUri("zk://" + zookeeper + "/ledgers");
+ @Cleanup BookKeeper bk = BookKeeper.forConfig(conf).build();
+ @Cleanup LedgerHandle writeLh = bk.createLedger(DigestType.CRC32C,
PASSWD);
+ @Cleanup("shutdown") ExecutorService writeExecutor =
Executors.newSingleThreadExecutor(
+ new
ThreadFactoryBuilder().setNameFormat("write-executor").build());
+
+ @Cleanup LedgerHandle readLh =
bk.openLedgerNoRecovery(writeLh.getId(), DigestType.CRC32C, PASSWD);
+ @Cleanup("shutdown") ScheduledExecutorService readExecutor =
Executors.newSingleThreadScheduledExecutor(
+ new ThreadFactoryBuilder().setNameFormat("read-executor").build());
+
+ CompletableFuture<Void> readFuture = new CompletableFuture<>();
+ CompletableFuture<Void> writeFuture = new CompletableFuture<>();
+
+ // start the read thread
+ AtomicLong nextEntryId = new AtomicLong(0L);
+
+ Runnable readNextFunc = new Runnable() {
+
+ @Override
+ public void run() {
+ if (nextEntryId.get() > lastExpectedConfirmedEntryId) {
+ FutureUtils.complete(readFuture, null);
+ return;
+ }
+
+ Stopwatch readWatch = Stopwatch.createStarted();
+ log.info("Attempt to read next entry {} - lac {}",
nextEntryId.get(), readLh.getLastAddConfirmed());
+ readLh.asyncReadLastConfirmedAndEntry(nextEntryId.get(),
Long.MAX_VALUE / 2, false,
+ (rc, lastConfirmed, entry, ctx) -> {
+ log.info("Read return in {} ms : rc = {}, lac = {},
entry = {}",
+ readWatch.elapsed(TimeUnit.MILLISECONDS), rc,
lastConfirmed, entry);
+ if (Code.OK == rc) {
+ if (null != entry) {
+ log.info("Successfully read entry {} : {}",
+ entry.getEntryId(), new
String(entry.getEntry(), UTF_8));
+ if (entry.getEntryId() != nextEntryId.get()) {
+ log.error("Attempt to read entry {} but
received entry {}",
+ nextEntryId.get(), entry.getEntryId());
+ readFuture.completeExceptionally(
+
BKException.create(Code.UnexpectedConditionException));
+ return;
+ } else {
+ nextEntryId.incrementAndGet();
+ }
+ }
+ readExecutor.submit(this);
+ } else if (Code.NoSuchLedgerExistsException == rc) {
+ // the ledger hasn't been created yet.
+ readExecutor.schedule(this, 200,
TimeUnit.MILLISECONDS);
+ } else {
+ log.error("Failed to read entries : {}",
BKException.getMessage(rc));
+
readFuture.completeExceptionally(BKException.create(rc));
+ }
+ }, null);
+ }
+ };
+
+ readNextFunc.run();
+
+ // start the write thread
+ writeEntries(numEntries, writeLh, writeExecutor, writeFuture);
// both write and read should be successful
result(readFuture);
result(writeFuture);
- assertEquals(numEntries - 2, readLh.getLastAddConfirmed());
+ assertEquals(lastExpectedConfirmedEntryId + 1, nextEntryId.get());
+ assertEquals(lastExpectedConfirmedEntryId,
readLh.getLastAddConfirmed());
assertEquals(numEntries - 1, writeLh.getLastAddConfirmed());
assertEquals(numEntries - 1, writeLh.getLastAddPushed());
}
--
To stop receiving notification emails like this one, please contact
[email protected].