This is an automated email from the ASF dual-hosted git repository.

sijie pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/bookkeeper.git


The following commit(s) were added to refs/heads/master by this push:
     new 134a6c2  Issue #1489: Better Prevent Read Outliers during short-term 
Bookie Slow-Down
134a6c2 is described below

commit 134a6c25a846731dc2ad0dc6c3e848170557b7b9
Author: Nicolas Michael <[email protected]>
AuthorDate: Wed Jun 13 17:41:02 2018 -0700

    Issue #1489: Better Prevent Read Outliers during short-term Bookie Slow-Down
    
    ### Motivation
    
    Bookies can temporarily be slow for a large number of reasons, often for 
just a brief time of few milliseconds to seconds such as during Java Garbage 
Collection or EntryLog compaction. For writes, latencies of individual bookies 
are masked by acknowledging the client after a quorum of bookies have replied. 
However for reads, we don't have any equivalent feature to mask short-term 
latencies of individual bookies yet (in case of SequenceReadRequests). This PR 
implements such a feature b [...]
    
    ### Changes
    This change implements a configurable reordering of read requests in 
Bokkeeper client based on the number of pending requests to each bookie that 
could service the request. The intention is to mask the latency of one bookie 
by directing a read request to another bookie that could potentially service 
the request faster. This should help prevent read time outliers due to bookies 
that temporarily are responsing slow, for example due to Java garbage 
collection, compaction, or any other ki [...]
    
    Reordering of reads is based on a threshold of relative queue length to 
other bookies. Setting the threshold very low will more frequently reorder the 
read set and potentially result in better latency, but will also reduce data 
affinity of reads. Reads send to other than the preferred bookie have a low 
chance to be served from file system cache on that bookie, and will likely 
result in a physical read. Small thresholds therefore shuffle read requests 
more among bookies and may lead to [...]
    
    Master Issue: #1489
    
    Author: Nicolas Michael <[email protected]>
    
    Reviewers: Yiming Zang <[email protected]>, Sijie Guo <[email protected]>
    
    This closes #1504 from nicmichael/ReadReordering, closes #1489
---
 .../bookkeeper/client/BookKeeperClientStats.java   |   1 +
 .../org/apache/bookkeeper/client/LedgerHandle.java |  10 +-
 .../client/RackawareEnsemblePlacementPolicy.java   |  15 +-
 .../RackawareEnsemblePlacementPolicyImpl.java      |  51 ++++++-
 .../client/RegionAwareEnsemblePlacementPolicy.java |   6 +-
 .../bookkeeper/conf/ClientConfiguration.java       |  25 ++++
 .../TestRackawareEnsemblePlacementPolicy.java      | 166 +++++++++++++++++++++
 7 files changed, 264 insertions(+), 10 deletions(-)

diff --git 
a/bookkeeper-server/src/main/java/org/apache/bookkeeper/client/BookKeeperClientStats.java
 
b/bookkeeper-server/src/main/java/org/apache/bookkeeper/client/BookKeeperClientStats.java
index 83e6421..749ac9c 100644
--- 
a/bookkeeper-server/src/main/java/org/apache/bookkeeper/client/BookKeeperClientStats.java
+++ 
b/bookkeeper-server/src/main/java/org/apache/bookkeeper/client/BookKeeperClientStats.java
@@ -54,6 +54,7 @@ public interface BookKeeperClientStats {
     String LAC_UPDATE_MISSES = "LAC_UPDATE_MISSES";
     String GET_BOOKIE_INFO_OP = "GET_BOOKIE_INFO";
     String SPECULATIVE_READ_COUNT = "SPECULATIVE_READ_COUNT";
+    String READ_REQUESTS_REORDERED = "READ_REQUESTS_REORDERED";
 
     // per channel stats
     String CHANNEL_SCOPE = "per_channel_bookie_client";
diff --git 
a/bookkeeper-server/src/main/java/org/apache/bookkeeper/client/LedgerHandle.java
 
b/bookkeeper-server/src/main/java/org/apache/bookkeeper/client/LedgerHandle.java
index 9f90c5a..a2a37e9 100644
--- 
a/bookkeeper-server/src/main/java/org/apache/bookkeeper/client/LedgerHandle.java
+++ 
b/bookkeeper-server/src/main/java/org/apache/bookkeeper/client/LedgerHandle.java
@@ -99,6 +99,8 @@ import org.slf4j.LoggerFactory;
 public class LedgerHandle implements WriteHandle {
     static final Logger LOG = LoggerFactory.getLogger(LedgerHandle.class);
 
+    static final long PENDINGREQ_NOTWRITABLE_MASK = 0x01L << 62;
+
     final byte[] ledgerKey;
     LedgerMetadata metadata;
     final BookKeeper bk;
@@ -224,7 +226,13 @@ public class LedgerHandle implements WriteHandle {
             @Override
             public long getBookiePendingRequests(BookieSocketAddress 
bookieSocketAddress) {
                 PerChannelBookieClientPool pcbcPool = 
bk.bookieClient.lookupClient(bookieSocketAddress);
-                return pcbcPool == null ? 0 : 
pcbcPool.getNumPendingCompletionRequests();
+                if (pcbcPool == null) {
+                    return 0;
+                } else if (pcbcPool.isWritable(ledgerId)) {
+                    return pcbcPool.getNumPendingCompletionRequests();
+                } else {
+                    return pcbcPool.getNumPendingCompletionRequests() | 
PENDINGREQ_NOTWRITABLE_MASK;
+                }
             }
         };
 
diff --git 
a/bookkeeper-server/src/main/java/org/apache/bookkeeper/client/RackawareEnsemblePlacementPolicy.java
 
b/bookkeeper-server/src/main/java/org/apache/bookkeeper/client/RackawareEnsemblePlacementPolicy.java
index 32f94f3..e3ce8ff 100644
--- 
a/bookkeeper-server/src/main/java/org/apache/bookkeeper/client/RackawareEnsemblePlacementPolicy.java
+++ 
b/bookkeeper-server/src/main/java/org/apache/bookkeeper/client/RackawareEnsemblePlacementPolicy.java
@@ -50,19 +50,22 @@ public class RackawareEnsemblePlacementPolicy extends 
RackawareEnsemblePlacement
                                                           HashedWheelTimer 
timer,
                                                           boolean 
reorderReadsRandom,
                                                           int 
stabilizePeriodSeconds,
+                                                          int 
reorderThresholdPendingRequests,
                                                           boolean isWeighted,
                                                           int 
maxWeightMultiple,
                                                           int 
minNumRacksPerWriteQuorum,
                                                           StatsLogger 
statsLogger) {
         if (stabilizePeriodSeconds > 0) {
-            super.initialize(dnsResolver, timer, reorderReadsRandom, 0, 
isWeighted, maxWeightMultiple,
-                    minNumRacksPerWriteQuorum, statsLogger);
-            slave = new 
RackawareEnsemblePlacementPolicyImpl(enforceDurability);
-            slave.initialize(dnsResolver, timer, reorderReadsRandom, 
stabilizePeriodSeconds, isWeighted,
+            super.initialize(dnsResolver, timer, reorderReadsRandom, 0, 
reorderThresholdPendingRequests, isWeighted,
                     maxWeightMultiple, minNumRacksPerWriteQuorum, statsLogger);
+            slave = new 
RackawareEnsemblePlacementPolicyImpl(enforceDurability);
+            slave.initialize(dnsResolver, timer, reorderReadsRandom, 
stabilizePeriodSeconds,
+                    reorderThresholdPendingRequests, isWeighted, 
maxWeightMultiple,
+                    minNumRacksPerWriteQuorum, statsLogger);
         } else {
-            super.initialize(dnsResolver, timer, reorderReadsRandom, 
stabilizePeriodSeconds, isWeighted,
-                    maxWeightMultiple, minNumRacksPerWriteQuorum, statsLogger);
+            super.initialize(dnsResolver, timer, reorderReadsRandom, 
stabilizePeriodSeconds,
+                    reorderThresholdPendingRequests, isWeighted, 
maxWeightMultiple,
+                    minNumRacksPerWriteQuorum, statsLogger);
             slave = null;
         }
         return this;
diff --git 
a/bookkeeper-server/src/main/java/org/apache/bookkeeper/client/RackawareEnsemblePlacementPolicyImpl.java
 
b/bookkeeper-server/src/main/java/org/apache/bookkeeper/client/RackawareEnsemblePlacementPolicyImpl.java
index 7416372..23afaa9 100644
--- 
a/bookkeeper-server/src/main/java/org/apache/bookkeeper/client/RackawareEnsemblePlacementPolicyImpl.java
+++ 
b/bookkeeper-server/src/main/java/org/apache/bookkeeper/client/RackawareEnsemblePlacementPolicyImpl.java
@@ -198,11 +198,13 @@ public class RackawareEnsemblePlacementPolicyImpl extends 
TopologyAwareEnsembleP
     protected boolean reorderReadsRandom = false;
     protected boolean enforceDurability = false;
     protected int stabilizePeriodSeconds = 0;
+    protected int reorderThresholdPendingRequests = 0;
     // looks like these only assigned in the same thread as constructor, 
immediately after constructor;
     // no need to make volatile
     protected StatsLogger statsLogger = null;
     protected OpStatsLogger bookiesJoinedCounter = null;
     protected OpStatsLogger bookiesLeftCounter = null;
+    protected OpStatsLogger readReorderedCounter = null;
 
     private String defaultRack = NetworkTopology.DEFAULT_RACK;
 
@@ -232,6 +234,7 @@ public class RackawareEnsemblePlacementPolicyImpl extends 
TopologyAwareEnsembleP
                                                               HashedWheelTimer 
timer,
                                                               boolean 
reorderReadsRandom,
                                                               int 
stabilizePeriodSeconds,
+                                                              int 
reorderThresholdPendingRequests,
                                                               boolean 
isWeighted,
                                                               int 
maxWeightMultiple,
                                                               int 
minNumRacksPerWriteQuorum,
@@ -240,8 +243,10 @@ public class RackawareEnsemblePlacementPolicyImpl extends 
TopologyAwareEnsembleP
         this.statsLogger = statsLogger;
         this.bookiesJoinedCounter = 
statsLogger.getOpStatsLogger(BookKeeperServerStats.BOOKIES_JOINED);
         this.bookiesLeftCounter = 
statsLogger.getOpStatsLogger(BookKeeperServerStats.BOOKIES_LEFT);
+        this.readReorderedCounter = 
statsLogger.getOpStatsLogger(BookKeeperClientStats.READ_REQUESTS_REORDERED);
         this.reorderReadsRandom = reorderReadsRandom;
         this.stabilizePeriodSeconds = stabilizePeriodSeconds;
+        this.reorderThresholdPendingRequests = reorderThresholdPendingRequests;
         this.dnsResolver = new DNSResolverDecorator(dnsResolver, () -> 
this.getDefaultRack());
         this.timer = timer;
         this.minNumRacksPerWriteQuorum = minNumRacksPerWriteQuorum;
@@ -330,6 +335,7 @@ public class RackawareEnsemblePlacementPolicyImpl extends 
TopologyAwareEnsembleP
                 timer,
                 conf.getBoolean(REPP_RANDOM_READ_REORDERING, false),
                 conf.getNetworkTopologyStabilizePeriodSeconds(),
+                conf.getReorderThresholdPendingRequests(),
                 conf.getDiskWeightBasedPlacementEnabled(),
                 conf.getBookieMaxWeightMultipleForWeightBasedPlacement(),
                 conf.getMinNumRacksPerWriteQuorum(),
@@ -952,7 +958,11 @@ public class RackawareEnsemblePlacementPolicyImpl extends 
TopologyAwareEnsembleP
 
     @Override
     public void registerSlowBookie(BookieSocketAddress bookieSocketAddress, 
long entryId) {
-        slowBookies.put(bookieSocketAddress, entryId);
+        if (reorderThresholdPendingRequests <= 0) {
+            // only put bookies on slowBookies list if 
reorderThresholdPendingRequests is *not* set (0);
+            // otherwise, rely on reordering of reads based on 
reorderThresholdPendingRequests
+            slowBookies.put(bookieSocketAddress, entryId);
+        }
     }
 
     @Override
@@ -1026,7 +1036,45 @@ public class RackawareEnsemblePlacementPolicyImpl 
extends TopologyAwareEnsembleP
             }
         }
 
+        boolean reordered = false;
+        if (reorderThresholdPendingRequests > 0) {
+            // if there are no slow or unavailable bookies, capture each 
bookie's number of
+            // pending request to reorder requests based on a threshold of 
pending requests
+
+            // number of pending requests per bookie (same index as writeSet)
+            long[] pendingReqs = new long[writeSet.size()];
+            int bestBookieIdx = -1;
+
+            for (int i = 0; i < writeSet.size(); i++) {
+                pendingReqs[i] = 
bookiesHealthInfo.getBookiePendingRequests(ensemble.get(writeSet.get(i)));
+                if (bestBookieIdx < 0 || pendingReqs[i] < 
pendingReqs[bestBookieIdx]) {
+                    bestBookieIdx = i;
+                }
+            }
+
+            // reorder the writeSet if the currently first bookie in our 
writeSet has at
+            // least
+            // reorderThresholdPendingRequests more outstanding request than 
the best bookie
+            if (bestBookieIdx > 0 && pendingReqs[0] >= 
pendingReqs[bestBookieIdx] + reorderThresholdPendingRequests) {
+                // We're not reordering the entire write set, but only move 
the best bookie
+                // to the first place. Chances are good that this bookie will 
be fast enough
+                // to not trigger the speculativeReadTimeout. But even if it 
hits that timeout,
+                // things may have changed by then so much that whichever 
bookie we put second
+                // may actually not be the second-best choice any more.
+                if (LOG.isDebugEnabled()) {
+                    LOG.debug("read set reordered from {} ({} pending) to {} 
({} pending)",
+                            ensemble.get(writeSet.get(0)), pendingReqs[0], 
ensemble.get(writeSet.get(bestBookieIdx)),
+                            pendingReqs[bestBookieIdx]);
+                }
+                writeSet.moveAndShift(bestBookieIdx, 0);
+                reordered = true;
+            }
+        }
+
         if (!isAnyBookieUnavailable) {
+            if (reordered) {
+                readReorderedCounter.registerSuccessfulValue(1);
+            }
             return writeSet;
         }
 
@@ -1137,6 +1185,7 @@ public class RackawareEnsemblePlacementPolicyImpl extends 
TopologyAwareEnsembleP
         for (int i = 0; i < writeSet.size(); i++) {
             writeSet.set(i, writeSet.get(i) & ~MASK_BITS);
         }
+        readReorderedCounter.registerSuccessfulValue(1);
         return writeSet;
     }
 
diff --git 
a/bookkeeper-server/src/main/java/org/apache/bookkeeper/client/RegionAwareEnsemblePlacementPolicy.java
 
b/bookkeeper-server/src/main/java/org/apache/bookkeeper/client/RegionAwareEnsemblePlacementPolicy.java
index 6205174..218781b 100644
--- 
a/bookkeeper-server/src/main/java/org/apache/bookkeeper/client/RegionAwareEnsemblePlacementPolicy.java
+++ 
b/bookkeeper-server/src/main/java/org/apache/bookkeeper/client/RegionAwareEnsemblePlacementPolicy.java
@@ -130,7 +130,8 @@ public class RegionAwareEnsemblePlacementPolicy extends 
RackawareEnsemblePlaceme
             if (null == perRegionPlacement.get(region)) {
                 perRegionPlacement.put(region, new 
RackawareEnsemblePlacementPolicy()
                         .initialize(dnsResolver, timer, 
this.reorderReadsRandom, this.stabilizePeriodSeconds,
-                                this.isWeighted, this.maxWeightMultiple, 
this.minNumRacksPerWriteQuorum, statsLogger)
+                                this.reorderThresholdPendingRequests, 
this.isWeighted, this.maxWeightMultiple,
+                                this.minNumRacksPerWriteQuorum, statsLogger)
                         
.withDefaultRack(NetworkTopology.DEFAULT_REGION_AND_RACK));
             }
 
@@ -178,7 +179,8 @@ public class RegionAwareEnsemblePlacementPolicy extends 
RackawareEnsemblePlaceme
             for (String region: regions) {
                 perRegionPlacement.put(region, new 
RackawareEnsemblePlacementPolicy(true)
                         .initialize(dnsResolver, timer, 
this.reorderReadsRandom, this.stabilizePeriodSeconds,
-                                this.isWeighted, this.maxWeightMultiple, 
this.minNumRacksPerWriteQuorum, statsLogger)
+                                this.reorderThresholdPendingRequests, 
this.isWeighted, this.maxWeightMultiple,
+                                this.minNumRacksPerWriteQuorum, statsLogger)
                         
.withDefaultRack(NetworkTopology.DEFAULT_REGION_AND_RACK));
             }
             minRegionsForDurability = 
conf.getInt(REPP_MINIMUM_REGIONS_FOR_DURABILITY,
diff --git 
a/bookkeeper-server/src/main/java/org/apache/bookkeeper/conf/ClientConfiguration.java
 
b/bookkeeper-server/src/main/java/org/apache/bookkeeper/conf/ClientConfiguration.java
index 9f09c37..340b40b 100644
--- 
a/bookkeeper-server/src/main/java/org/apache/bookkeeper/conf/ClientConfiguration.java
+++ 
b/bookkeeper-server/src/main/java/org/apache/bookkeeper/conf/ClientConfiguration.java
@@ -149,6 +149,7 @@ public class ClientConfiguration extends 
AbstractConfiguration<ClientConfigurati
     // Ensemble Placement Policy
     protected static final String ENSEMBLE_PLACEMENT_POLICY = 
"ensemblePlacementPolicy";
     protected static final String NETWORK_TOPOLOGY_STABILIZE_PERIOD_SECONDS = 
"networkTopologyStabilizePeriodSeconds";
+    protected static final String READ_REORDER_THRESHOLD_PENDING_REQUESTS = 
"readReorderThresholdPendingRequests";
     protected static final String ENSEMBLE_PLACEMENT_POLICY_ORDER_SLOW_BOOKIES 
=
         "ensemblePlacementPolicyOrderSlowBookies";
 
@@ -1127,6 +1128,30 @@ public class ClientConfiguration extends 
AbstractConfiguration<ClientConfigurati
     }
 
     /**
+     * Get the threshold for the number of pending requests beyond which to 
reorder
+     * reads. If &lt;= zero, this feature is turned off.
+     *
+     * @return the threshold for the number of pending requests beyond which to
+     *         reorder reads.
+     */
+    public int getReorderThresholdPendingRequests() {
+        return getInt(READ_REORDER_THRESHOLD_PENDING_REQUESTS, 0);
+    }
+
+    /**
+     * Set the threshold for the number of pending requests beyond which to 
reorder
+     * reads. If zero, this feature is turned off.
+     *
+     * @param threshold
+     *            The threshold for the number of pending requests beyond 
which to
+     *            reorder reads.
+     */
+    public ClientConfiguration setReorderThresholdPendingRequests(int 
threshold) {
+        setProperty(READ_REORDER_THRESHOLD_PENDING_REQUESTS, threshold);
+        return this;
+    }
+
+    /**
      * Get the network topology stabilize period in seconds. if it is zero, 
this feature is turned off.
      *
      * @return network topology stabilize period in seconds.
diff --git 
a/bookkeeper-server/src/test/java/org/apache/bookkeeper/client/TestRackawareEnsemblePlacementPolicy.java
 
b/bookkeeper-server/src/test/java/org/apache/bookkeeper/client/TestRackawareEnsemblePlacementPolicy.java
index ad9c9c8..cdff679 100644
--- 
a/bookkeeper-server/src/test/java/org/apache/bookkeeper/client/TestRackawareEnsemblePlacementPolicy.java
+++ 
b/bookkeeper-server/src/test/java/org/apache/bookkeeper/client/TestRackawareEnsemblePlacementPolicy.java
@@ -372,6 +372,172 @@ public class TestRackawareEnsemblePlacementPolicy extends 
TestCase {
         assertEquals(expectedSet, reorderSet);
     }
 
+    /*
+     * Tests the reordering of the writeSet based on number of pending 
requests.
+     * Expect the third bookie to be placed first since its number of pending 
requests
+     * is READ_REORDER_THRESHOLD_PENDING_REQUESTS=10 less than the originally 
first bookie.
+     */
+    @Test
+    public void testPendingRequestsReorder() throws Exception {
+        repp.uninitalize();
+        updateMyRack("/r1/rack1");
+
+        repp = new RackawareEnsemblePlacementPolicy();
+        ClientConfiguration conf = (ClientConfiguration) this.conf.clone();
+        conf.setReorderThresholdPendingRequests(10);
+        repp.initialize(conf, Optional.<DNSToSwitchMapping>empty(), timer, 
DISABLE_ALL, NullStatsLogger.INSTANCE);
+        repp.withDefaultRack(NetworkTopology.DEFAULT_REGION_AND_RACK);
+
+        // Update cluster
+        Set<BookieSocketAddress> addrs = new HashSet<BookieSocketAddress>();
+        addrs.add(addr1);
+        addrs.add(addr2);
+        addrs.add(addr3);
+        addrs.add(addr4);
+        repp.onClusterChanged(addrs, new HashSet<BookieSocketAddress>());
+        Map<BookieSocketAddress, Long> bookiePendingMap = new HashMap<>();
+        bookiePendingMap.put(addr1, 20L);
+        bookiePendingMap.put(addr2, 7L);
+        bookiePendingMap.put(addr3, 1L); // best bookie -> this one first
+        bookiePendingMap.put(addr4, 5L);
+
+        DistributionSchedule.WriteSet origWriteSet = writeSet.copy();
+        DistributionSchedule.WriteSet reorderSet = repp.reorderReadSequence(
+            ensemble, getBookiesHealthInfo(new HashMap<>(), bookiePendingMap), 
writeSet);
+        DistributionSchedule.WriteSet expectedSet = writeSetFromValues(2, 0, 
1, 3);
+        LOG.info("reorder set : {}", reorderSet);
+        assertEquals("expect bookie idx 2 first", expectedSet, reorderSet);
+    }
+
+    /*
+     * Tests the reordering of the writeSet based on number of pending 
requests for
+     * an ensemble that is larger than the writeSet.
+     * Expect the sixth bookie to be placed first since its number of pending 
requests
+     * is READ_REORDER_THRESHOLD_PENDING_REQUESTS=10 less than the originally 
first bookie.
+     */
+    @Test
+    public void testPendingRequestsReorderLargeEnsemble() throws Exception {
+        repp.uninitalize();
+        updateMyRack("/r1/rack1");
+
+        repp = new RackawareEnsemblePlacementPolicy();
+        ClientConfiguration conf = (ClientConfiguration) this.conf.clone();
+        conf.setReorderThresholdPendingRequests(10);
+        repp.initialize(conf, Optional.<DNSToSwitchMapping>empty(), timer, 
DISABLE_ALL, NullStatsLogger.INSTANCE);
+        repp.withDefaultRack(NetworkTopology.DEFAULT_REGION_AND_RACK);
+
+        // Update cluster
+        BookieSocketAddress addr5 = new BookieSocketAddress("127.0.0.6", 3181);
+        BookieSocketAddress addr6 = new BookieSocketAddress("127.0.0.7", 3181);
+        BookieSocketAddress addr7 = new BookieSocketAddress("127.0.0.8", 3181);
+        Set<BookieSocketAddress> addrs = new HashSet<BookieSocketAddress>();
+        addrs.add(addr1);
+        addrs.add(addr2);
+        addrs.add(addr3);
+        addrs.add(addr4);
+        addrs.add(addr5);
+        addrs.add(addr6);
+        addrs.add(addr7);
+        repp.onClusterChanged(addrs, new HashSet<BookieSocketAddress>());
+        Map<BookieSocketAddress, Long> bookiePendingMap = new HashMap<>();
+        bookiePendingMap.put(addr1, 1L); // not in write set
+        bookiePendingMap.put(addr2, 20L);
+        bookiePendingMap.put(addr3, 0L); // not in write set
+        bookiePendingMap.put(addr4, 12L);
+        bookiePendingMap.put(addr5, 9L); // not in write set
+        bookiePendingMap.put(addr6, 2L); // best bookie -> this one first
+        bookiePendingMap.put(addr7, 10L);
+        ArrayList<BookieSocketAddress> ensemble = new 
ArrayList<BookieSocketAddress>();
+        ensemble.add(addr1);
+        ensemble.add(addr2);
+        ensemble.add(addr3);
+        ensemble.add(addr4);
+        ensemble.add(addr5);
+        ensemble.add(addr6);
+        ensemble.add(addr7);
+
+        DistributionSchedule.WriteSet writeSet = writeSetFromValues(1, 3, 5, 
6);
+        DistributionSchedule.WriteSet origWriteSet = writeSet.copy();
+        DistributionSchedule.WriteSet reorderSet = repp.reorderReadSequence(
+                ensemble, getBookiesHealthInfo(new HashMap<>(), 
bookiePendingMap), writeSet);
+        DistributionSchedule.WriteSet expectedSet = writeSetFromValues(5, 1, 
3, 6);
+        LOG.info("reorder set : {}", reorderSet);
+        assertEquals("expect bookie idx 5 first", expectedSet, reorderSet);
+    }
+
+    /*
+     * Tests the reordering of the writeSet based on number of pending 
requests.
+     * Expect no reordering in this case since the currently first bookie's 
number of
+     * pending requests is less than 
READ_REORDER_THRESHOLD_PENDING_REQUESTS=10 lower
+     * than the best bookie.
+     */
+    @Test
+    public void testPendingRequestsNoReorder1() throws Exception {
+        repp.uninitalize();
+        updateMyRack("/r1/rack1");
+
+        repp = new RackawareEnsemblePlacementPolicy();
+        ClientConfiguration conf = (ClientConfiguration) this.conf.clone();
+        conf.setReorderThresholdPendingRequests(10);
+        repp.initialize(conf, Optional.<DNSToSwitchMapping>empty(), timer, 
DISABLE_ALL, NullStatsLogger.INSTANCE);
+        repp.withDefaultRack(NetworkTopology.DEFAULT_REGION_AND_RACK);
+
+        // Update cluster
+        Set<BookieSocketAddress> addrs = new HashSet<BookieSocketAddress>();
+        addrs.add(addr1);
+        addrs.add(addr2);
+        addrs.add(addr3);
+        addrs.add(addr4);
+        repp.onClusterChanged(addrs, new HashSet<BookieSocketAddress>());
+        Map<BookieSocketAddress, Long> bookiePendingMap = new HashMap<>();
+        bookiePendingMap.put(addr1, 10L); // -> this one first
+        bookiePendingMap.put(addr2, 7L);
+        bookiePendingMap.put(addr3, 1L); // best bookie, but below threshold
+        bookiePendingMap.put(addr4, 5L);
+
+        DistributionSchedule.WriteSet origWriteSet = writeSet.copy();
+        DistributionSchedule.WriteSet reorderSet = repp.reorderReadSequence(
+            ensemble, getBookiesHealthInfo(new HashMap<>(), bookiePendingMap), 
writeSet);
+        LOG.info("reorder set : {}", reorderSet);
+        assertEquals("writeSet should be in original order", origWriteSet, 
reorderSet);
+    }
+
+    /*
+     * Tests the reordering of the writeSet based on number of pending 
requests.
+     * Expect no reordering in this case since the currently first bookie's 
number of
+     * pending requests is lowest among all bookies already.
+     */
+    @Test
+    public void testPendingRequestsNoReorder2() throws Exception {
+        repp.uninitalize();
+        updateMyRack("/r1/rack1");
+
+        repp = new RackawareEnsemblePlacementPolicy();
+        ClientConfiguration conf = (ClientConfiguration) this.conf.clone();
+        conf.setReorderThresholdPendingRequests(10);
+        repp.initialize(conf, Optional.<DNSToSwitchMapping>empty(), timer, 
DISABLE_ALL, NullStatsLogger.INSTANCE);
+        repp.withDefaultRack(NetworkTopology.DEFAULT_REGION_AND_RACK);
+
+        // Update cluster
+        Set<BookieSocketAddress> addrs = new HashSet<BookieSocketAddress>();
+        addrs.add(addr1);
+        addrs.add(addr2);
+        addrs.add(addr3);
+        addrs.add(addr4);
+        repp.onClusterChanged(addrs, new HashSet<BookieSocketAddress>());
+        Map<BookieSocketAddress, Long> bookiePendingMap = new HashMap<>();
+        bookiePendingMap.put(addr1, 1L); // -> this one first
+        bookiePendingMap.put(addr2, 7L);
+        bookiePendingMap.put(addr3, 1L);
+        bookiePendingMap.put(addr4, 5L);
+
+        DistributionSchedule.WriteSet origWriteSet = writeSet.copy();
+        DistributionSchedule.WriteSet reorderSet = repp.reorderReadSequence(
+            ensemble, getBookiesHealthInfo(new HashMap<>(), bookiePendingMap), 
writeSet);
+        LOG.info("reorder set : {}", reorderSet);
+        assertEquals("writeSet should be in original order", origWriteSet, 
reorderSet);
+    }
+
     @Test
     public void testReplaceBookieWithEnoughBookiesInSameRack() throws 
Exception {
         BookieSocketAddress addr1 = new BookieSocketAddress("127.0.0.2", 3181);

-- 
To stop receiving notification emails like this one, please contact
[email protected].

Reply via email to