sijie closed pull request #1404: Issue 1403: ArrayIndexOutOfBoundsException is
thrown on readLastAddConfirmedAndEntry
URL: https://github.com/apache/bookkeeper/pull/1404
This is a PR merged from a forked repository.
As GitHub hides the original diff on merge, it is displayed below for
the sake of provenance:
As this is a foreign pull request (from a fork), the diff is supplied
below (as it won't show otherwise due to GitHub magic):
diff --git
a/bookkeeper-server/src/main/java/org/apache/bookkeeper/client/DistributionSchedule.java
b/bookkeeper-server/src/main/java/org/apache/bookkeeper/client/DistributionSchedule.java
index 0e36faab1..2bd2a9962 100644
---
a/bookkeeper-server/src/main/java/org/apache/bookkeeper/client/DistributionSchedule.java
+++
b/bookkeeper-server/src/main/java/org/apache/bookkeeper/client/DistributionSchedule.java
@@ -141,6 +141,13 @@ public WriteSet copy() {
*/
WriteSet getWriteSet(long entryId);
+ /**
+ * Return the set of bookies indices to send the messages to for longpoll
reads.
+ *
+ * @param entryId expected next entry id to read.
+ * @return the set of bookies indices to read from.
+ */
+ WriteSet getWriteSetForLongPoll(long entryId);
/**
* An ack set represents the set of bookies from which
diff --git
a/bookkeeper-server/src/main/java/org/apache/bookkeeper/client/ReadLastConfirmedAndEntryOp.java
b/bookkeeper-server/src/main/java/org/apache/bookkeeper/client/ReadLastConfirmedAndEntryOp.java
index 53276399f..b9888ba88 100644
---
a/bookkeeper-server/src/main/java/org/apache/bookkeeper/client/ReadLastConfirmedAndEntryOp.java
+++
b/bookkeeper-server/src/main/java/org/apache/bookkeeper/client/ReadLastConfirmedAndEntryOp.java
@@ -81,7 +81,7 @@
ReadLACAndEntryRequest(ArrayList<BookieSocketAddress> ensemble, long
lId, long eId) {
this.entryImpl = LedgerEntryImpl.create(lId, eId);
this.ensemble = ensemble;
- this.writeSet = lh.distributionSchedule.getWriteSet(eId);
+ this.writeSet =
lh.distributionSchedule.getWriteSetForLongPoll(eId);
if (lh.bk.reorderReadSequence) {
this.orderedEnsemble =
lh.bk.placementPolicy.reorderReadLACSequence(ensemble,
lh.getBookiesHealthInfo(), writeSet.copy());
@@ -435,7 +435,10 @@ boolean complete(int bookieIndex, BookieSocketAddress
host, ByteBuf buffer, long
this.lastAddConfirmed = lh.getLastAddConfirmed();
this.timeOutInMillis = timeOutInMillis;
this.numResponsesPending = 0;
- this.numEmptyResponsesAllowed =
getLedgerMetadata().getWriteQuorumSize()
+ // since long poll is effectively reading lac with waits, lac can be
potentially
+ // be advanced in different write quorums, so we need to make sure to
cover enough
+ // bookies before claiming lac is not advanced.
+ this.numEmptyResponsesAllowed = getLedgerMetadata().getEnsembleSize()
- getLedgerMetadata().getAckQuorumSize() + 1;
this.requestTimeNano = MathUtils.nowInNano();
this.scheduler = scheduler;
diff --git
a/bookkeeper-server/src/main/java/org/apache/bookkeeper/client/RoundRobinDistributionSchedule.java
b/bookkeeper-server/src/main/java/org/apache/bookkeeper/client/RoundRobinDistributionSchedule.java
index 220779bbe..e399b0191 100644
---
a/bookkeeper-server/src/main/java/org/apache/bookkeeper/client/RoundRobinDistributionSchedule.java
+++
b/bookkeeper-server/src/main/java/org/apache/bookkeeper/client/RoundRobinDistributionSchedule.java
@@ -53,6 +53,13 @@ public WriteSet getWriteSet(long entryId) {
return WriteSetImpl.create(ensembleSize, writeQuorumSize, entryId);
}
+ @Override
+ public WriteSet getWriteSetForLongPoll(long entryId) {
+ // for long poll reads, we are trying all the bookies in the ensemble
+ // so we create a `WriteSet` with `writeQuorumSize == ensembleSize`.
+ return WriteSetImpl.create(ensembleSize, ensembleSize /*
writeQuorumSize */, entryId);
+ }
+
@VisibleForTesting
static WriteSet writeSetFromValues(Integer... values) {
WriteSetImpl writeSet = WriteSetImpl.create(0, 0, 0);
diff --git
a/tests/integration/smoke/src/test/java/org/apache/bookkeeper/tests/integration/TestSmoke.java
b/tests/integration/smoke/src/test/java/org/apache/bookkeeper/tests/integration/TestSmoke.java
index 4e16c68ff..f2eff854e 100644
---
a/tests/integration/smoke/src/test/java/org/apache/bookkeeper/tests/integration/TestSmoke.java
+++
b/tests/integration/smoke/src/test/java/org/apache/bookkeeper/tests/integration/TestSmoke.java
@@ -22,19 +22,25 @@
import static org.junit.Assert.assertEquals;
import com.github.dockerjava.api.DockerClient;
+import com.google.common.base.Stopwatch;
import com.google.common.util.concurrent.ThreadFactoryBuilder;
import java.util.Enumeration;
import java.util.concurrent.CompletableFuture;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;
+import java.util.concurrent.ScheduledExecutorService;
import java.util.concurrent.TimeUnit;
+import java.util.concurrent.atomic.AtomicLong;
import lombok.Cleanup;
import lombok.extern.slf4j.Slf4j;
+import org.apache.bookkeeper.client.BKException;
+import org.apache.bookkeeper.client.BKException.Code;
import org.apache.bookkeeper.client.BookKeeper;
import org.apache.bookkeeper.client.BookKeeper.DigestType;
import org.apache.bookkeeper.client.LedgerEntry;
import org.apache.bookkeeper.client.LedgerHandle;
import org.apache.bookkeeper.common.concurrent.FutureUtils;
+import org.apache.bookkeeper.conf.ClientConfiguration;
import org.apache.bookkeeper.tests.BookKeeperClusterUtils;
import org.jboss.arquillian.junit.Arquillian;
import org.jboss.arquillian.test.api.ArquillianResource;
@@ -71,59 +77,72 @@ public void teardown() throws Exception {
@Test
public void testReadWrite() throws Exception {
String zookeeper =
BookKeeperClusterUtils.zookeeperConnectString(docker);
+ int numEntries = 100;
try (BookKeeper bk = new BookKeeper(zookeeper)) {
long ledgerId;
try (LedgerHandle writelh =
bk.createLedger(BookKeeper.DigestType.CRC32C, PASSWD)) {
ledgerId = writelh.getId();
- for (int i = 0; i < 100; i++) {
+ for (int i = 0; i < numEntries; i++) {
writelh.addEntry(("entry-" + i).getBytes());
}
}
- try (LedgerHandle readlh = bk.openLedger(ledgerId,
BookKeeper.DigestType.CRC32C, PASSWD)) {
- long lac = readlh.getLastAddConfirmed();
- int i = 0;
- Enumeration<LedgerEntry> entries = readlh.readEntries(0, lac);
- while (entries.hasMoreElements()) {
- LedgerEntry e = entries.nextElement();
- String readBack = new String(e.getEntry());
- assertEquals(readBack, "entry-" + i++);
- }
- assertEquals(i, 100);
- }
+ readEntries(bk, ledgerId, numEntries);
}
}
@Test
public void testReadWriteAdv() throws Exception {
String zookeeper =
BookKeeperClusterUtils.zookeeperConnectString(docker);
+ int numEntries = 100;
try (BookKeeper bk = new BookKeeper(zookeeper)) {
long ledgerId;
try (LedgerHandle writelh = bk.createLedgerAdv(3, 3, 2,
BookKeeper.DigestType.CRC32C, PASSWD)) {
ledgerId = writelh.getId();
- for (int i = 0; i < 100; i++) {
+ for (int i = 0; i < numEntries; i++) {
writelh.addEntry(i, ("entry-" + i).getBytes());
}
}
- try (LedgerHandle readlh = bk.openLedger(ledgerId,
BookKeeper.DigestType.CRC32C, PASSWD)) {
- long lac = readlh.getLastAddConfirmed();
- int i = 0;
- Enumeration<LedgerEntry> entries = readlh.readEntries(0, lac);
- while (entries.hasMoreElements()) {
- LedgerEntry e = entries.nextElement();
- String readBack = new String(e.getEntry());
- assertEquals(readBack, "entry-" + i++);
- }
- assertEquals(i, 100);
+ readEntries(bk, ledgerId, numEntries);
+ }
+ }
+
+ private static void readEntries(BookKeeper bk,
+ long ledgerId,
+ int numExpectedEntries) throws Exception {
+ try (LedgerHandle readlh = bk.openLedger(ledgerId,
BookKeeper.DigestType.CRC32C, PASSWD)) {
+ long lac = readlh.getLastAddConfirmed();
+ int i = 0;
+ Enumeration<LedgerEntry> entries = readlh.readEntries(0, lac);
+ while (entries.hasMoreElements()) {
+ LedgerEntry e = entries.nextElement();
+ String readBack = new String(e.getEntry());
+ assertEquals(readBack, "entry-" + i++);
}
+ assertEquals(i, numExpectedEntries);
}
}
@Test
- public void testTailingReads() throws Exception {
+ public void testTailingReadsWithoutExplicitLac() throws Exception {
+ testTailingReads(100, 98, 0);
+ }
+
+ @Test
+ public void testTailingReadsWithExplicitLac() throws Exception {
+ testTailingReads(100, 99, 100);
+ }
+
+ private void testTailingReads(int numEntries,
+ long lastExpectedConfirmedEntryId,
+ int lacIntervalMs)
+ throws Exception {
String zookeeper =
BookKeeperClusterUtils.zookeeperConnectString(docker);
- @Cleanup BookKeeper bk = new BookKeeper(zookeeper);
+ ClientConfiguration conf = new ClientConfiguration()
+ .setExplictLacInterval(lacIntervalMs)
+ .setMetadataServiceUri("zk://" + zookeeper + "/ledgers");
+ @Cleanup BookKeeper bk = BookKeeper.forConfig(conf).build();
@Cleanup LedgerHandle writeLh = bk.createLedger(DigestType.CRC32C,
PASSWD);
@Cleanup("shutdown") ExecutorService writeExecutor =
Executors.newSingleThreadExecutor(
new
ThreadFactoryBuilder().setNameFormat("write-executor").build());
@@ -132,18 +151,18 @@ public void testTailingReads() throws Exception {
@Cleanup("shutdown") ExecutorService readExecutor =
Executors.newSingleThreadExecutor(
new ThreadFactoryBuilder().setNameFormat("read-executor").build());
- int numEntries = 100;
CompletableFuture<Void> readFuture = new CompletableFuture<>();
CompletableFuture<Void> writeFuture = new CompletableFuture<>();
// start the read thread
readExecutor.submit(() -> {
- long lastExpectedConfirmedEntryId = numEntries - 2;
long nextEntryId = 0L;
try {
while (nextEntryId <= lastExpectedConfirmedEntryId) {
long lac = readLh.getLastAddConfirmed();
while (lac >= nextEntryId) {
+ log.info("Attempt to read entries : [{} - {}]",
+ nextEntryId, lac);
Enumeration<LedgerEntry> entries =
readLh.readEntries(nextEntryId, lac);
while (entries.hasMoreElements()) {
LedgerEntry e = entries.nextElement();
@@ -159,8 +178,16 @@ public void testTailingReads() throws Exception {
}
// refresh lac
- while (readLh.readLastConfirmed() < nextEntryId) {
+ readLh.readLastConfirmed();
+ while (readLh.getLastAddConfirmed() < nextEntryId) {
+ log.info("Refresh lac {}, next entry id = {}",
+ readLh.getLastAddConfirmed(), nextEntryId);
TimeUnit.MILLISECONDS.sleep(100L);
+
+ readLh.readLastConfirmed();
+ if (readLh.getLastAddConfirmed() < nextEntryId) {
+ readLh.readExplicitLastConfirmed();
+ }
}
}
FutureUtils.complete(readFuture, null);
@@ -172,6 +199,21 @@ public void testTailingReads() throws Exception {
});
// start the write thread
+ writeEntries(numEntries, writeLh, writeExecutor, writeFuture);
+
+ // both write and read should be successful
+ result(readFuture);
+ result(writeFuture);
+
+ assertEquals(lastExpectedConfirmedEntryId,
readLh.getLastAddConfirmed());
+ assertEquals(numEntries - 1, writeLh.getLastAddConfirmed());
+ assertEquals(numEntries - 1, writeLh.getLastAddPushed());
+ }
+
+ private static void writeEntries(int numEntries,
+ LedgerHandle writeLh,
+ ExecutorService writeExecutor,
+ CompletableFuture<Void> writeFuture) {
writeExecutor.submit(() -> {
try {
for (int i = 0; i < 100; i++) {
@@ -184,12 +226,93 @@ public void testTailingReads() throws Exception {
writeFuture.completeExceptionally(e);
}
});
+ }
+
+ @Test
+ public void testLongTailingReadsWithoutExplicitLac() throws Exception {
+ testLongPollTailingReads(100, 98, 0);
+ }
+
+ @Test
+ public void testLongTailingReadsWithExplicitLac() throws Exception {
+ testLongPollTailingReads(100, 99, 100);
+ }
+
+ private void testLongPollTailingReads(int numEntries,
+ long lastExpectedConfirmedEntryId,
+ int lacIntervalMs)
+ throws Exception {
+ String zookeeper =
BookKeeperClusterUtils.zookeeperConnectString(docker);
+ ClientConfiguration conf = new ClientConfiguration()
+ .setExplictLacInterval(lacIntervalMs)
+ .setMetadataServiceUri("zk://" + zookeeper + "/ledgers");
+ @Cleanup BookKeeper bk = BookKeeper.forConfig(conf).build();
+ @Cleanup LedgerHandle writeLh = bk.createLedger(DigestType.CRC32C,
PASSWD);
+ @Cleanup("shutdown") ExecutorService writeExecutor =
Executors.newSingleThreadExecutor(
+ new
ThreadFactoryBuilder().setNameFormat("write-executor").build());
+
+ @Cleanup LedgerHandle readLh =
bk.openLedgerNoRecovery(writeLh.getId(), DigestType.CRC32C, PASSWD);
+ @Cleanup("shutdown") ScheduledExecutorService readExecutor =
Executors.newSingleThreadScheduledExecutor(
+ new ThreadFactoryBuilder().setNameFormat("read-executor").build());
+
+ CompletableFuture<Void> readFuture = new CompletableFuture<>();
+ CompletableFuture<Void> writeFuture = new CompletableFuture<>();
+
+ // start the read thread
+ AtomicLong nextEntryId = new AtomicLong(0L);
+
+ Runnable readNextFunc = new Runnable() {
+
+ @Override
+ public void run() {
+ if (nextEntryId.get() > lastExpectedConfirmedEntryId) {
+ FutureUtils.complete(readFuture, null);
+ return;
+ }
+
+ Stopwatch readWatch = Stopwatch.createStarted();
+ log.info("Attempt to read next entry {} - lac {}",
nextEntryId.get(), readLh.getLastAddConfirmed());
+ readLh.asyncReadLastConfirmedAndEntry(nextEntryId.get(),
Long.MAX_VALUE / 2, false,
+ (rc, lastConfirmed, entry, ctx) -> {
+ log.info("Read return in {} ms : rc = {}, lac = {},
entry = {}",
+ readWatch.elapsed(TimeUnit.MILLISECONDS), rc,
lastConfirmed, entry);
+ if (Code.OK == rc) {
+ if (null != entry) {
+ log.info("Successfully read entry {} : {}",
+ entry.getEntryId(), new
String(entry.getEntry(), UTF_8));
+ if (entry.getEntryId() != nextEntryId.get()) {
+ log.error("Attempt to read entry {} but
received entry {}",
+ nextEntryId.get(), entry.getEntryId());
+ readFuture.completeExceptionally(
+
BKException.create(Code.UnexpectedConditionException));
+ return;
+ } else {
+ nextEntryId.incrementAndGet();
+ }
+ }
+ readExecutor.submit(this);
+ } else if (Code.NoSuchLedgerExistsException == rc) {
+ // the ledger hasn't been created yet.
+ readExecutor.schedule(this, 200,
TimeUnit.MILLISECONDS);
+ } else {
+ log.error("Failed to read entries : {}",
BKException.getMessage(rc));
+
readFuture.completeExceptionally(BKException.create(rc));
+ }
+ }, null);
+ }
+ };
+
+ readNextFunc.run();
+
+ // start the write thread
+ writeEntries(numEntries, writeLh, writeExecutor, writeFuture);
// both write and read should be successful
result(readFuture);
result(writeFuture);
- assertEquals(numEntries - 2, readLh.getLastAddConfirmed());
+ assertEquals(lastExpectedConfirmedEntryId + 1, nextEntryId.get());
+ assertEquals(lastExpectedConfirmedEntryId,
readLh.getLastAddConfirmed());
assertEquals(numEntries - 1, writeLh.getLastAddConfirmed());
assertEquals(numEntries - 1, writeLh.getLastAddPushed());
}
----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on GitHub and use the
URL above to go to the specific comment.
For queries about this service, please contact Infrastructure at:
[email protected]
With regards,
Apache Git Services