This is an automated email from the ASF dual-hosted git repository.
chenhang pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/bookkeeper.git
The following commit(s) were added to refs/heads/master by this push:
new 22097343ce [BP-62] LedgerHandle introduces batch read API. (#4195)
22097343ce is described below
commit 22097343ceed1312964ad077cdcd9fbd7e3e026b
Author: Yan Zhao <[email protected]>
AuthorDate: Mon Feb 5 16:58:38 2024 +0800
[BP-62] LedgerHandle introduces batch read API. (#4195)
### Motivation
This is the fifth PR for the batch
read(https://github.com/apache/bookkeeper/pull/4051) feature.
LedgerHandle introduces batch read API.
This PR is based on #4190, please merge it firstly.
---
.../bookkeeper/client/ClientInternalConf.java | 6 +-
.../org/apache/bookkeeper/client/LedgerHandle.java | 279 ++++++++++++++
.../apache/bookkeeper/client/api/ReadHandle.java | 32 ++
.../bookkeeper/conf/ClientConfiguration.java | 7 +
.../proto/BatchedReadEntryProcessor.java | 7 +
.../apache/bookkeeper/client/TestBatchedRead.java | 292 +++++++++++++++
.../client/TestSpeculativeBatchRead.java | 401 +++++++++++++++++++++
.../apache/bookkeeper/test/BookieClientTest.java | 2 +-
8 files changed, 1023 insertions(+), 3 deletions(-)
diff --git
a/bookkeeper-server/src/main/java/org/apache/bookkeeper/client/ClientInternalConf.java
b/bookkeeper-server/src/main/java/org/apache/bookkeeper/client/ClientInternalConf.java
index cff66a3cb3..fc83617cef 100644
---
a/bookkeeper-server/src/main/java/org/apache/bookkeeper/client/ClientInternalConf.java
+++
b/bookkeeper-server/src/main/java/org/apache/bookkeeper/client/ClientInternalConf.java
@@ -48,6 +48,8 @@ class ClientInternalConf {
final boolean enableBookieFailureTracking;
final boolean useV2WireProtocol;
final boolean enforceMinNumFaultDomainsForWrite;
+ final boolean batchReadEnabled;
+ final int nettyMaxFrameSizeBytes;
static ClientInternalConf defaultValues() {
return fromConfig(new ClientConfiguration());
@@ -72,9 +74,9 @@ class ClientInternalConf {
this.addEntryQuorumTimeoutNanos =
TimeUnit.SECONDS.toNanos(conf.getAddEntryQuorumTimeout());
this.throttleValue = conf.getThrottleValue();
this.bookieFailureHistoryExpirationMSec =
conf.getBookieFailureHistoryExpirationMSec();
-
+ this.batchReadEnabled = conf.isBatchReadEnabled();
+ this.nettyMaxFrameSizeBytes = conf.getNettyMaxFrameSizeBytes();
this.disableEnsembleChangeFeature =
featureProvider.getFeature(conf.getDisableEnsembleChangeFeatureName());
-
this.delayEnsembleChange = conf.getDelayEnsembleChange();
this.maxAllowedEnsembleChanges = conf.getMaxAllowedEnsembleChanges();
this.timeoutMonitorIntervalSec = conf.getTimeoutMonitorIntervalSec();
diff --git
a/bookkeeper-server/src/main/java/org/apache/bookkeeper/client/LedgerHandle.java
b/bookkeeper-server/src/main/java/org/apache/bookkeeper/client/LedgerHandle.java
index 9486b2e632..6a98af5503 100644
---
a/bookkeeper-server/src/main/java/org/apache/bookkeeper/client/LedgerHandle.java
+++
b/bookkeeper-server/src/main/java/org/apache/bookkeeper/client/LedgerHandle.java
@@ -103,6 +103,7 @@ public class LedgerHandle implements WriteHandle {
final long ledgerId;
final ExecutorService executor;
long lastAddPushed;
+ boolean notSupportBatch;
private enum HandleState {
OPEN,
@@ -641,6 +642,26 @@ public class LedgerHandle implements WriteHandle {
return SyncCallbackUtils.waitForResult(result);
}
+ /**
+ * Read a sequence of entries synchronously.
+ *
+ * @param startEntry
+ * start entry id
+ * @param maxCount
+ * the total entries count.
+ * @param maxSize
+ * the total entries size.
+ * @see #asyncBatchReadEntries(long, int, long, boolean, ReadCallback,
Object)
+ */
+ public Enumeration<LedgerEntry> batchReadEntries(long startEntry, int
maxCount, long maxSize)
+ throws InterruptedException, BKException {
+ CompletableFuture<Enumeration<LedgerEntry>> result = new
CompletableFuture<>();
+
+ asyncBatchReadEntries(startEntry, maxCount, maxSize, new
SyncReadCallback(result), null);
+
+ return SyncCallbackUtils.waitForResult(result);
+ }
+
/**
* Read a sequence of entries synchronously, allowing to read after the
LastAddConfirmed range.<br>
* This is the same of
@@ -664,6 +685,27 @@ public class LedgerHandle implements WriteHandle {
return SyncCallbackUtils.waitForResult(result);
}
+ /**
+ * Read a sequence of entries synchronously, allowing to read after the
LastAddConfirmed range.<br>
+ * This is the same of
+ * {@link #asyncBatchReadUnconfirmedEntries(long, int, long, boolean,
ReadCallback, Object) }
+ *
+ * @param firstEntry
+ * id of first entry of sequence (included)
+ * @param maxCount
+ * id of last entry of sequence (included)
+ * @param maxSize
+ * the total entries size
+ */
+ public Enumeration<LedgerEntry> batchReadUnconfirmedEntries(long
firstEntry, int maxCount, long maxSize)
+ throws InterruptedException, BKException {
+ CompletableFuture<Enumeration<LedgerEntry>> result = new
CompletableFuture<>();
+
+ asyncBatchReadUnconfirmedEntries(firstEntry, maxCount, maxSize, new
SyncReadCallback(result), null);
+
+ return SyncCallbackUtils.waitForResult(result);
+ }
+
/**
* Read a sequence of entries asynchronously.
*
@@ -695,6 +737,50 @@ public class LedgerHandle implements WriteHandle {
asyncReadEntriesInternal(firstEntry, lastEntry, cb, ctx, false);
}
+ /**
+ * Read a sequence of entries in asynchronously.
+ * It send an RPC to get all entries instead of send multi RPC to get all
entries.
+ *
+ * @param startEntry
+ * id of first entry of sequence
+ * @param maxCount
+ * the entries count
+ * @param maxSize
+ * the total entries size
+ * @param cb
+ * object implementing read callback interface
+ * @param ctx
+ * control object
+ */
+ public void asyncBatchReadEntries(long startEntry, int maxCount, long
maxSize, ReadCallback cb, Object ctx) {
+ // Little sanity check
+ if (startEntry > lastAddConfirmed) {
+ LOG.error("ReadEntries exception on ledgerId:{} firstEntry:{}
lastAddConfirmed:{}",
+ ledgerId, startEntry, lastAddConfirmed);
+ cb.readComplete(BKException.Code.ReadException, this, null, ctx);
+ return;
+ }
+ if (notSupportBatchRead()) {
+ long lastEntry = Math.min(startEntry + maxCount - 1,
lastAddConfirmed);
+ asyncReadEntriesInternal(startEntry, lastEntry, cb, ctx, false);
+ } else {
+ asyncBatchReadEntriesInternal(startEntry, maxCount, maxSize, new
ReadCallback() {
+ @Override
+ public void readComplete(int rc, LedgerHandle lh,
Enumeration<LedgerEntry> seq, Object ctx) {
+ //If the bookie server not support the batch read request,
the bookie server will close the
+ // connection, then get the
BookieHandleNotAvailableException.
+ if (rc == Code.BookieHandleNotAvailableException) {
+ notSupportBatch = true;
+ long lastEntry = Math.min(startEntry + maxCount - 1,
lastAddConfirmed);
+ asyncReadEntriesInternal(startEntry, lastEntry, cb,
ctx, false);
+ } else {
+ cb.readComplete(rc, lh, seq, ctx);
+ }
+ }
+ }, ctx, false);
+ }
+ }
+
/**
* Read a sequence of entries asynchronously, allowing to read after the
LastAddConfirmed range.
* <br>This is the same of
@@ -734,6 +820,48 @@ public class LedgerHandle implements WriteHandle {
asyncReadEntriesInternal(firstEntry, lastEntry, cb, ctx, false);
}
+ /**
+ * Read a sequence of entries asynchronously, allowing to read after the
LastAddConfirmed range.
+ * It sends an RPC to get all entries instead of send multi RPC to get all
entries.
+ * @param startEntry
+ * id of first entry of sequence
+ * @param maxCount
+ * the entries count
+ * @param maxSize
+ * the total entries size
+ * @param cb
+ * object implementing read callback interface
+ * @param ctx
+ * control object
+ */
+ public void asyncBatchReadUnconfirmedEntries(long startEntry, int
maxCount, long maxSize, ReadCallback cb,
+ Object ctx) {
+ // Little sanity check
+ if (startEntry < 0) {
+ LOG.error("IncorrectParameterException on ledgerId:{}
firstEntry:{}", ledgerId, startEntry);
+ cb.readComplete(BKException.Code.IncorrectParameterException,
this, null, ctx);
+ }
+ if (notSupportBatchRead()) {
+ long lastEntry = startEntry + maxCount - 1;
+ asyncReadEntriesInternal(startEntry, lastEntry, cb, ctx, false);
+ } else {
+ asyncBatchReadEntriesInternal(startEntry, maxCount, maxSize, new
ReadCallback() {
+ @Override
+ public void readComplete(int rc, LedgerHandle lh,
Enumeration<LedgerEntry> seq, Object ctx) {
+ //If the bookie server not support the batch read request,
the bookie server will close the
+ // connection, then get the
BookieHandleNotAvailableException.
+ if (rc == Code.BookieHandleNotAvailableException) {
+ notSupportBatch = true;
+ long lastEntry = startEntry + maxCount - 1;
+ asyncReadEntriesInternal(startEntry, lastEntry, cb,
ctx, false);
+ } else {
+ cb.readComplete(rc, lh, seq, ctx);
+ }
+ }
+ }, ctx, false);
+ }
+ }
+
/**
* Read a sequence of entries asynchronously.
*
@@ -760,6 +888,123 @@ public class LedgerHandle implements WriteHandle {
return readEntriesInternalAsync(firstEntry, lastEntry, false);
}
+ /**
+ * Read a sequence of entries in asynchronously.
+ * It sends an RPC to get all entries instead of send multi RPC to get all
entries.
+ *
+ * @param startEntry
+ * id of first entry of sequence
+ * @param maxCount
+ * the entries count
+ * @param maxSize
+ * the total entries size
+ */
+ @Override
+ public CompletableFuture<LedgerEntries> batchReadAsync(long startEntry,
int maxCount, long maxSize) {
+ // Little sanity check
+ if (startEntry < 0) {
+ LOG.error("IncorrectParameterException on ledgerId:{}
firstEntry:{}", ledgerId, startEntry);
+ return FutureUtils.exception(new BKIncorrectParameterException());
+ }
+ if (startEntry > lastAddConfirmed) {
+ LOG.error("ReadAsync exception on ledgerId:{} firstEntry:{}
lastAddConfirmed:{}",
+ ledgerId, startEntry, lastAddConfirmed);
+ return FutureUtils.exception(new BKReadException());
+ }
+ if (notSupportBatchRead()) {
+ long lastEntry = Math.min(startEntry + maxCount - 1,
lastAddConfirmed);
+ return readEntriesInternalAsync(startEntry, lastEntry, false);
+ }
+ CompletableFuture<LedgerEntries> future = new CompletableFuture<>();
+ batchReadEntriesInternalAsync(startEntry, maxCount, maxSize, false)
+ .whenComplete((entries, ex) -> {
+ if (ex != null) {
+ //If the bookie server not support the batch read
request, the bookie server will close the
+ // connection, then get the
BookieHandleNotAvailableException.
+ if (ex instanceof
BKException.BKBookieHandleNotAvailableException) {
+ notSupportBatch = true;
+ long lastEntry = Math.min(startEntry + maxCount -
1, lastAddConfirmed);
+ readEntriesInternalAsync(startEntry, lastEntry,
false).whenComplete((entries1, ex1) -> {
+ if (ex1 != null) {
+ future.completeExceptionally(ex1);
+ } else {
+ future.complete(entries1);
+ }
+ });
+ } else {
+ future.completeExceptionally(ex);
+ }
+ } else {
+ future.complete(entries);
+ }
+ });
+ return future;
+ }
+
+ private boolean notSupportBatchRead() {
+ if (!clientCtx.getConf().batchReadEnabled) {
+ return true;
+ }
+ if (notSupportBatch) {
+ return true;
+ }
+ LedgerMetadata ledgerMetadata = getLedgerMetadata();
+ return ledgerMetadata.getEnsembleSize() !=
ledgerMetadata.getWriteQuorumSize();
+ }
+
+ private CompletableFuture<LedgerEntries>
batchReadEntriesInternalAsync(long startEntry, int maxCount, long maxSize,
+ boolean isRecoveryRead) {
+ int nettyMaxFrameSizeBytes =
clientCtx.getConf().nettyMaxFrameSizeBytes;
+ if (maxSize > nettyMaxFrameSizeBytes) {
+ LOG.info(
+ "The max size is greater than nettyMaxFrameSizeBytes, use
nettyMaxFrameSizeBytes:{} to replace it.",
+ nettyMaxFrameSizeBytes);
+ maxSize = nettyMaxFrameSizeBytes;
+ }
+ if (maxSize <= 0) {
+ LOG.info("The max size is negative, use nettyMaxFrameSizeBytes:{}
to replace it.", nettyMaxFrameSizeBytes);
+ maxSize = nettyMaxFrameSizeBytes;
+ }
+ BatchedReadOp op = new BatchedReadOp(this, clientCtx,
+ startEntry, maxCount, maxSize, isRecoveryRead);
+ if (!clientCtx.isClientClosed()) {
+ // Waiting on the first one.
+ // This is not very helpful if there are multiple ensembles or if
bookie goes into unresponsive
+ // state later after N requests sent.
+ // Unfortunately it seems that alternatives are:
+ // - send reads one-by-one (up to the app)
+ // - rework LedgerHandle to send requests one-by-one (maybe later,
potential perf impact)
+ // - block worker pool (not good)
+ // Even with this implementation one should be more concerned
about OOME when all read responses arrive
+ // or about overloading bookies with these requests then about
submission of many small requests.
+ // Naturally one of the solutions would be to submit smaller
batches and in this case
+ // current implementation will prevent next batch from starting
when bookie is
+ // unresponsive thus helpful enough.
+ if (clientCtx.getConf().waitForWriteSetMs >= 0) {
+ DistributionSchedule.WriteSet ws =
distributionSchedule.getWriteSet(startEntry);
+ try {
+ if (!waitForWritable(ws, ws.size() - 1,
clientCtx.getConf().waitForWriteSetMs)) {
+ op.allowFailFastOnUnwritableChannel();
+ }
+ } finally {
+ ws.recycle();
+ }
+ }
+
+ if (isHandleWritable()) {
+ // Ledger handle in read/write mode: submit to OSE for ordered
execution.
+ executeOrdered(op);
+ } else {
+ // Read-only ledger handle: bypass OSE and execute read
directly in client thread.
+ // This avoids a context-switch to OSE thread and thus reduces
latency.
+ op.run();
+ }
+ } else {
+
op.future().completeExceptionally(BKException.create(ClientClosedException));
+ }
+ return op.future();
+ }
+
/**
* Read a sequence of entries asynchronously, allowing to read after the
LastAddConfirmed range.
* <br>This is the same of
@@ -829,6 +1074,40 @@ public class LedgerHandle implements WriteHandle {
}
}
+ void asyncBatchReadEntriesInternal(long startEntry, int maxCount, long
maxSize, ReadCallback cb,
+ Object ctx, boolean isRecoveryRead) {
+ if (!clientCtx.isClientClosed()) {
+ batchReadEntriesInternalAsync(startEntry, maxCount, maxSize,
isRecoveryRead)
+ .whenCompleteAsync(new
FutureEventListener<LedgerEntries>() {
+ @Override
+ public void onSuccess(LedgerEntries entries) {
+ cb.readComplete(
+ Code.OK,
+ LedgerHandle.this,
+ IteratorUtils.asEnumeration(
+
Iterators.transform(entries.iterator(), le -> {
+ LedgerEntry entry = new
LedgerEntry((LedgerEntryImpl) le);
+ le.close();
+ return entry;
+ })),
+ ctx);
+ }
+
+ @Override
+ public void onFailure(Throwable cause) {
+ if (cause instanceof BKException) {
+ BKException bke = (BKException) cause;
+ cb.readComplete(bke.getCode(),
LedgerHandle.this, null, ctx);
+ } else {
+
cb.readComplete(Code.UnexpectedConditionException, LedgerHandle.this, null,
ctx);
+ }
+ }
+ }, clientCtx.getMainWorkerPool().chooseThread(ledgerId));
+ } else {
+ cb.readComplete(Code.ClientClosedException, LedgerHandle.this,
null, ctx);
+ }
+ }
+
/*
* Read the last entry in the ledger
*
diff --git
a/bookkeeper-server/src/main/java/org/apache/bookkeeper/client/api/ReadHandle.java
b/bookkeeper-server/src/main/java/org/apache/bookkeeper/client/api/ReadHandle.java
index d5e906d17d..e9bcddd0b3 100644
---
a/bookkeeper-server/src/main/java/org/apache/bookkeeper/client/api/ReadHandle.java
+++
b/bookkeeper-server/src/main/java/org/apache/bookkeeper/client/api/ReadHandle.java
@@ -45,6 +45,23 @@ public interface ReadHandle extends Handle {
*/
CompletableFuture<LedgerEntries> readAsync(long firstEntry, long
lastEntry);
+ /**
+ * Read a sequence of entries asynchronously.
+ *
+ * @param startEntry
+ * start entry id
+ * @param maxCount
+ * the total entries count.
+ * @param maxSize
+ * the total entries size.
+ * @return an handle to the result of the operation
+ */
+ default CompletableFuture<LedgerEntries> batchReadAsync(long startEntry,
int maxCount, long maxSize) {
+ CompletableFuture<LedgerEntries> future = new CompletableFuture<>();
+ future.completeExceptionally(new UnsupportedOperationException());
+ return future;
+ }
+
/**
* Read a sequence of entries synchronously.
*
@@ -59,6 +76,21 @@ public interface ReadHandle extends Handle {
BKException.HANDLER);
}
+ /**
+ *
+ * @param startEntry
+ * start entry id
+ * @param maxCount
+ * the total entries count.
+ * @param maxSize
+ * the total entries size.
+ * @return the result of the operation
+ */
+ default LedgerEntries batchRead(long startEntry, int maxCount, long
maxSize)
+ throws BKException, InterruptedException {
+ return FutureUtils.result(batchReadAsync(startEntry, maxCount,
maxSize), BKException.HANDLER);
+ }
+
/**
* Read a sequence of entries asynchronously, allowing to read after the
LastAddConfirmed range.
* <br>This is the same of
diff --git
a/bookkeeper-server/src/main/java/org/apache/bookkeeper/conf/ClientConfiguration.java
b/bookkeeper-server/src/main/java/org/apache/bookkeeper/conf/ClientConfiguration.java
index 66dc160fd5..924dee4ada 100644
---
a/bookkeeper-server/src/main/java/org/apache/bookkeeper/conf/ClientConfiguration.java
+++
b/bookkeeper-server/src/main/java/org/apache/bookkeeper/conf/ClientConfiguration.java
@@ -202,6 +202,9 @@ public class ClientConfiguration extends
AbstractConfiguration<ClientConfigurati
protected static final String
CLIENT_CONNECT_BOOKIE_UNAVAILABLE_LOG_THROTTLING =
"clientConnectBookieUnavailableLogThrottling";
+ //For batch read api, it the batch read is not stable, we can fail back to
single read by this config.
+ protected static final String BATCH_READ_ENABLED = "batchReadEnabled";
+
/**
* Construct a default client-side configuration.
*/
@@ -2077,6 +2080,10 @@ public class ClientConfiguration extends
AbstractConfiguration<ClientConfigurati
return getLong(CLIENT_CONNECT_BOOKIE_UNAVAILABLE_LOG_THROTTLING,
5_000L);
}
+ public boolean isBatchReadEnabled() {
+ return getBoolean(BATCH_READ_ENABLED, true);
+ }
+
@Override
protected ClientConfiguration getThis() {
return this;
diff --git
a/bookkeeper-server/src/main/java/org/apache/bookkeeper/proto/BatchedReadEntryProcessor.java
b/bookkeeper-server/src/main/java/org/apache/bookkeeper/proto/BatchedReadEntryProcessor.java
index 700952042f..6db3e14351 100644
---
a/bookkeeper-server/src/main/java/org/apache/bookkeeper/proto/BatchedReadEntryProcessor.java
+++
b/bookkeeper-server/src/main/java/org/apache/bookkeeper/proto/BatchedReadEntryProcessor.java
@@ -85,6 +85,13 @@ public class BatchedReadEntryProcessor extends
ReadEntryProcessor {
return ResponseBuilder.buildBatchedReadResponse((ByteBufList) data,
(BatchedReadRequest) request);
}
+ @Override
+ public String toString() {
+ BatchedReadRequest br = (BatchedReadRequest) request;
+ return String.format("BatchedReadEntry(%d, %d %d, %d)",
br.getLedgerId(), br.getEntryId(), br.getMaxCount(),
+ br.getMaxSize());
+ }
+
protected void recycle() {
request.recycle();
super.reset();
diff --git
a/bookkeeper-server/src/test/java/org/apache/bookkeeper/client/TestBatchedRead.java
b/bookkeeper-server/src/test/java/org/apache/bookkeeper/client/TestBatchedRead.java
new file mode 100644
index 0000000000..1bb95ed047
--- /dev/null
+++
b/bookkeeper-server/src/test/java/org/apache/bookkeeper/client/TestBatchedRead.java
@@ -0,0 +1,292 @@
+/*
+ *
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ *
+ */
+package org.apache.bookkeeper.client;
+
+import static org.apache.bookkeeper.common.concurrent.FutureUtils.result;
+import static org.junit.Assert.assertEquals;
+import static org.junit.Assert.assertFalse;
+import static org.junit.Assert.assertNotNull;
+import static org.junit.Assert.assertTrue;
+import static org.junit.Assert.fail;
+
+import java.util.Iterator;
+import java.util.List;
+import java.util.concurrent.CompletableFuture;
+import java.util.concurrent.CountDownLatch;
+import org.apache.bookkeeper.client.BKException.Code;
+import org.apache.bookkeeper.client.BookKeeper.DigestType;
+import org.apache.bookkeeper.client.api.LedgerEntry;
+import org.apache.bookkeeper.conf.ClientConfiguration;
+import org.apache.bookkeeper.net.BookieId;
+import org.apache.bookkeeper.test.BookKeeperClusterTestCase;
+import org.junit.Test;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+/**
+ * Unit tests for batch reading.
+ */
+public class TestBatchedRead extends BookKeeperClusterTestCase {
+
+ private static final Logger LOG =
LoggerFactory.getLogger(TestBatchedRead.class);
+
+ final DigestType digestType;
+ final byte[] passwd = "sequence-read".getBytes();
+
+ public TestBatchedRead() {
+ super(6);
+ baseClientConf.setUseV2WireProtocol(true);
+ this.digestType = DigestType.CRC32;
+ }
+
+ long getLedgerToRead(int ensemble, int writeQuorum, int ackQuorum, int
numEntries)
+ throws Exception {
+ LedgerHandle lh = bkc.createLedger(ensemble, writeQuorum, ackQuorum,
digestType, passwd);
+ for (int i = 0; i < numEntries; i++) {
+ lh.addEntry(("" + i).getBytes());
+ }
+ lh.close();
+ return lh.getId();
+ }
+
+ BatchedReadOp createReadOp(LedgerHandle lh, long startEntry, int count) {
+ return new BatchedReadOp(lh, bkc.getClientCtx(), startEntry, count,
1024 * count, false);
+ }
+
+ BatchedReadOp createRecoveryReadOp(LedgerHandle lh, long startEntry, int
count) {
+ return new BatchedReadOp(lh, bkc.getClientCtx(), startEntry, count,
1024 * count, true);
+ }
+
+ @Test
+ public void testNormalRead() throws Exception {
+ int numEntries = 10;
+ long id = getLedgerToRead(5, 5, 2, numEntries);
+ LedgerHandle lh = bkc.openLedger(id, digestType, passwd);
+
+ //read single entry
+ for (int i = 0; i < numEntries; i++) {
+ BatchedReadOp readOp = createReadOp(lh, i, 1);
+ readOp.submit();
+ Iterator<LedgerEntry> entries = readOp.future().get().iterator();
+ assertTrue(entries.hasNext());
+ LedgerEntry entry = entries.next();
+ assertNotNull(entry);
+ assertEquals(i, Integer.parseInt(new
String(entry.getEntryBytes())));
+ entry.close();
+ assertFalse(entries.hasNext());
+ }
+
+ // read multiple entries
+ BatchedReadOp readOp = createReadOp(lh, 0, numEntries);
+ readOp.submit();
+ Iterator<LedgerEntry> iterator = readOp.future().get().iterator();
+
+ int numReads = 0;
+ while (iterator.hasNext()) {
+ LedgerEntry entry = iterator.next();
+ assertNotNull(entry);
+ assertEquals(numReads, Integer.parseInt(new
String(entry.getEntryBytes())));
+ entry.close();
+ ++numReads;
+ }
+ assertEquals(numEntries, numReads);
+ lh.close();
+ }
+
+ @Test
+ public void testReadWhenEnsembleNotEqualWQ() throws Exception {
+ int numEntries = 10;
+ long id = getLedgerToRead(5, 2, 2, numEntries);
+ LedgerHandle lh = bkc.openLedger(id, digestType, passwd);
+
+ //read single entry
+ for (int i = 0; i < numEntries; i++) {
+ BatchedReadOp readOp = createReadOp(lh, i, 1);
+ readOp.submit();
+ Iterator<LedgerEntry> entries = readOp.future().get().iterator();
+ assertTrue(entries.hasNext());
+ LedgerEntry entry = entries.next();
+ assertNotNull(entry);
+ assertEquals(i, Integer.parseInt(new
String(entry.getEntryBytes())));
+ entry.close();
+ assertFalse(entries.hasNext());
+ }
+
+ // read multiple entries, because the ensemble is not equals with
write quorum, the return entries
+ // will less than max count.
+ for (int i = 0; i < numEntries; i++) {
+ BatchedReadOp readOp = createReadOp(lh, i, numEntries);
+ readOp.submit();
+ Iterator<LedgerEntry> entries = readOp.future().get().iterator();
+ assertTrue(entries.hasNext());
+ LedgerEntry entry = entries.next();
+ assertNotNull(entry);
+ assertEquals(i, Integer.parseInt(new
String(entry.getEntryBytes())));
+ entry.close();
+ assertFalse(entries.hasNext());
+ }
+ lh.close();
+ }
+
+ private static <T> void expectFail(CompletableFuture<T> future, int
expectedRc) {
+ try {
+ result(future);
+ fail("Expect to fail");
+ } catch (Exception e) {
+ assertTrue(e instanceof BKException);
+ BKException bke = (BKException) e;
+ assertEquals(expectedRc, bke.getCode());
+ }
+ }
+
+ @Test
+ public void testReadMissingEntries() throws Exception {
+ int numEntries = 10;
+
+ long id = getLedgerToRead(5, 5, 2, numEntries);
+ LedgerHandle lh = bkc.openLedger(id, digestType, passwd);
+
+ // read single entry
+ BatchedReadOp readOp = createReadOp(lh, 10, 1);
+ readOp.submit();
+ expectFail(readOp.future(), Code.NoSuchEntryException);
+
+ // read multiple entries
+ readOp = createReadOp(lh, 8, 3);
+ readOp.submit();
+
+ int index = 8;
+ int numReads = 0;
+ Iterator<LedgerEntry> iterator = readOp.future().get().iterator();
+ while (iterator.hasNext()) {
+ LedgerEntry entry = iterator.next();
+ assertNotNull(entry);
+ assertEquals(index, Integer.parseInt(new
String(entry.getEntryBytes())));
+ entry.close();
+ ++index;
+ ++numReads;
+ }
+ assertEquals(2, numReads);
+ lh.close();
+ }
+
+ @Test
+ public void testFailRecoveryReadMissingEntryImmediately() throws Exception
{
+ int numEntries = 1;
+
+ long id = getLedgerToRead(5, 5, 3, numEntries);
+
+ ClientConfiguration newConf = new ClientConfiguration()
+ .setReadEntryTimeout(30000);
+ newConf.setMetadataServiceUri(zkUtil.getMetadataServiceUri());
+ BookKeeper newBk = new BookKeeper(newConf);
+
+ LedgerHandle lh = bkc.openLedger(id, digestType, passwd);
+
+ List<BookieId> ensemble = lh.getLedgerMetadata().getEnsembleAt(10);
+ CountDownLatch latch1 = new CountDownLatch(1);
+ CountDownLatch latch2 = new CountDownLatch(1);
+ // sleep two bookie
+ sleepBookie(ensemble.get(0), latch1);
+ sleepBookie(ensemble.get(1), latch2);
+
+ BatchedReadOp readOp = createRecoveryReadOp(lh, 10, 1);
+ readOp.submit();
+ // would fail immediately if found missing entries don't cover ack
quorum
+ expectFail(readOp.future(), Code.NoSuchEntryException);
+ latch1.countDown();
+ latch2.countDown();
+
+ lh.close();
+ newBk.close();
+ }
+
+ @Test
+ public void testReadWithFailedBookies() throws Exception {
+ int numEntries = 10;
+
+ long id = getLedgerToRead(5, 3, 3, numEntries);
+
+ ClientConfiguration newConf = new ClientConfiguration()
+ .setReadEntryTimeout(30000);
+ newConf.setMetadataServiceUri(zkUtil.getMetadataServiceUri());
+ BookKeeper newBk = new BookKeeper(newConf);
+
+ LedgerHandle lh = bkc.openLedger(id, digestType, passwd);
+
+ List<BookieId> ensemble = lh.getLedgerMetadata().getEnsembleAt(5);
+ // kill two bookies
+ killBookie(ensemble.get(0));
+ killBookie(ensemble.get(1));
+
+ // read multiple entries, because the ensemble is not equals with
write quorum, the return entries
+ // will less than max count.
+ int numReads = 0;
+ for (int i = 0; i < numEntries;) {
+ BatchedReadOp readOp = createReadOp(lh, i, numEntries);
+ readOp.submit();
+ Iterator<LedgerEntry> entries = readOp.future().get().iterator();
+ if (!entries.hasNext()) {
+ i++;
+ continue;
+ }
+ while (entries.hasNext()) {
+ LedgerEntry entry = entries.next();
+ assertNotNull(entry);
+ assertEquals(i, Integer.parseInt(new
String(entry.getEntryBytes())));
+ entry.close();
+ i++;
+ numReads++;
+ }
+ }
+ assertEquals(10, numReads);
+ lh.close();
+ newBk.close();
+ }
+
+ @Test
+ public void testReadFailureWithFailedBookies() throws Exception {
+ int numEntries = 10;
+
+ long id = getLedgerToRead(5, 3, 3, numEntries);
+
+ ClientConfiguration newConf = new ClientConfiguration()
+ .setReadEntryTimeout(30000);
+ newConf.setMetadataServiceUri(zkUtil.getMetadataServiceUri());
+ BookKeeper newBk = new BookKeeper(newConf);
+
+ LedgerHandle lh = bkc.openLedger(id, digestType, passwd);
+
+ List<BookieId> ensemble = lh.getLedgerMetadata().getEnsembleAt(5);
+ // kill two bookies
+ killBookie(ensemble.get(0));
+ killBookie(ensemble.get(1));
+ killBookie(ensemble.get(2));
+
+ // read multiple entries
+ BatchedReadOp readOp = createReadOp(lh, 0, numEntries);
+ readOp.submit();
+ expectFail(readOp.future(), Code.BookieHandleNotAvailableException);
+
+ lh.close();
+ newBk.close();
+ }
+}
diff --git
a/bookkeeper-server/src/test/java/org/apache/bookkeeper/client/TestSpeculativeBatchRead.java
b/bookkeeper-server/src/test/java/org/apache/bookkeeper/client/TestSpeculativeBatchRead.java
new file mode 100644
index 0000000000..3bf5e2d5e4
--- /dev/null
+++
b/bookkeeper-server/src/test/java/org/apache/bookkeeper/client/TestSpeculativeBatchRead.java
@@ -0,0 +1,401 @@
+/*
+ *
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ *
+ */
+package org.apache.bookkeeper.client;
+
+import static org.apache.bookkeeper.client.BookKeeperClientStats.CLIENT_SCOPE;
+import static
org.apache.bookkeeper.client.BookKeeperClientStats.SPECULATIVE_READ_COUNT;
+import static org.junit.Assert.assertEquals;
+import static org.junit.Assert.assertFalse;
+import static org.junit.Assert.assertNotNull;
+import static org.junit.Assert.assertNull;
+import static org.junit.Assert.assertTrue;
+
+import java.util.BitSet;
+import java.util.Enumeration;
+import java.util.HashSet;
+import java.util.List;
+import java.util.Set;
+import java.util.concurrent.CountDownLatch;
+import java.util.concurrent.TimeUnit;
+import org.apache.bookkeeper.bookie.LocalBookieEnsemblePlacementPolicy;
+import org.apache.bookkeeper.client.AsyncCallback.ReadCallback;
+import org.apache.bookkeeper.client.BookKeeper.DigestType;
+import org.apache.bookkeeper.conf.ClientConfiguration;
+import org.apache.bookkeeper.net.BookieId;
+import org.apache.bookkeeper.test.BookKeeperClusterTestCase;
+import org.apache.bookkeeper.test.TestStatsProvider;
+import org.junit.Test;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+/**
+ * This unit test tests ledger fencing.
+ *
+ */
+public class TestSpeculativeBatchRead extends BookKeeperClusterTestCase {
+ private static final Logger LOG =
LoggerFactory.getLogger(TestSpeculativeBatchRead.class);
+
+ private final DigestType digestType;
+ byte[] passwd = "specPW".getBytes();
+
+ public TestSpeculativeBatchRead() {
+ super(10);
+ this.digestType = DigestType.CRC32;
+ }
+
+ long getLedgerToRead(int ensemble, int quorum) throws Exception {
+ byte[] data = "Data for test".getBytes();
+ LedgerHandle l = bkc.createLedger(ensemble, quorum, digestType,
passwd);
+ for (int i = 0; i < 10; i++) {
+ l.addEntry(data);
+ }
+ l.close();
+
+ return l.getId();
+ }
+
+ @SuppressWarnings("deprecation")
+ BookKeeperTestClient createClient(int specTimeout) throws Exception {
+ ClientConfiguration conf = new ClientConfiguration()
+ .setSpeculativeReadTimeout(specTimeout)
+ .setReadTimeout(30000)
+ .setUseV2WireProtocol(true)
+ .setReorderReadSequenceEnabled(true)
+ .setEnsemblePlacementPolicySlowBookies(true)
+ .setMetadataServiceUri(zkUtil.getMetadataServiceUri());
+ return new BookKeeperTestClient(conf, new TestStatsProvider());
+ }
+
+ class LatchCallback implements ReadCallback {
+ CountDownLatch l = new CountDownLatch(1);
+ boolean success = false;
+ long startMillis = System.currentTimeMillis();
+ long endMillis = Long.MAX_VALUE;
+
+ public void readComplete(int rc,
+ LedgerHandle lh,
+ Enumeration<LedgerEntry> seq,
+ Object ctx) {
+ endMillis = System.currentTimeMillis();
+ if (LOG.isDebugEnabled()) {
+ LOG.debug("Got response {} {}", rc, getDuration());
+ }
+ success = rc == BKException.Code.OK;
+ l.countDown();
+ }
+
+ long getDuration() {
+ return endMillis - startMillis;
+ }
+
+ void expectSuccess(int milliseconds) throws Exception {
+ boolean await = l.await(milliseconds, TimeUnit.MILLISECONDS);
+ System.out.println(await);
+ }
+
+ void expectFail(int milliseconds) throws Exception {
+ assertTrue(l.await(milliseconds, TimeUnit.MILLISECONDS));
+ assertFalse(success);
+ }
+
+ void expectTimeout(int milliseconds) throws Exception {
+ assertFalse(l.await(milliseconds, TimeUnit.MILLISECONDS));
+ }
+ }
+
+ /**
+ * Test basic speculative functionality.
+ * - Create 2 clients with read timeout disabled, one with spec
+ * read enabled, the other not.
+ * - create ledger
+ * - sleep second bookie in ensemble
+ * - read first entry, both should find on first bookie.
+ * - read second bookie, spec client should find on bookie three,
+ * non spec client should hang.
+ */
+ @Test
+ public void testSpeculativeRead() throws Exception {
+ long id = getLedgerToRead(3, 2);
+ BookKeeperTestClient bknospec = createClient(0); // disabled
+ BookKeeperTestClient bkspec = createClient(2000);
+
+ LedgerHandle lnospec = bknospec.openLedger(id, digestType, passwd);
+ LedgerHandle lspec = bkspec.openLedger(id, digestType, passwd);
+
+ // sleep second bookie
+ CountDownLatch sleepLatch = new CountDownLatch(1);
+ BookieId second =
lnospec.getLedgerMetadata().getAllEnsembles().get(0L).get(1);
+ sleepBookie(second, sleepLatch);
+
+ try {
+ // read first entry, both go to first bookie, should be fine
+ LatchCallback nospeccb = new LatchCallback();
+ LatchCallback speccb = new LatchCallback();
+ lnospec.asyncBatchReadEntries(0, 1, 1024, nospeccb, null);
+ lspec.asyncBatchReadEntries(0, 1, 1024, speccb, null);
+ nospeccb.expectSuccess(2000);
+ speccb.expectSuccess(2000);
+
+ // read second entry, both look for second book, spec read client
+ // tries third bookie, nonspec client hangs as read timeout is
very long.
+ nospeccb = new LatchCallback();
+ speccb = new LatchCallback();
+ lnospec.asyncReadEntries(1, 1, nospeccb, null);
+ lspec.asyncReadEntries(1, 1, speccb, null);
+ speccb.expectSuccess(4000);
+ nospeccb.expectTimeout(4000);
+ // Check that the second bookie is registered as slow at entryId 1
+ RackawareEnsemblePlacementPolicy rep =
(RackawareEnsemblePlacementPolicy) bkspec.getPlacementPolicy();
+ assertTrue(rep.slowBookies.asMap().size() == 1);
+
+ assertTrue(
+ "Stats should not reflect speculative reads if disabled",
+ bknospec.getTestStatsProvider()
+ .getCounter(CLIENT_SCOPE + "." +
SPECULATIVE_READ_COUNT).get() == 0);
+ assertTrue(
+ "Stats should reflect speculative reads",
+ bkspec.getTestStatsProvider()
+ .getCounter(CLIENT_SCOPE + "." +
SPECULATIVE_READ_COUNT).get() > 0);
+ } finally {
+ sleepLatch.countDown();
+ lspec.close();
+ lnospec.close();
+ bkspec.close();
+ bknospec.close();
+ }
+ }
+
+ /**
+ * Test that if more than one replica is down, we can still read, as long
as the quorum
+ * size is larger than the number of down replicas.
+ */
+ @Test
+ public void testSpeculativeReadMultipleReplicasDown() throws Exception {
+ long id = getLedgerToRead(5, 5);
+ int timeout = 5000;
+ BookKeeper bkspec = createClient(timeout);
+
+ LedgerHandle l = bkspec.openLedger(id, digestType, passwd);
+
+ // sleep bookie 1, 2 & 4
+ CountDownLatch sleepLatch = new CountDownLatch(1);
+ sleepBookie(l.getLedgerMetadata().getAllEnsembles().get(0L).get(1),
sleepLatch);
+ sleepBookie(l.getLedgerMetadata().getAllEnsembles().get(0L).get(2),
sleepLatch);
+ sleepBookie(l.getLedgerMetadata().getAllEnsembles().get(0L).get(4),
sleepLatch);
+
+ try {
+ // read first entry, should complete faster than timeout
+ // as bookie 0 has the entry
+ LatchCallback latch0 = new LatchCallback();
+ l.asyncBatchReadEntries(0, 1, 1024, latch0, null);
+ latch0.expectSuccess(timeout / 2);
+
+ // second should have to hit two timeouts (bookie 1 & 2)
+ // bookie 3 has the entry
+ LatchCallback latch1 = new LatchCallback();
+ l.asyncBatchReadEntries(1, 1, 1024, latch1, null);
+ latch1.expectTimeout(timeout);
+ latch1.expectSuccess(timeout * 2);
+ LOG.info("Timeout {} latch1 duration {}", timeout,
latch1.getDuration());
+ assertTrue("should have taken longer than two timeouts, but less
than 3",
+ latch1.getDuration() >= timeout * 2
+ && latch1.getDuration() < timeout * 3);
+
+ // bookies 1 & 2 should be registered as slow bookies because of
speculative reads
+ Set<BookieId> expectedSlowBookies = new HashSet<>();
+
expectedSlowBookies.add(l.getLedgerMetadata().getAllEnsembles().get(0L).get(1));
+
expectedSlowBookies.add(l.getLedgerMetadata().getAllEnsembles().get(0L).get(2));
+ assertEquals(((RackawareEnsemblePlacementPolicy)
bkspec.getPlacementPolicy()).slowBookies.asMap().keySet(),
+ expectedSlowBookies);
+
+ // third should not hit timeouts since bookies 1 & 2 are
registered as slow
+ // bookie 3 has the entry
+ LatchCallback latch2 = new LatchCallback();
+ l.asyncBatchReadEntries(2, 1, 1024, latch2, null);
+ latch2.expectSuccess(timeout);
+
+ // fourth should have no timeout
+ // bookie 3 has the entry
+ LatchCallback latch3 = new LatchCallback();
+ l.asyncBatchReadEntries(3, 1, 1024, latch3, null);
+ latch3.expectSuccess(timeout / 2);
+
+ // fifth should hit one timeout, (bookie 4)
+ // bookie 0 has the entry
+ LatchCallback latch4 = new LatchCallback();
+ l.asyncBatchReadEntries(4, 1, 1024, latch4, null);
+ latch4.expectTimeout(timeout / 2);
+ latch4.expectSuccess(timeout);
+ LOG.info("Timeout {} latch4 duration {}", timeout,
latch4.getDuration());
+ assertTrue("should have taken longer than one timeout, but less
than 2",
+ latch4.getDuration() >= timeout
+ && latch4.getDuration() < timeout * 2);
+ } finally {
+ sleepLatch.countDown();
+ l.close();
+ bkspec.close();
+ }
+ }
+
+ /**
+ * Test that if after a speculative read is kicked off, the original read
completes
+ * nothing bad happens.
+ */
+ @Test
+ public void testSpeculativeReadFirstReadCompleteIsOk() throws Exception {
+ long id = getLedgerToRead(2, 2);
+ int timeout = 1000;
+ BookKeeper bkspec = createClient(timeout);
+
+ LedgerHandle l = bkspec.openLedger(id, digestType, passwd);
+
+ // sleep bookies
+ CountDownLatch sleepLatch0 = new CountDownLatch(1);
+ CountDownLatch sleepLatch1 = new CountDownLatch(1);
+ sleepBookie(l.getLedgerMetadata().getAllEnsembles().get(0L).get(0),
sleepLatch0);
+ sleepBookie(l.getLedgerMetadata().getAllEnsembles().get(0L).get(1),
sleepLatch1);
+
+ try {
+ // read goes to first bookie, spec read timeout occurs,
+ // goes to second
+ LatchCallback latch0 = new LatchCallback();
+ l.asyncBatchReadEntries(0, 1, 1024, latch0, null);
+ latch0.expectTimeout(timeout);
+
+ // wake up first bookie
+ sleepLatch0.countDown();
+ latch0.expectSuccess(timeout / 2);
+
+ sleepLatch1.countDown();
+
+ // check we can read next entry without issue
+ LatchCallback latch1 = new LatchCallback();
+ l.asyncBatchReadEntries(1, 1, 1024, latch1, null);
+ latch1.expectSuccess(timeout / 2);
+ } finally {
+ sleepLatch0.countDown();
+ sleepLatch1.countDown();
+ l.close();
+ bkspec.close();
+ }
+ }
+
+ /**
+ * Unit test to check if the scheduled speculative task gets cancelled
+ * on successful read.
+ */
+ @Test
+ public void testSpeculativeReadScheduledTaskCancel() throws Exception {
+ long id = getLedgerToRead(3, 2);
+ int timeout = 1000;
+ BookKeeper bkspec = createClient(timeout);
+ LedgerHandle l = bkspec.openLedger(id, digestType, passwd);
+ BatchedReadOp op = null;
+ try {
+ op = new BatchedReadOp(l, bkspec.getClientCtx(), 0, 5, 5120,
false);
+ op.initiate();
+ op.future().get();
+ } finally {
+ assertNull("Speculative Read tasks must be null",
op.getSpeculativeTask());
+ }
+ }
+
+ /**
+ * Unit test for the speculative read scheduling method.
+ */
+ @Test
+ public void testSpeculativeReadScheduling() throws Exception {
+ long id = getLedgerToRead(3, 2);
+ int timeout = 1000;
+ BookKeeper bkspec = createClient(timeout);
+
+ LedgerHandle l = bkspec.openLedger(id, digestType, passwd);
+
+ List<BookieId> ensemble =
l.getLedgerMetadata().getAllEnsembles().get(0L);
+ BitSet allHosts = new BitSet(ensemble.size());
+ for (int i = 0; i < ensemble.size(); i++) {
+ allHosts.set(i, true);
+ }
+ BitSet noHost = new BitSet(ensemble.size());
+ BitSet secondHostOnly = new BitSet(ensemble.size());
+ secondHostOnly.set(1, true);
+ BatchedReadOp.LedgerEntryRequest req0 = null, req2 = null, req4 = null;
+ try {
+ BatchedReadOp op = new BatchedReadOp(l, bkspec.getClientCtx(), 0,
5, 5120, false);
+ // if we've already heard from all hosts,
+ // we only send the initial read
+ req0 = op.new SequenceReadRequest(ensemble, l.getId(), 0, 1, 1024);
+ assertTrue("Should have sent to first",
+
req0.maybeSendSpeculativeRead(allHosts).equals(ensemble.get(0)));
+ assertNull("Should not have sent another",
+ req0.maybeSendSpeculativeRead(allHosts));
+
+ // if we have heard from some hosts, but not one we have sent to
+ // send again
+ req2 = op.new SequenceReadRequest(ensemble, l.getId(), 2, 1, 1024);
+ assertTrue("Should have sent to third",
+
req2.maybeSendSpeculativeRead(noHost).equals(ensemble.get(2)));
+ assertTrue("Should have sent to first",
+
req2.maybeSendSpeculativeRead(secondHostOnly).equals(ensemble.get(0)));
+
+ // if we have heard from some hosts, which includes one we sent to
+ // do not read again
+ req4 = op.new SequenceReadRequest(ensemble, l.getId(), 4, 1, 1024);
+ assertTrue("Should have sent to second",
+
req4.maybeSendSpeculativeRead(noHost).equals(ensemble.get(1)));
+ assertNull("Should not have sent another",
+ req4.maybeSendSpeculativeRead(secondHostOnly));
+ } finally {
+ for (BatchedReadOp.LedgerEntryRequest req
+ : new BatchedReadOp.LedgerEntryRequest[] { req0, req2,
req4 }) {
+ if (req != null) {
+ int i = 0;
+ while (!req.isComplete()) {
+ if (i++ > 10) {
+ break; // wait for up to 10 seconds
+ }
+ Thread.sleep(1000);
+ }
+ assertTrue("Request should be done", req.isComplete());
+ }
+ }
+
+ l.close();
+ bkspec.close();
+ }
+ }
+
+ @Test
+ public void testSequenceReadLocalEnsemble() throws Exception {
+ ClientConfiguration conf = new ClientConfiguration()
+ .setSpeculativeReadTimeout(1000)
+
.setEnsemblePlacementPolicy(LocalBookieEnsemblePlacementPolicy.class)
+ .setReorderReadSequenceEnabled(true)
+ .setEnsemblePlacementPolicySlowBookies(true)
+ .setMetadataServiceUri(zkUtil.getMetadataServiceUri());
+ BookKeeper bkspec = new BookKeeperTestClient(conf, new
TestStatsProvider());
+ LedgerHandle l = bkspec.createLedger(1, 1, digestType, passwd);
+ List<BookieId> ensemble =
l.getLedgerMetadata().getAllEnsembles().get(0L);
+ BatchedReadOp op = new BatchedReadOp(l, bkspec.getClientCtx(), 0, 5,
5120, false);
+ BatchedReadOp.LedgerEntryRequest req0 = op.new
SequenceReadRequest(ensemble, l.getId(), 0, 1, 1024);
+ assertNotNull(req0.writeSet);
+ }
+}
diff --git
a/bookkeeper-server/src/test/java/org/apache/bookkeeper/test/BookieClientTest.java
b/bookkeeper-server/src/test/java/org/apache/bookkeeper/test/BookieClientTest.java
index 5b96c52a0b..60f89159a0 100644
---
a/bookkeeper-server/src/test/java/org/apache/bookkeeper/test/BookieClientTest.java
+++
b/bookkeeper-server/src/test/java/org/apache/bookkeeper/test/BookieClientTest.java
@@ -359,7 +359,7 @@ public class BookieClientTest {
}
@Test
- public void testBatchedRead() throws Exception {
+ public void testBatchRead() throws Exception {
ClientConfiguration conf = new ClientConfiguration();
conf.setUseV2WireProtocol(true);
BookieClient bc = new BookieClientImpl(conf, eventLoopGroup,