This is an automated email from the ASF dual-hosted git repository.

sijie pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/bookkeeper.git


The following commit(s) were added to refs/heads/master by this push:
     new cc84042  ISSUE #709: Add Slow Bookkeeper Servers to Placement Policy 
for read ordering
cc84042 is described below

commit cc84042a4d494fde42d215ead28093033da6861a
Author: Philip Su <[email protected]>
AuthorDate: Tue Dec 19 10:44:41 2017 -0800

    ISSUE #709: Add Slow Bookkeeper Servers to Placement Policy for read 
ordering
    
    Descriptions of the changes in this PR:
    
    Maintain a list of slow bookkeeper hosts, which are tried only after 
readonly bookkeeper hosts are tried. Bookkeeper hosts can be categorized as 
"slow" when a speculative timeout occurs on that bookkeeper host.
    
    Master Issue:: 709
    
    Author: Philip Su <[email protected]>
    Author: Sijie Guo <[email protected]>
    Author: philipsu522 <[email protected]>
    Author: Ivan Kelly <[email protected]>
    
    Reviewers: Ivan Kelly <[email protected]>, Yiming Zang 
<[email protected]>, Jia Zhai <None>, Enrico Olivelli <[email protected]>
    
    This closes #883 from philipsu522/psu/ISSUE-709-2, closes #709
---
 .../bookie/LocalBookieEnsemblePlacementPolicy.java |  10 +-
 .../bookkeeper/client/BookiesHealthInfo.java       |  47 ++++++
 .../client/DefaultEnsemblePlacementPolicy.java     |   9 +-
 .../bookkeeper/client/EnsemblePlacementPolicy.java |  27 +++-
 .../org/apache/bookkeeper/client/LedgerHandle.java |  24 +++
 .../apache/bookkeeper/client/PendingReadOp.java    |  19 ++-
 .../client/RackawareEnsemblePlacementPolicy.java   |   8 +-
 .../RackawareEnsemblePlacementPolicyImpl.java      | 151 ++++++++++++++++--
 .../client/ReadLastConfirmedAndEntryOp.java        |  17 ++-
 .../client/RegionAwareEnsemblePlacementPolicy.java | 110 ++-----------
 .../TopologyAwareEnsemblePlacementPolicy.java      |   7 +-
 .../bookkeeper/conf/ClientConfiguration.java       |  23 +++
 .../org/apache/bookkeeper/proto/BookieClient.java  |   2 +-
 .../proto/DefaultPerChannelBookieClientPool.java   |   9 ++
 .../bookkeeper/proto/PerChannelBookieClient.java   |   4 +
 .../proto/PerChannelBookieClientPool.java          |   6 +-
 .../TestRackawareEnsemblePlacementPolicy.java      | 157 ++++++++++++++++++-
 .../TestRegionAwareEnsemblePlacementPolicy.java    | 170 +++++++++++++++++++--
 .../bookkeeper/client/TestSpeculativeRead.java     |  25 ++-
 19 files changed, 662 insertions(+), 163 deletions(-)

diff --git 
a/bookkeeper-server/src/main/java/org/apache/bookkeeper/bookie/LocalBookieEnsemblePlacementPolicy.java
 
b/bookkeeper-server/src/main/java/org/apache/bookkeeper/bookie/LocalBookieEnsemblePlacementPolicy.java
index 2e87398..7c6bad9 100644
--- 
a/bookkeeper-server/src/main/java/org/apache/bookkeeper/bookie/LocalBookieEnsemblePlacementPolicy.java
+++ 
b/bookkeeper-server/src/main/java/org/apache/bookkeeper/bookie/LocalBookieEnsemblePlacementPolicy.java
@@ -28,6 +28,7 @@ import java.util.Optional;
 import java.util.Set;
 import org.apache.bookkeeper.client.BKException.BKNotEnoughBookiesException;
 import org.apache.bookkeeper.client.BookieInfoReader.BookieInfo;
+import org.apache.bookkeeper.client.BookiesHealthInfo;
 import org.apache.bookkeeper.client.DistributionSchedule;
 import org.apache.bookkeeper.client.EnsemblePlacementPolicy;
 import org.apache.bookkeeper.conf.ClientConfiguration;
@@ -88,9 +89,14 @@ public class LocalBookieEnsemblePlacementPolicy implements 
EnsemblePlacementPoli
     }
 
     @Override
+    public void registerSlowBookie(BookieSocketAddress bookieSocketAddress, 
long entryId) {
+        return;
+    }
+
+    @Override
     public DistributionSchedule.WriteSet reorderReadSequence(
             ArrayList<BookieSocketAddress> ensemble,
-            Map<BookieSocketAddress, Long> bookieFailureHistory,
+            BookiesHealthInfo bookiesHealthInfo,
             DistributionSchedule.WriteSet writeSet) {
         return null;
     }
@@ -98,7 +104,7 @@ public class LocalBookieEnsemblePlacementPolicy implements 
EnsemblePlacementPoli
     @Override
     public DistributionSchedule.WriteSet reorderReadLACSequence(
             ArrayList<BookieSocketAddress> ensemble,
-            Map<BookieSocketAddress, Long> bookieFailureHistory,
+            BookiesHealthInfo bookiesHealthInfo,
             DistributionSchedule.WriteSet writeSet) {
         return null;
     }
diff --git 
a/bookkeeper-server/src/main/java/org/apache/bookkeeper/client/BookiesHealthInfo.java
 
b/bookkeeper-server/src/main/java/org/apache/bookkeeper/client/BookiesHealthInfo.java
new file mode 100644
index 0000000..b040440
--- /dev/null
+++ 
b/bookkeeper-server/src/main/java/org/apache/bookkeeper/client/BookiesHealthInfo.java
@@ -0,0 +1,47 @@
+/**
+ *
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ *
+ */
+package org.apache.bookkeeper.client;
+
+import org.apache.bookkeeper.net.BookieSocketAddress;
+
+/**
+ * This interface returns heuristics used to determine the health of a 
Bookkeeper server for read
+ * ordering.
+ */
+public interface BookiesHealthInfo {
+
+    /**
+     * Return the failure history for a bookie.
+     *
+     * @param bookieSocketAddress
+     * @return failed entries on a bookie, -1 if there have been no failures
+     */
+    long getBookieFailureHistory(BookieSocketAddress bookieSocketAddress);
+
+    /**
+     * Returns pending requests to a bookie.
+     *
+     * @param bookieSocketAddress
+     * @return number of pending requests
+     */
+    long getBookiePendingRequests(BookieSocketAddress bookieSocketAddress);
+
+}
diff --git 
a/bookkeeper-server/src/main/java/org/apache/bookkeeper/client/DefaultEnsemblePlacementPolicy.java
 
b/bookkeeper-server/src/main/java/org/apache/bookkeeper/client/DefaultEnsemblePlacementPolicy.java
index eab8954..219086d 100644
--- 
a/bookkeeper-server/src/main/java/org/apache/bookkeeper/client/DefaultEnsemblePlacementPolicy.java
+++ 
b/bookkeeper-server/src/main/java/org/apache/bookkeeper/client/DefaultEnsemblePlacementPolicy.java
@@ -153,9 +153,14 @@ public class DefaultEnsemblePlacementPolicy implements 
EnsemblePlacementPolicy {
     }
 
     @Override
+    public void registerSlowBookie(BookieSocketAddress bookieSocketAddress, 
long entryId) {
+        return;
+    }
+
+    @Override
     public DistributionSchedule.WriteSet reorderReadSequence(
             ArrayList<BookieSocketAddress> ensemble,
-            Map<BookieSocketAddress, Long> bookieFailureHistory,
+            BookiesHealthInfo bookiesHealthInfo,
             DistributionSchedule.WriteSet writeSet) {
         return writeSet;
     }
@@ -163,7 +168,7 @@ public class DefaultEnsemblePlacementPolicy implements 
EnsemblePlacementPolicy {
     @Override
     public DistributionSchedule.WriteSet reorderReadLACSequence(
             ArrayList<BookieSocketAddress> ensemble,
-            Map<BookieSocketAddress, Long> bookieFailureHistory,
+            BookiesHealthInfo bookiesHealthInfo,
             DistributionSchedule.WriteSet writeSet) {
         writeSet.addMissingIndices(ensemble.size());
         return writeSet;
diff --git 
a/bookkeeper-server/src/main/java/org/apache/bookkeeper/client/EnsemblePlacementPolicy.java
 
b/bookkeeper-server/src/main/java/org/apache/bookkeeper/client/EnsemblePlacementPolicy.java
index 855ee75..7656a49 100644
--- 
a/bookkeeper-server/src/main/java/org/apache/bookkeeper/client/EnsemblePlacementPolicy.java
+++ 
b/bookkeeper-server/src/main/java/org/apache/bookkeeper/client/EnsemblePlacementPolicy.java
@@ -20,12 +20,12 @@ package org.apache.bookkeeper.client;
 import io.netty.util.HashedWheelTimer;
 import java.util.ArrayList;
 import java.util.Collection;
-import java.util.List;
 import java.util.Map;
 import java.util.Optional;
 import java.util.Set;
 import org.apache.bookkeeper.client.BKException.BKNotEnoughBookiesException;
 import org.apache.bookkeeper.client.BookieInfoReader.BookieInfo;
+import org.apache.bookkeeper.client.DistributionSchedule.WriteSet;
 import org.apache.bookkeeper.common.annotation.InterfaceAudience;
 import org.apache.bookkeeper.common.annotation.InterfaceStability;
 import org.apache.bookkeeper.conf.ClientConfiguration;
@@ -151,7 +151,8 @@ import org.apache.bookkeeper.stats.StatsLogger;
  *
  * <h3>How to choose bookies to do speculative reads?</h3>
  *
- * <p>{@link #reorderReadSequence(ArrayList, List, Map)} and {@link 
#reorderReadLACSequence(ArrayList, List, Map)} are
+ * <p>{@link #reorderReadSequence(ArrayList, BookiesHealthInfo, WriteSet)} and
+ * {@link #reorderReadLACSequence(ArrayList, BookiesHealthInfo, WriteSet)} are
  * two methods exposed by the placement policy, to help client determine a 
better read sequence according to the
  * network topology and the bookie failure history.
  *
@@ -290,12 +291,22 @@ public interface EnsemblePlacementPolicy {
         throws BKNotEnoughBookiesException;
 
     /**
+     * Register a bookie as slow so that it is tried after available and 
read-only bookies.
+     *
+     * @param bookieSocketAddress
+     *          Address of bookie host
+     * @param entryId
+     *          Entry ID that caused a speculative timeout on the bookie.
+     */
+    void registerSlowBookie(BookieSocketAddress bookieSocketAddress, long 
entryId);
+
+    /**
      * Reorder the read sequence of a given write quorum <i>writeSet</i>.
      *
      * @param ensemble
      *          Ensemble to read entries.
-     * @param bookieFailureHistory
-     *          Observed failures on the bookies
+     * @param bookiesHealthInfo
+     *          Health info for bookies
      * @param writeSet
      *          Write quorum to read entries. This will be modified, rather 
than
      *          allocating a new WriteSet.
@@ -305,7 +316,7 @@ public interface EnsemblePlacementPolicy {
      */
     DistributionSchedule.WriteSet reorderReadSequence(
             ArrayList<BookieSocketAddress> ensemble,
-            Map<BookieSocketAddress, Long> bookieFailureHistory,
+            BookiesHealthInfo bookiesHealthInfo,
             DistributionSchedule.WriteSet writeSet);
 
 
@@ -314,8 +325,8 @@ public interface EnsemblePlacementPolicy {
      *
      * @param ensemble
      *          Ensemble to read entries.
-     * @param bookieFailureHistory
-     *          Observed failures on the bookies
+     * @param bookiesHealthInfo
+     *          Health info for bookies
      * @param writeSet
      *          Write quorum to read entries. This will be modified, rather 
than
      *          allocating a new WriteSet.
@@ -325,7 +336,7 @@ public interface EnsemblePlacementPolicy {
      */
     DistributionSchedule.WriteSet reorderReadLACSequence(
             ArrayList<BookieSocketAddress> ensemble,
-            Map<BookieSocketAddress, Long> bookieFailureHistory,
+            BookiesHealthInfo bookiesHealthInfo,
             DistributionSchedule.WriteSet writeSet);
 
     /**
diff --git 
a/bookkeeper-server/src/main/java/org/apache/bookkeeper/client/LedgerHandle.java
 
b/bookkeeper-server/src/main/java/org/apache/bookkeeper/client/LedgerHandle.java
index 32eeab8..d761530 100644
--- 
a/bookkeeper-server/src/main/java/org/apache/bookkeeper/client/LedgerHandle.java
+++ 
b/bookkeeper-server/src/main/java/org/apache/bookkeeper/client/LedgerHandle.java
@@ -77,6 +77,7 @@ import 
org.apache.bookkeeper.proto.BookkeeperInternalCallbacks;
 import org.apache.bookkeeper.proto.BookkeeperInternalCallbacks.GenericCallback;
 import 
org.apache.bookkeeper.proto.BookkeeperInternalCallbacks.TimedGenericCallback;
 import org.apache.bookkeeper.proto.DataFormats.LedgerMetadataFormat.State;
+import org.apache.bookkeeper.proto.PerChannelBookieClientPool;
 import org.apache.bookkeeper.stats.Counter;
 import org.apache.bookkeeper.stats.Gauge;
 import 
org.apache.bookkeeper.util.OrderedSafeExecutor.OrderedSafeGenericCallback;
@@ -106,6 +107,7 @@ public class LedgerHandle implements WriteHandle {
     final LoadingCache<BookieSocketAddress, Long> bookieFailureHistory;
     final boolean enableParallelRecoveryRead;
     final int recoveryReadBatchSize;
+    final BookiesHealthInfo bookiesHealthInfo;
     final EnumSet<WriteFlag> writeFlags;
     ScheduledFuture<?> timeoutFuture = null;
 
@@ -174,6 +176,19 @@ public class LedgerHandle implements WriteHandle {
                 return -1L;
             }
         });
+        this.bookiesHealthInfo = new BookiesHealthInfo() {
+            @Override
+            public long getBookieFailureHistory(BookieSocketAddress 
bookieSocketAddress) {
+                Long lastFailure = 
bookieFailureHistory.getIfPresent(bookieSocketAddress);
+                return lastFailure == null ? -1L : lastFailure;
+            }
+
+            @Override
+            public long getBookiePendingRequests(BookieSocketAddress 
bookieSocketAddress) {
+                PerChannelBookieClientPool pcbcPool = 
bk.bookieClient.lookupClient(bookieSocketAddress);
+                return pcbcPool == null ? 0 : 
pcbcPool.getNumPendingCompletionRequests();
+            }
+        };
 
         ensembleChangeCounter = 
bk.getStatsLogger().getCounter(BookKeeperClientStats.ENSEMBLE_CHANGES);
         lacUpdateHitsCounter = 
bk.getStatsLogger().getCounter(BookKeeperClientStats.LAC_UPDATE_HITS);
@@ -333,6 +348,15 @@ public class LedgerHandle implements WriteHandle {
         return distributionSchedule;
     }
 
+    /**
+     * Get the health info for bookies for this ledger.
+     *
+     * @return BookiesHealthInfo for every bookie in the write set.
+     */
+    BookiesHealthInfo getBookiesHealthInfo() {
+        return bookiesHealthInfo;
+    }
+
     void writeLedgerConfig(GenericCallback<Void> writeCb) {
         if (LOG.isDebugEnabled()) {
             LOG.debug("Writing metadata to ledger manager: {}, {}", 
this.ledgerId, metadata.getVersion());
diff --git 
a/bookkeeper-server/src/main/java/org/apache/bookkeeper/client/PendingReadOp.java
 
b/bookkeeper-server/src/main/java/org/apache/bookkeeper/client/PendingReadOp.java
index 2920231..32816bf 100644
--- 
a/bookkeeper-server/src/main/java/org/apache/bookkeeper/client/PendingReadOp.java
+++ 
b/bookkeeper-server/src/main/java/org/apache/bookkeeper/client/PendingReadOp.java
@@ -86,16 +86,18 @@ class PendingReadOp implements ReadEntryCallback, 
SafeRunnable {
         final ArrayList<BookieSocketAddress> ensemble;
         final DistributionSchedule.WriteSet writeSet;
         final LedgerEntryImpl entryImpl;
+        final long eId;
 
         LedgerEntryRequest(ArrayList<BookieSocketAddress> ensemble, long lId, 
long eId) {
             this.entryImpl = LedgerEntryImpl.create(lId, eId);
             this.ensemble = ensemble;
+            this.eId = eId;
 
             if (lh.bk.isReorderReadSequence()) {
                 writeSet = lh.bk.getPlacementPolicy()
                     .reorderReadSequence(
                             ensemble,
-                            lh.bookieFailureHistory.asMap(),
+                            lh.getBookiesHealthInfo(),
                             lh.distributionSchedule.getWriteSet(eId));
             } else {
                 writeSet = lh.distributionSchedule.getWriteSet(eId);
@@ -420,6 +422,21 @@ class PendingReadOp implements ReadEntryCallback, 
SafeRunnable {
             }
         }
 
+        @Override
+        boolean complete(int bookieIndex, BookieSocketAddress host, ByteBuf 
buffer) {
+            boolean completed = super.complete(bookieIndex, host, buffer);
+            if (completed) {
+                int numReplicasTried = getNextReplicaIndexToReadFrom();
+                // Check if any speculative reads were issued and mark any 
slow bookies before
+                // the first successful speculative read as "slow"
+                for (int i = 0; i < numReplicasTried - 1; i++) {
+                    int slowBookieIndex = writeSet.get(i);
+                    BookieSocketAddress slowBookieSocketAddress = 
ensemble.get(slowBookieIndex);
+                    
lh.bk.placementPolicy.registerSlowBookie(slowBookieSocketAddress, eId);
+                }
+            }
+            return completed;
+        }
     }
 
     PendingReadOp(LedgerHandle lh,
diff --git 
a/bookkeeper-server/src/main/java/org/apache/bookkeeper/client/RackawareEnsemblePlacementPolicy.java
 
b/bookkeeper-server/src/main/java/org/apache/bookkeeper/client/RackawareEnsemblePlacementPolicy.java
index db8797a..1691f1e 100644
--- 
a/bookkeeper-server/src/main/java/org/apache/bookkeeper/client/RackawareEnsemblePlacementPolicy.java
+++ 
b/bookkeeper-server/src/main/java/org/apache/bookkeeper/client/RackawareEnsemblePlacementPolicy.java
@@ -121,18 +121,18 @@ public class RackawareEnsemblePlacementPolicy extends 
RackawareEnsemblePlacement
     @Override
     public DistributionSchedule.WriteSet reorderReadSequence(
             ArrayList<BookieSocketAddress> ensemble,
-            Map<BookieSocketAddress, Long> bookieFailureHistory,
+            BookiesHealthInfo bookiesHealthInfo,
             DistributionSchedule.WriteSet writeSet) {
-        return super.reorderReadSequence(ensemble, bookieFailureHistory,
+        return super.reorderReadSequence(ensemble, bookiesHealthInfo,
                                          writeSet);
     }
 
     @Override
     public DistributionSchedule.WriteSet reorderReadLACSequence(
             ArrayList<BookieSocketAddress> ensemble,
-            Map<BookieSocketAddress, Long> bookieFailureHistory,
+            BookiesHealthInfo bookiesHealthInfo,
             DistributionSchedule.WriteSet writeSet) {
-        return super.reorderReadLACSequence(ensemble, bookieFailureHistory,
+        return super.reorderReadLACSequence(ensemble, bookiesHealthInfo,
                                             writeSet);
     }
 
diff --git 
a/bookkeeper-server/src/main/java/org/apache/bookkeeper/client/RackawareEnsemblePlacementPolicyImpl.java
 
b/bookkeeper-server/src/main/java/org/apache/bookkeeper/client/RackawareEnsemblePlacementPolicyImpl.java
index f0572a6..735324b 100644
--- 
a/bookkeeper-server/src/main/java/org/apache/bookkeeper/client/RackawareEnsemblePlacementPolicyImpl.java
+++ 
b/bookkeeper-server/src/main/java/org/apache/bookkeeper/client/RackawareEnsemblePlacementPolicyImpl.java
@@ -18,7 +18,11 @@
 package org.apache.bookkeeper.client;
 
 import static com.google.common.base.Preconditions.checkNotNull;
+import static 
org.apache.bookkeeper.client.RegionAwareEnsemblePlacementPolicy.UNKNOWN_REGION;
 
+import com.google.common.cache.Cache;
+import com.google.common.cache.CacheBuilder;
+import com.google.common.cache.CacheLoader;
 import com.google.common.collect.ImmutableSet;
 import com.google.common.collect.Sets;
 
@@ -37,6 +41,7 @@ import java.util.Map;
 import java.util.Optional;
 import java.util.Set;
 import java.util.concurrent.ThreadLocalRandom;
+import java.util.concurrent.TimeUnit;
 import java.util.concurrent.locks.ReentrantReadWriteLock;
 import java.util.function.Supplier;
 
@@ -87,7 +92,8 @@ class RackawareEnsemblePlacementPolicyImpl extends 
TopologyAwareEnsemblePlacemen
     static final int REMOTE_MASK      = 0x04 << 24;
     static final int REMOTE_FAIL_MASK = 0x08 << 24;
     static final int READ_ONLY_MASK   = 0x10 << 24;
-    static final int UNAVAIL_MASK     = 0x20 << 24;
+    static final int SLOW_MASK        = 0x20 << 24;
+    static final int UNAVAIL_MASK     = 0x40 << 24;
     static final int MASK_BITS        = 0xFFF << 20;
 
     static class DefaultResolver implements DNSToSwitchMapping {
@@ -183,6 +189,8 @@ class RackawareEnsemblePlacementPolicyImpl extends 
TopologyAwareEnsemblePlacemen
     protected DNSToSwitchMapping dnsResolver;
     protected HashedWheelTimer timer;
     protected final Map<BookieSocketAddress, BookieNode> knownBookies;
+    // Use a loading cache so slow bookies are expired. Use entryId as values.
+    protected Cache<BookieSocketAddress, Long> slowBookies;
     protected BookieNode localNode;
     protected final ReentrantReadWriteLock rwLock;
     protected ImmutableSet<BookieSocketAddress> readOnlyBookies = null;
@@ -302,6 +310,14 @@ class RackawareEnsemblePlacementPolicyImpl extends 
TopologyAwareEnsemblePlacemen
                 dnsResolver = new DefaultResolver(() -> this.getDefaultRack());
             }
         }
+        slowBookies = CacheBuilder.newBuilder()
+            .expireAfterWrite(conf.getBookieFailureHistoryExpirationMSec(), 
TimeUnit.MILLISECONDS)
+            .build(new CacheLoader<BookieSocketAddress, Long>() {
+                @Override
+                public Long load(BookieSocketAddress key) throws Exception {
+                    return -1L;
+                }
+            });
         return initialize(
                 dnsResolver,
                 timer,
@@ -790,34 +806,104 @@ class RackawareEnsemblePlacementPolicyImpl extends 
TopologyAwareEnsemblePlacemen
     }
 
     @Override
+    public void registerSlowBookie(BookieSocketAddress bookieSocketAddress, 
long entryId) {
+        slowBookies.put(bookieSocketAddress, entryId);
+    }
+
+    @Override
     public DistributionSchedule.WriteSet reorderReadSequence(
             ArrayList<BookieSocketAddress> ensemble,
-            Map<BookieSocketAddress, Long> bookieFailureHistory,
+            BookiesHealthInfo bookiesHealthInfo,
             DistributionSchedule.WriteSet writeSet) {
+        Map<Integer, String> writeSetWithRegion = new HashMap<>();
+        for (int i = 0; i < writeSet.size(); i++) {
+            writeSetWithRegion.put(writeSet.get(i), "");
+        }
+        return reorderReadSequenceWithRegion(
+            ensemble, writeSet, writeSetWithRegion, bookiesHealthInfo, false, 
"", writeSet.size());
+    }
+
+    /**
+     * This function orders the read sequence with a given region. For 
region-unaware policies (e.g.
+     * RackAware), we pass in false for regionAware and an empty myRegion. 
When this happens, any
+     * remote list will stay empty. The ordering is as follows (the R* at the 
beginning of each list item
+     * is only present for region aware policies).
+     *      1. available (local) bookies
+     *      2. R* a remote bookie (based on remoteNodeInReorderSequence
+     *      3. R* remaining (local) bookies
+     *      4. R* remaining remote bookies
+     *      5. read only bookies
+     *      6. slow bookies
+     *      7. unavailable bookies
+     *
+     * @param ensemble
+     *          ensemble of bookies
+     * @param writeSet
+     *          write set
+     * @param writeSetWithRegion
+     *          write set with region information
+     * @param bookiesHealthInfo
+     *          heuristics about health of boookies
+     * @param regionAware
+     *          whether or not a region-aware policy is used
+     * @param myRegion
+     *          current region of policy
+     * @param remoteNodeInReorderSequence
+     *          number of local bookies to try before trying a remote bookie
+     * @return ordering of bookies to send read to
+     */
+    DistributionSchedule.WriteSet reorderReadSequenceWithRegion(
+        ArrayList<BookieSocketAddress> ensemble,
+        DistributionSchedule.WriteSet writeSet,
+        Map<Integer, String> writeSetWithRegion,
+        BookiesHealthInfo bookiesHealthInfo,
+        boolean regionAware,
+        String myRegion,
+        int remoteNodeInReorderSequence) {
+        boolean useRegionAware = regionAware && 
(!myRegion.equals(UNKNOWN_REGION));
         int ensembleSize = ensemble.size();
 
         for (int i = 0; i < writeSet.size(); i++) {
             int idx = writeSet.get(i);
             BookieSocketAddress address = ensemble.get(idx);
-            Long lastFailedEntryOnBookie = bookieFailureHistory.get(address);
+            String region = writeSetWithRegion.get(idx);
+            Long lastFailedEntryOnBookie = 
bookiesHealthInfo.getBookieFailureHistory(address);
             if (null == knownBookies.get(address)) {
                 // there isn't too much differences between readonly bookies
                 // from unavailable bookies. since there
                 // is no write requests to them, so we shouldn't try reading
-                // from readonly bookie in prior to writable
-                // bookies.
+                // from readonly bookie prior to writable bookies.
                 if ((null == readOnlyBookies)
-                        || !readOnlyBookies.contains(address)) {
+                    || !readOnlyBookies.contains(address)) {
                     writeSet.set(i, idx | UNAVAIL_MASK);
                 } else {
-                    writeSet.set(i, idx | READ_ONLY_MASK);
+                    if (slowBookies.getIfPresent(address) != null) {
+                        long numPendingReqs = 
bookiesHealthInfo.getBookiePendingRequests(address);
+                        // use slow bookies with less pending requests first
+                        long slowIdx = numPendingReqs * ensembleSize + idx;
+                        writeSet.set(i, (int) (slowIdx & ~MASK_BITS) | 
SLOW_MASK);
+                    } else {
+                        writeSet.set(i, idx | READ_ONLY_MASK);
+                    }
+                }
+            } else if (lastFailedEntryOnBookie < 0) {
+                if (slowBookies.getIfPresent(address) != null) {
+                    long numPendingReqs = 
bookiesHealthInfo.getBookiePendingRequests(address);
+                    long slowIdx = numPendingReqs * ensembleSize + idx;
+                    writeSet.set(i, (int) (slowIdx & ~MASK_BITS) | SLOW_MASK);
+                } else {
+                    if (useRegionAware && !myRegion.equals(region)) {
+                        writeSet.set(i, idx | REMOTE_MASK);
+                    } else {
+                        writeSet.set(i, idx | LOCAL_MASK);
+                    }
                 }
             } else {
-                if ((lastFailedEntryOnBookie == null)
-                        || (lastFailedEntryOnBookie < 0)) {
-                    writeSet.set(i, idx | LOCAL_MASK);
+                // use bookies with earlier failed entryIds first
+                long failIdx = lastFailedEntryOnBookie * ensembleSize + idx;
+                if (useRegionAware && !myRegion.equals(region)) {
+                    writeSet.set(i, (int) (failIdx & ~MASK_BITS) | 
REMOTE_FAIL_MASK);
                 } else {
-                    long failIdx = lastFailedEntryOnBookie * ensembleSize + 
idx;
                     writeSet.set(i, (int) (failIdx & ~MASK_BITS) | 
LOCAL_FAIL_MASK);
                 }
             }
@@ -836,15 +922,54 @@ class RackawareEnsemblePlacementPolicyImpl extends 
TopologyAwareEnsemblePlacemen
 
         if (reorderReadsRandom) {
             shuffleWithMask(writeSet, LOCAL_MASK, MASK_BITS);
+            shuffleWithMask(writeSet, REMOTE_MASK, MASK_BITS);
             shuffleWithMask(writeSet, READ_ONLY_MASK, MASK_BITS);
             shuffleWithMask(writeSet, UNAVAIL_MASK, MASK_BITS);
         }
 
-        // remove all masks
+        // nodes within a region are ordered as follows
+        // (Random?) list of nodes that have no history of failure
+        // Nodes with Failure history are ordered in the reverse
+        // order of the most recent entry that generated an error
+        // The sort will have put them in correct order,
+        // so remove the bits that sort by age.
+        for (int i = 0; i < writeSet.size(); i++) {
+            int mask = writeSet.get(i) & MASK_BITS;
+            int idx = (writeSet.get(i) & ~MASK_BITS) % ensembleSize;
+            if (mask == LOCAL_FAIL_MASK) {
+                writeSet.set(i, LOCAL_MASK | idx);
+            } else if (mask == REMOTE_FAIL_MASK) {
+                writeSet.set(i, REMOTE_MASK | idx);
+            } else if (mask == SLOW_MASK) {
+                writeSet.set(i, SLOW_MASK | idx);
+            }
+        }
+
+        // Insert a node from the remote region at the specified location so
+        // we try more than one region within the max allowed latency
+        int firstRemote = -1;
         for (int i = 0; i < writeSet.size(); i++) {
-            writeSet.set(i, (writeSet.get(i) & ~MASK_BITS) % ensembleSize);
+            if ((writeSet.get(i) & MASK_BITS) == REMOTE_MASK) {
+                firstRemote = i;
+                break;
+            }
+        }
+        if (firstRemote != -1) {
+            int i = 0;
+            for (; i < remoteNodeInReorderSequence
+                && i < writeSet.size(); i++) {
+                if ((writeSet.get(i) & MASK_BITS) != LOCAL_MASK) {
+                    break;
+                }
+            }
+            writeSet.moveAndShift(firstRemote, i);
         }
 
+
+        // remove all masks
+        for (int i = 0; i < writeSet.size(); i++) {
+            writeSet.set(i, writeSet.get(i) & ~MASK_BITS);
+        }
         return writeSet;
     }
 
diff --git 
a/bookkeeper-server/src/main/java/org/apache/bookkeeper/client/ReadLastConfirmedAndEntryOp.java
 
b/bookkeeper-server/src/main/java/org/apache/bookkeeper/client/ReadLastConfirmedAndEntryOp.java
index fe2f64a..07cb318 100644
--- 
a/bookkeeper-server/src/main/java/org/apache/bookkeeper/client/ReadLastConfirmedAndEntryOp.java
+++ 
b/bookkeeper-server/src/main/java/org/apache/bookkeeper/client/ReadLastConfirmedAndEntryOp.java
@@ -83,7 +83,7 @@ class ReadLastConfirmedAndEntryOp implements 
BookkeeperInternalCallbacks.ReadEnt
             this.writeSet = lh.distributionSchedule.getWriteSet(eId);
             if (lh.bk.reorderReadSequence) {
                 this.orderedEnsemble = 
lh.bk.placementPolicy.reorderReadLACSequence(ensemble,
-                        lh.bookieFailureHistory.asMap(), writeSet.copy());
+                        lh.getBookiesHealthInfo(), writeSet.copy());
             } else {
                 this.orderedEnsemble = writeSet.copy();
             }
@@ -407,6 +407,21 @@ class ReadLastConfirmedAndEntryOp implements 
BookkeeperInternalCallbacks.ReadEnt
             }
         }
 
+        @Override
+        boolean complete(int bookieIndex, BookieSocketAddress host, ByteBuf 
buffer, long entryId) {
+            boolean completed = super.complete(bookieIndex, host, buffer, 
entryId);
+            if (completed) {
+                int numReplicasTried = getNextReplicaIndexToReadFrom();
+                // Check if any speculative reads were issued and mark any 
bookies before the
+                // first speculative read as slow
+                for (int i = 0; i < numReplicasTried; i++) {
+                    int slowBookieIndex = orderedEnsemble.get(i);
+                    BookieSocketAddress slowBookieSocketAddress = 
ensemble.get(slowBookieIndex);
+                    
lh.bk.placementPolicy.registerSlowBookie(slowBookieSocketAddress, entryId);
+                }
+            }
+            return completed;
+        }
     }
 
     ReadLastConfirmedAndEntryOp(LedgerHandle lh,
diff --git 
a/bookkeeper-server/src/main/java/org/apache/bookkeeper/client/RegionAwareEnsemblePlacementPolicy.java
 
b/bookkeeper-server/src/main/java/org/apache/bookkeeper/client/RegionAwareEnsemblePlacementPolicy.java
index ba8798e..f990b3f 100644
--- 
a/bookkeeper-server/src/main/java/org/apache/bookkeeper/client/RegionAwareEnsemblePlacementPolicy.java
+++ 
b/bookkeeper-server/src/main/java/org/apache/bookkeeper/client/RegionAwareEnsemblePlacementPolicy.java
@@ -521,122 +521,30 @@ public class RegionAwareEnsemblePlacementPolicy extends 
RackawareEnsemblePlaceme
     @Override
     public final DistributionSchedule.WriteSet reorderReadSequence(
             ArrayList<BookieSocketAddress> ensemble,
-            Map<BookieSocketAddress, Long> bookieFailureHistory,
+            BookiesHealthInfo bookiesHealthInfo,
             DistributionSchedule.WriteSet writeSet) {
         if (UNKNOWN_REGION.equals(myRegion)) {
-            return super.reorderReadSequence(ensemble, bookieFailureHistory,
-                                             writeSet);
+            return super.reorderReadSequence(ensemble, bookiesHealthInfo, 
writeSet);
         } else {
-            int ensembleSize = ensemble.size();
-
+            Map<Integer, String> writeSetWithRegion = new HashMap<>();
             for (int i = 0; i < writeSet.size(); i++) {
                 int idx = writeSet.get(i);
-                BookieSocketAddress address = ensemble.get(idx);
-                String region = getRegion(address);
-                Long lastFailedEntryOnBookie = 
bookieFailureHistory.get(address);
-                if (null == knownBookies.get(address)) {
-                    // there isn't too much differences between readonly 
bookies
-                    // from unavailable bookies. since there
-                    // is no write requests to them, so we shouldn't try 
reading
-                    // from readonly bookie in prior to writable bookies.
-                    if ((null == readOnlyBookies)
-                            || !readOnlyBookies.contains(address)) {
-                        writeSet.set(i, idx | UNAVAIL_MASK);
-                    } else {
-                        writeSet.set(i, idx | READ_ONLY_MASK);
-                    }
-                } else if (region.equals(myRegion)) {
-                    if ((lastFailedEntryOnBookie == null)
-                            || (lastFailedEntryOnBookie < 0)) {
-                        writeSet.set(i, idx | LOCAL_MASK);
-                    } else {
-                        long failIdx = lastFailedEntryOnBookie * ensembleSize 
+ idx;
-                        writeSet.set(i, (int) (failIdx & ~MASK_BITS) | 
LOCAL_FAIL_MASK);
-                    }
-                } else {
-                    if ((lastFailedEntryOnBookie == null)
-                            || (lastFailedEntryOnBookie < 0)) {
-                        writeSet.set(i, idx | REMOTE_MASK);
-                    } else {
-                        long failIdx = lastFailedEntryOnBookie * ensembleSize 
+ idx;
-                        writeSet.set(i, (int) (failIdx & ~MASK_BITS)
-                                     | REMOTE_FAIL_MASK);
-                    }
-                }
-            }
-
-            // Add a mask to ensure the sort is stable, sort,
-            // and then remove mask. This maintains stability as
-            // long as there are fewer than 16 bookies in the write set.
-            for (int i = 0; i < writeSet.size(); i++) {
-                writeSet.set(i, writeSet.get(i) | ((i & 0xF) << 20));
-            }
-            writeSet.sort();
-            for (int i = 0; i < writeSet.size(); i++) {
-                writeSet.set(i, writeSet.get(i) & ~((0xF) << 20));
-            }
-
-            if (reorderReadsRandom) {
-                shuffleWithMask(writeSet, LOCAL_MASK, MASK_BITS);
-                shuffleWithMask(writeSet, REMOTE_MASK, MASK_BITS);
-                shuffleWithMask(writeSet, READ_ONLY_MASK, MASK_BITS);
-                shuffleWithMask(writeSet, UNAVAIL_MASK, MASK_BITS);
-            }
-
-            // nodes within a region are ordered as follows
-            // (Random?) list of nodes that have no history of failure
-            // Nodes with Failure history are ordered in the reverse
-            // order of the most recent entry that generated an error
-            // The sort will have put them in correct order,
-            // so remove the bits that sort by age.
-            for (int i = 0; i < writeSet.size(); i++) {
-                int mask = writeSet.get(i) & MASK_BITS;
-                int idx = (writeSet.get(i) & ~MASK_BITS) % ensembleSize;
-                if (mask == LOCAL_FAIL_MASK) {
-                    writeSet.set(i, LOCAL_MASK | idx);
-                } else if (mask == REMOTE_FAIL_MASK) {
-                    writeSet.set(i, REMOTE_MASK | idx);
-                }
-            }
-
-            // Insert a node from the remote region at the specified location 
so
-            // we try more than one region within the max allowed latency
-            int firstRemote = -1;
-            for (int i = 0; i < writeSet.size(); i++) {
-                if ((writeSet.get(i) & MASK_BITS) == REMOTE_MASK) {
-                    firstRemote = i;
-                    break;
-                }
-            }
-            if (firstRemote != -1) {
-                int i = 0;
-                for (; i < REMOTE_NODE_IN_REORDER_SEQUENCE && i < 
writeSet.size(); i++) {
-                    if ((writeSet.get(i) & MASK_BITS) != LOCAL_MASK) {
-                        break;
-                    }
-                }
-                writeSet.moveAndShift(firstRemote, i);
-            }
-
-
-            // remove all masks
-            for (int i = 0; i < writeSet.size(); i++) {
-                writeSet.set(i, writeSet.get(i) & ~MASK_BITS);
+                writeSetWithRegion.put(idx, getRegion(ensemble.get(idx)));
             }
-            return writeSet;
+            return super.reorderReadSequenceWithRegion(ensemble, writeSet, 
writeSetWithRegion,
+                bookiesHealthInfo, true, myRegion, 
REMOTE_NODE_IN_REORDER_SEQUENCE);
         }
     }
 
     @Override
     public final DistributionSchedule.WriteSet reorderReadLACSequence(
             ArrayList<BookieSocketAddress> ensemble,
-            Map<BookieSocketAddress, Long> bookieFailureHistory,
+            BookiesHealthInfo bookiesHealthInfo,
             DistributionSchedule.WriteSet writeSet) {
         if (UNKNOWN_REGION.equals(myRegion)) {
-            return super.reorderReadLACSequence(ensemble, bookieFailureHistory,
-                                                writeSet);
+            return super.reorderReadLACSequence(ensemble, bookiesHealthInfo, 
writeSet);
         }
-        DistributionSchedule.WriteSet finalList = 
reorderReadSequence(ensemble, bookieFailureHistory, writeSet);
+        DistributionSchedule.WriteSet finalList = 
reorderReadSequence(ensemble, bookiesHealthInfo, writeSet);
         finalList.addMissingIndices(ensemble.size());
         return finalList;
     }
diff --git 
a/bookkeeper-server/src/main/java/org/apache/bookkeeper/client/TopologyAwareEnsemblePlacementPolicy.java
 
b/bookkeeper-server/src/main/java/org/apache/bookkeeper/client/TopologyAwareEnsemblePlacementPolicy.java
index 88953b3..b70cb0f 100644
--- 
a/bookkeeper-server/src/main/java/org/apache/bookkeeper/client/TopologyAwareEnsemblePlacementPolicy.java
+++ 
b/bookkeeper-server/src/main/java/org/apache/bookkeeper/client/TopologyAwareEnsemblePlacementPolicy.java
@@ -24,7 +24,6 @@ import com.google.common.collect.Sets;
 import java.util.ArrayList;
 import java.util.HashMap;
 import java.util.HashSet;
-import java.util.Map;
 import java.util.Set;
 
 import org.apache.bookkeeper.net.BookieSocketAddress;
@@ -461,7 +460,7 @@ abstract class TopologyAwareEnsemblePlacementPolicy 
implements
     @Override
     public DistributionSchedule.WriteSet reorderReadSequence(
             ArrayList<BookieSocketAddress> ensemble,
-            Map<BookieSocketAddress, Long> bookieFailureHistory,
+            BookiesHealthInfo bookiesHealthInfo,
             DistributionSchedule.WriteSet writeSet) {
         return writeSet;
     }
@@ -469,10 +468,10 @@ abstract class TopologyAwareEnsemblePlacementPolicy 
implements
     @Override
     public DistributionSchedule.WriteSet reorderReadLACSequence(
             ArrayList<BookieSocketAddress> ensemble,
-            Map<BookieSocketAddress, Long> bookieFailureHistory,
+            BookiesHealthInfo bookiesHealthInfo,
             DistributionSchedule.WriteSet writeSet) {
         DistributionSchedule.WriteSet retList = reorderReadSequence(
-                ensemble, bookieFailureHistory, writeSet);
+                ensemble, bookiesHealthInfo, writeSet);
         retList.addMissingIndices(ensemble.size());
         return retList;
     }
diff --git 
a/bookkeeper-server/src/main/java/org/apache/bookkeeper/conf/ClientConfiguration.java
 
b/bookkeeper-server/src/main/java/org/apache/bookkeeper/conf/ClientConfiguration.java
index 257c367..4d9c774 100644
--- 
a/bookkeeper-server/src/main/java/org/apache/bookkeeper/conf/ClientConfiguration.java
+++ 
b/bookkeeper-server/src/main/java/org/apache/bookkeeper/conf/ClientConfiguration.java
@@ -112,6 +112,8 @@ public class ClientConfiguration extends 
AbstractConfiguration<ClientConfigurati
     // Ensemble Placement Policy
     protected static final String ENSEMBLE_PLACEMENT_POLICY = 
"ensemblePlacementPolicy";
     protected static final String NETWORK_TOPOLOGY_STABILIZE_PERIOD_SECONDS = 
"networkTopologyStabilizePeriodSeconds";
+    protected static final String ENSEMBLE_PLACEMENT_POLICY_ORDER_SLOW_BOOKIES 
=
+        "ensemblePlacementPolicyOrderSlowBookies";
 
     // Ledger Metadata Parameters
     protected static final String STORE_SYSTEMTIME_AS_LEDGER_CREATION_TIME = 
"storeSystemTimeAsLedgerCreationTime";
@@ -1089,6 +1091,27 @@ public class ClientConfiguration extends 
AbstractConfiguration<ClientConfigurati
     }
 
     /**
+     * Whether to order slow bookies in placement policy.
+     *
+     * @return flag of whether to order slow bookies in placement policy or 
not.
+     */
+    public boolean getEnsemblePlacementPolicySlowBookies() {
+        return getBoolean(ENSEMBLE_PLACEMENT_POLICY_ORDER_SLOW_BOOKIES, false);
+    }
+
+    /**
+     * Enable/Disable ordering slow bookies in placement policy.
+     *
+     * @param enabled
+     *          flag to enable/disable ordering slow bookies in placement 
policy.
+     * @return client configuration.
+     */
+    public ClientConfiguration setEnsemblePlacementPolicySlowBookies(boolean 
enabled) {
+        setProperty(ENSEMBLE_PLACEMENT_POLICY_ORDER_SLOW_BOOKIES, enabled);
+        return this;
+    }
+
+    /**
      * Whether to enable recording task execution stats.
      *
      * @return flag to enable/disable recording task execution stats.
diff --git 
a/bookkeeper-server/src/main/java/org/apache/bookkeeper/proto/BookieClient.java 
b/bookkeeper-server/src/main/java/org/apache/bookkeeper/proto/BookieClient.java
index 9b1ca9c..08ed194 100644
--- 
a/bookkeeper-server/src/main/java/org/apache/bookkeeper/proto/BookieClient.java
+++ 
b/bookkeeper-server/src/main/java/org/apache/bookkeeper/proto/BookieClient.java
@@ -157,7 +157,7 @@ public class BookieClient implements 
PerChannelBookieClientFactory {
                                           authProviderFactory, registry, 
pcbcPool, shFactory);
     }
 
-    private PerChannelBookieClientPool lookupClient(BookieSocketAddress addr) {
+    public PerChannelBookieClientPool lookupClient(BookieSocketAddress addr) {
         PerChannelBookieClientPool clientPool = channels.get(addr);
         if (null == clientPool) {
             closeLock.readLock().lock();
diff --git 
a/bookkeeper-server/src/main/java/org/apache/bookkeeper/proto/DefaultPerChannelBookieClientPool.java
 
b/bookkeeper-server/src/main/java/org/apache/bookkeeper/proto/DefaultPerChannelBookieClientPool.java
index 41233cf..e57b8a7 100644
--- 
a/bookkeeper-server/src/main/java/org/apache/bookkeeper/proto/DefaultPerChannelBookieClientPool.java
+++ 
b/bookkeeper-server/src/main/java/org/apache/bookkeeper/proto/DefaultPerChannelBookieClientPool.java
@@ -119,4 +119,13 @@ class DefaultPerChannelBookieClientPool implements 
PerChannelBookieClientPool,
             pcbc.close(wait);
         }
     }
+
+    @Override
+    public long getNumPendingCompletionRequests() {
+        long numPending = 0;
+        for (PerChannelBookieClient pcbc : clients) {
+            numPending += pcbc.getNumPendingCompletionRequests();
+        }
+        return numPending;
+    }
 }
diff --git 
a/bookkeeper-server/src/main/java/org/apache/bookkeeper/proto/PerChannelBookieClient.java
 
b/bookkeeper-server/src/main/java/org/apache/bookkeeper/proto/PerChannelBookieClient.java
index 6e64b37..e0199ff 100644
--- 
a/bookkeeper-server/src/main/java/org/apache/bookkeeper/proto/PerChannelBookieClient.java
+++ 
b/bookkeeper-server/src/main/java/org/apache/bookkeeper/proto/PerChannelBookieClient.java
@@ -355,6 +355,10 @@ public class PerChannelBookieClient extends 
ChannelInboundHandlerAdapter {
         }
     }
 
+    protected long getNumPendingCompletionRequests() {
+        return completionObjects.size();
+    }
+
     protected ChannelFuture connect() {
         if (LOG.isDebugEnabled()) {
             LOG.debug("Connecting to bookie: {}", addr);
diff --git 
a/bookkeeper-server/src/main/java/org/apache/bookkeeper/proto/PerChannelBookieClientPool.java
 
b/bookkeeper-server/src/main/java/org/apache/bookkeeper/proto/PerChannelBookieClientPool.java
index 80f00a5..97a6a26 100644
--- 
a/bookkeeper-server/src/main/java/org/apache/bookkeeper/proto/PerChannelBookieClientPool.java
+++ 
b/bookkeeper-server/src/main/java/org/apache/bookkeeper/proto/PerChannelBookieClientPool.java
@@ -25,7 +25,7 @@ import 
org.apache.bookkeeper.proto.BookkeeperInternalCallbacks.GenericCallback;
 /**
  * An interface to manage channel pooling for bookie client.
  */
-interface PerChannelBookieClientPool {
+public interface PerChannelBookieClientPool {
 
     /**
      * intialize the pool. the implementation should not be blocked.
@@ -67,4 +67,8 @@ interface PerChannelBookieClientPool {
      */
     void close(boolean wait);
 
+    /**
+     * Get the number of pending completion requests in the channel.
+     */
+    long getNumPendingCompletionRequests();
 }
diff --git 
a/bookkeeper-server/src/test/java/org/apache/bookkeeper/client/TestRackawareEnsemblePlacementPolicy.java
 
b/bookkeeper-server/src/test/java/org/apache/bookkeeper/client/TestRackawareEnsemblePlacementPolicy.java
index 1ce8f0c..637df1b 100644
--- 
a/bookkeeper-server/src/test/java/org/apache/bookkeeper/client/TestRackawareEnsemblePlacementPolicy.java
+++ 
b/bookkeeper-server/src/test/java/org/apache/bookkeeper/client/TestRackawareEnsemblePlacementPolicy.java
@@ -104,6 +104,25 @@ public class TestRackawareEnsemblePlacementPolicy extends 
TestCase {
         super.tearDown();
     }
 
+    static BookiesHealthInfo getBookiesHealthInfo() {
+        return getBookiesHealthInfo(new HashMap<>(), new HashMap<>());
+    }
+
+    static BookiesHealthInfo getBookiesHealthInfo(Map<BookieSocketAddress, 
Long> bookieFailureHistory,
+                                                  Map<BookieSocketAddress, 
Long> bookiePendingRequests) {
+        return new BookiesHealthInfo() {
+            @Override
+            public long getBookieFailureHistory(BookieSocketAddress 
bookieSocketAddress) {
+                return bookieFailureHistory.getOrDefault(bookieSocketAddress, 
-1L);
+            }
+
+            @Override
+            public long getBookiePendingRequests(BookieSocketAddress 
bookieSocketAddress) {
+                return bookiePendingRequests.getOrDefault(bookieSocketAddress, 
0L);
+            }
+        };
+    }
+
     static void updateMyRack(String rack) throws Exception {
         
StaticDNSResolver.addNodeToRack(InetAddress.getLocalHost().getHostAddress(), 
rack);
         
StaticDNSResolver.addNodeToRack(InetAddress.getLocalHost().getHostName(), rack);
@@ -131,7 +150,7 @@ public class TestRackawareEnsemblePlacementPolicy extends 
TestCase {
 
         DistributionSchedule.WriteSet origWriteSet = writeSet.copy();
         DistributionSchedule.WriteSet reorderSet = repp.reorderReadSequence(
-                ensemble, new HashMap<BookieSocketAddress, Long>(),
+                ensemble, getBookiesHealthInfo(),
                 writeSet);
         DistributionSchedule.WriteSet expectedSet = writeSetFromValues(1, 2, 
3, 0);
         LOG.info("reorder set : {}", reorderSet);
@@ -162,7 +181,37 @@ public class TestRackawareEnsemblePlacementPolicy extends 
TestCase {
 
         DistributionSchedule.WriteSet origWriteSet = writeSet.copy();
         DistributionSchedule.WriteSet reorderSet = repp.reorderReadSequence(
-                ensemble, new HashMap<BookieSocketAddress, Long>(), writeSet);
+                ensemble, getBookiesHealthInfo(), writeSet);
+        DistributionSchedule.WriteSet expectedSet = writeSetFromValues(1, 2, 
3, 0);
+        LOG.info("reorder set : {}", reorderSet);
+        assertFalse(reorderSet.equals(origWriteSet));
+        assertEquals(expectedSet, reorderSet);
+    }
+
+    @Test
+    public void testNodeSlow() throws Exception {
+        repp.uninitalize();
+        updateMyRack("/r1/rack1");
+
+        repp = new RackawareEnsemblePlacementPolicy();
+        repp.initialize(conf, Optional.<DNSToSwitchMapping>empty(), timer, 
DISABLE_ALL, NullStatsLogger.INSTANCE);
+        repp.withDefaultRack(NetworkTopology.DEFAULT_REGION_AND_RACK);
+
+        // Update cluster
+        Set<BookieSocketAddress> addrs = new HashSet<BookieSocketAddress>();
+        addrs.add(addr1);
+        addrs.add(addr2);
+        addrs.add(addr3);
+        addrs.add(addr4);
+        repp.onClusterChanged(addrs, new HashSet<BookieSocketAddress>());
+        repp.registerSlowBookie(addr1, 0L);
+        Map<BookieSocketAddress, Long> bookiePendingMap = new HashMap<>();
+        bookiePendingMap.put(addr1, 1L);
+        repp.onClusterChanged(addrs, new HashSet<>());
+
+        DistributionSchedule.WriteSet origWriteSet = writeSet.copy();
+        DistributionSchedule.WriteSet reorderSet = repp.reorderReadSequence(
+            ensemble, getBookiesHealthInfo(new HashMap<>(), bookiePendingMap), 
writeSet);
         DistributionSchedule.WriteSet expectedSet = writeSetFromValues(1, 2, 
3, 0);
         LOG.info("reorder set : {}", reorderSet);
         assertFalse(reorderSet.equals(origWriteSet));
@@ -170,6 +219,38 @@ public class TestRackawareEnsemblePlacementPolicy extends 
TestCase {
     }
 
     @Test
+    public void testTwoNodesSlow() throws Exception {
+        repp.uninitalize();
+        updateMyRack("/r1/rack1");
+
+        repp = new RackawareEnsemblePlacementPolicy();
+        repp.initialize(conf, Optional.<DNSToSwitchMapping>empty(), timer, 
DISABLE_ALL, NullStatsLogger.INSTANCE);
+        repp.withDefaultRack(NetworkTopology.DEFAULT_REGION_AND_RACK);
+
+        // Update cluster
+        Set<BookieSocketAddress> addrs = new HashSet<BookieSocketAddress>();
+        addrs.add(addr1);
+        addrs.add(addr2);
+        addrs.add(addr3);
+        addrs.add(addr4);
+        repp.onClusterChanged(addrs, new HashSet<BookieSocketAddress>());
+        repp.registerSlowBookie(addr1, 0L);
+        repp.registerSlowBookie(addr2, 0L);
+        Map<BookieSocketAddress, Long> bookiePendingMap = new HashMap<>();
+        bookiePendingMap.put(addr1, 1L);
+        bookiePendingMap.put(addr2, 2L);
+        repp.onClusterChanged(addrs, new HashSet<>());
+
+        DistributionSchedule.WriteSet origWriteSet = writeSet.copy();
+        DistributionSchedule.WriteSet reorderSet = repp.reorderReadSequence(
+            ensemble, getBookiesHealthInfo(new HashMap<>(), bookiePendingMap), 
writeSet);
+        DistributionSchedule.WriteSet expectedSet = writeSetFromValues(2, 3, 
0, 1);
+        LOG.info("reorder set : {}", reorderSet);
+        assertFalse(reorderSet.equals(origWriteSet));
+        assertEquals(expectedSet, reorderSet);
+    }
+
+    @Test
     public void testTwoNodesDown() throws Exception {
         repp.uninitalize();
         updateMyRack("/r1/rack1");
@@ -191,7 +272,7 @@ public class TestRackawareEnsemblePlacementPolicy extends 
TestCase {
 
         DistributionSchedule.WriteSet origWriteSet = writeSet.copy();
         DistributionSchedule.WriteSet reorderSet = repp.reorderReadSequence(
-                ensemble, new HashMap<BookieSocketAddress, Long>(), writeSet);
+                ensemble, getBookiesHealthInfo(), writeSet);
         DistributionSchedule.WriteSet expectedSet = writeSetFromValues(2, 3, 
0, 1);
         LOG.info("reorder set : {}", reorderSet);
         assertFalse(reorderSet.equals(origWriteSet));
@@ -221,13 +302,79 @@ public class TestRackawareEnsemblePlacementPolicy extends 
TestCase {
         repp.onClusterChanged(addrs, roAddrs);
         DistributionSchedule.WriteSet origWriteSet = writeSet.copy();
         DistributionSchedule.WriteSet reorderSet = repp.reorderReadSequence(
-                ensemble, new HashMap<BookieSocketAddress, Long>(), writeSet);
+                ensemble, getBookiesHealthInfo(), writeSet);
         DistributionSchedule.WriteSet expectedSet = writeSetFromValues(2, 3, 
1, 0);
         assertFalse(reorderSet.equals(origWriteSet));
         assertEquals(expectedSet, reorderSet);
     }
 
     @Test
+    public void testNodeDownAndNodeSlow() throws Exception {
+        repp.uninitalize();
+        updateMyRack("/r1/rack1");
+
+        repp = new RackawareEnsemblePlacementPolicy();
+        repp.initialize(conf, Optional.<DNSToSwitchMapping>empty(), timer, 
DISABLE_ALL, NullStatsLogger.INSTANCE);
+        repp.withDefaultRack(NetworkTopology.DEFAULT_REGION_AND_RACK);
+
+        // Update cluster
+        Set<BookieSocketAddress> addrs = new HashSet<BookieSocketAddress>();
+        addrs.add(addr1);
+        addrs.add(addr2);
+        addrs.add(addr3);
+        addrs.add(addr4);
+        repp.onClusterChanged(addrs, new HashSet<BookieSocketAddress>());
+        repp.registerSlowBookie(addr1, 0L);
+        Map<BookieSocketAddress, Long> bookiePendingMap = new HashMap<>();
+        bookiePendingMap.put(addr1, 1L);
+        addrs.remove(addr2);
+        repp.onClusterChanged(addrs, new HashSet<>());
+
+        DistributionSchedule.WriteSet origWriteSet = writeSet.copy();
+        DistributionSchedule.WriteSet reorderSet = repp.reorderReadSequence(
+            ensemble, getBookiesHealthInfo(new HashMap<>(), bookiePendingMap), 
writeSet);
+        DistributionSchedule.WriteSet expectedSet = writeSetFromValues(2, 3, 
0, 1);
+        LOG.info("reorder set : {}", reorderSet);
+        assertFalse(reorderSet.equals(origWriteSet));
+        assertEquals(expectedSet, reorderSet);
+    }
+
+    @Test
+    public void testNodeDownAndReadOnlyAndNodeSlow() throws Exception {
+        repp.uninitalize();
+        updateMyRack("/r1/rack1");
+
+        repp = new RackawareEnsemblePlacementPolicy();
+        repp.initialize(conf, Optional.<DNSToSwitchMapping>empty(), timer, 
DISABLE_ALL, NullStatsLogger.INSTANCE);
+        repp.withDefaultRack(NetworkTopology.DEFAULT_REGION_AND_RACK);
+
+        // Update cluster
+        Set<BookieSocketAddress> addrs = new HashSet<BookieSocketAddress>();
+        addrs.add(addr1);
+        addrs.add(addr2);
+        addrs.add(addr3);
+        addrs.add(addr4);
+        repp.onClusterChanged(addrs, new HashSet<BookieSocketAddress>());
+        addrs.remove(addr1);
+        addrs.remove(addr2);
+        Set<BookieSocketAddress> ro = new HashSet<BookieSocketAddress>();
+        ro.add(addr2);
+        repp.registerSlowBookie(addr3, 0L);
+        Map<BookieSocketAddress, Long> bookiePendingMap = new HashMap<>();
+        bookiePendingMap.put(addr3, 1L);
+        addrs.remove(addr2);
+        repp.onClusterChanged(addrs, ro);
+
+        DistributionSchedule.WriteSet origWriteSet = writeSet.copy();
+        DistributionSchedule.WriteSet reorderSet = repp.reorderReadSequence(
+            ensemble, getBookiesHealthInfo(new HashMap<>(), bookiePendingMap), 
writeSet);
+        DistributionSchedule.WriteSet expectedSet = writeSetFromValues(3, 1, 
2, 0);
+        LOG.info("reorder set : {}", reorderSet);
+        assertFalse(reorderSet.equals(origWriteSet));
+        assertEquals(expectedSet, reorderSet);
+    }
+
+    @Test
     public void testReplaceBookieWithEnoughBookiesInSameRack() throws 
Exception {
         BookieSocketAddress addr1 = new BookieSocketAddress("127.0.0.2", 3181);
         BookieSocketAddress addr2 = new BookieSocketAddress("127.0.0.3", 3181);
@@ -738,7 +885,7 @@ public class TestRackawareEnsemblePlacementPolicy extends 
TestCase {
         bookieFailures.put(addr2, 22L);
 
         DistributionSchedule.WriteSet reoderSet = repp.reorderReadSequence(
-                ensemble, bookieFailures, writeSet);
+                ensemble, getBookiesHealthInfo(bookieFailures, new 
HashMap<>()), writeSet);
         LOG.info("reorder set : {}", reoderSet);
         assertEquals(ensemble.get(reoderSet.get(2)), addr1);
         assertEquals(ensemble.get(reoderSet.get(3)), addr2);
diff --git 
a/bookkeeper-server/src/test/java/org/apache/bookkeeper/client/TestRegionAwareEnsemblePlacementPolicy.java
 
b/bookkeeper-server/src/test/java/org/apache/bookkeeper/client/TestRegionAwareEnsemblePlacementPolicy.java
index 1dcf0e1..fe75b31 100644
--- 
a/bookkeeper-server/src/test/java/org/apache/bookkeeper/client/TestRegionAwareEnsemblePlacementPolicy.java
+++ 
b/bookkeeper-server/src/test/java/org/apache/bookkeeper/client/TestRegionAwareEnsemblePlacementPolicy.java
@@ -35,6 +35,7 @@ import java.util.ArrayList;
 import java.util.HashMap;
 import java.util.HashSet;
 import java.util.List;
+import java.util.Map;
 import java.util.Optional;
 import java.util.Set;
 import java.util.concurrent.TimeUnit;
@@ -118,6 +119,26 @@ public class TestRegionAwareEnsemblePlacementPolicy 
extends TestCase {
         super.tearDown();
     }
 
+    static BookiesHealthInfo getBookiesHealthInfo() {
+        return getBookiesHealthInfo(new HashMap<>(), new HashMap<>());
+    }
+
+    static BookiesHealthInfo getBookiesHealthInfo(Map<BookieSocketAddress, 
Long> bookieFailureHistory,
+                                                  Map<BookieSocketAddress, 
Long> bookiePendingRequests) {
+        return new BookiesHealthInfo() {
+            @Override
+            public long getBookieFailureHistory(BookieSocketAddress 
bookieSocketAddress) {
+                return bookieFailureHistory.getOrDefault(bookieSocketAddress, 
-1L);
+            }
+
+            @Override
+            public long getBookiePendingRequests(BookieSocketAddress 
bookieSocketAddress) {
+                return bookiePendingRequests.getOrDefault(bookieSocketAddress, 
0L);
+            }
+        };
+    }
+
+
     @Test
     public void testNotReorderReadIfInDefaultRack() throws Exception {
         repp.uninitalize();
@@ -128,7 +149,7 @@ public class TestRegionAwareEnsemblePlacementPolicy extends 
TestCase {
 
         DistributionSchedule.WriteSet origWriteSet = writeSet.copy();
         DistributionSchedule.WriteSet reorderSet = repp.reorderReadSequence(
-                ensemble, new HashMap<BookieSocketAddress, Long>(), writeSet);
+                ensemble, getBookiesHealthInfo(), writeSet);
         assertEquals(origWriteSet, reorderSet);
     }
 
@@ -151,7 +172,7 @@ public class TestRegionAwareEnsemblePlacementPolicy extends 
TestCase {
         repp.onClusterChanged(addrs, new HashSet<BookieSocketAddress>());
 
         DistributionSchedule.WriteSet reorderSet = repp.reorderReadSequence(
-                ensemble, new HashMap<BookieSocketAddress, Long>(), 
writeSet.copy());
+                ensemble, getBookiesHealthInfo(), writeSet.copy());
         DistributionSchedule.WriteSet expectedSet = writeSetFromValues(0, 3, 
1, 2);
         LOG.info("write set : {}", writeSet);
         LOG.info("reorder set : {}", reorderSet);
@@ -171,7 +192,7 @@ public class TestRegionAwareEnsemblePlacementPolicy extends 
TestCase {
 
         DistributionSchedule.WriteSet origWriteSet = writeSet.copy();
         DistributionSchedule.WriteSet reorderSet = repp.reorderReadSequence(
-                ensemble, new HashMap<BookieSocketAddress, Long>(), writeSet);
+                ensemble, getBookiesHealthInfo(), writeSet);
         LOG.info("reorder set : {}", reorderSet);
         assertEquals(origWriteSet, reorderSet);
     }
@@ -196,7 +217,7 @@ public class TestRegionAwareEnsemblePlacementPolicy extends 
TestCase {
 
         DistributionSchedule.WriteSet origWriteSet = writeSet.copy();
         DistributionSchedule.WriteSet reorderSet = repp.reorderReadSequence(
-                ensemble, new HashMap<BookieSocketAddress, Long>(), writeSet);
+                ensemble, getBookiesHealthInfo(), writeSet);
         DistributionSchedule.WriteSet expectedSet = writeSetFromValues(3, 1, 
2, 0);
         LOG.info("reorder set : {}", reorderSet);
         assertFalse(reorderSet.equals(origWriteSet));
@@ -225,7 +246,7 @@ public class TestRegionAwareEnsemblePlacementPolicy extends 
TestCase {
 
         DistributionSchedule.WriteSet origWriteSet = writeSet.copy();
         DistributionSchedule.WriteSet reorderSet = repp.reorderReadSequence(
-                ensemble, new HashMap<BookieSocketAddress, Long>(), writeSet);
+                ensemble, getBookiesHealthInfo(), writeSet);
         DistributionSchedule.WriteSet expectedSet = writeSetFromValues(3, 1, 
2, 0);
         LOG.info("reorder set : {}", reorderSet);
         assertFalse(reorderSet.equals(origWriteSet));
@@ -233,6 +254,66 @@ public class TestRegionAwareEnsemblePlacementPolicy 
extends TestCase {
     }
 
     @Test
+    public void testNodeSlow() throws Exception {
+        repp.uninitalize();
+        updateMyRack("/r1/rack1");
+
+        repp = new RegionAwareEnsemblePlacementPolicy();
+        repp.initialize(conf, Optional.<DNSToSwitchMapping>empty(), timer, 
DISABLE_ALL, NullStatsLogger.INSTANCE);
+
+        // Update cluster
+        Set<BookieSocketAddress> addrs = new HashSet<BookieSocketAddress>();
+        addrs.add(addr1);
+        addrs.add(addr2);
+        addrs.add(addr3);
+        addrs.add(addr4);
+        repp.onClusterChanged(addrs, new HashSet<BookieSocketAddress>());
+        repp.registerSlowBookie(addr1, 0L);
+        Map<BookieSocketAddress, Long> bookiePendingMap = new HashMap<>();
+        bookiePendingMap.put(addr1, 1L);
+        repp.onClusterChanged(addrs, new HashSet<>());
+
+        DistributionSchedule.WriteSet origWriteSet = writeSet.copy();
+        DistributionSchedule.WriteSet reorderSet = repp.reorderReadSequence(
+            ensemble, getBookiesHealthInfo(new HashMap<>(), bookiePendingMap), 
writeSet);
+        DistributionSchedule.WriteSet expectedSet = writeSetFromValues(3, 1, 
2, 0);
+        LOG.info("reorder set : {}", reorderSet);
+        assertFalse(reorderSet.equals(origWriteSet));
+        assertEquals(expectedSet, reorderSet);
+    }
+
+    @Test
+    public void testTwoNodesSlow() throws Exception {
+        repp.uninitalize();
+        updateMyRack("/r1/rack1");
+
+        repp = new RegionAwareEnsemblePlacementPolicy();
+        repp.initialize(conf, Optional.<DNSToSwitchMapping>empty(), timer, 
DISABLE_ALL, NullStatsLogger.INSTANCE);
+
+        // Update cluster
+        Set<BookieSocketAddress> addrs = new HashSet<BookieSocketAddress>();
+        addrs.add(addr1);
+        addrs.add(addr2);
+        addrs.add(addr3);
+        addrs.add(addr4);
+        repp.onClusterChanged(addrs, new HashSet<BookieSocketAddress>());
+        repp.registerSlowBookie(addr1, 0L);
+        repp.registerSlowBookie(addr2, 0L);
+        Map<BookieSocketAddress, Long> bookiePendingMap = new HashMap<>();
+        bookiePendingMap.put(addr1, 1L);
+        bookiePendingMap.put(addr2, 2L);
+        repp.onClusterChanged(addrs, new HashSet<>());
+
+        DistributionSchedule.WriteSet origWriteSet = writeSet.copy();
+        DistributionSchedule.WriteSet reorderSet = repp.reorderReadSequence(
+            ensemble, getBookiesHealthInfo(new HashMap<>(), bookiePendingMap), 
writeSet);
+        DistributionSchedule.WriteSet expectedSet = writeSetFromValues(3, 2, 
0, 1);
+        LOG.info("reorder set : {}", reorderSet);
+        assertFalse(reorderSet.equals(origWriteSet));
+        assertEquals(expectedSet, reorderSet);
+    }
+
+    @Test
     public void testTwoNodesDown() throws Exception {
         repp.uninitalize();
         updateMyRack("/r1/rack1");
@@ -253,7 +334,37 @@ public class TestRegionAwareEnsemblePlacementPolicy 
extends TestCase {
 
         DistributionSchedule.WriteSet origWriteSet = writeSet.copy();
         DistributionSchedule.WriteSet reorderSet = repp.reorderReadSequence(
-                ensemble, new HashMap<BookieSocketAddress, Long>(), writeSet);
+                ensemble, getBookiesHealthInfo(), writeSet);
+        DistributionSchedule.WriteSet expectedSet = writeSetFromValues(3, 2, 
0, 1);
+        LOG.info("reorder set : {}", reorderSet);
+        assertFalse(reorderSet.equals(origWriteSet));
+        assertEquals(expectedSet, reorderSet);
+    }
+
+    @Test
+    public void testNodeDownAndNodeSlow() throws Exception {
+        repp.uninitalize();
+        updateMyRack("/r1/rack1");
+
+        repp = new RegionAwareEnsemblePlacementPolicy();
+        repp.initialize(conf, Optional.<DNSToSwitchMapping>empty(), timer, 
DISABLE_ALL, NullStatsLogger.INSTANCE);
+
+        // Update cluster
+        Set<BookieSocketAddress> addrs = new HashSet<BookieSocketAddress>();
+        addrs.add(addr1);
+        addrs.add(addr2);
+        addrs.add(addr3);
+        addrs.add(addr4);
+        repp.onClusterChanged(addrs, new HashSet<BookieSocketAddress>());
+        repp.registerSlowBookie(addr1, 0L);
+        Map<BookieSocketAddress, Long> bookiePendingMap = new HashMap<>();
+        bookiePendingMap.put(addr1, 1L);
+        addrs.remove(addr2);
+        repp.onClusterChanged(addrs, new HashSet<BookieSocketAddress>());
+
+        DistributionSchedule.WriteSet origWriteSet = writeSet.copy();
+        DistributionSchedule.WriteSet reorderSet = repp.reorderReadSequence(
+            ensemble, getBookiesHealthInfo(new HashMap<>(), bookiePendingMap), 
writeSet);
         DistributionSchedule.WriteSet expectedSet = writeSetFromValues(3, 2, 
0, 1);
         LOG.info("reorder set : {}", reorderSet);
         assertFalse(reorderSet.equals(origWriteSet));
@@ -261,6 +372,39 @@ public class TestRegionAwareEnsemblePlacementPolicy 
extends TestCase {
     }
 
     @Test
+    public void testNodeDownAndReadOnlyAndNodeSlow() throws Exception {
+        repp.uninitalize();
+        updateMyRack("/r1/rack1");
+
+        repp = new RegionAwareEnsemblePlacementPolicy();
+        repp.initialize(conf, Optional.<DNSToSwitchMapping>empty(), timer, 
DISABLE_ALL, NullStatsLogger.INSTANCE);
+
+        // Update cluster
+        Set<BookieSocketAddress> addrs = new HashSet<BookieSocketAddress>();
+        addrs.add(addr1);
+        addrs.add(addr2);
+        addrs.add(addr3);
+        addrs.add(addr4);
+        repp.onClusterChanged(addrs, new HashSet<BookieSocketAddress>());
+        addrs.remove(addr1);
+        addrs.remove(addr2);
+        Set<BookieSocketAddress> ro = new HashSet<>();
+        ro.add(addr2);
+        repp.registerSlowBookie(addr3, 0L);
+        Map<BookieSocketAddress, Long> bookiePendingMap = new HashMap<>();
+        bookiePendingMap.put(addr3, 1L);
+        repp.onClusterChanged(addrs, ro);
+
+        DistributionSchedule.WriteSet origWriteSet = writeSet.copy();
+        DistributionSchedule.WriteSet reorderSet = repp.reorderReadSequence(
+            ensemble, getBookiesHealthInfo(new HashMap<>(), bookiePendingMap), 
writeSet);
+        DistributionSchedule.WriteSet expectedSet = writeSetFromValues(3, 1, 
2, 0);
+        LOG.info("reorder set : {}", reorderSet);
+        assertFalse(reorderSet.equals(origWriteSet));
+        assertEquals(expectedSet, reorderSet);
+    }
+
+    @Test
     public void testReplaceBookieWithEnoughBookiesInSameRegion() throws 
Exception {
         BookieSocketAddress addr1 = new BookieSocketAddress("127.0.0.2", 3181);
         BookieSocketAddress addr2 = new BookieSocketAddress("127.0.0.3", 3181);
@@ -1068,11 +1212,11 @@ public class TestRegionAwareEnsemblePlacementPolicy 
extends TestCase {
             if (isReadLAC) {
                 readSet = repp.reorderReadLACSequence(
                         ensemble,
-                        new HashMap<BookieSocketAddress, Long>(), writeSet);
+                        getBookiesHealthInfo(), writeSet);
             } else {
                 readSet = repp.reorderReadSequence(
                         ensemble,
-                        new HashMap<BookieSocketAddress, Long>(), writeSet);
+                        getBookiesHealthInfo(), writeSet);
             }
 
             LOG.info("Reorder {} => {}.", origWriteSet, readSet);
@@ -1124,12 +1268,12 @@ public class TestRegionAwareEnsemblePlacementPolicy 
extends TestCase {
             if (isReadLAC) {
                 readSet = repp.reorderReadLACSequence(
                         ensemble,
-                        new HashMap<BookieSocketAddress, Long>(),
+                        getBookiesHealthInfo(),
                         writeSet.copy());
             } else {
                 readSet = repp.reorderReadSequence(
                         ensemble,
-                        new HashMap<BookieSocketAddress, Long>(),
+                        getBookiesHealthInfo(),
                         writeSet.copy());
             }
 
@@ -1199,11 +1343,11 @@ public class TestRegionAwareEnsemblePlacementPolicy 
extends TestCase {
             DistributionSchedule.WriteSet readSet;
             if (isReadLAC) {
                 readSet = repp.reorderReadLACSequence(
-                        ensemble, new HashMap<BookieSocketAddress, Long>(),
+                        ensemble, getBookiesHealthInfo(),
                         writeSet.copy());
             } else {
                 readSet = repp.reorderReadSequence(
-                        ensemble, new HashMap<BookieSocketAddress, Long>(),
+                        ensemble, getBookiesHealthInfo(),
                         writeSet.copy());
             }
 
@@ -1293,7 +1437,7 @@ public class TestRegionAwareEnsemblePlacementPolicy 
extends TestCase {
 
         LOG.info("write set : {}", writeSet2);
         DistributionSchedule.WriteSet reoderSet = repp.reorderReadSequence(
-                ensemble, bookieFailures, writeSet2);
+                ensemble, getBookiesHealthInfo(bookieFailures, new 
HashMap<>()), writeSet2);
         LOG.info("reorder set : {}", reoderSet);
         assertEquals(ensemble.get(reoderSet.get(0)), addr6);
         assertEquals(ensemble.get(reoderSet.get(1)), addr7);
diff --git 
a/bookkeeper-server/src/test/java/org/apache/bookkeeper/client/TestSpeculativeRead.java
 
b/bookkeeper-server/src/test/java/org/apache/bookkeeper/client/TestSpeculativeRead.java
index 41e5b2e..d692cea 100644
--- 
a/bookkeeper-server/src/test/java/org/apache/bookkeeper/client/TestSpeculativeRead.java
+++ 
b/bookkeeper-server/src/test/java/org/apache/bookkeeper/client/TestSpeculativeRead.java
@@ -20,6 +20,7 @@
  */
 package org.apache.bookkeeper.client;
 
+import static org.junit.Assert.assertEquals;
 import static org.junit.Assert.assertFalse;
 import static org.junit.Assert.assertNull;
 import static org.junit.Assert.assertTrue;
@@ -27,6 +28,8 @@ import static org.junit.Assert.assertTrue;
 import java.util.ArrayList;
 import java.util.BitSet;
 import java.util.Enumeration;
+import java.util.HashSet;
+import java.util.Set;
 import java.util.concurrent.CountDownLatch;
 import java.util.concurrent.TimeUnit;
 
@@ -69,7 +72,9 @@ public class TestSpeculativeRead extends 
BookKeeperClusterTestCase {
     BookKeeper createClient(int specTimeout) throws Exception {
         ClientConfiguration conf = new ClientConfiguration()
             .setSpeculativeReadTimeout(specTimeout)
-            .setReadTimeout(30000);
+            .setReadTimeout(30000)
+            .setReorderReadSequenceEnabled(true)
+            .setEnsemblePlacementPolicySlowBookies(true);
         conf.setZkServers(zkUtil.getZooKeeperConnectString());
         return new BookKeeper(conf);
     }
@@ -150,6 +155,10 @@ public class TestSpeculativeRead extends 
BookKeeperClusterTestCase {
             lspec.asyncReadEntries(1, 1, speccb, null);
             speccb.expectSuccess(4000);
             nospeccb.expectTimeout(4000);
+            // Check that the second bookie is registered as slow at entryId 1
+            RackawareEnsemblePlacementPolicy rep = 
(RackawareEnsemblePlacementPolicy) lspec.bk.placementPolicy;
+            assertTrue(rep.slowBookies.asMap().size() == 1);
+            assertTrue(rep.slowBookies.asMap().get(second) == 1L);
         } finally {
             sleepLatch.countDown();
             lspec.close();
@@ -195,16 +204,18 @@ public class TestSpeculativeRead extends 
BookKeeperClusterTestCase {
                        latch1.getDuration() >= timeout * 2
                        && latch1.getDuration() < timeout * 3);
 
-            // third should have to hit one timeouts (bookie 2)
+            // bookies 1 & 2 should be registered as slow bookies because of 
speculative reads
+            Set<BookieSocketAddress> expectedSlowBookies = new HashSet<>();
+            
expectedSlowBookies.add(l.getLedgerMetadata().getEnsembles().get(0L).get(1));
+            
expectedSlowBookies.add(l.getLedgerMetadata().getEnsembles().get(0L).get(2));
+            assertEquals(((RackawareEnsemblePlacementPolicy) 
l.bk.placementPolicy).slowBookies.asMap().keySet(),
+                expectedSlowBookies);
+
+            // third should not hit timeouts since bookies 1 & 2 are 
registered as slow
             // bookie 3 has the entry
             LatchCallback latch2 = new LatchCallback();
             l.asyncReadEntries(2, 2, latch2, null);
-            latch2.expectTimeout(timeout / 2);
             latch2.expectSuccess(timeout);
-            LOG.info("Timeout {} latch2 duration {}", timeout, 
latch2.getDuration());
-            assertTrue("should have taken longer than one timeout, but less 
than 2",
-                       latch2.getDuration() >= timeout
-                       && latch2.getDuration() < timeout * 2);
 
             // fourth should have no timeout
             // bookie 3 has the entry

-- 
To stop receiving notification emails like this one, please contact
['"[email protected]" <[email protected]>'].

Reply via email to