philipsu522 closed pull request #776: Issue 709: Add Slow Bookkeeper Servers to 
Placement Policy for read ordering
URL: https://github.com/apache/bookkeeper/pull/776
 
 
   

This is a PR merged from a forked repository.
As GitHub hides the original diff on merge, it is displayed below for
the sake of provenance:

As this is a foreign pull request (from a fork), the diff is supplied
below (as it won't show otherwise due to GitHub magic):

diff --git 
a/bookkeeper-server/src/main/java/org/apache/bookkeeper/bookie/LocalBookieEnsemblePlacementPolicy.java
 
b/bookkeeper-server/src/main/java/org/apache/bookkeeper/bookie/LocalBookieEnsemblePlacementPolicy.java
index 6bb8e5db2..481e5d36a 100644
--- 
a/bookkeeper-server/src/main/java/org/apache/bookkeeper/bookie/LocalBookieEnsemblePlacementPolicy.java
+++ 
b/bookkeeper-server/src/main/java/org/apache/bookkeeper/bookie/LocalBookieEnsemblePlacementPolicy.java
@@ -23,11 +23,11 @@
 import java.util.ArrayList;
 import java.util.Collection;
 import java.util.Collections;
-import java.util.List;
 import java.util.Map;
 import java.util.Optional;
 import java.util.Set;
 import org.apache.bookkeeper.client.BKException.BKNotEnoughBookiesException;
+import org.apache.bookkeeper.client.BookiesHealthInfo;
 import org.apache.bookkeeper.client.BookieInfoReader.BookieInfo;
 import org.apache.bookkeeper.client.DistributionSchedule;
 import org.apache.bookkeeper.client.EnsemblePlacementPolicy;
@@ -88,10 +88,15 @@ public BookieSocketAddress replaceBookie(int ensembleSize, 
int writeQuorumSize,
         throw new BKNotEnoughBookiesException();
     }
 
+    @Override
+    public void registerSlowBookie(BookieSocketAddress bookieSocketAddress, 
long entryId) {
+        return;
+    }
+
     @Override
     public DistributionSchedule.WriteSet reorderReadSequence(
             ArrayList<BookieSocketAddress> ensemble,
-            Map<BookieSocketAddress, Long> bookieFailureHistory,
+            BookiesHealthInfo bookiesHealthInfo,
             DistributionSchedule.WriteSet writeSet) {
         return null;
     }
@@ -99,7 +104,7 @@ public BookieSocketAddress replaceBookie(int ensembleSize, 
int writeQuorumSize,
     @Override
     public DistributionSchedule.WriteSet reorderReadLACSequence(
             ArrayList<BookieSocketAddress> ensemble,
-            Map<BookieSocketAddress, Long> bookieFailureHistory,
+            BookiesHealthInfo bookiesHealthInfo,
             DistributionSchedule.WriteSet writeSet) {
         return null;
     }
diff --git 
a/bookkeeper-server/src/main/java/org/apache/bookkeeper/client/BookiesHealthInfo.java
 
b/bookkeeper-server/src/main/java/org/apache/bookkeeper/client/BookiesHealthInfo.java
new file mode 100644
index 000000000..d0a846ccd
--- /dev/null
+++ 
b/bookkeeper-server/src/main/java/org/apache/bookkeeper/client/BookiesHealthInfo.java
@@ -0,0 +1,46 @@
+/**
+ *
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ *
+ */
+package org.apache.bookkeeper.client;
+
+import org.apache.bookkeeper.net.BookieSocketAddress;
+
+/**
+ * This interface returns heuristics used to determine the health of a 
Bookkeeper server for read
+ * ordering.
+ */
+public interface BookiesHealthInfo {
+
+  /**
+   * Return the failure history for a bookie.
+   *
+   * @param bookieSocketAddress
+   * @return failed entries on a bookie, -1 if there has been no failures.
+   */
+  public long getBookieFailureHistory(BookieSocketAddress bookieSocketAddress);
+
+  /**
+   * Returns pending requests to a bookie.
+   * @param bookieSocketAddress
+   * @return number of pending requests
+   */
+  public int getBookiePendingRequests(BookieSocketAddress bookieSocketAddress);
+
+}
diff --git 
a/bookkeeper-server/src/main/java/org/apache/bookkeeper/client/DefaultEnsemblePlacementPolicy.java
 
b/bookkeeper-server/src/main/java/org/apache/bookkeeper/client/DefaultEnsemblePlacementPolicy.java
index ffb229d6b..a2af2b727 100644
--- 
a/bookkeeper-server/src/main/java/org/apache/bookkeeper/client/DefaultEnsemblePlacementPolicy.java
+++ 
b/bookkeeper-server/src/main/java/org/apache/bookkeeper/client/DefaultEnsemblePlacementPolicy.java
@@ -145,10 +145,15 @@ public BookieSocketAddress replaceBookie(int 
ensembleSize, int writeQuorumSize,
         }
     }
 
+    @Override
+    public void registerSlowBookie(BookieSocketAddress bookieSocketAddress, 
long entryId) {
+        return;
+    }
+
     @Override
     public DistributionSchedule.WriteSet reorderReadSequence(
             ArrayList<BookieSocketAddress> ensemble,
-            Map<BookieSocketAddress, Long> bookieFailureHistory,
+            BookiesHealthInfo bookiesHealthInfo,
             DistributionSchedule.WriteSet writeSet) {
         return writeSet;
     }
@@ -156,7 +161,7 @@ public BookieSocketAddress replaceBookie(int ensembleSize, 
int writeQuorumSize,
     @Override
     public DistributionSchedule.WriteSet reorderReadLACSequence(
             ArrayList<BookieSocketAddress> ensemble,
-            Map<BookieSocketAddress, Long> bookieFailureHistory,
+            BookiesHealthInfo bookiesHealthInfo,
             DistributionSchedule.WriteSet writeSet) {
         writeSet.addMissingIndices(ensemble.size());
         return writeSet;
diff --git 
a/bookkeeper-server/src/main/java/org/apache/bookkeeper/client/DistributionSchedule.java
 
b/bookkeeper-server/src/main/java/org/apache/bookkeeper/client/DistributionSchedule.java
index 6db2b6c2b..de7c870cc 100644
--- 
a/bookkeeper-server/src/main/java/org/apache/bookkeeper/client/DistributionSchedule.java
+++ 
b/bookkeeper-server/src/main/java/org/apache/bookkeeper/client/DistributionSchedule.java
@@ -22,7 +22,7 @@
 import java.util.Map;
 
 /**
- * This interface determins how entries are distributed among bookies.
+ * This interface determines how entries are distributed among bookies.
  *
  * Every entry gets replicated to some number of replicas. The first replica 
for
  * an entry is given a replicaIndex of 0, and so on. To distribute write load,
diff --git 
a/bookkeeper-server/src/main/java/org/apache/bookkeeper/client/EnsemblePlacementPolicy.java
 
b/bookkeeper-server/src/main/java/org/apache/bookkeeper/client/EnsemblePlacementPolicy.java
index 2301a9544..268a808fd 100644
--- 
a/bookkeeper-server/src/main/java/org/apache/bookkeeper/client/EnsemblePlacementPolicy.java
+++ 
b/bookkeeper-server/src/main/java/org/apache/bookkeeper/client/EnsemblePlacementPolicy.java
@@ -20,7 +20,6 @@
 import io.netty.util.HashedWheelTimer;
 import java.util.ArrayList;
 import java.util.Collection;
-import java.util.List;
 import java.util.Map;
 import java.util.Optional;
 import java.util.Set;
@@ -151,7 +150,8 @@
  *
  * <h3>How to choose bookies to do speculative reads?</h3>
  *
- * <p>{@link #reorderReadSequence(ArrayList, List, Map)} and {@link 
#reorderReadLACSequence(ArrayList, List, Map)} are
+ * <p>{@link #reorderReadSequence(ArrayList, BookiesHealthInfo, Map)} and
+ * {@link #reorderReadLACSequence(ArrayList, BookiesHealthInfo, Map)} are
  * two methods exposed by the placement policy, to help client determine a 
better read sequence according to the
  * network topology and the bookie failure history.
  *
@@ -289,13 +289,23 @@ public BookieSocketAddress replaceBookie(int ensembleSize,
                                              Set<BookieSocketAddress> 
excludeBookies)
         throws BKNotEnoughBookiesException;
 
+    /**
+     * Register a bookie as slow so that it is tried after available and 
read-only bookies.
+     *
+     * @param bookieSocketAddress
+     *          Address of bookie host
+     * @param entryId
+     *          Entry ID that caused speculative timeout on the bookie.
+     */
+    public void registerSlowBookie(BookieSocketAddress bookieSocketAddress, 
long entryId);
+
     /**
      * Reorder the read sequence of a given write quorum <i>writeSet</i>.
      *
      * @param ensemble
      *          Ensemble to read entries.
-     * @param bookieFailureHistory
-     *          Observed failures on the bookies
+     * @param bookiesHealthInfo
+     *          Health info for bookies
      * @param writeSet
      *          Write quorum to read entries. This will be modified, rather 
than
      *          allocating a new WriteSet.
@@ -305,7 +315,7 @@ public BookieSocketAddress replaceBookie(int ensembleSize,
      */
     public DistributionSchedule.WriteSet reorderReadSequence(
             ArrayList<BookieSocketAddress> ensemble,
-            Map<BookieSocketAddress, Long> bookieFailureHistory,
+            BookiesHealthInfo bookiesHealthInfo,
             DistributionSchedule.WriteSet writeSet);
 
 
@@ -314,8 +324,8 @@ public BookieSocketAddress replaceBookie(int ensembleSize,
      *
      * @param ensemble
      *          Ensemble to read entries.
-     * @param bookieFailureHistory
-     *          Observed failures on the bookies
+     * @param bookiesHealthInfo
+     *          Health info for bookies
      * @param writeSet
      *          Write quorum to read entries. This will be modified, rather 
than
      *          allocating a new WriteSet.
@@ -325,7 +335,7 @@ public BookieSocketAddress replaceBookie(int ensembleSize,
      */
     public DistributionSchedule.WriteSet reorderReadLACSequence(
             ArrayList<BookieSocketAddress> ensemble,
-            Map<BookieSocketAddress, Long> bookieFailureHistory,
+            BookiesHealthInfo bookiesHealthInfo,
             DistributionSchedule.WriteSet writeSet);
 
     /**
diff --git 
a/bookkeeper-server/src/main/java/org/apache/bookkeeper/client/LedgerHandle.java
 
b/bookkeeper-server/src/main/java/org/apache/bookkeeper/client/LedgerHandle.java
index 51efefd07..2d07eea08 100644
--- 
a/bookkeeper-server/src/main/java/org/apache/bookkeeper/client/LedgerHandle.java
+++ 
b/bookkeeper-server/src/main/java/org/apache/bookkeeper/client/LedgerHandle.java
@@ -37,6 +37,7 @@
 import java.util.ArrayList;
 import java.util.Arrays;
 import java.util.Enumeration;
+import java.util.HashMap;
 import java.util.HashSet;
 import java.util.List;
 import java.util.Map;
@@ -69,11 +70,13 @@
 import org.apache.bookkeeper.common.concurrent.FutureEventListener;
 import org.apache.bookkeeper.common.concurrent.FutureUtils;
 import org.apache.bookkeeper.net.BookieSocketAddress;
+import org.apache.bookkeeper.proto.BookieClient;
 import org.apache.bookkeeper.proto.BookieProtocol;
 import org.apache.bookkeeper.proto.BookkeeperInternalCallbacks;
 import org.apache.bookkeeper.proto.BookkeeperInternalCallbacks.GenericCallback;
 import 
org.apache.bookkeeper.proto.BookkeeperInternalCallbacks.TimedGenericCallback;
 import org.apache.bookkeeper.proto.DataFormats.LedgerMetadataFormat.State;
+import org.apache.bookkeeper.proto.PerChannelBookieClientPool;
 import org.apache.bookkeeper.stats.Counter;
 import org.apache.bookkeeper.stats.Gauge;
 import 
org.apache.bookkeeper.util.OrderedSafeExecutor.OrderedSafeGenericCallback;
@@ -103,6 +106,7 @@
     final LoadingCache<BookieSocketAddress, Long> bookieFailureHistory;
     final boolean enableParallelRecoveryRead;
     final int recoveryReadBatchSize;
+    final BookiesHealthInfo bookiesHealthInfo;
 
     /**
      * Invalid entry id. This value is returned from methods which
@@ -168,6 +172,19 @@ public Long load(BookieSocketAddress key) {
                 return -1L;
             }
         });
+         this.bookiesHealthInfo = new BookiesHealthInfo() {
+            @Override
+            public long getBookieFailureHistory(BookieSocketAddress 
bookieSocketAddress) {
+                Long lastFailure = 
bookieFailureHistory.getIfPresent(bookieSocketAddress);
+                return lastFailure == null ? -1L : lastFailure;
+            }
+
+            @Override
+            public int getBookiePendingRequests(BookieSocketAddress 
bookieSocketAddress) {
+                PerChannelBookieClientPool pcbcPool = 
bk.bookieClient.lookupClient(bookieSocketAddress);
+                return pcbcPool == null ? 0 : 
pcbcPool.getNumPendingCompletionRequests();
+            }
+        };
 
         ensembleChangeCounter = 
bk.getStatsLogger().getCounter(BookKeeperClientStats.ENSEMBLE_CHANGES);
         lacUpdateHitsCounter = 
bk.getStatsLogger().getCounter(BookKeeperClientStats.LAC_UPDATE_HITS);
@@ -310,6 +327,15 @@ DistributionSchedule getDistributionSchedule() {
         return distributionSchedule;
     }
 
+    /**
+     * Get the health info for bookies for this ledger.
+     *
+     * @return BookiesHealthInfo for every bookie in the write set.
+     */
+    public BookiesHealthInfo getBookiesHealthInfo() {
+        return bookiesHealthInfo;
+    }
+
     void writeLedgerConfig(GenericCallback<Void> writeCb) {
         if (LOG.isDebugEnabled()) {
             LOG.debug("Writing metadata to ledger manager: {}, {}", 
this.ledgerId, metadata.getVersion());
diff --git 
a/bookkeeper-server/src/main/java/org/apache/bookkeeper/client/PendingReadOp.java
 
b/bookkeeper-server/src/main/java/org/apache/bookkeeper/client/PendingReadOp.java
index 11b80387a..a3433ed9d 100644
--- 
a/bookkeeper-server/src/main/java/org/apache/bookkeeper/client/PendingReadOp.java
+++ 
b/bookkeeper-server/src/main/java/org/apache/bookkeeper/client/PendingReadOp.java
@@ -86,16 +86,18 @@
         final ArrayList<BookieSocketAddress> ensemble;
         final DistributionSchedule.WriteSet writeSet;
         final LedgerEntryImpl entryImpl;
+        final long eId;
 
         LedgerEntryRequest(ArrayList<BookieSocketAddress> ensemble, long lId, 
long eId) {
             this.entryImpl = LedgerEntryImpl.create(lId, eId);
             this.ensemble = ensemble;
+            this.eId = eId;
 
             if (lh.bk.isReorderReadSequence()) {
                 writeSet = lh.bk.getPlacementPolicy()
                     .reorderReadSequence(
                             ensemble,
-                            lh.bookieFailureHistory.asMap(),
+                            lh.getBookiesHealthInfo(),
                             lh.distributionSchedule.getWriteSet(eId));
             } else {
                 writeSet = lh.distributionSchedule.getWriteSet(eId);
@@ -420,6 +422,21 @@ synchronized void logErrorAndReattemptRead(int 
bookieIndex, BookieSocketAddress
             }
         }
 
+        @Override
+        boolean complete(int bookieIndex, BookieSocketAddress host, ByteBuf 
buffer) {
+            boolean completed = super.complete(bookieIndex, host, buffer);
+            if (completed) {
+                int numReplicasTried = getNextReplicaIndexToReadFrom();
+                // Check if any speculative reads were issued and mark any 
slow bookies before
+                // the first successful speculative read as "slow"
+                for (int i = 0 ; i < numReplicasTried - 1; i++) {
+                    int slowBookieIndex = writeSet.get(i);
+                    BookieSocketAddress slowBookieSocketAddress = 
ensemble.get(slowBookieIndex);
+                    
lh.bk.placementPolicy.registerSlowBookie(slowBookieSocketAddress, eId);
+                }
+            }
+            return completed;
+        }
     }
 
     PendingReadOp(LedgerHandle lh,
diff --git 
a/bookkeeper-server/src/main/java/org/apache/bookkeeper/client/RackawareEnsemblePlacementPolicy.java
 
b/bookkeeper-server/src/main/java/org/apache/bookkeeper/client/RackawareEnsemblePlacementPolicy.java
index b3b633fc1..7a409e840 100644
--- 
a/bookkeeper-server/src/main/java/org/apache/bookkeeper/client/RackawareEnsemblePlacementPolicy.java
+++ 
b/bookkeeper-server/src/main/java/org/apache/bookkeeper/client/RackawareEnsemblePlacementPolicy.java
@@ -19,8 +19,6 @@
 
 import java.util.ArrayList;
 import java.util.Collection;
-import java.util.List;
-import java.util.Map;
 import java.util.Set;
 
 import org.apache.bookkeeper.net.BookieSocketAddress;
@@ -120,18 +118,18 @@ public BookieSocketAddress replaceBookie(
     @Override
     public DistributionSchedule.WriteSet reorderReadSequence(
             ArrayList<BookieSocketAddress> ensemble,
-            Map<BookieSocketAddress, Long> bookieFailureHistory,
+            BookiesHealthInfo bookiesHealthInfo,
             DistributionSchedule.WriteSet writeSet) {
-        return super.reorderReadSequence(ensemble, bookieFailureHistory,
+        return super.reorderReadSequence(ensemble, bookiesHealthInfo,
                                          writeSet);
     }
 
     @Override
     public DistributionSchedule.WriteSet reorderReadLACSequence(
             ArrayList<BookieSocketAddress> ensemble,
-            Map<BookieSocketAddress, Long> bookieFailureHistory,
+            BookiesHealthInfo bookiesHealthInfo,
             DistributionSchedule.WriteSet writeSet) {
-        return super.reorderReadLACSequence(ensemble, bookieFailureHistory,
+        return super.reorderReadLACSequence(ensemble, bookiesHealthInfo,
                                             writeSet);
     }
 
diff --git 
a/bookkeeper-server/src/main/java/org/apache/bookkeeper/client/RackawareEnsemblePlacementPolicyImpl.java
 
b/bookkeeper-server/src/main/java/org/apache/bookkeeper/client/RackawareEnsemblePlacementPolicyImpl.java
index b08ce4340..a673cc823 100644
--- 
a/bookkeeper-server/src/main/java/org/apache/bookkeeper/client/RackawareEnsemblePlacementPolicyImpl.java
+++ 
b/bookkeeper-server/src/main/java/org/apache/bookkeeper/client/RackawareEnsemblePlacementPolicyImpl.java
@@ -19,7 +19,6 @@
 
 import java.net.InetAddress;
 import java.net.UnknownHostException;
-import java.util.Arrays;
 import java.util.ArrayList;
 import java.util.Collection;
 import java.util.Collections;
@@ -30,6 +29,7 @@
 import java.util.Map;
 import java.util.Set;
 import java.util.concurrent.ThreadLocalRandom;
+import java.util.concurrent.TimeUnit;
 import java.util.concurrent.locks.ReentrantReadWriteLock;
 
 import com.google.common.base.Preconditions;
@@ -48,6 +48,10 @@
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
+import com.google.common.cache.Cache;
+import com.google.common.cache.CacheBuilder;
+import com.google.common.cache.CacheLoader;
+import com.google.common.cache.LoadingCache;
 import com.google.common.collect.ImmutableSet;
 import com.google.common.collect.Sets;
 
@@ -55,6 +59,8 @@
 import java.util.Optional;
 import java.util.function.Supplier;
 
+import static 
org.apache.bookkeeper.client.RegionAwareEnsemblePlacementPolicy.UNKNOWN_REGION;
+
 /**
  * Simple rackware ensemble placement policy.
  *
@@ -79,7 +85,8 @@
     static final int REMOTE_MASK      = 0x04 << 24;
     static final int REMOTE_FAIL_MASK = 0x08 << 24;
     static final int READ_ONLY_MASK   = 0x10 << 24;
-    static final int UNAVAIL_MASK     = 0x20 << 24;
+    static final int SLOW_MASK        = 0x20 << 24;
+    static final int UNAVAIL_MASK     = 0x40 << 24;
     static final int MASK_BITS        = 0xFFF << 20;
 
     static class DefaultResolver implements DNSToSwitchMapping {
@@ -175,6 +182,8 @@ public void reloadCachedMappings() {
     protected DNSToSwitchMapping dnsResolver;
     protected HashedWheelTimer timer;
     protected final Map<BookieSocketAddress, BookieNode> knownBookies;
+    // Use a loading cache so slow bookies are expired. Use entryId as values.
+    protected Cache<BookieSocketAddress, Long> slowBookies;
     protected BookieNode localNode;
     protected final ReentrantReadWriteLock rwLock;
     protected ImmutableSet<BookieSocketAddress> readOnlyBookies = null;
@@ -294,6 +303,14 @@ public RackawareEnsemblePlacementPolicyImpl 
initialize(ClientConfiguration conf,
                 dnsResolver = new DefaultResolver(() -> this.getDefaultRack());
             }
         }
+        slowBookies = CacheBuilder.newBuilder()
+            .expireAfterWrite(conf.getBookieFailureHistoryExpirationMSec(), 
TimeUnit.MILLISECONDS)
+            .build(new CacheLoader<BookieSocketAddress, Long>() {
+                @Override
+                public Long load(BookieSocketAddress bookieSocketAddress) 
throws Exception {
+                    return -1L;
+                }
+            });
         return initialize(
                 dnsResolver,
                 timer,
@@ -779,37 +796,106 @@ protected BookieNode selectRandomFromRack(String 
netPath, Set<Node> excludeBooki
         throw new BKNotEnoughBookiesException();
     }
 
+    @Override
+    public void registerSlowBookie(BookieSocketAddress bookieSocketAddress, 
long entryId) {
+        slowBookies.put(bookieSocketAddress, entryId);
+    }
+
     @Override
     public DistributionSchedule.WriteSet reorderReadSequence(
             ArrayList<BookieSocketAddress> ensemble,
-            Map<BookieSocketAddress, Long> bookieFailureHistory,
+            BookiesHealthInfo bookiesHealthInfo,
             DistributionSchedule.WriteSet writeSet) {
+        Map<Integer, String> writeSetWithRegion = new HashMap<>();
+        for (int i = 0; i < writeSet.size(); i++) {
+            writeSetWithRegion.put(writeSet.get(i), "");
+        }
+        return reorderReadSequenceWithRegion(ensemble, writeSet, 
writeSetWithRegion, bookiesHealthInfo, false, "", writeSet.size());
+
+    }
+
+    /**
+     * This function orders the read sequence with a given region. For 
region-unaware policies (e.g.
+     * RackAware), we pass in false for regionAware and an empty myRegion. 
When this happens, any
+     * remote list will stay empty. The ordering is as follows (the R* at the 
beginning of each list item
+     * is only present for region aware policies.
+     *      1. available (local) bookies
+     *      2. R* a remote bookie (based on remoteNodeInReorderSequence
+     *      3. R* remaining (local) bookies
+     *      4. R* remaining remote bookies
+     *      5. read only bookies
+     *      6. slow bookies
+     *      7. unavailable bookies
+     *
+     * @param ensemble
+     *          ensemble of bookies
+     * @param writeSet
+     *          write set
+     * @param writeSetWithRegion
+     *          write set with region information
+     * @param bookiesHealthInfo
+     *          heuristics about health of boookies
+     * @param regionAware
+     *          whether or not a region-aware policy is used
+     * @param myRegion
+     *          current region of policy
+     * @param remoteNodeInReorderSequence
+     *          number of local bookies to try before trying a remote bookie
+     * @return ordering of bookies to send read to
+     */
+    DistributionSchedule.WriteSet reorderReadSequenceWithRegion(
+        ArrayList<BookieSocketAddress> ensemble,
+        DistributionSchedule.WriteSet writeSet,
+        Map<Integer, String> writeSetWithRegion,
+        BookiesHealthInfo bookiesHealthInfo,
+        boolean regionAware,
+        String myRegion,
+        int remoteNodeInReorderSequence) {
+        boolean useRegionAware = regionAware && 
(!myRegion.equals(UNKNOWN_REGION));
         int ensembleSize = ensemble.size();
 
         for (int i = 0; i < writeSet.size(); i++) {
             int idx = writeSet.get(i);
             BookieSocketAddress address = ensemble.get(idx);
-            Long lastFailedEntryOnBookie = bookieFailureHistory.get(address);
+            String region = writeSetWithRegion.get(idx);
+            Long lastFailedEntryOnBookie = 
bookiesHealthInfo.getBookieFailureHistory(address);
             if (null == knownBookies.get(address)) {
                 // there isn't too much differences between readonly bookies
                 // from unavailable bookies. since there
                 // is no write requests to them, so we shouldn't try reading
-                // from readonly bookie in prior to writable
-                // bookies.
+                // from readonly bookie prior to writable bookies.
                 if ((null == readOnlyBookies)
-                        || !readOnlyBookies.contains(address)) {
+                    || !readOnlyBookies.contains(address)) {
                     writeSet.set(i, idx | UNAVAIL_MASK);
                 } else {
-                    writeSet.set(i, idx | READ_ONLY_MASK);
+                    if (slowBookies.getIfPresent(address) != null) {
+                        int numPendingReqs = 
bookiesHealthInfo.getBookiePendingRequests(address);
+                        // use slow bookies with less pending requests first
+                        long slowIdx = numPendingReqs * ensembleSize + idx;
+                        writeSet.set(i, (int) (slowIdx & ~MASK_BITS) | 
SLOW_MASK);
+                    } else {
+                        writeSet.set(i, idx | READ_ONLY_MASK);
+                    }
+                }
+            } else if (lastFailedEntryOnBookie < 0) {
+                if (slowBookies.getIfPresent(address) != null) {
+                    int numPendingReqs = 
bookiesHealthInfo.getBookiePendingRequests(address);
+                    long slowIdx = numPendingReqs * ensembleSize + idx;
+                    writeSet.set(i, (int) (slowIdx & ~MASK_BITS) | SLOW_MASK);
+                } else {
+                    if (useRegionAware && !myRegion.equals(region)) {
+                        writeSet.set(i, idx | REMOTE_MASK);
+                    } else {
+                        writeSet.set(i, idx | LOCAL_MASK);
+                    }
                 }
             } else {
-                if ((lastFailedEntryOnBookie == null)
-                        || (lastFailedEntryOnBookie < 0)) {
-                    writeSet.set(i, idx | LOCAL_MASK);
+                // use bookies with earlier failed entryIds first
+                long failIdx = lastFailedEntryOnBookie * ensembleSize + idx;
+                if (useRegionAware && !myRegion.equals(region)) {
+                    writeSet.set(i, (int) (failIdx & ~MASK_BITS) | 
REMOTE_FAIL_MASK);
                 } else {
-                    long failIdx = lastFailedEntryOnBookie * ensembleSize + 
idx;
-                    writeSet.set(i,
-                                 (int)(failIdx & ~MASK_BITS) | 
LOCAL_FAIL_MASK);
+                    writeSet.set(i, (int) (failIdx & ~MASK_BITS) | 
LOCAL_FAIL_MASK);
                 }
             }
         }
@@ -827,15 +913,54 @@ protected BookieNode selectRandomFromRack(String netPath, 
Set<Node> excludeBooki
 
         if (reorderReadsRandom) {
             shuffleWithMask(writeSet, LOCAL_MASK, MASK_BITS);
+            shuffleWithMask(writeSet, REMOTE_MASK, MASK_BITS);
             shuffleWithMask(writeSet, READ_ONLY_MASK, MASK_BITS);
             shuffleWithMask(writeSet, UNAVAIL_MASK, MASK_BITS);
         }
 
-        // remove all masks
+        // nodes within a region are ordered as follows
+        // (Random?) list of nodes that have no history of failure
+        // Nodes with Failure history are ordered in the reverse
+        // order of the most recent entry that generated an error
+        // The sort will have put them in correct order,
+        // so remove the bits that sort by age.
         for (int i = 0; i < writeSet.size(); i++) {
-            writeSet.set(i, (writeSet.get(i) & ~MASK_BITS) % ensembleSize);
+            int mask = writeSet.get(i) & MASK_BITS;
+            int idx = (writeSet.get(i) & ~MASK_BITS) % ensembleSize;
+            if (mask == LOCAL_FAIL_MASK) {
+                writeSet.set(i, LOCAL_MASK | idx);
+            } else if (mask == REMOTE_FAIL_MASK) {
+                writeSet.set(i, REMOTE_MASK | idx);
+            } else if (mask == SLOW_MASK) {
+                writeSet.set(i, SLOW_MASK | idx);
+            }
         }
 
+        // Insert a node from the remote region at the specified location so
+        // we try more than one region within the max allowed latency
+        int firstRemote = -1;
+        for (int i = 0; i < writeSet.size(); i++) {
+            if ((writeSet.get(i) & MASK_BITS) == REMOTE_MASK) {
+                firstRemote = i;
+                break;
+            }
+        }
+        if (firstRemote != -1) {
+            int i = 0;
+            for (;i < remoteNodeInReorderSequence
+                && i < writeSet.size(); i++) {
+                if ((writeSet.get(i) & MASK_BITS) != LOCAL_MASK) {
+                    break;
+                }
+            }
+            writeSet.moveAndShift(firstRemote, i);
+        }
+
+
+        // remove all masks
+        for (int i = 0; i < writeSet.size(); i++) {
+            writeSet.set(i, writeSet.get(i) & ~MASK_BITS);
+        }
         return writeSet;
     }
 
diff --git 
a/bookkeeper-server/src/main/java/org/apache/bookkeeper/client/ReadLastConfirmedAndEntryOp.java
 
b/bookkeeper-server/src/main/java/org/apache/bookkeeper/client/ReadLastConfirmedAndEntryOp.java
index 9ee75d954..b3e9bc44f 100644
--- 
a/bookkeeper-server/src/main/java/org/apache/bookkeeper/client/ReadLastConfirmedAndEntryOp.java
+++ 
b/bookkeeper-server/src/main/java/org/apache/bookkeeper/client/ReadLastConfirmedAndEntryOp.java
@@ -83,7 +83,7 @@
             this.writeSet = lh.distributionSchedule.getWriteSet(eId);
             if (lh.bk.reorderReadSequence) {
                 this.orderedEnsemble = 
lh.bk.placementPolicy.reorderReadLACSequence(ensemble,
-                        lh.bookieFailureHistory.asMap(), writeSet.copy());
+                    lh.getBookiesHealthInfo(), writeSet.copy());
             } else {
                 this.orderedEnsemble = writeSet.copy();
             }
@@ -407,6 +407,22 @@ synchronized void logErrorAndReattemptRead(int 
bookieIndex, BookieSocketAddress
             }
         }
 
+        @Override
+        boolean complete(int bookieIndex, BookieSocketAddress host, final 
ByteBuf buffer, long entryId) {
+            boolean completed = super.complete(bookieIndex, host, buffer, 
entryId);
+            if (completed) {
+                int numReplicasTried = getNextReplicaIndexToReadFrom();
+                // Check if any speculative reads were issued and mark any 
bookies before the
+                // first speculative read as slow
+                for (int i = 0; i < numReplicasTried - 1; i++) {
+                    int slowBookieIndex = orderedEnsemble.get(i);
+                    BookieSocketAddress slowBookieAddress = 
ensemble.get(slowBookieIndex);
+                    
lh.bk.placementPolicy.registerSlowBookie(slowBookieAddress, entryId);
+                }
+            }
+            return completed;
+        }
+
     }
 
     ReadLastConfirmedAndEntryOp(LedgerHandle lh,
diff --git 
a/bookkeeper-server/src/main/java/org/apache/bookkeeper/client/RegionAwareEnsemblePlacementPolicy.java
 
b/bookkeeper-server/src/main/java/org/apache/bookkeeper/client/RegionAwareEnsemblePlacementPolicy.java
index 8342a6b1c..daa30d5e4 100644
--- 
a/bookkeeper-server/src/main/java/org/apache/bookkeeper/client/RegionAwareEnsemblePlacementPolicy.java
+++ 
b/bookkeeper-server/src/main/java/org/apache/bookkeeper/client/RegionAwareEnsemblePlacementPolicy.java
@@ -17,10 +17,8 @@
  */
 package org.apache.bookkeeper.client;
 
-import java.util.Arrays;
 import java.util.ArrayList;
 import java.util.Collection;
-import java.util.Collections;
 import java.util.HashMap;
 import java.util.HashSet;
 import java.util.List;
@@ -35,7 +33,6 @@
 import org.apache.bookkeeper.feature.FeatureProvider;
 import org.apache.bookkeeper.net.BookieSocketAddress;
 import org.apache.bookkeeper.net.DNSToSwitchMapping;
-import org.apache.bookkeeper.net.NetUtils;
 import org.apache.bookkeeper.net.NetworkTopology;
 import org.apache.bookkeeper.net.Node;
 import org.apache.bookkeeper.net.NodeBase;
@@ -503,127 +500,32 @@ protected BookieNode replaceFromRack(BookieNode 
bookieNodeToReplace,
     @Override
     public final DistributionSchedule.WriteSet reorderReadSequence(
             ArrayList<BookieSocketAddress> ensemble,
-            Map<BookieSocketAddress, Long> bookieFailureHistory,
+            BookiesHealthInfo bookiesHealthInfo,
             DistributionSchedule.WriteSet writeSet) {
-        if (UNKNOWN_REGION.equals(myRegion)) {
-            return super.reorderReadSequence(ensemble, bookieFailureHistory,
-                                             writeSet);
+        if (myRegion.equals(UNKNOWN_REGION)) {
+            return super.reorderReadSequence(ensemble, bookiesHealthInfo, 
writeSet);
         } else {
-            int ensembleSize = ensemble.size();
-
+            Map<Integer, String> writeSetWithRegion = new HashMap<>();
             for (int i = 0; i < writeSet.size(); i++) {
                 int idx = writeSet.get(i);
-                BookieSocketAddress address = ensemble.get(idx);
-                String region = getRegion(address);
-                Long lastFailedEntryOnBookie = 
bookieFailureHistory.get(address);
-                if (null == knownBookies.get(address)) {
-                    // there isn't too much differences between readonly 
bookies
-                    // from unavailable bookies. since there
-                    // is no write requests to them, so we shouldn't try 
reading
-                    // from readonly bookie in prior to writable bookies.
-                    if ((null == readOnlyBookies)
-                            || !readOnlyBookies.contains(address)) {
-                        writeSet.set(i, idx | UNAVAIL_MASK);
-                    } else {
-                        writeSet.set(i, idx | READ_ONLY_MASK);
-                    }
-                } else if (region.equals(myRegion)) {
-                    if ((lastFailedEntryOnBookie == null)
-                            || (lastFailedEntryOnBookie < 0)) {
-                        writeSet.set(i, idx | LOCAL_MASK);
-                    } else {
-                        long failIdx
-                            = lastFailedEntryOnBookie * ensembleSize + idx;
-                        writeSet.set(i, (int)(failIdx & ~MASK_BITS)
-                                     | LOCAL_FAIL_MASK);
-                    }
-                } else {
-                    if ((lastFailedEntryOnBookie == null)
-                            || (lastFailedEntryOnBookie < 0)) {
-                        writeSet.set(i, idx | REMOTE_MASK);
-                    } else {
-                        long failIdx
-                            = lastFailedEntryOnBookie * ensembleSize + idx;
-                        writeSet.set(i, (int)(failIdx & ~MASK_BITS)
-                                     | REMOTE_FAIL_MASK);
-                    }
-                }
-            }
-
-            // Add a mask to ensure the sort is stable, sort,
-            // and then remove mask. This maintains stability as
-            // long as there are fewer than 16 bookies in the write set.
-            for (int i = 0; i < writeSet.size(); i++) {
-                writeSet.set(i, writeSet.get(i) | ((i & 0xF) << 20));
-            }
-            writeSet.sort();
-            for (int i = 0; i < writeSet.size(); i++) {
-                writeSet.set(i, writeSet.get(i) & ~((0xF) << 20));
-            }
-
-            if (reorderReadsRandom) {
-                shuffleWithMask(writeSet, LOCAL_MASK, MASK_BITS);
-                shuffleWithMask(writeSet, REMOTE_MASK, MASK_BITS);
-                shuffleWithMask(writeSet, READ_ONLY_MASK, MASK_BITS);
-                shuffleWithMask(writeSet, UNAVAIL_MASK, MASK_BITS);
-            }
-
-            // nodes within a region are ordered as follows
-            // (Random?) list of nodes that have no history of failure
-            // Nodes with Failure history are ordered in the reverse
-            // order of the most recent entry that generated an error
-            // The sort will have put them in correct order,
-            // so remove the bits that sort by age.
-            for (int i = 0; i < writeSet.size(); i++) {
-                int mask = writeSet.get(i) & MASK_BITS;
-                int idx = (writeSet.get(i) & ~MASK_BITS) % ensembleSize;
-                if (mask == LOCAL_FAIL_MASK) {
-                    writeSet.set(i, LOCAL_MASK | idx);
-                } else if (mask == REMOTE_FAIL_MASK) {
-                    writeSet.set(i, REMOTE_MASK | idx);
-                }
-            }
-
-            // Insert a node from the remote region at the specified location 
so
-            // we try more than one region within the max allowed latency
-            int firstRemote = -1;
-            for (int i = 0; i < writeSet.size(); i++) {
-                if ((writeSet.get(i) & MASK_BITS) == REMOTE_MASK) {
-                    firstRemote = i;
-                    break;
-                }
-            }
-            if (firstRemote != -1) {
-                int i = 0;
-                for (;i < REMOTE_NODE_IN_REORDER_SEQUENCE
-                         && i < writeSet.size(); i++) {
-                    if ((writeSet.get(i) & MASK_BITS) != LOCAL_MASK) {
-                        break;
-                    }
-                }
-                writeSet.moveAndShift(firstRemote, i);
-            }
-
-
-            // remove all masks
-            for (int i = 0; i < writeSet.size(); i++) {
-                writeSet.set(i, writeSet.get(i) & ~MASK_BITS);
+                writeSetWithRegion.put(idx, getRegion(ensemble.get(idx)));
             }
-            return writeSet;
+            return super.reorderReadSequenceWithRegion(ensemble, writeSet, 
writeSetWithRegion,
+                bookiesHealthInfo, true, myRegion, 
REMOTE_NODE_IN_REORDER_SEQUENCE);
         }
     }
 
     @Override
     public final DistributionSchedule.WriteSet reorderReadLACSequence(
             ArrayList<BookieSocketAddress> ensemble,
-            Map<BookieSocketAddress, Long> bookieFailureHistory,
+            BookiesHealthInfo bookiesHealthInfo,
             DistributionSchedule.WriteSet writeSet) {
         if (UNKNOWN_REGION.equals(myRegion)) {
-            return super.reorderReadLACSequence(ensemble, bookieFailureHistory,
+            return super.reorderReadLACSequence(ensemble, bookiesHealthInfo,
                                                 writeSet);
         }
         DistributionSchedule.WriteSet finalList
-            = reorderReadSequence(ensemble, bookieFailureHistory, writeSet);
+            = reorderReadSequence(ensemble, bookiesHealthInfo, writeSet);
         finalList.addMissingIndices(ensemble.size());
         return finalList;
     }
diff --git 
a/bookkeeper-server/src/main/java/org/apache/bookkeeper/client/TopologyAwareEnsemblePlacementPolicy.java
 
b/bookkeeper-server/src/main/java/org/apache/bookkeeper/client/TopologyAwareEnsemblePlacementPolicy.java
index c99b4fc66..0e93b1dd1 100644
--- 
a/bookkeeper-server/src/main/java/org/apache/bookkeeper/client/TopologyAwareEnsemblePlacementPolicy.java
+++ 
b/bookkeeper-server/src/main/java/org/apache/bookkeeper/client/TopologyAwareEnsemblePlacementPolicy.java
@@ -20,8 +20,6 @@
 import java.util.ArrayList;
 import java.util.HashMap;
 import java.util.HashSet;
-import java.util.List;
-import java.util.Map;
 import java.util.Set;
 
 import com.google.common.collect.Lists;
@@ -454,7 +452,7 @@ public String toString() {
     @Override
     public DistributionSchedule.WriteSet reorderReadSequence(
             ArrayList<BookieSocketAddress> ensemble,
-            Map<BookieSocketAddress, Long> bookieFailureHistory,
+            BookiesHealthInfo bookiesHealthInfo,
             DistributionSchedule.WriteSet writeSet) {
         return writeSet;
     }
@@ -462,10 +460,10 @@ public String toString() {
     @Override
     public DistributionSchedule.WriteSet reorderReadLACSequence(
             ArrayList<BookieSocketAddress> ensemble,
-            Map<BookieSocketAddress, Long> bookieFailureHistory,
+            BookiesHealthInfo bookiesHealthInfo,
             DistributionSchedule.WriteSet writeSet) {
         DistributionSchedule.WriteSet retList = reorderReadSequence(
-                ensemble, bookieFailureHistory, writeSet);
+                ensemble, bookiesHealthInfo, writeSet);
         retList.addMissingIndices(ensemble.size());
         return retList;
     }
diff --git 
a/bookkeeper-server/src/main/java/org/apache/bookkeeper/client/impl/LedgerEntryImpl.java
 
b/bookkeeper-server/src/main/java/org/apache/bookkeeper/client/impl/LedgerEntryImpl.java
index f7b36c156..123336104 100644
--- 
a/bookkeeper-server/src/main/java/org/apache/bookkeeper/client/impl/LedgerEntryImpl.java
+++ 
b/bookkeeper-server/src/main/java/org/apache/bookkeeper/client/impl/LedgerEntryImpl.java
@@ -24,6 +24,7 @@
 import io.netty.util.Recycler.Handle;
 import io.netty.util.ReferenceCountUtil;
 import java.nio.ByteBuffer;
+
 import org.apache.bookkeeper.client.api.LedgerEntry;
 
 /**
diff --git 
a/bookkeeper-server/src/main/java/org/apache/bookkeeper/conf/ClientConfiguration.java
 
b/bookkeeper-server/src/main/java/org/apache/bookkeeper/conf/ClientConfiguration.java
index f4ffd521f..e1da67805 100644
--- 
a/bookkeeper-server/src/main/java/org/apache/bookkeeper/conf/ClientConfiguration.java
+++ 
b/bookkeeper-server/src/main/java/org/apache/bookkeeper/conf/ClientConfiguration.java
@@ -113,6 +113,7 @@
     // Ensemble Placement Policy
     protected final static String ENSEMBLE_PLACEMENT_POLICY = 
"ensemblePlacementPolicy";
     protected final static String NETWORK_TOPOLOGY_STABILIZE_PERIOD_SECONDS = 
"networkTopologyStabilizePeriodSeconds";
+    protected final static String ENSEMBLE_PLACEMENT_POLICY_ORDER_SLOW_BOOKIES 
= "ensemblePlacementPolicyOrderSlowBookies";
 
     // Ledger Metadata Parameters
     protected static final String STORE_SYSTEMTIME_AS_LEDGER_CREATION_TIME = 
"storeSystemTimeAsLedgerCreationTime";
@@ -1097,6 +1098,27 @@ public ClientConfiguration 
setNetworkTopologyStabilizePeriodSeconds(int seconds)
         return this;
     }
 
+    /**
+     * Whether to order slow bookies in placement policy.
+     *
+     * @return flag to order or ignore slow bookies in placement policy
+     */
+    public boolean getEnsemblePlacementPolicySlowBookies() {
+        return getBoolean(ENSEMBLE_PLACEMENT_POLICY_ORDER_SLOW_BOOKIES, false);
+    }
+
+    /**
+     * Enable/Disable ordering slow bookies in placement policy.
+     *
+     * @param enabled
+     *          flag to enable/disable ordering slow bookies in placement 
policy.
+     * @return client configuration.
+     */
+    public ClientConfiguration setEnsemblePlacementPolicySlowBookies(boolean 
enabled) {
+        setProperty(ENSEMBLE_PLACEMENT_POLICY_ORDER_SLOW_BOOKIES, enabled);
+        return this;
+    }
+
     /**
      * Whether to enable recording task execution stats.
      *
diff --git 
a/bookkeeper-server/src/main/java/org/apache/bookkeeper/proto/BookieClient.java 
b/bookkeeper-server/src/main/java/org/apache/bookkeeper/proto/BookieClient.java
index ae7f806ab..8a5bfcbe1 100644
--- 
a/bookkeeper-server/src/main/java/org/apache/bookkeeper/proto/BookieClient.java
+++ 
b/bookkeeper-server/src/main/java/org/apache/bookkeeper/proto/BookieClient.java
@@ -149,7 +149,7 @@ public PerChannelBookieClient create(BookieSocketAddress 
address, PerChannelBook
                 authProviderFactory, registry, pcbcPool, shFactory);
     }
 
-    private PerChannelBookieClientPool lookupClient(BookieSocketAddress addr) {
+    public PerChannelBookieClientPool lookupClient(BookieSocketAddress addr) {
         PerChannelBookieClientPool clientPool = channels.get(addr);
         if (null == clientPool) {
             closeLock.readLock().lock();
diff --git 
a/bookkeeper-server/src/main/java/org/apache/bookkeeper/proto/DefaultPerChannelBookieClientPool.java
 
b/bookkeeper-server/src/main/java/org/apache/bookkeeper/proto/DefaultPerChannelBookieClientPool.java
index ba1b444e5..a7c09ade2 100644
--- 
a/bookkeeper-server/src/main/java/org/apache/bookkeeper/proto/DefaultPerChannelBookieClientPool.java
+++ 
b/bookkeeper-server/src/main/java/org/apache/bookkeeper/proto/DefaultPerChannelBookieClientPool.java
@@ -113,4 +113,13 @@ public void close(boolean wait) {
             pcbc.close(wait);
         }
     }
+
+    @Override
+    public int getNumPendingCompletionRequests() {
+        int numPending = 0;
+        for (PerChannelBookieClient pcbc: clients) {
+            numPending += pcbc.getNumPendingCompletionRequests();
+        }
+        return numPending;
+    }
 }
diff --git 
a/bookkeeper-server/src/main/java/org/apache/bookkeeper/proto/PerChannelBookieClient.java
 
b/bookkeeper-server/src/main/java/org/apache/bookkeeper/proto/PerChannelBookieClient.java
index c392c794f..dbe1e214d 100644
--- 
a/bookkeeper-server/src/main/java/org/apache/bookkeeper/proto/PerChannelBookieClient.java
+++ 
b/bookkeeper-server/src/main/java/org/apache/bookkeeper/proto/PerChannelBookieClient.java
@@ -360,6 +360,10 @@ private void 
completeOperation(GenericCallback<PerChannelBookieClient> op, int r
         }
     }
 
+    int getNumPendingCompletionRequests() {
+        return completionObjects.size();
+    }
+
     protected ChannelFuture connect() {
         if (LOG.isDebugEnabled()) {
             LOG.debug("Connecting to bookie: {}", addr);
diff --git 
a/bookkeeper-server/src/main/java/org/apache/bookkeeper/proto/PerChannelBookieClientPool.java
 
b/bookkeeper-server/src/main/java/org/apache/bookkeeper/proto/PerChannelBookieClientPool.java
index b7618e307..14b6e7ac1 100644
--- 
a/bookkeeper-server/src/main/java/org/apache/bookkeeper/proto/PerChannelBookieClientPool.java
+++ 
b/bookkeeper-server/src/main/java/org/apache/bookkeeper/proto/PerChannelBookieClientPool.java
@@ -25,7 +25,7 @@
 /**
  * An interface to manage channel pooling for bookie client.
  */
-interface PerChannelBookieClientPool {
+public interface PerChannelBookieClientPool {
 
     /**
      * intialize the pool. the implementation should not be blocked.
@@ -61,4 +61,8 @@
      */
     void close(boolean wait);
 
+    /**
+     * Get the number of pending completion requests in the channel.
+     */
+    int getNumPendingCompletionRequests();
 }
diff --git 
a/bookkeeper-server/src/test/java/org/apache/bookkeeper/client/TestRackawareEnsemblePlacementPolicy.java
 
b/bookkeeper-server/src/test/java/org/apache/bookkeeper/client/TestRackawareEnsemblePlacementPolicy.java
index c79679a02..06b136e54 100644
--- 
a/bookkeeper-server/src/test/java/org/apache/bookkeeper/client/TestRackawareEnsemblePlacementPolicy.java
+++ 
b/bookkeeper-server/src/test/java/org/apache/bookkeeper/client/TestRackawareEnsemblePlacementPolicy.java
@@ -26,7 +26,6 @@
 import java.util.ArrayList;
 import java.util.HashMap;
 import java.util.HashSet;
-import java.util.List;
 import java.util.Map;
 import java.util.Set;
 import java.util.concurrent.TimeUnit;
@@ -43,7 +42,6 @@
 import org.apache.bookkeeper.net.DNSToSwitchMapping;
 import org.apache.bookkeeper.net.NetworkTopology;
 import org.apache.bookkeeper.stats.NullStatsLogger;
-import org.apache.bookkeeper.stats.StatsLogger;
 import org.apache.bookkeeper.util.StaticDNSResolver;
 import org.junit.Test;
 import org.slf4j.Logger;
@@ -108,6 +106,25 @@ static void updateMyRack(String rack) throws Exception {
         StaticDNSResolver.addNodeToRack("localhost", rack);
     }
 
+    static BookiesHealthInfo getBookiesHealthInfo() {
+        return getBookiesHealthInfo(new HashMap<>(), new HashMap<>());
+    }
+
+    static BookiesHealthInfo getBookiesHealthInfo(Map<BookieSocketAddress, 
Long> bookieFailureHistory,
+                                                  Map<BookieSocketAddress, 
Integer> bookiePendingRequests) {
+        return new BookiesHealthInfo() {
+            @Override
+            public long getBookieFailureHistory(BookieSocketAddress 
bookieSocketAddress) {
+                return bookieFailureHistory.getOrDefault(bookieSocketAddress, 
-1L);
+            }
+
+            @Override
+            public int getBookiePendingRequests(BookieSocketAddress 
bookieSocketAddress) {
+                return bookiePendingRequests.getOrDefault(bookieSocketAddress, 
0);
+            }
+        };
+    }
+
     @Test
     public void testNodeDown() throws Exception {
         repp.uninitalize();
@@ -128,8 +145,7 @@ public void testNodeDown() throws Exception {
 
         DistributionSchedule.WriteSet origWriteSet = writeSet.copy();
         DistributionSchedule.WriteSet reorderSet = repp.reorderReadSequence(
-                ensemble, new HashMap<BookieSocketAddress, Long>(),
-                writeSet);
+                ensemble, getBookiesHealthInfo(), writeSet);
         DistributionSchedule.WriteSet expectedSet
             = writeSetFromValues(1, 2, 3, 0);
         LOG.info("reorder set : {}", reorderSet);
@@ -160,7 +176,38 @@ public void testNodeReadOnly() throws Exception {
 
         DistributionSchedule.WriteSet origWriteSet = writeSet.copy();
         DistributionSchedule.WriteSet reorderSet = repp.reorderReadSequence(
-                ensemble, new HashMap<BookieSocketAddress, Long>(), writeSet);
+                ensemble, getBookiesHealthInfo(), writeSet);
+        DistributionSchedule.WriteSet expectedSet
+            = writeSetFromValues(1, 2, 3, 0);
+        LOG.info("reorder set : {}", reorderSet);
+        assertFalse(reorderSet.equals(origWriteSet));
+        assertEquals(expectedSet, reorderSet);
+    }
+
+    @Test
+    public void testNodeSlow() throws Exception {
+        repp.uninitalize();
+        updateMyRack("/r1/rack1");
+
+        repp = new RackawareEnsemblePlacementPolicy();
+        repp.initialize(conf, Optional.<DNSToSwitchMapping>empty(), timer, 
DISABLE_ALL, NullStatsLogger.INSTANCE);
+        repp.withDefaultRack(NetworkTopology.DEFAULT_REGION_AND_RACK);
+
+        // Update cluster
+        Set<BookieSocketAddress> addrs = new HashSet<BookieSocketAddress>();
+        addrs.add(addr1);
+        addrs.add(addr2);
+        addrs.add(addr3);
+        addrs.add(addr4);
+        repp.onClusterChanged(addrs, new HashSet<BookieSocketAddress>());
+        repp.registerSlowBookie(addr1, 0L);
+        Map<BookieSocketAddress, Integer> bookiePendingMap = new HashMap<>();
+        bookiePendingMap.put(addr1, 1);
+        repp.onClusterChanged(addrs, new HashSet<BookieSocketAddress>());
+
+        DistributionSchedule.WriteSet origWriteSet = writeSet.copy();
+        DistributionSchedule.WriteSet reorderSet = repp.reorderReadSequence(
+            ensemble, getBookiesHealthInfo(new HashMap<>(), bookiePendingMap), 
writeSet);
         DistributionSchedule.WriteSet expectedSet
             = writeSetFromValues(1, 2, 3, 0);
         LOG.info("reorder set : {}", reorderSet);
@@ -168,6 +215,39 @@ public void testNodeReadOnly() throws Exception {
         assertEquals(expectedSet, reorderSet);
     }
 
+    @Test
+    public void testTwoNodesSlow() throws Exception {
+        repp.uninitalize();
+        updateMyRack("/r1/rack1");
+
+        repp = new RackawareEnsemblePlacementPolicy();
+        repp.initialize(conf, Optional.<DNSToSwitchMapping>empty(), timer, 
DISABLE_ALL, NullStatsLogger.INSTANCE);
+        repp.withDefaultRack(NetworkTopology.DEFAULT_REGION_AND_RACK);
+
+        // Update cluster
+        Set<BookieSocketAddress> addrs = new HashSet<BookieSocketAddress>();
+        addrs.add(addr1);
+        addrs.add(addr2);
+        addrs.add(addr3);
+        addrs.add(addr4);
+        repp.onClusterChanged(addrs, new HashSet<BookieSocketAddress>());
+        repp.registerSlowBookie(addr1, 0L);
+        repp.registerSlowBookie(addr2, 0L);
+        Map<BookieSocketAddress, Integer> bookiePendingMap = new 
HashMap<BookieSocketAddress, Integer>();
+        bookiePendingMap.put(addr1, 1);
+        bookiePendingMap.put(addr2, 2);
+        repp.onClusterChanged(addrs, new HashSet<BookieSocketAddress>());
+
+        BookiesHealthInfo bookiesHealthInfo = getBookiesHealthInfo(new 
HashMap<>(), bookiePendingMap);
+        DistributionSchedule.WriteSet origWriteSet = writeSet.copy();
+        DistributionSchedule.WriteSet reorderSet = 
repp.reorderReadSequence(ensemble, bookiesHealthInfo, writeSet);
+        DistributionSchedule.WriteSet expectedSet
+            = writeSetFromValues(2, 3, 0, 1);
+        LOG.info("reorder set : {}", reorderSet);
+        assertFalse(reorderSet.equals(origWriteSet));
+        assertEquals(expectedSet, reorderSet);
+    }
+
     @Test
     public void testTwoNodesDown() throws Exception {
         repp.uninitalize();
@@ -190,7 +270,7 @@ public void testTwoNodesDown() throws Exception {
 
         DistributionSchedule.WriteSet origWriteSet = writeSet.copy();
         DistributionSchedule.WriteSet reorderSet = repp.reorderReadSequence(
-                ensemble, new HashMap<BookieSocketAddress, Long>(), writeSet);
+                ensemble, getBookiesHealthInfo(), writeSet);
         DistributionSchedule.WriteSet expectedSet
             = writeSetFromValues(2, 3, 0, 1);
         LOG.info("reorder set : {}", reorderSet);
@@ -219,11 +299,84 @@ public void testNodeDownAndReadOnly() throws Exception {
         Set<BookieSocketAddress> roAddrs = new HashSet<BookieSocketAddress>();
         roAddrs.add(addr2);
         repp.onClusterChanged(addrs, roAddrs);
+
         DistributionSchedule.WriteSet origWriteSet = writeSet.copy();
         DistributionSchedule.WriteSet reorderSet = repp.reorderReadSequence(
-                ensemble, new HashMap<BookieSocketAddress, Long>(), writeSet);
+                ensemble, getBookiesHealthInfo(), writeSet);
         DistributionSchedule.WriteSet expectedSet
             = writeSetFromValues(2, 3, 1, 0);
+        LOG.info("reorder set : {}", reorderSet);
+        assertFalse(reorderSet.equals(origWriteSet));
+        assertEquals(expectedSet, reorderSet);
+    }
+
+    @Test
+    public void testNodeDownAndNodeSlow() throws Exception {
+        repp.uninitalize();
+        updateMyRack("/r1/rack1");
+
+        repp = new RackawareEnsemblePlacementPolicy();
+        repp.initialize(conf, Optional.<DNSToSwitchMapping>empty(), timer, 
DISABLE_ALL, NullStatsLogger.INSTANCE);
+        repp.withDefaultRack(NetworkTopology.DEFAULT_REGION_AND_RACK);
+
+        // Update cluster
+        Set<BookieSocketAddress> addrs = new HashSet<BookieSocketAddress>();
+        addrs.add(addr1);
+        addrs.add(addr2);
+        addrs.add(addr3);
+        addrs.add(addr4);
+        repp.onClusterChanged(addrs, new HashSet<BookieSocketAddress>());
+        repp.registerSlowBookie(addr1, 0L);
+        Map<BookieSocketAddress, Integer> bookiePendingMap = new 
HashMap<BookieSocketAddress, Integer>();
+        bookiePendingMap.put(addr1, 1);
+        addrs.remove(addr2);
+        repp.onClusterChanged(addrs, new HashSet<BookieSocketAddress>());
+
+        BookiesHealthInfo bookiesHealthInfo = getBookiesHealthInfo(
+            new HashMap<BookieSocketAddress, Long>(), bookiePendingMap);
+        DistributionSchedule.WriteSet origWriteSet = writeSet.copy();
+        DistributionSchedule.WriteSet reorderSet = repp.reorderReadSequence(
+            ensemble, bookiesHealthInfo, writeSet);
+        DistributionSchedule.WriteSet expectedSet
+            = writeSetFromValues(2, 3, 0, 1);
+        LOG.info("reorder set : {}", reorderSet);
+        assertFalse(reorderSet.equals(origWriteSet));
+        assertEquals(expectedSet, reorderSet);
+    }
+
+    @Test
+    public void testNodeDownAndReadOnlyAndNodeSlow() throws Exception {
+        repp.uninitalize();
+        updateMyRack("/r1/rack1");
+
+        repp = new RackawareEnsemblePlacementPolicy();
+        repp.initialize(conf, Optional.<DNSToSwitchMapping>empty(), timer, 
DISABLE_ALL, NullStatsLogger.INSTANCE);
+        repp.withDefaultRack(NetworkTopology.DEFAULT_REGION_AND_RACK);
+
+        // Update cluster
+        Set<BookieSocketAddress> addrs = new HashSet<BookieSocketAddress>();
+        addrs.add(addr1);
+        addrs.add(addr2);
+        addrs.add(addr3);
+        addrs.add(addr4);
+        repp.onClusterChanged(addrs, new HashSet<BookieSocketAddress>());
+        addrs.remove(addr1);
+        addrs.remove(addr2);
+        Set<BookieSocketAddress> ro = new HashSet<BookieSocketAddress>();
+        ro.add(addr2);
+        repp.registerSlowBookie(addr3, 0L);;
+        Map<BookieSocketAddress, Integer> bookiePendingMap = new 
HashMap<BookieSocketAddress, Integer>();
+        bookiePendingMap.put(addr3, 1);
+        repp.onClusterChanged(addrs, ro);
+
+        BookiesHealthInfo bookiesHealthInfo = getBookiesHealthInfo(
+            new HashMap<BookieSocketAddress, Long>(), bookiePendingMap);
+        DistributionSchedule.WriteSet origWriteSet = writeSet.copy();
+        DistributionSchedule.WriteSet reorderSet = repp.reorderReadSequence(
+            ensemble, bookiesHealthInfo, writeSet);
+        DistributionSchedule.WriteSet expectedSet
+            = writeSetFromValues(3, 1, 2, 0);
+        LOG.info("reorder set : {}", reorderSet);
         assertFalse(reorderSet.equals(origWriteSet));
         assertEquals(expectedSet, reorderSet);
     }
@@ -714,8 +867,12 @@ public void testNodeWithFailures() throws Exception {
         bookieFailures.put(addr1, 20L);
         bookieFailures.put(addr2, 22L);
 
+        BookiesHealthInfo bookiesHealthInfo = getBookiesHealthInfo(
+            bookieFailures,
+            new HashMap<>()
+        );
         DistributionSchedule.WriteSet reoderSet = repp.reorderReadSequence(
-                ensemble, bookieFailures, writeSet);
+                ensemble, bookiesHealthInfo, writeSet);
         LOG.info("reorder set : {}", reoderSet);
         assertEquals(ensemble.get(reoderSet.get(2)), addr1);
         assertEquals(ensemble.get(reoderSet.get(3)), addr2);
diff --git 
a/bookkeeper-server/src/test/java/org/apache/bookkeeper/client/TestRegionAwareEnsemblePlacementPolicy.java
 
b/bookkeeper-server/src/test/java/org/apache/bookkeeper/client/TestRegionAwareEnsemblePlacementPolicy.java
index d2cfcc402..441897e89 100644
--- 
a/bookkeeper-server/src/test/java/org/apache/bookkeeper/client/TestRegionAwareEnsemblePlacementPolicy.java
+++ 
b/bookkeeper-server/src/test/java/org/apache/bookkeeper/client/TestRegionAwareEnsemblePlacementPolicy.java
@@ -19,14 +19,13 @@
 
 import java.net.InetAddress;
 import java.util.ArrayList;
-import java.util.Arrays;
 import java.util.HashMap;
 import java.util.HashSet;
 import java.util.List;
+import java.util.Map;
 import java.util.Set;
 import java.util.concurrent.TimeUnit;
 
-import com.google.common.primitives.Ints;
 import com.google.common.util.concurrent.ThreadFactoryBuilder;
 
 import io.netty.util.HashedWheelTimer;
@@ -111,6 +110,25 @@ protected void tearDown() throws Exception {
         super.tearDown();
     }
 
+    static BookiesHealthInfo getBookiesHealthInfo() {
+        return getBookiesHealthInfo(new HashMap<>(), new HashMap<>());
+    }
+
+    static BookiesHealthInfo getBookiesHealthInfo(Map<BookieSocketAddress, 
Long> bookieFailureHistory,
+                                                  Map<BookieSocketAddress, 
Integer> bookiePendingRequests) {
+        return new BookiesHealthInfo() {
+            @Override
+            public long getBookieFailureHistory(BookieSocketAddress 
bookieSocketAddress) {
+                return bookieFailureHistory.getOrDefault(bookieSocketAddress, 
-1L);
+            }
+
+            @Override
+            public int getBookiePendingRequests(BookieSocketAddress 
bookieSocketAddress) {
+                return bookiePendingRequests.getOrDefault(bookieSocketAddress, 
0);
+            }
+        };
+    }
+
     @Test
     public void testNotReorderReadIfInDefaultRack() throws Exception {
         repp.uninitalize();
@@ -121,7 +139,7 @@ public void testNotReorderReadIfInDefaultRack() throws 
Exception {
 
         DistributionSchedule.WriteSet origWriteSet = writeSet.copy();
         DistributionSchedule.WriteSet reorderSet = repp.reorderReadSequence(
-                ensemble, new HashMap<BookieSocketAddress, Long>(), writeSet);
+                ensemble, getBookiesHealthInfo(), writeSet);
         assertEquals(origWriteSet, reorderSet);
     }
 
@@ -144,7 +162,7 @@ public void testNodeInSameRegion() throws Exception {
         repp.onClusterChanged(addrs, new HashSet<BookieSocketAddress>());
 
         DistributionSchedule.WriteSet reorderSet = repp.reorderReadSequence(
-                ensemble, new HashMap<BookieSocketAddress, Long>(), 
writeSet.copy());
+                ensemble, getBookiesHealthInfo(), writeSet.copy());
         DistributionSchedule.WriteSet expectedSet = writeSetFromValues(0, 3, 
1, 2);
         LOG.info("write set : {}", writeSet);
         LOG.info("reorder set : {}", reorderSet);
@@ -164,7 +182,7 @@ public void testNodeNotInSameRegions() throws Exception {
 
         DistributionSchedule.WriteSet origWriteSet = writeSet.copy();
         DistributionSchedule.WriteSet reorderSet = repp.reorderReadSequence(
-                ensemble, new HashMap<BookieSocketAddress, Long>(), writeSet);
+                ensemble, getBookiesHealthInfo(), writeSet);
         LOG.info("reorder set : {}", reorderSet);
         assertEquals(origWriteSet, reorderSet);
     }
@@ -189,7 +207,7 @@ public void testNodeDown() throws Exception {
 
         DistributionSchedule.WriteSet origWriteSet = writeSet.copy();
         DistributionSchedule.WriteSet reorderSet = repp.reorderReadSequence(
-                ensemble, new HashMap<BookieSocketAddress, Long>(), writeSet);
+                ensemble, getBookiesHealthInfo(), writeSet);
         DistributionSchedule.WriteSet expectedSet
             = writeSetFromValues(3, 1, 2, 0);
         LOG.info("reorder set : {}", reorderSet);
@@ -219,7 +237,39 @@ public void testNodeReadOnly() throws Exception {
 
         DistributionSchedule.WriteSet origWriteSet = writeSet.copy();
         DistributionSchedule.WriteSet reorderSet = repp.reorderReadSequence(
-                ensemble, new HashMap<BookieSocketAddress, Long>(), writeSet);
+                ensemble, getBookiesHealthInfo(), writeSet);
+        DistributionSchedule.WriteSet expectedSet
+            = writeSetFromValues(3, 1, 2, 0);
+        LOG.info("reorder set : {}", reorderSet);
+        assertFalse(reorderSet.equals(origWriteSet));
+        assertEquals(expectedSet, reorderSet);
+    }
+
+    @Test
+    public void testNodeSlow() throws Exception {
+        repp.uninitalize();
+        updateMyRack("/r1/rack1");
+
+        repp = new RegionAwareEnsemblePlacementPolicy();
+        repp.initialize(conf, Optional.<DNSToSwitchMapping>empty(), timer, 
DISABLE_ALL, NullStatsLogger.INSTANCE);
+
+        // Update cluster
+        Set<BookieSocketAddress> addrs = new HashSet<BookieSocketAddress>();
+        addrs.add(addr1);
+        addrs.add(addr2);
+        addrs.add(addr3);
+        addrs.add(addr4);
+        repp.onClusterChanged(addrs, new HashSet<BookieSocketAddress>());
+        repp.registerSlowBookie(addr1, 0L);
+        Map<BookieSocketAddress, Integer> bookiePendingMap = new 
HashMap<BookieSocketAddress, Integer>();
+        bookiePendingMap.put(addr1, 1);
+        repp.onClusterChanged(addrs, new HashSet<BookieSocketAddress>());
+
+        BookiesHealthInfo bookiesHealthInfo = getBookiesHealthInfo(
+            new HashMap<BookieSocketAddress, Long>(), bookiePendingMap);
+        DistributionSchedule.WriteSet origWriteSet = writeSet.copy();
+        DistributionSchedule.WriteSet reorderSet = repp.reorderReadSequence(
+            ensemble, bookiesHealthInfo, writeSet);
         DistributionSchedule.WriteSet expectedSet
             = writeSetFromValues(3, 1, 2, 0);
         LOG.info("reorder set : {}", reorderSet);
@@ -227,6 +277,40 @@ public void testNodeReadOnly() throws Exception {
         assertEquals(expectedSet, reorderSet);
     }
 
+    @Test
+    public void testTwoNodesSlow() throws Exception {
+        repp.uninitalize();
+        updateMyRack("/r1/rack1");
+
+        repp = new RegionAwareEnsemblePlacementPolicy();
+        repp.initialize(conf, Optional.<DNSToSwitchMapping>empty(), timer, 
DISABLE_ALL, NullStatsLogger.INSTANCE);
+
+        // Update cluster
+        Set<BookieSocketAddress> addrs = new HashSet<BookieSocketAddress>();
+        addrs.add(addr1);
+        addrs.add(addr2);
+        addrs.add(addr3);
+        addrs.add(addr4);
+        repp.onClusterChanged(addrs, new HashSet<BookieSocketAddress>());
+        repp.registerSlowBookie(addr1, 0L);
+        repp.registerSlowBookie(addr2, 0L);
+        Map<BookieSocketAddress, Integer> bookiePendingMap = new 
HashMap<BookieSocketAddress, Integer>();
+        bookiePendingMap.put(addr1, 1);
+        bookiePendingMap.put(addr2, 2);
+        repp.onClusterChanged(addrs, new HashSet<BookieSocketAddress>());
+
+        BookiesHealthInfo bookiesHealthInfo = getBookiesHealthInfo(
+            new HashMap<BookieSocketAddress, Long>(), bookiePendingMap);
+        DistributionSchedule.WriteSet origWriteSet = writeSet.copy();
+        DistributionSchedule.WriteSet reorderSet = repp.reorderReadSequence(
+            ensemble, bookiesHealthInfo, writeSet);
+        DistributionSchedule.WriteSet expectedSet
+            = writeSetFromValues(3, 2, 0, 1);
+        LOG.info("reorder set : {}", reorderSet);
+        assertFalse(reorderSet.equals(origWriteSet));
+        assertEquals(expectedSet, reorderSet);
+    }
+
     @Test
     public void testTwoNodesDown() throws Exception {
         repp.uninitalize();
@@ -248,7 +332,40 @@ public void testTwoNodesDown() throws Exception {
 
         DistributionSchedule.WriteSet origWriteSet = writeSet.copy();
         DistributionSchedule.WriteSet reorderSet = repp.reorderReadSequence(
-                ensemble, new HashMap<BookieSocketAddress, Long>(), writeSet);
+                ensemble, getBookiesHealthInfo(), writeSet);
+        DistributionSchedule.WriteSet expectedSet
+            = writeSetFromValues(3, 2, 0, 1);
+        LOG.info("reorder set : {}", reorderSet);
+        assertFalse(reorderSet.equals(origWriteSet));
+        assertEquals(expectedSet, reorderSet);
+    }
+
+    @Test
+    public void testNodeDownAndNodeSlow() throws Exception {
+        repp.uninitalize();
+        updateMyRack("/r1/rack1");
+
+        repp = new RegionAwareEnsemblePlacementPolicy();
+        repp.initialize(conf, Optional.<DNSToSwitchMapping>empty(), timer, 
DISABLE_ALL, NullStatsLogger.INSTANCE);
+
+        // Update cluster
+        Set<BookieSocketAddress> addrs = new HashSet<BookieSocketAddress>();
+        addrs.add(addr1);
+        addrs.add(addr2);
+        addrs.add(addr3);
+        addrs.add(addr4);
+        repp.onClusterChanged(addrs, new HashSet<BookieSocketAddress>());
+        repp.registerSlowBookie(addr1, 0L);
+        Map<BookieSocketAddress, Integer> bookiePendingMap = new 
HashMap<BookieSocketAddress, Integer>();
+        bookiePendingMap.put(addr1, 1);
+        addrs.remove(addr2);
+        repp.onClusterChanged(addrs, new HashSet<BookieSocketAddress>());
+
+        BookiesHealthInfo bookiesHealthInfo = getBookiesHealthInfo(
+            new HashMap<BookieSocketAddress, Long>(), bookiePendingMap);
+        DistributionSchedule.WriteSet origWriteSet = writeSet.copy();
+        DistributionSchedule.WriteSet reorderSet = repp.reorderReadSequence(
+            ensemble, bookiesHealthInfo, writeSet);
         DistributionSchedule.WriteSet expectedSet
             = writeSetFromValues(3, 2, 0, 1);
         LOG.info("reorder set : {}", reorderSet);
@@ -256,6 +373,42 @@ public void testTwoNodesDown() throws Exception {
         assertEquals(expectedSet, reorderSet);
     }
 
+    @Test
+    public void testNodeDownAndReadOnlyAndNodeSlow() throws Exception {
+        repp.uninitalize();
+        updateMyRack("/r1/rack1");
+
+        repp = new RegionAwareEnsemblePlacementPolicy();
+        repp.initialize(conf, Optional.<DNSToSwitchMapping>empty(), timer, 
DISABLE_ALL, NullStatsLogger.INSTANCE);
+
+        // Update cluster
+        Set<BookieSocketAddress> addrs = new HashSet<BookieSocketAddress>();
+        addrs.add(addr1);
+        addrs.add(addr2);
+        addrs.add(addr3);
+        addrs.add(addr4);
+        repp.onClusterChanged(addrs, new HashSet<BookieSocketAddress>());
+        addrs.remove(addr1);
+        addrs.remove(addr2);
+        Set<BookieSocketAddress> ro = new HashSet<BookieSocketAddress>();
+        ro.add(addr2);
+        repp.registerSlowBookie(addr3, 0L);
+        Map<BookieSocketAddress, Integer> bookiePendingMap = new 
HashMap<BookieSocketAddress, Integer>();
+        bookiePendingMap.put(addr3, 1);
+        repp.onClusterChanged(addrs, ro);
+
+        BookiesHealthInfo bookiesHealthInfo = getBookiesHealthInfo(
+            new HashMap<BookieSocketAddress, Long>(), bookiePendingMap);
+        DistributionSchedule.WriteSet origWriteSet = writeSet.copy();
+        DistributionSchedule.WriteSet reorderSet = repp.reorderReadSequence(
+            ensemble, bookiesHealthInfo, writeSet);
+        DistributionSchedule.WriteSet expectedSet
+            = writeSetFromValues(3, 1, 2, 0);
+        LOG.info("reorder set : {}", reorderSet);
+        assertFalse(reorderSet.equals(origWriteSet));
+        assertEquals(expectedSet, reorderSet);
+    }
+
     @Test
     public void testReplaceBookieWithEnoughBookiesInSameRegion() throws 
Exception {
         BookieSocketAddress addr1 = new BookieSocketAddress("127.0.0.2", 3181);
@@ -1046,11 +1199,11 @@ private void 
basicReorderReadSequenceWithLocalRegionTest(String myRegion, boolea
             if (isReadLAC) {
                 readSet = repp.reorderReadLACSequence(
                         ensemble,
-                        new HashMap<BookieSocketAddress, Long>(), writeSet);
+                        getBookiesHealthInfo(), writeSet);
             } else {
                 readSet = repp.reorderReadSequence(
                         ensemble,
-                        new HashMap<BookieSocketAddress, Long>(), writeSet);
+                        getBookiesHealthInfo(), writeSet);
             }
 
             LOG.info("Reorder {} => {}.", origWriteSet, readSet);
@@ -1102,12 +1255,12 @@ private void 
basicReorderReadSequenceWithRemoteRegionTest(String myRegion, boole
             if (isReadLAC) {
                 readSet = repp.reorderReadLACSequence(
                         ensemble,
-                        new HashMap<BookieSocketAddress, Long>(),
+                        getBookiesHealthInfo(),
                         writeSet.copy());
             } else {
                 readSet = repp.reorderReadSequence(
                         ensemble,
-                        new HashMap<BookieSocketAddress, Long>(),
+                        getBookiesHealthInfo(),
                         writeSet.copy());
             }
 
@@ -1177,11 +1330,11 @@ private void 
reorderReadSequenceWithUnavailableOrReadOnlyBookiesTest(boolean isR
             DistributionSchedule.WriteSet readSet;
             if (isReadLAC) {
                 readSet = repp.reorderReadLACSequence(
-                        ensemble, new HashMap<BookieSocketAddress, Long>(),
+                        ensemble, getBookiesHealthInfo(),
                         writeSet.copy());
             } else {
                 readSet = repp.reorderReadSequence(
-                        ensemble, new HashMap<BookieSocketAddress, Long>(),
+                        ensemble, getBookiesHealthInfo(),
                         writeSet.copy());
             }
 
@@ -1271,8 +1424,10 @@ public void testNodeWithFailures() throws Exception {
         bookieFailures.put(addr4, 25L);
 
         LOG.info("write set : {}", writeSet2);
+        BookiesHealthInfo bookiesHealthInfo = getBookiesHealthInfo(
+            bookieFailures, new HashMap<>());
         DistributionSchedule.WriteSet reoderSet = repp.reorderReadSequence(
-                ensemble, bookieFailures, writeSet2);
+                ensemble, bookiesHealthInfo, writeSet2);
         LOG.info("reorder set : {}", reoderSet);
         assertEquals(ensemble.get(reoderSet.get(0)), addr6);
         assertEquals(ensemble.get(reoderSet.get(1)), addr7);
diff --git 
a/bookkeeper-server/src/test/java/org/apache/bookkeeper/client/TestSpeculativeRead.java
 
b/bookkeeper-server/src/test/java/org/apache/bookkeeper/client/TestSpeculativeRead.java
index acfeb8cba..a188f8dbf 100644
--- 
a/bookkeeper-server/src/test/java/org/apache/bookkeeper/client/TestSpeculativeRead.java
+++ 
b/bookkeeper-server/src/test/java/org/apache/bookkeeper/client/TestSpeculativeRead.java
@@ -20,6 +20,7 @@
  */
 package org.apache.bookkeeper.client;
 
+import static org.junit.Assert.assertEquals;
 import static org.junit.Assert.assertFalse;
 import static org.junit.Assert.assertNull;
 import static org.junit.Assert.assertTrue;
@@ -27,6 +28,8 @@
 import java.util.ArrayList;
 import java.util.BitSet;
 import java.util.Enumeration;
+import java.util.HashSet;
+import java.util.Set;
 import java.util.concurrent.CountDownLatch;
 import java.util.concurrent.TimeUnit;
 import org.apache.bookkeeper.conf.ClientConfiguration;
@@ -68,7 +71,9 @@ long getLedgerToRead(int ensemble, int quorum) throws 
Exception {
     BookKeeper createClient(int specTimeout) throws Exception {
         ClientConfiguration conf = new ClientConfiguration()
             .setSpeculativeReadTimeout(specTimeout)
-            .setReadTimeout(30000);
+            .setReadTimeout(30000)
+            .setReorderReadSequenceEnabled(true)
+            .setEnsemblePlacementPolicySlowBookies(true);
         conf.setZkServers(zkUtil.getZooKeeperConnectString());
         return new BookKeeper(conf);
     }
@@ -149,6 +154,9 @@ public void testSpeculativeRead() throws Exception {
             lspec.asyncReadEntries(1, 1, speccb, null);
             speccb.expectSuccess(4000);
             nospeccb.expectTimeout(4000);
+            // Check that the second bookie is registered as a slow bookie at 
entryId 1
+            assertTrue(((RackawareEnsemblePlacementPolicy) 
lspec.bk.placementPolicy).slowBookies.asMap().size() == 1);
+            assertTrue(((RackawareEnsemblePlacementPolicy) 
lspec.bk.placementPolicy).slowBookies.asMap().get(second) == 1L);
         } finally {
             sleepLatch.countDown();
             lspec.close();
@@ -194,16 +202,18 @@ public void testSpeculativeReadMultipleReplicasDown() 
throws Exception {
                        latch1.getDuration() >= timeout*2
                        && latch1.getDuration() < timeout*3);
 
-            // third should have to hit one timeouts (bookie 2)
+            // bookies 1 & 2 should be registered as slow bookies because of 
speculative reads
+            Set<BookieSocketAddress> expectedSlowBookies = new 
HashSet<BookieSocketAddress>();
+            
expectedSlowBookies.add(l.getLedgerMetadata().getEnsembles().get(0L).get(1));
+            
expectedSlowBookies.add(l.getLedgerMetadata().getEnsembles().get(0L).get(2));
+            assertEquals(((RackawareEnsemblePlacementPolicy) 
l.bk.placementPolicy).slowBookies.asMap().keySet(),
+                expectedSlowBookies);
+
+            // third should not hit timeouts since bookies 1 & 2 are 
registered as slow
             // bookie 3 has the entry
             LatchCallback latch2 = new LatchCallback();
             l.asyncReadEntries(2, 2, latch2, null);
-            latch2.expectTimeout(timeout/2);
             latch2.expectSuccess(timeout);
-            LOG.info("Timeout {} latch2 duration {}", timeout, 
latch2.getDuration());
-            assertTrue("should have taken longer than one timeout, but less 
than 2",
-                       latch2.getDuration() >= timeout
-                       && latch2.getDuration() < timeout*2);
 
             // fourth should have no timeout
             // bookie 3 has the entry


 

----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
[email protected]


With regards,
Apache Git Services

Reply via email to