This is an automated email from the ASF dual-hosted git repository.

chenhang pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/bookkeeper.git


The following commit(s) were added to refs/heads/master by this push:
     new 22097343ce [BP-62] LedgerHandle introduces batch read API. (#4195)
22097343ce is described below

commit 22097343ceed1312964ad077cdcd9fbd7e3e026b
Author: Yan Zhao <[email protected]>
AuthorDate: Mon Feb 5 16:58:38 2024 +0800

    [BP-62] LedgerHandle introduces batch read API. (#4195)
    
    ### Motivation
    This is the fifth PR for the batch 
read(https://github.com/apache/bookkeeper/pull/4051) feature.
    
    LedgerHandle introduces batch read API.
    
    This PR is based on #4190, please merge it firstly.
---
 .../bookkeeper/client/ClientInternalConf.java      |   6 +-
 .../org/apache/bookkeeper/client/LedgerHandle.java | 279 ++++++++++++++
 .../apache/bookkeeper/client/api/ReadHandle.java   |  32 ++
 .../bookkeeper/conf/ClientConfiguration.java       |   7 +
 .../proto/BatchedReadEntryProcessor.java           |   7 +
 .../apache/bookkeeper/client/TestBatchedRead.java  | 292 +++++++++++++++
 .../client/TestSpeculativeBatchRead.java           | 401 +++++++++++++++++++++
 .../apache/bookkeeper/test/BookieClientTest.java   |   2 +-
 8 files changed, 1023 insertions(+), 3 deletions(-)

diff --git 
a/bookkeeper-server/src/main/java/org/apache/bookkeeper/client/ClientInternalConf.java
 
b/bookkeeper-server/src/main/java/org/apache/bookkeeper/client/ClientInternalConf.java
index cff66a3cb3..fc83617cef 100644
--- 
a/bookkeeper-server/src/main/java/org/apache/bookkeeper/client/ClientInternalConf.java
+++ 
b/bookkeeper-server/src/main/java/org/apache/bookkeeper/client/ClientInternalConf.java
@@ -48,6 +48,8 @@ class ClientInternalConf {
     final boolean enableBookieFailureTracking;
     final boolean useV2WireProtocol;
     final boolean enforceMinNumFaultDomainsForWrite;
+    final boolean batchReadEnabled;
+    final int nettyMaxFrameSizeBytes;
 
     static ClientInternalConf defaultValues() {
         return fromConfig(new ClientConfiguration());
@@ -72,9 +74,9 @@ class ClientInternalConf {
         this.addEntryQuorumTimeoutNanos = 
TimeUnit.SECONDS.toNanos(conf.getAddEntryQuorumTimeout());
         this.throttleValue = conf.getThrottleValue();
         this.bookieFailureHistoryExpirationMSec = 
conf.getBookieFailureHistoryExpirationMSec();
-
+        this.batchReadEnabled = conf.isBatchReadEnabled();
+        this.nettyMaxFrameSizeBytes = conf.getNettyMaxFrameSizeBytes();
         this.disableEnsembleChangeFeature = 
featureProvider.getFeature(conf.getDisableEnsembleChangeFeatureName());
-
         this.delayEnsembleChange = conf.getDelayEnsembleChange();
         this.maxAllowedEnsembleChanges = conf.getMaxAllowedEnsembleChanges();
         this.timeoutMonitorIntervalSec = conf.getTimeoutMonitorIntervalSec();
diff --git 
a/bookkeeper-server/src/main/java/org/apache/bookkeeper/client/LedgerHandle.java
 
b/bookkeeper-server/src/main/java/org/apache/bookkeeper/client/LedgerHandle.java
index 9486b2e632..6a98af5503 100644
--- 
a/bookkeeper-server/src/main/java/org/apache/bookkeeper/client/LedgerHandle.java
+++ 
b/bookkeeper-server/src/main/java/org/apache/bookkeeper/client/LedgerHandle.java
@@ -103,6 +103,7 @@ public class LedgerHandle implements WriteHandle {
     final long ledgerId;
     final ExecutorService executor;
     long lastAddPushed;
+    boolean notSupportBatch;
 
     private enum HandleState {
         OPEN,
@@ -641,6 +642,26 @@ public class LedgerHandle implements WriteHandle {
         return SyncCallbackUtils.waitForResult(result);
     }
 
+    /**
+     * Read a sequence of entries synchronously.
+     *
+     * @param startEntry
+     *          start entry id
+     * @param maxCount
+     *          the total entries count.
+     * @param maxSize
+     *          the total entries size.
+     * @see #asyncBatchReadEntries(long, int, long, boolean, ReadCallback, 
Object)
+     */
+    public Enumeration<LedgerEntry> batchReadEntries(long startEntry, int 
maxCount, long maxSize)
+            throws InterruptedException, BKException {
+        CompletableFuture<Enumeration<LedgerEntry>> result = new 
CompletableFuture<>();
+
+        asyncBatchReadEntries(startEntry, maxCount, maxSize, new 
SyncReadCallback(result), null);
+
+        return SyncCallbackUtils.waitForResult(result);
+    }
+
     /**
      * Read a sequence of entries synchronously, allowing to read after the 
LastAddConfirmed range.<br>
      * This is the same of
@@ -664,6 +685,27 @@ public class LedgerHandle implements WriteHandle {
         return SyncCallbackUtils.waitForResult(result);
     }
 
+    /**
+     * Read a sequence of entries synchronously, allowing to read after the 
LastAddConfirmed range.<br>
+     * This is the same of
+     * {@link #asyncBatchReadUnconfirmedEntries(long, int, long, boolean, 
ReadCallback, Object) }
+     *
+     * @param firstEntry
+     *          id of first entry of sequence (included)
+     * @param maxCount
+     *          id of last entry of sequence (included)
+     * @param maxSize
+     *          the total entries size
+     */
+    public Enumeration<LedgerEntry> batchReadUnconfirmedEntries(long 
firstEntry, int maxCount, long maxSize)
+            throws InterruptedException, BKException {
+        CompletableFuture<Enumeration<LedgerEntry>> result = new 
CompletableFuture<>();
+
+        asyncBatchReadUnconfirmedEntries(firstEntry, maxCount, maxSize, new 
SyncReadCallback(result), null);
+
+        return SyncCallbackUtils.waitForResult(result);
+    }
+
     /**
      * Read a sequence of entries asynchronously.
      *
@@ -695,6 +737,50 @@ public class LedgerHandle implements WriteHandle {
         asyncReadEntriesInternal(firstEntry, lastEntry, cb, ctx, false);
     }
 
+    /**
+     * Read a sequence of entries in asynchronously.
+     * It send an RPC to get all entries instead of send multi RPC to get all 
entries.
+     *
+     * @param startEntry
+     *          id of first entry of sequence
+     * @param maxCount
+     *          the entries count
+     * @param maxSize
+     *          the total entries size
+     * @param cb
+     *          object implementing read callback interface
+     * @param ctx
+     *          control object
+     */
+    public void asyncBatchReadEntries(long startEntry, int maxCount, long 
maxSize, ReadCallback cb, Object ctx) {
+        // Little sanity check
+        if (startEntry > lastAddConfirmed) {
+            LOG.error("ReadEntries exception on ledgerId:{} firstEntry:{} 
lastAddConfirmed:{}",
+                    ledgerId, startEntry, lastAddConfirmed);
+            cb.readComplete(BKException.Code.ReadException, this, null, ctx);
+            return;
+        }
+        if (notSupportBatchRead()) {
+            long lastEntry = Math.min(startEntry + maxCount - 1, 
lastAddConfirmed);
+            asyncReadEntriesInternal(startEntry, lastEntry, cb, ctx, false);
+        } else {
+            asyncBatchReadEntriesInternal(startEntry, maxCount, maxSize, new 
ReadCallback() {
+                @Override
+                public void readComplete(int rc, LedgerHandle lh, 
Enumeration<LedgerEntry> seq, Object ctx) {
+                    //If the bookie server not support the batch read request, 
the bookie server will close the
+                    // connection, then get the 
BookieHandleNotAvailableException.
+                    if (rc == Code.BookieHandleNotAvailableException) {
+                        notSupportBatch = true;
+                        long lastEntry = Math.min(startEntry + maxCount - 1, 
lastAddConfirmed);
+                        asyncReadEntriesInternal(startEntry, lastEntry, cb, 
ctx, false);
+                    } else {
+                        cb.readComplete(rc, lh, seq, ctx);
+                    }
+                }
+            }, ctx, false);
+        }
+    }
+
     /**
      * Read a sequence of entries asynchronously, allowing to read after the 
LastAddConfirmed range.
      * <br>This is the same of
@@ -734,6 +820,48 @@ public class LedgerHandle implements WriteHandle {
         asyncReadEntriesInternal(firstEntry, lastEntry, cb, ctx, false);
     }
 
+    /**
+     * Read a sequence of entries asynchronously, allowing to read after the 
LastAddConfirmed range.
+     * It sends an RPC to get all entries instead of send multi RPC to get all 
entries.
+     * @param startEntry
+     *          id of first entry of sequence
+     * @param maxCount
+     *          the entries count
+     * @param maxSize
+     *          the total entries size
+     * @param cb
+     *          object implementing read callback interface
+     * @param ctx
+     *          control object
+     */
+    public void asyncBatchReadUnconfirmedEntries(long startEntry, int 
maxCount, long maxSize, ReadCallback cb,
+            Object ctx) {
+        // Little sanity check
+        if (startEntry < 0) {
+            LOG.error("IncorrectParameterException on ledgerId:{} 
firstEntry:{}", ledgerId, startEntry);
+            cb.readComplete(BKException.Code.IncorrectParameterException, 
this, null, ctx);
+        }
+        if (notSupportBatchRead()) {
+            long lastEntry = startEntry + maxCount - 1;
+            asyncReadEntriesInternal(startEntry, lastEntry, cb, ctx, false);
+        } else {
+            asyncBatchReadEntriesInternal(startEntry, maxCount, maxSize, new 
ReadCallback() {
+                @Override
+                public void readComplete(int rc, LedgerHandle lh, 
Enumeration<LedgerEntry> seq, Object ctx) {
+                    //If the bookie server not support the batch read request, 
the bookie server will close the
+                    // connection, then get the 
BookieHandleNotAvailableException.
+                    if (rc == Code.BookieHandleNotAvailableException) {
+                        notSupportBatch = true;
+                        long lastEntry = startEntry + maxCount - 1;
+                        asyncReadEntriesInternal(startEntry, lastEntry, cb, 
ctx, false);
+                    } else {
+                        cb.readComplete(rc, lh, seq, ctx);
+                    }
+                }
+            }, ctx, false);
+        }
+    }
+
     /**
      * Read a sequence of entries asynchronously.
      *
@@ -760,6 +888,123 @@ public class LedgerHandle implements WriteHandle {
         return readEntriesInternalAsync(firstEntry, lastEntry, false);
     }
 
+    /**
+     * Read a sequence of entries in asynchronously.
+     * It sends an RPC to get all entries instead of send multi RPC to get all 
entries.
+     *
+     * @param startEntry
+     *          id of first entry of sequence
+     * @param maxCount
+     *          the entries count
+     * @param maxSize
+     *          the total entries size
+     */
+    @Override
+    public CompletableFuture<LedgerEntries> batchReadAsync(long startEntry, 
int maxCount, long maxSize) {
+        // Little sanity check
+        if (startEntry < 0) {
+            LOG.error("IncorrectParameterException on ledgerId:{} 
firstEntry:{}", ledgerId, startEntry);
+            return FutureUtils.exception(new BKIncorrectParameterException());
+        }
+        if (startEntry > lastAddConfirmed) {
+            LOG.error("ReadAsync exception on ledgerId:{} firstEntry:{} 
lastAddConfirmed:{}",
+                    ledgerId, startEntry, lastAddConfirmed);
+            return FutureUtils.exception(new BKReadException());
+        }
+        if (notSupportBatchRead()) {
+            long lastEntry = Math.min(startEntry + maxCount - 1, 
lastAddConfirmed);
+            return readEntriesInternalAsync(startEntry, lastEntry, false);
+        }
+        CompletableFuture<LedgerEntries> future = new CompletableFuture<>();
+        batchReadEntriesInternalAsync(startEntry, maxCount, maxSize, false)
+                .whenComplete((entries, ex) -> {
+                    if (ex != null) {
+                        //If the bookie server not support the batch read 
request, the bookie server will close the
+                        // connection, then get the 
BookieHandleNotAvailableException.
+                        if (ex instanceof 
BKException.BKBookieHandleNotAvailableException) {
+                            notSupportBatch = true;
+                            long lastEntry = Math.min(startEntry + maxCount - 
1, lastAddConfirmed);
+                            readEntriesInternalAsync(startEntry, lastEntry, 
false).whenComplete((entries1, ex1) -> {
+                                if (ex1 != null) {
+                                    future.completeExceptionally(ex1);
+                                } else {
+                                    future.complete(entries1);
+                                }
+                            });
+                        } else {
+                            future.completeExceptionally(ex);
+                        }
+                    } else {
+                        future.complete(entries);
+                    }
+                });
+        return future;
+    }
+
+    private boolean notSupportBatchRead() {
+        if (!clientCtx.getConf().batchReadEnabled) {
+            return true;
+        }
+        if (notSupportBatch) {
+            return true;
+        }
+        LedgerMetadata ledgerMetadata = getLedgerMetadata();
+        return ledgerMetadata.getEnsembleSize() != 
ledgerMetadata.getWriteQuorumSize();
+    }
+
+    private CompletableFuture<LedgerEntries> 
batchReadEntriesInternalAsync(long startEntry, int maxCount, long maxSize,
+            boolean isRecoveryRead) {
+        int nettyMaxFrameSizeBytes = 
clientCtx.getConf().nettyMaxFrameSizeBytes;
+        if (maxSize > nettyMaxFrameSizeBytes) {
+            LOG.info(
+                "The max size is greater than nettyMaxFrameSizeBytes, use 
nettyMaxFrameSizeBytes:{} to replace it.",
+                nettyMaxFrameSizeBytes);
+            maxSize = nettyMaxFrameSizeBytes;
+        }
+        if (maxSize <= 0) {
+            LOG.info("The max size is negative, use nettyMaxFrameSizeBytes:{} 
to replace it.", nettyMaxFrameSizeBytes);
+            maxSize = nettyMaxFrameSizeBytes;
+        }
+        BatchedReadOp op = new BatchedReadOp(this, clientCtx,
+                startEntry, maxCount, maxSize, isRecoveryRead);
+        if (!clientCtx.isClientClosed()) {
+            // Waiting on the first one.
+            // This is not very helpful if there are multiple ensembles or if 
bookie goes into unresponsive
+            // state later after N requests sent.
+            // Unfortunately it seems that alternatives are:
+            // - send reads one-by-one (up to the app)
+            // - rework LedgerHandle to send requests one-by-one (maybe later, 
potential perf impact)
+            // - block worker pool (not good)
+            // Even with this implementation one should be more concerned 
about OOME when all read responses arrive
+            // or about overloading bookies with these requests then about 
submission of many small requests.
+            // Naturally one of the solutions would be to submit smaller 
batches and in this case
+            // current implementation will prevent next batch from starting 
when bookie is
+            // unresponsive thus helpful enough.
+            if (clientCtx.getConf().waitForWriteSetMs >= 0) {
+                DistributionSchedule.WriteSet ws = 
distributionSchedule.getWriteSet(startEntry);
+                try {
+                    if (!waitForWritable(ws, ws.size() - 1, 
clientCtx.getConf().waitForWriteSetMs)) {
+                        op.allowFailFastOnUnwritableChannel();
+                    }
+                } finally {
+                    ws.recycle();
+                }
+            }
+
+            if (isHandleWritable()) {
+                // Ledger handle in read/write mode: submit to OSE for ordered 
execution.
+                executeOrdered(op);
+            } else {
+                // Read-only ledger handle: bypass OSE and execute read 
directly in client thread.
+                // This avoids a context-switch to OSE thread and thus reduces 
latency.
+                op.run();
+            }
+        } else {
+            
op.future().completeExceptionally(BKException.create(ClientClosedException));
+        }
+        return op.future();
+    }
+
     /**
      * Read a sequence of entries asynchronously, allowing to read after the 
LastAddConfirmed range.
      * <br>This is the same of
@@ -829,6 +1074,40 @@ public class LedgerHandle implements WriteHandle {
         }
     }
 
+    void asyncBatchReadEntriesInternal(long startEntry, int maxCount, long 
maxSize, ReadCallback cb,
+            Object ctx, boolean isRecoveryRead) {
+        if (!clientCtx.isClientClosed()) {
+            batchReadEntriesInternalAsync(startEntry, maxCount, maxSize, 
isRecoveryRead)
+                    .whenCompleteAsync(new 
FutureEventListener<LedgerEntries>() {
+                        @Override
+                        public void onSuccess(LedgerEntries entries) {
+                            cb.readComplete(
+                                    Code.OK,
+                                    LedgerHandle.this,
+                                    IteratorUtils.asEnumeration(
+                                            
Iterators.transform(entries.iterator(), le -> {
+                                                LedgerEntry entry = new 
LedgerEntry((LedgerEntryImpl) le);
+                                                le.close();
+                                                return entry;
+                                            })),
+                                    ctx);
+                        }
+
+                        @Override
+                        public void onFailure(Throwable cause) {
+                            if (cause instanceof BKException) {
+                                BKException bke = (BKException) cause;
+                                cb.readComplete(bke.getCode(), 
LedgerHandle.this, null, ctx);
+                            } else {
+                                
cb.readComplete(Code.UnexpectedConditionException, LedgerHandle.this, null, 
ctx);
+                            }
+                        }
+                    }, clientCtx.getMainWorkerPool().chooseThread(ledgerId));
+        } else {
+            cb.readComplete(Code.ClientClosedException, LedgerHandle.this, 
null, ctx);
+        }
+    }
+
     /*
      * Read the last entry in the ledger
      *
diff --git 
a/bookkeeper-server/src/main/java/org/apache/bookkeeper/client/api/ReadHandle.java
 
b/bookkeeper-server/src/main/java/org/apache/bookkeeper/client/api/ReadHandle.java
index d5e906d17d..e9bcddd0b3 100644
--- 
a/bookkeeper-server/src/main/java/org/apache/bookkeeper/client/api/ReadHandle.java
+++ 
b/bookkeeper-server/src/main/java/org/apache/bookkeeper/client/api/ReadHandle.java
@@ -45,6 +45,23 @@ public interface ReadHandle extends Handle {
      */
     CompletableFuture<LedgerEntries> readAsync(long firstEntry, long 
lastEntry);
 
+    /**
+     * Read a sequence of entries asynchronously.
+     *
+     * @param startEntry
+     *          start entry id
+     * @param maxCount
+     *          the total entries count.
+     * @param maxSize
+     *          the total entries size.
+     * @return an handle to the result of the operation
+     */
+    default CompletableFuture<LedgerEntries> batchReadAsync(long startEntry, 
int maxCount, long maxSize) {
+        CompletableFuture<LedgerEntries> future = new CompletableFuture<>();
+        future.completeExceptionally(new UnsupportedOperationException());
+        return future;
+    }
+
     /**
      * Read a sequence of entries synchronously.
      *
@@ -59,6 +76,21 @@ public interface ReadHandle extends Handle {
                                                               
BKException.HANDLER);
     }
 
+    /**
+     *
+     * @param startEntry
+     *          start entry id
+     * @param maxCount
+     *          the total entries count.
+     * @param maxSize
+     *          the total entries size.
+     * @return the result of the operation
+     */
+    default LedgerEntries batchRead(long startEntry, int maxCount, long 
maxSize)
+            throws BKException, InterruptedException {
+        return FutureUtils.result(batchReadAsync(startEntry, maxCount, 
maxSize), BKException.HANDLER);
+    }
+
     /**
      * Read a sequence of entries asynchronously, allowing to read after the 
LastAddConfirmed range.
      * <br>This is the same of
diff --git 
a/bookkeeper-server/src/main/java/org/apache/bookkeeper/conf/ClientConfiguration.java
 
b/bookkeeper-server/src/main/java/org/apache/bookkeeper/conf/ClientConfiguration.java
index 66dc160fd5..924dee4ada 100644
--- 
a/bookkeeper-server/src/main/java/org/apache/bookkeeper/conf/ClientConfiguration.java
+++ 
b/bookkeeper-server/src/main/java/org/apache/bookkeeper/conf/ClientConfiguration.java
@@ -202,6 +202,9 @@ public class ClientConfiguration extends 
AbstractConfiguration<ClientConfigurati
     protected static final String 
CLIENT_CONNECT_BOOKIE_UNAVAILABLE_LOG_THROTTLING =
             "clientConnectBookieUnavailableLogThrottling";
 
+    //For batch read api, it the batch read is not stable, we can fail back to 
single read by this config.
+    protected static final String BATCH_READ_ENABLED = "batchReadEnabled";
+
     /**
      * Construct a default client-side configuration.
      */
@@ -2077,6 +2080,10 @@ public class ClientConfiguration extends 
AbstractConfiguration<ClientConfigurati
         return getLong(CLIENT_CONNECT_BOOKIE_UNAVAILABLE_LOG_THROTTLING, 
5_000L);
     }
 
+    public boolean isBatchReadEnabled() {
+        return getBoolean(BATCH_READ_ENABLED, true);
+    }
+
     @Override
     protected ClientConfiguration getThis() {
         return this;
diff --git 
a/bookkeeper-server/src/main/java/org/apache/bookkeeper/proto/BatchedReadEntryProcessor.java
 
b/bookkeeper-server/src/main/java/org/apache/bookkeeper/proto/BatchedReadEntryProcessor.java
index 700952042f..6db3e14351 100644
--- 
a/bookkeeper-server/src/main/java/org/apache/bookkeeper/proto/BatchedReadEntryProcessor.java
+++ 
b/bookkeeper-server/src/main/java/org/apache/bookkeeper/proto/BatchedReadEntryProcessor.java
@@ -85,6 +85,13 @@ public class BatchedReadEntryProcessor extends 
ReadEntryProcessor {
         return ResponseBuilder.buildBatchedReadResponse((ByteBufList) data, 
(BatchedReadRequest) request);
     }
 
+    @Override
+    public String toString() {
+        BatchedReadRequest br = (BatchedReadRequest) request;
+        return String.format("BatchedReadEntry(%d, %d %d, %d)", 
br.getLedgerId(), br.getEntryId(), br.getMaxCount(),
+                br.getMaxSize());
+    }
+
     protected void recycle() {
         request.recycle();
         super.reset();
diff --git 
a/bookkeeper-server/src/test/java/org/apache/bookkeeper/client/TestBatchedRead.java
 
b/bookkeeper-server/src/test/java/org/apache/bookkeeper/client/TestBatchedRead.java
new file mode 100644
index 0000000000..1bb95ed047
--- /dev/null
+++ 
b/bookkeeper-server/src/test/java/org/apache/bookkeeper/client/TestBatchedRead.java
@@ -0,0 +1,292 @@
+/*
+ *
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ *
+ */
+package org.apache.bookkeeper.client;
+
+import static org.apache.bookkeeper.common.concurrent.FutureUtils.result;
+import static org.junit.Assert.assertEquals;
+import static org.junit.Assert.assertFalse;
+import static org.junit.Assert.assertNotNull;
+import static org.junit.Assert.assertTrue;
+import static org.junit.Assert.fail;
+
+import java.util.Iterator;
+import java.util.List;
+import java.util.concurrent.CompletableFuture;
+import java.util.concurrent.CountDownLatch;
+import org.apache.bookkeeper.client.BKException.Code;
+import org.apache.bookkeeper.client.BookKeeper.DigestType;
+import org.apache.bookkeeper.client.api.LedgerEntry;
+import org.apache.bookkeeper.conf.ClientConfiguration;
+import org.apache.bookkeeper.net.BookieId;
+import org.apache.bookkeeper.test.BookKeeperClusterTestCase;
+import org.junit.Test;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+/**
+ * Unit tests for batch reading.
+ */
+public class TestBatchedRead extends BookKeeperClusterTestCase {
+
+    private static final Logger LOG = 
LoggerFactory.getLogger(TestBatchedRead.class);
+
+    final DigestType digestType;
+    final byte[] passwd = "sequence-read".getBytes();
+
+    public TestBatchedRead() {
+        super(6);
+        baseClientConf.setUseV2WireProtocol(true);
+        this.digestType = DigestType.CRC32;
+    }
+
+    long getLedgerToRead(int ensemble, int writeQuorum, int ackQuorum, int 
numEntries)
+            throws Exception {
+        LedgerHandle lh = bkc.createLedger(ensemble, writeQuorum, ackQuorum, 
digestType, passwd);
+        for (int i = 0; i < numEntries; i++) {
+            lh.addEntry(("" + i).getBytes());
+        }
+        lh.close();
+        return lh.getId();
+    }
+
+    BatchedReadOp createReadOp(LedgerHandle lh, long startEntry, int count) {
+        return new BatchedReadOp(lh, bkc.getClientCtx(), startEntry, count, 
1024 * count, false);
+    }
+
+    BatchedReadOp createRecoveryReadOp(LedgerHandle lh, long startEntry, int 
count) {
+        return new BatchedReadOp(lh, bkc.getClientCtx(), startEntry, count, 
1024 * count, true);
+    }
+
+    @Test
+    public void testNormalRead() throws Exception {
+        int numEntries = 10;
+        long id = getLedgerToRead(5, 5, 2, numEntries);
+        LedgerHandle lh = bkc.openLedger(id, digestType, passwd);
+
+        //read single entry
+        for (int i = 0; i < numEntries; i++) {
+            BatchedReadOp readOp = createReadOp(lh, i, 1);
+            readOp.submit();
+            Iterator<LedgerEntry> entries = readOp.future().get().iterator();
+            assertTrue(entries.hasNext());
+            LedgerEntry entry = entries.next();
+            assertNotNull(entry);
+            assertEquals(i, Integer.parseInt(new 
String(entry.getEntryBytes())));
+            entry.close();
+            assertFalse(entries.hasNext());
+        }
+
+        // read multiple entries
+        BatchedReadOp readOp = createReadOp(lh, 0, numEntries);
+        readOp.submit();
+        Iterator<LedgerEntry> iterator = readOp.future().get().iterator();
+
+        int numReads = 0;
+        while (iterator.hasNext()) {
+            LedgerEntry entry = iterator.next();
+            assertNotNull(entry);
+            assertEquals(numReads, Integer.parseInt(new 
String(entry.getEntryBytes())));
+            entry.close();
+            ++numReads;
+        }
+        assertEquals(numEntries, numReads);
+        lh.close();
+    }
+
+    @Test
+    public void testReadWhenEnsembleNotEqualWQ() throws Exception {
+        int numEntries = 10;
+        long id = getLedgerToRead(5, 2, 2, numEntries);
+        LedgerHandle lh = bkc.openLedger(id, digestType, passwd);
+
+        //read single entry
+        for (int i = 0; i < numEntries; i++) {
+            BatchedReadOp readOp = createReadOp(lh, i, 1);
+            readOp.submit();
+            Iterator<LedgerEntry> entries = readOp.future().get().iterator();
+            assertTrue(entries.hasNext());
+            LedgerEntry entry = entries.next();
+            assertNotNull(entry);
+            assertEquals(i, Integer.parseInt(new 
String(entry.getEntryBytes())));
+            entry.close();
+            assertFalse(entries.hasNext());
+        }
+
+        // read multiple entries, because the ensemble is not equals with 
write quorum, the return entries
+        // will less than max count.
+        for (int i = 0; i < numEntries; i++) {
+            BatchedReadOp readOp = createReadOp(lh, i, numEntries);
+            readOp.submit();
+            Iterator<LedgerEntry> entries = readOp.future().get().iterator();
+            assertTrue(entries.hasNext());
+            LedgerEntry entry = entries.next();
+            assertNotNull(entry);
+            assertEquals(i, Integer.parseInt(new 
String(entry.getEntryBytes())));
+            entry.close();
+            assertFalse(entries.hasNext());
+        }
+        lh.close();
+    }
+
+    private static <T> void expectFail(CompletableFuture<T> future, int 
expectedRc) {
+        try {
+            result(future);
+            fail("Expect to fail");
+        } catch (Exception e) {
+            assertTrue(e instanceof BKException);
+            BKException bke = (BKException) e;
+            assertEquals(expectedRc, bke.getCode());
+        }
+    }
+
+    @Test
+    public void testReadMissingEntries() throws Exception {
+        int numEntries = 10;
+
+        long id = getLedgerToRead(5, 5, 2, numEntries);
+        LedgerHandle lh = bkc.openLedger(id, digestType, passwd);
+
+        // read single entry
+        BatchedReadOp readOp = createReadOp(lh, 10, 1);
+        readOp.submit();
+        expectFail(readOp.future(), Code.NoSuchEntryException);
+
+        // read multiple entries
+        readOp = createReadOp(lh, 8, 3);
+        readOp.submit();
+
+        int index = 8;
+        int numReads = 0;
+        Iterator<LedgerEntry> iterator = readOp.future().get().iterator();
+        while (iterator.hasNext()) {
+            LedgerEntry entry = iterator.next();
+            assertNotNull(entry);
+            assertEquals(index, Integer.parseInt(new 
String(entry.getEntryBytes())));
+            entry.close();
+            ++index;
+            ++numReads;
+        }
+        assertEquals(2, numReads);
+        lh.close();
+    }
+
+    @Test
+    public void testFailRecoveryReadMissingEntryImmediately() throws Exception 
{
+        int numEntries = 1;
+
+        long id = getLedgerToRead(5, 5, 3, numEntries);
+
+        ClientConfiguration newConf = new ClientConfiguration()
+            .setReadEntryTimeout(30000);
+        newConf.setMetadataServiceUri(zkUtil.getMetadataServiceUri());
+        BookKeeper newBk = new BookKeeper(newConf);
+
+        LedgerHandle lh = bkc.openLedger(id, digestType, passwd);
+
+        List<BookieId> ensemble = lh.getLedgerMetadata().getEnsembleAt(10);
+        CountDownLatch latch1 = new CountDownLatch(1);
+        CountDownLatch latch2 = new CountDownLatch(1);
+        // sleep two bookie
+        sleepBookie(ensemble.get(0), latch1);
+        sleepBookie(ensemble.get(1), latch2);
+
+        BatchedReadOp readOp = createRecoveryReadOp(lh, 10, 1);
+        readOp.submit();
+        // would fail immediately if found missing entries don't cover ack 
quorum
+        expectFail(readOp.future(), Code.NoSuchEntryException);
+        latch1.countDown();
+        latch2.countDown();
+
+        lh.close();
+        newBk.close();
+    }
+
+    @Test
+    public void testReadWithFailedBookies() throws Exception {
+        int numEntries = 10;
+
+        long id = getLedgerToRead(5, 3, 3, numEntries);
+
+        ClientConfiguration newConf = new ClientConfiguration()
+            .setReadEntryTimeout(30000);
+        newConf.setMetadataServiceUri(zkUtil.getMetadataServiceUri());
+        BookKeeper newBk = new BookKeeper(newConf);
+
+        LedgerHandle lh = bkc.openLedger(id, digestType, passwd);
+
+        List<BookieId> ensemble = lh.getLedgerMetadata().getEnsembleAt(5);
+        // kill two bookies
+        killBookie(ensemble.get(0));
+        killBookie(ensemble.get(1));
+
+        // read multiple entries, because the ensemble is not equals with 
write quorum, the return entries
+        // will less than max count.
+        int numReads = 0;
+        for (int i = 0; i < numEntries;) {
+            BatchedReadOp readOp = createReadOp(lh, i, numEntries);
+            readOp.submit();
+            Iterator<LedgerEntry> entries = readOp.future().get().iterator();
+            if (!entries.hasNext()) {
+                i++;
+                continue;
+            }
+            while (entries.hasNext()) {
+                LedgerEntry entry = entries.next();
+                assertNotNull(entry);
+                assertEquals(i, Integer.parseInt(new 
String(entry.getEntryBytes())));
+                entry.close();
+                i++;
+                numReads++;
+            }
+        }
+        assertEquals(10, numReads);
+        lh.close();
+        newBk.close();
+    }
+
+    @Test
+    public void testReadFailureWithFailedBookies() throws Exception {
+        int numEntries = 10;
+
+        long id = getLedgerToRead(5, 3, 3, numEntries);
+
+        ClientConfiguration newConf = new ClientConfiguration()
+            .setReadEntryTimeout(30000);
+        newConf.setMetadataServiceUri(zkUtil.getMetadataServiceUri());
+        BookKeeper newBk = new BookKeeper(newConf);
+
+        LedgerHandle lh = bkc.openLedger(id, digestType, passwd);
+
+        List<BookieId> ensemble = lh.getLedgerMetadata().getEnsembleAt(5);
+        // kill two bookies
+        killBookie(ensemble.get(0));
+        killBookie(ensemble.get(1));
+        killBookie(ensemble.get(2));
+
+        // read multiple entries
+        BatchedReadOp readOp = createReadOp(lh, 0,  numEntries);
+        readOp.submit();
+        expectFail(readOp.future(), Code.BookieHandleNotAvailableException);
+
+        lh.close();
+        newBk.close();
+    }
+}
diff --git 
a/bookkeeper-server/src/test/java/org/apache/bookkeeper/client/TestSpeculativeBatchRead.java
 
b/bookkeeper-server/src/test/java/org/apache/bookkeeper/client/TestSpeculativeBatchRead.java
new file mode 100644
index 0000000000..3bf5e2d5e4
--- /dev/null
+++ 
b/bookkeeper-server/src/test/java/org/apache/bookkeeper/client/TestSpeculativeBatchRead.java
@@ -0,0 +1,401 @@
+/*
+ *
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ *
+ */
+package org.apache.bookkeeper.client;
+
+import static org.apache.bookkeeper.client.BookKeeperClientStats.CLIENT_SCOPE;
+import static 
org.apache.bookkeeper.client.BookKeeperClientStats.SPECULATIVE_READ_COUNT;
+import static org.junit.Assert.assertEquals;
+import static org.junit.Assert.assertFalse;
+import static org.junit.Assert.assertNotNull;
+import static org.junit.Assert.assertNull;
+import static org.junit.Assert.assertTrue;
+
+import java.util.BitSet;
+import java.util.Enumeration;
+import java.util.HashSet;
+import java.util.List;
+import java.util.Set;
+import java.util.concurrent.CountDownLatch;
+import java.util.concurrent.TimeUnit;
+import org.apache.bookkeeper.bookie.LocalBookieEnsemblePlacementPolicy;
+import org.apache.bookkeeper.client.AsyncCallback.ReadCallback;
+import org.apache.bookkeeper.client.BookKeeper.DigestType;
+import org.apache.bookkeeper.conf.ClientConfiguration;
+import org.apache.bookkeeper.net.BookieId;
+import org.apache.bookkeeper.test.BookKeeperClusterTestCase;
+import org.apache.bookkeeper.test.TestStatsProvider;
+import org.junit.Test;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+/**
+ * This unit test tests ledger fencing.
+ *
+ */
+public class TestSpeculativeBatchRead extends BookKeeperClusterTestCase {
+    private static final Logger LOG = 
LoggerFactory.getLogger(TestSpeculativeBatchRead.class);
+
+    private final DigestType digestType;
+    byte[] passwd = "specPW".getBytes();
+
+    public TestSpeculativeBatchRead() {
+        super(10);
+        this.digestType = DigestType.CRC32;
+    }
+
+    long getLedgerToRead(int ensemble, int quorum) throws Exception {
+        byte[] data = "Data for test".getBytes();
+        LedgerHandle l = bkc.createLedger(ensemble, quorum, digestType, 
passwd);
+        for (int i = 0; i < 10; i++) {
+            l.addEntry(data);
+        }
+        l.close();
+
+        return l.getId();
+    }
+
+    @SuppressWarnings("deprecation")
+    BookKeeperTestClient createClient(int specTimeout) throws Exception {
+        ClientConfiguration conf = new ClientConfiguration()
+            .setSpeculativeReadTimeout(specTimeout)
+            .setReadTimeout(30000)
+            .setUseV2WireProtocol(true)
+            .setReorderReadSequenceEnabled(true)
+            .setEnsemblePlacementPolicySlowBookies(true)
+            .setMetadataServiceUri(zkUtil.getMetadataServiceUri());
+        return new BookKeeperTestClient(conf, new TestStatsProvider());
+    }
+
+    class LatchCallback implements ReadCallback {
+        CountDownLatch l = new CountDownLatch(1);
+        boolean success = false;
+        long startMillis = System.currentTimeMillis();
+        long endMillis = Long.MAX_VALUE;
+
+        public void readComplete(int rc,
+                                 LedgerHandle lh,
+                                 Enumeration<LedgerEntry> seq,
+                                 Object ctx) {
+            endMillis = System.currentTimeMillis();
+            if (LOG.isDebugEnabled()) {
+                LOG.debug("Got response {} {}", rc, getDuration());
+            }
+            success = rc == BKException.Code.OK;
+            l.countDown();
+        }
+
+        long getDuration() {
+            return endMillis - startMillis;
+        }
+
+        void expectSuccess(int milliseconds) throws Exception {
+            boolean await = l.await(milliseconds, TimeUnit.MILLISECONDS);
+            System.out.println(await);
+        }
+
+        void expectFail(int milliseconds) throws Exception {
+            assertTrue(l.await(milliseconds, TimeUnit.MILLISECONDS));
+            assertFalse(success);
+        }
+
+        void expectTimeout(int milliseconds) throws Exception {
+            assertFalse(l.await(milliseconds, TimeUnit.MILLISECONDS));
+        }
+    }
+
+    /**
+     * Test basic speculative functionality.
+     * - Create 2 clients with read timeout disabled, one with spec
+     *   read enabled, the other not.
+     * - create ledger
+     * - sleep second bookie in ensemble
+     * - read first entry, both should find on first bookie.
+     * - read second bookie, spec client should find on bookie three,
+     *   non spec client should hang.
+     */
+    @Test
+    public void testSpeculativeRead() throws Exception {
+        long id = getLedgerToRead(3, 2);
+        BookKeeperTestClient bknospec = createClient(0); // disabled
+        BookKeeperTestClient bkspec = createClient(2000);
+
+        LedgerHandle lnospec = bknospec.openLedger(id, digestType, passwd);
+        LedgerHandle lspec = bkspec.openLedger(id, digestType, passwd);
+
+        // sleep second bookie
+        CountDownLatch sleepLatch = new CountDownLatch(1);
+        BookieId second = 
lnospec.getLedgerMetadata().getAllEnsembles().get(0L).get(1);
+        sleepBookie(second, sleepLatch);
+
+        try {
+            // read first entry, both go to first bookie, should be fine
+            LatchCallback nospeccb = new LatchCallback();
+            LatchCallback speccb = new LatchCallback();
+            lnospec.asyncBatchReadEntries(0, 1, 1024, nospeccb, null);
+            lspec.asyncBatchReadEntries(0, 1, 1024, speccb, null);
+            nospeccb.expectSuccess(2000);
+            speccb.expectSuccess(2000);
+
+            // read second entry, both look for second book, spec read client
+            // tries third bookie, nonspec client hangs as read timeout is 
very long.
+            nospeccb = new LatchCallback();
+            speccb = new LatchCallback();
+            lnospec.asyncReadEntries(1, 1, nospeccb, null);
+            lspec.asyncReadEntries(1, 1, speccb, null);
+            speccb.expectSuccess(4000);
+            nospeccb.expectTimeout(4000);
+            // Check that the second bookie is registered as slow at entryId 1
+            RackawareEnsemblePlacementPolicy rep = 
(RackawareEnsemblePlacementPolicy) bkspec.getPlacementPolicy();
+            assertTrue(rep.slowBookies.asMap().size() == 1);
+
+            assertTrue(
+                    "Stats should not reflect speculative reads if disabled",
+                    bknospec.getTestStatsProvider()
+                            .getCounter(CLIENT_SCOPE + "." + 
SPECULATIVE_READ_COUNT).get() == 0);
+            assertTrue(
+                    "Stats should reflect speculative reads",
+                    bkspec.getTestStatsProvider()
+                            .getCounter(CLIENT_SCOPE + "." + 
SPECULATIVE_READ_COUNT).get() > 0);
+        } finally {
+            sleepLatch.countDown();
+            lspec.close();
+            lnospec.close();
+            bkspec.close();
+            bknospec.close();
+        }
+    }
+
+    /**
+     * Test that if more than one replica is down, we can still read, as long 
as the quorum
+     * size is larger than the number of down replicas.
+     */
+    @Test
+    public void testSpeculativeReadMultipleReplicasDown() throws Exception {
+        long id = getLedgerToRead(5, 5);
+        int timeout = 5000;
+        BookKeeper bkspec = createClient(timeout);
+
+        LedgerHandle l = bkspec.openLedger(id, digestType, passwd);
+
+        // sleep bookie 1, 2 & 4
+        CountDownLatch sleepLatch = new CountDownLatch(1);
+        sleepBookie(l.getLedgerMetadata().getAllEnsembles().get(0L).get(1), 
sleepLatch);
+        sleepBookie(l.getLedgerMetadata().getAllEnsembles().get(0L).get(2), 
sleepLatch);
+        sleepBookie(l.getLedgerMetadata().getAllEnsembles().get(0L).get(4), 
sleepLatch);
+
+        try {
+            // read first entry, should complete faster than timeout
+            // as bookie 0 has the entry
+            LatchCallback latch0 = new LatchCallback();
+            l.asyncBatchReadEntries(0, 1, 1024, latch0, null);
+            latch0.expectSuccess(timeout / 2);
+
+            // second should have to hit two timeouts (bookie 1 & 2)
+            // bookie 3 has the entry
+            LatchCallback latch1 = new LatchCallback();
+            l.asyncBatchReadEntries(1, 1, 1024, latch1, null);
+            latch1.expectTimeout(timeout);
+            latch1.expectSuccess(timeout * 2);
+            LOG.info("Timeout {} latch1 duration {}", timeout, 
latch1.getDuration());
+            assertTrue("should have taken longer than two timeouts, but less 
than 3",
+                       latch1.getDuration() >= timeout * 2
+                       && latch1.getDuration() < timeout * 3);
+
+            // bookies 1 & 2 should be registered as slow bookies because of 
speculative reads
+            Set<BookieId> expectedSlowBookies = new HashSet<>();
+            
expectedSlowBookies.add(l.getLedgerMetadata().getAllEnsembles().get(0L).get(1));
+            
expectedSlowBookies.add(l.getLedgerMetadata().getAllEnsembles().get(0L).get(2));
+            assertEquals(((RackawareEnsemblePlacementPolicy) 
bkspec.getPlacementPolicy()).slowBookies.asMap().keySet(),
+                expectedSlowBookies);
+
+            // third should not hit timeouts since bookies 1 & 2 are 
registered as slow
+            // bookie 3 has the entry
+            LatchCallback latch2 = new LatchCallback();
+            l.asyncBatchReadEntries(2, 1, 1024, latch2, null);
+            latch2.expectSuccess(timeout);
+
+            // fourth should have no timeout
+            // bookie 3 has the entry
+            LatchCallback latch3 = new LatchCallback();
+            l.asyncBatchReadEntries(3, 1, 1024, latch3, null);
+            latch3.expectSuccess(timeout / 2);
+
+            // fifth should hit one timeout, (bookie 4)
+            // bookie 0 has the entry
+            LatchCallback latch4 = new LatchCallback();
+            l.asyncBatchReadEntries(4, 1, 1024, latch4, null);
+            latch4.expectTimeout(timeout / 2);
+            latch4.expectSuccess(timeout);
+            LOG.info("Timeout {} latch4 duration {}", timeout, 
latch4.getDuration());
+            assertTrue("should have taken longer than one timeout, but less 
than 2",
+                       latch4.getDuration() >= timeout
+                       && latch4.getDuration() < timeout * 2);
+        } finally {
+            sleepLatch.countDown();
+            l.close();
+            bkspec.close();
+        }
+    }
+
+    /**
+     * Test that if after a speculative read is kicked off, the original read 
completes
+     * nothing bad happens.
+     */
+    @Test
+    public void testSpeculativeReadFirstReadCompleteIsOk() throws Exception {
+        long id = getLedgerToRead(2, 2);
+        int timeout = 1000;
+        BookKeeper bkspec = createClient(timeout);
+
+        LedgerHandle l = bkspec.openLedger(id, digestType, passwd);
+
+        // sleep bookies
+        CountDownLatch sleepLatch0 = new CountDownLatch(1);
+        CountDownLatch sleepLatch1 = new CountDownLatch(1);
+        sleepBookie(l.getLedgerMetadata().getAllEnsembles().get(0L).get(0), 
sleepLatch0);
+        sleepBookie(l.getLedgerMetadata().getAllEnsembles().get(0L).get(1), 
sleepLatch1);
+
+        try {
+            // read goes to first bookie, spec read timeout occurs,
+            // goes to second
+            LatchCallback latch0 = new LatchCallback();
+            l.asyncBatchReadEntries(0, 1, 1024, latch0, null);
+            latch0.expectTimeout(timeout);
+
+            // wake up first bookie
+            sleepLatch0.countDown();
+            latch0.expectSuccess(timeout / 2);
+
+            sleepLatch1.countDown();
+
+            // check we can read next entry without issue
+            LatchCallback latch1 = new LatchCallback();
+            l.asyncBatchReadEntries(1, 1, 1024, latch1, null);
+            latch1.expectSuccess(timeout / 2);
+        } finally {
+            sleepLatch0.countDown();
+            sleepLatch1.countDown();
+            l.close();
+            bkspec.close();
+        }
+    }
+
+    /**
+     * Unit test to check if the scheduled speculative task gets cancelled
+     * on successful read.
+     */
+    @Test
+    public void testSpeculativeReadScheduledTaskCancel() throws Exception {
+        long id = getLedgerToRead(3, 2);
+        int timeout = 1000;
+        BookKeeper bkspec = createClient(timeout);
+        LedgerHandle l = bkspec.openLedger(id, digestType, passwd);
+        BatchedReadOp op = null;
+        try {
+            op = new BatchedReadOp(l, bkspec.getClientCtx(), 0, 5, 5120, 
false);
+            op.initiate();
+            op.future().get();
+        } finally {
+            assertNull("Speculative Read tasks must be null", 
op.getSpeculativeTask());
+        }
+    }
+
+    /**
+     * Unit test for the speculative read scheduling method.
+     */
+    @Test
+    public void testSpeculativeReadScheduling() throws Exception {
+        long id = getLedgerToRead(3, 2);
+        int timeout = 1000;
+        BookKeeper bkspec = createClient(timeout);
+
+        LedgerHandle l = bkspec.openLedger(id, digestType, passwd);
+
+        List<BookieId> ensemble = 
l.getLedgerMetadata().getAllEnsembles().get(0L);
+        BitSet allHosts = new BitSet(ensemble.size());
+        for (int i = 0; i < ensemble.size(); i++) {
+            allHosts.set(i, true);
+        }
+        BitSet noHost = new BitSet(ensemble.size());
+        BitSet secondHostOnly = new BitSet(ensemble.size());
+        secondHostOnly.set(1, true);
+        BatchedReadOp.LedgerEntryRequest req0 = null, req2 = null, req4 = null;
+        try {
+            BatchedReadOp op = new BatchedReadOp(l, bkspec.getClientCtx(), 0, 
5, 5120, false);
+            // if we've already heard from all hosts,
+            // we only send the initial read
+            req0 = op.new SequenceReadRequest(ensemble, l.getId(), 0, 1, 1024);
+            assertTrue("Should have sent to first",
+                       
req0.maybeSendSpeculativeRead(allHosts).equals(ensemble.get(0)));
+            assertNull("Should not have sent another",
+                       req0.maybeSendSpeculativeRead(allHosts));
+
+            // if we have heard from some hosts, but not one we have sent to
+            // send again
+            req2 = op.new SequenceReadRequest(ensemble, l.getId(), 2, 1, 1024);
+            assertTrue("Should have sent to third",
+                       
req2.maybeSendSpeculativeRead(noHost).equals(ensemble.get(2)));
+            assertTrue("Should have sent to first",
+                       
req2.maybeSendSpeculativeRead(secondHostOnly).equals(ensemble.get(0)));
+
+            // if we have heard from some hosts, which includes one we sent to
+            // do not read again
+            req4 = op.new SequenceReadRequest(ensemble, l.getId(), 4, 1, 1024);
+            assertTrue("Should have sent to second",
+                       
req4.maybeSendSpeculativeRead(noHost).equals(ensemble.get(1)));
+            assertNull("Should not have sent another",
+                       req4.maybeSendSpeculativeRead(secondHostOnly));
+        } finally {
+            for (BatchedReadOp.LedgerEntryRequest req
+                     : new BatchedReadOp.LedgerEntryRequest[] { req0, req2, 
req4 }) {
+                if (req != null) {
+                    int i = 0;
+                    while (!req.isComplete()) {
+                        if (i++ > 10) {
+                            break; // wait for up to 10 seconds
+                        }
+                        Thread.sleep(1000);
+                    }
+                    assertTrue("Request should be done", req.isComplete());
+                }
+            }
+
+            l.close();
+            bkspec.close();
+        }
+    }
+
+    @Test
+    public void testSequenceReadLocalEnsemble() throws Exception {
+        ClientConfiguration conf = new ClientConfiguration()
+                .setSpeculativeReadTimeout(1000)
+                
.setEnsemblePlacementPolicy(LocalBookieEnsemblePlacementPolicy.class)
+                .setReorderReadSequenceEnabled(true)
+                .setEnsemblePlacementPolicySlowBookies(true)
+                .setMetadataServiceUri(zkUtil.getMetadataServiceUri());
+        BookKeeper bkspec = new BookKeeperTestClient(conf, new 
TestStatsProvider());
+        LedgerHandle l = bkspec.createLedger(1, 1, digestType, passwd);
+        List<BookieId> ensemble = 
l.getLedgerMetadata().getAllEnsembles().get(0L);
+        BatchedReadOp op = new BatchedReadOp(l, bkspec.getClientCtx(), 0, 5, 
5120, false);
+        BatchedReadOp.LedgerEntryRequest req0 = op.new 
SequenceReadRequest(ensemble, l.getId(), 0, 1, 1024);
+        assertNotNull(req0.writeSet);
+    }
+}
diff --git 
a/bookkeeper-server/src/test/java/org/apache/bookkeeper/test/BookieClientTest.java
 
b/bookkeeper-server/src/test/java/org/apache/bookkeeper/test/BookieClientTest.java
index 5b96c52a0b..60f89159a0 100644
--- 
a/bookkeeper-server/src/test/java/org/apache/bookkeeper/test/BookieClientTest.java
+++ 
b/bookkeeper-server/src/test/java/org/apache/bookkeeper/test/BookieClientTest.java
@@ -359,7 +359,7 @@ public class BookieClientTest {
     }
 
     @Test
-    public void testBatchedRead() throws Exception {
+    public void testBatchRead() throws Exception {
         ClientConfiguration conf = new ClientConfiguration();
         conf.setUseV2WireProtocol(true);
         BookieClient bc = new BookieClientImpl(conf, eventLoopGroup,

Reply via email to