This is an automated email from the ASF dual-hosted git repository.
sijie pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/bookkeeper.git
The following commit(s) were added to refs/heads/master by this push:
new 134a6c2 Issue #1489: Better Prevent Read Outliers during short-term
Bookie Slow-Down
134a6c2 is described below
commit 134a6c25a846731dc2ad0dc6c3e848170557b7b9
Author: Nicolas Michael <[email protected]>
AuthorDate: Wed Jun 13 17:41:02 2018 -0700
Issue #1489: Better Prevent Read Outliers during short-term Bookie Slow-Down
### Motivation
Bookies can temporarily be slow for a large number of reasons, often for
just a brief time of few milliseconds to seconds such as during Java Garbage
Collection or EntryLog compaction. For writes, latencies of individual bookies
are masked by acknowledging the client after a quorum of bookies have replied.
However for reads, we don't have any equivalent feature to mask short-term
latencies of individual bookies yet (in case of SequenceReadRequests). This PR
implements such a feature b [...]
### Changes
This change implements a configurable reordering of read requests in
Bokkeeper client based on the number of pending requests to each bookie that
could service the request. The intention is to mask the latency of one bookie
by directing a read request to another bookie that could potentially service
the request faster. This should help prevent read time outliers due to bookies
that temporarily are responsing slow, for example due to Java garbage
collection, compaction, or any other ki [...]
Reordering of reads is based on a threshold of relative queue length to
other bookies. Setting the threshold very low will more frequently reorder the
read set and potentially result in better latency, but will also reduce data
affinity of reads. Reads send to other than the preferred bookie have a low
chance to be served from file system cache on that bookie, and will likely
result in a physical read. Small thresholds therefore shuffle read requests
more among bookies and may lead to [...]
Master Issue: #1489
Author: Nicolas Michael <[email protected]>
Reviewers: Yiming Zang <[email protected]>, Sijie Guo <[email protected]>
This closes #1504 from nicmichael/ReadReordering, closes #1489
---
.../bookkeeper/client/BookKeeperClientStats.java | 1 +
.../org/apache/bookkeeper/client/LedgerHandle.java | 10 +-
.../client/RackawareEnsemblePlacementPolicy.java | 15 +-
.../RackawareEnsemblePlacementPolicyImpl.java | 51 ++++++-
.../client/RegionAwareEnsemblePlacementPolicy.java | 6 +-
.../bookkeeper/conf/ClientConfiguration.java | 25 ++++
.../TestRackawareEnsemblePlacementPolicy.java | 166 +++++++++++++++++++++
7 files changed, 264 insertions(+), 10 deletions(-)
diff --git
a/bookkeeper-server/src/main/java/org/apache/bookkeeper/client/BookKeeperClientStats.java
b/bookkeeper-server/src/main/java/org/apache/bookkeeper/client/BookKeeperClientStats.java
index 83e6421..749ac9c 100644
---
a/bookkeeper-server/src/main/java/org/apache/bookkeeper/client/BookKeeperClientStats.java
+++
b/bookkeeper-server/src/main/java/org/apache/bookkeeper/client/BookKeeperClientStats.java
@@ -54,6 +54,7 @@ public interface BookKeeperClientStats {
String LAC_UPDATE_MISSES = "LAC_UPDATE_MISSES";
String GET_BOOKIE_INFO_OP = "GET_BOOKIE_INFO";
String SPECULATIVE_READ_COUNT = "SPECULATIVE_READ_COUNT";
+ String READ_REQUESTS_REORDERED = "READ_REQUESTS_REORDERED";
// per channel stats
String CHANNEL_SCOPE = "per_channel_bookie_client";
diff --git
a/bookkeeper-server/src/main/java/org/apache/bookkeeper/client/LedgerHandle.java
b/bookkeeper-server/src/main/java/org/apache/bookkeeper/client/LedgerHandle.java
index 9f90c5a..a2a37e9 100644
---
a/bookkeeper-server/src/main/java/org/apache/bookkeeper/client/LedgerHandle.java
+++
b/bookkeeper-server/src/main/java/org/apache/bookkeeper/client/LedgerHandle.java
@@ -99,6 +99,8 @@ import org.slf4j.LoggerFactory;
public class LedgerHandle implements WriteHandle {
static final Logger LOG = LoggerFactory.getLogger(LedgerHandle.class);
+ static final long PENDINGREQ_NOTWRITABLE_MASK = 0x01L << 62;
+
final byte[] ledgerKey;
LedgerMetadata metadata;
final BookKeeper bk;
@@ -224,7 +226,13 @@ public class LedgerHandle implements WriteHandle {
@Override
public long getBookiePendingRequests(BookieSocketAddress
bookieSocketAddress) {
PerChannelBookieClientPool pcbcPool =
bk.bookieClient.lookupClient(bookieSocketAddress);
- return pcbcPool == null ? 0 :
pcbcPool.getNumPendingCompletionRequests();
+ if (pcbcPool == null) {
+ return 0;
+ } else if (pcbcPool.isWritable(ledgerId)) {
+ return pcbcPool.getNumPendingCompletionRequests();
+ } else {
+ return pcbcPool.getNumPendingCompletionRequests() |
PENDINGREQ_NOTWRITABLE_MASK;
+ }
}
};
diff --git
a/bookkeeper-server/src/main/java/org/apache/bookkeeper/client/RackawareEnsemblePlacementPolicy.java
b/bookkeeper-server/src/main/java/org/apache/bookkeeper/client/RackawareEnsemblePlacementPolicy.java
index 32f94f3..e3ce8ff 100644
---
a/bookkeeper-server/src/main/java/org/apache/bookkeeper/client/RackawareEnsemblePlacementPolicy.java
+++
b/bookkeeper-server/src/main/java/org/apache/bookkeeper/client/RackawareEnsemblePlacementPolicy.java
@@ -50,19 +50,22 @@ public class RackawareEnsemblePlacementPolicy extends
RackawareEnsemblePlacement
HashedWheelTimer
timer,
boolean
reorderReadsRandom,
int
stabilizePeriodSeconds,
+ int
reorderThresholdPendingRequests,
boolean isWeighted,
int
maxWeightMultiple,
int
minNumRacksPerWriteQuorum,
StatsLogger
statsLogger) {
if (stabilizePeriodSeconds > 0) {
- super.initialize(dnsResolver, timer, reorderReadsRandom, 0,
isWeighted, maxWeightMultiple,
- minNumRacksPerWriteQuorum, statsLogger);
- slave = new
RackawareEnsemblePlacementPolicyImpl(enforceDurability);
- slave.initialize(dnsResolver, timer, reorderReadsRandom,
stabilizePeriodSeconds, isWeighted,
+ super.initialize(dnsResolver, timer, reorderReadsRandom, 0,
reorderThresholdPendingRequests, isWeighted,
maxWeightMultiple, minNumRacksPerWriteQuorum, statsLogger);
+ slave = new
RackawareEnsemblePlacementPolicyImpl(enforceDurability);
+ slave.initialize(dnsResolver, timer, reorderReadsRandom,
stabilizePeriodSeconds,
+ reorderThresholdPendingRequests, isWeighted,
maxWeightMultiple,
+ minNumRacksPerWriteQuorum, statsLogger);
} else {
- super.initialize(dnsResolver, timer, reorderReadsRandom,
stabilizePeriodSeconds, isWeighted,
- maxWeightMultiple, minNumRacksPerWriteQuorum, statsLogger);
+ super.initialize(dnsResolver, timer, reorderReadsRandom,
stabilizePeriodSeconds,
+ reorderThresholdPendingRequests, isWeighted,
maxWeightMultiple,
+ minNumRacksPerWriteQuorum, statsLogger);
slave = null;
}
return this;
diff --git
a/bookkeeper-server/src/main/java/org/apache/bookkeeper/client/RackawareEnsemblePlacementPolicyImpl.java
b/bookkeeper-server/src/main/java/org/apache/bookkeeper/client/RackawareEnsemblePlacementPolicyImpl.java
index 7416372..23afaa9 100644
---
a/bookkeeper-server/src/main/java/org/apache/bookkeeper/client/RackawareEnsemblePlacementPolicyImpl.java
+++
b/bookkeeper-server/src/main/java/org/apache/bookkeeper/client/RackawareEnsemblePlacementPolicyImpl.java
@@ -198,11 +198,13 @@ public class RackawareEnsemblePlacementPolicyImpl extends
TopologyAwareEnsembleP
protected boolean reorderReadsRandom = false;
protected boolean enforceDurability = false;
protected int stabilizePeriodSeconds = 0;
+ protected int reorderThresholdPendingRequests = 0;
// looks like these only assigned in the same thread as constructor,
immediately after constructor;
// no need to make volatile
protected StatsLogger statsLogger = null;
protected OpStatsLogger bookiesJoinedCounter = null;
protected OpStatsLogger bookiesLeftCounter = null;
+ protected OpStatsLogger readReorderedCounter = null;
private String defaultRack = NetworkTopology.DEFAULT_RACK;
@@ -232,6 +234,7 @@ public class RackawareEnsemblePlacementPolicyImpl extends
TopologyAwareEnsembleP
HashedWheelTimer
timer,
boolean
reorderReadsRandom,
int
stabilizePeriodSeconds,
+ int
reorderThresholdPendingRequests,
boolean
isWeighted,
int
maxWeightMultiple,
int
minNumRacksPerWriteQuorum,
@@ -240,8 +243,10 @@ public class RackawareEnsemblePlacementPolicyImpl extends
TopologyAwareEnsembleP
this.statsLogger = statsLogger;
this.bookiesJoinedCounter =
statsLogger.getOpStatsLogger(BookKeeperServerStats.BOOKIES_JOINED);
this.bookiesLeftCounter =
statsLogger.getOpStatsLogger(BookKeeperServerStats.BOOKIES_LEFT);
+ this.readReorderedCounter =
statsLogger.getOpStatsLogger(BookKeeperClientStats.READ_REQUESTS_REORDERED);
this.reorderReadsRandom = reorderReadsRandom;
this.stabilizePeriodSeconds = stabilizePeriodSeconds;
+ this.reorderThresholdPendingRequests = reorderThresholdPendingRequests;
this.dnsResolver = new DNSResolverDecorator(dnsResolver, () ->
this.getDefaultRack());
this.timer = timer;
this.minNumRacksPerWriteQuorum = minNumRacksPerWriteQuorum;
@@ -330,6 +335,7 @@ public class RackawareEnsemblePlacementPolicyImpl extends
TopologyAwareEnsembleP
timer,
conf.getBoolean(REPP_RANDOM_READ_REORDERING, false),
conf.getNetworkTopologyStabilizePeriodSeconds(),
+ conf.getReorderThresholdPendingRequests(),
conf.getDiskWeightBasedPlacementEnabled(),
conf.getBookieMaxWeightMultipleForWeightBasedPlacement(),
conf.getMinNumRacksPerWriteQuorum(),
@@ -952,7 +958,11 @@ public class RackawareEnsemblePlacementPolicyImpl extends
TopologyAwareEnsembleP
@Override
public void registerSlowBookie(BookieSocketAddress bookieSocketAddress,
long entryId) {
- slowBookies.put(bookieSocketAddress, entryId);
+ if (reorderThresholdPendingRequests <= 0) {
+ // only put bookies on slowBookies list if
reorderThresholdPendingRequests is *not* set (0);
+ // otherwise, rely on reordering of reads based on
reorderThresholdPendingRequests
+ slowBookies.put(bookieSocketAddress, entryId);
+ }
}
@Override
@@ -1026,7 +1036,45 @@ public class RackawareEnsemblePlacementPolicyImpl
extends TopologyAwareEnsembleP
}
}
+ boolean reordered = false;
+ if (reorderThresholdPendingRequests > 0) {
+ // if there are no slow or unavailable bookies, capture each
bookie's number of
+ // pending request to reorder requests based on a threshold of
pending requests
+
+ // number of pending requests per bookie (same index as writeSet)
+ long[] pendingReqs = new long[writeSet.size()];
+ int bestBookieIdx = -1;
+
+ for (int i = 0; i < writeSet.size(); i++) {
+ pendingReqs[i] =
bookiesHealthInfo.getBookiePendingRequests(ensemble.get(writeSet.get(i)));
+ if (bestBookieIdx < 0 || pendingReqs[i] <
pendingReqs[bestBookieIdx]) {
+ bestBookieIdx = i;
+ }
+ }
+
+ // reorder the writeSet if the currently first bookie in our
writeSet has at
+ // least
+ // reorderThresholdPendingRequests more outstanding request than
the best bookie
+ if (bestBookieIdx > 0 && pendingReqs[0] >=
pendingReqs[bestBookieIdx] + reorderThresholdPendingRequests) {
+ // We're not reordering the entire write set, but only move
the best bookie
+ // to the first place. Chances are good that this bookie will
be fast enough
+ // to not trigger the speculativeReadTimeout. But even if it
hits that timeout,
+ // things may have changed by then so much that whichever
bookie we put second
+ // may actually not be the second-best choice any more.
+ if (LOG.isDebugEnabled()) {
+ LOG.debug("read set reordered from {} ({} pending) to {}
({} pending)",
+ ensemble.get(writeSet.get(0)), pendingReqs[0],
ensemble.get(writeSet.get(bestBookieIdx)),
+ pendingReqs[bestBookieIdx]);
+ }
+ writeSet.moveAndShift(bestBookieIdx, 0);
+ reordered = true;
+ }
+ }
+
if (!isAnyBookieUnavailable) {
+ if (reordered) {
+ readReorderedCounter.registerSuccessfulValue(1);
+ }
return writeSet;
}
@@ -1137,6 +1185,7 @@ public class RackawareEnsemblePlacementPolicyImpl extends
TopologyAwareEnsembleP
for (int i = 0; i < writeSet.size(); i++) {
writeSet.set(i, writeSet.get(i) & ~MASK_BITS);
}
+ readReorderedCounter.registerSuccessfulValue(1);
return writeSet;
}
diff --git
a/bookkeeper-server/src/main/java/org/apache/bookkeeper/client/RegionAwareEnsemblePlacementPolicy.java
b/bookkeeper-server/src/main/java/org/apache/bookkeeper/client/RegionAwareEnsemblePlacementPolicy.java
index 6205174..218781b 100644
---
a/bookkeeper-server/src/main/java/org/apache/bookkeeper/client/RegionAwareEnsemblePlacementPolicy.java
+++
b/bookkeeper-server/src/main/java/org/apache/bookkeeper/client/RegionAwareEnsemblePlacementPolicy.java
@@ -130,7 +130,8 @@ public class RegionAwareEnsemblePlacementPolicy extends
RackawareEnsemblePlaceme
if (null == perRegionPlacement.get(region)) {
perRegionPlacement.put(region, new
RackawareEnsemblePlacementPolicy()
.initialize(dnsResolver, timer,
this.reorderReadsRandom, this.stabilizePeriodSeconds,
- this.isWeighted, this.maxWeightMultiple,
this.minNumRacksPerWriteQuorum, statsLogger)
+ this.reorderThresholdPendingRequests,
this.isWeighted, this.maxWeightMultiple,
+ this.minNumRacksPerWriteQuorum, statsLogger)
.withDefaultRack(NetworkTopology.DEFAULT_REGION_AND_RACK));
}
@@ -178,7 +179,8 @@ public class RegionAwareEnsemblePlacementPolicy extends
RackawareEnsemblePlaceme
for (String region: regions) {
perRegionPlacement.put(region, new
RackawareEnsemblePlacementPolicy(true)
.initialize(dnsResolver, timer,
this.reorderReadsRandom, this.stabilizePeriodSeconds,
- this.isWeighted, this.maxWeightMultiple,
this.minNumRacksPerWriteQuorum, statsLogger)
+ this.reorderThresholdPendingRequests,
this.isWeighted, this.maxWeightMultiple,
+ this.minNumRacksPerWriteQuorum, statsLogger)
.withDefaultRack(NetworkTopology.DEFAULT_REGION_AND_RACK));
}
minRegionsForDurability =
conf.getInt(REPP_MINIMUM_REGIONS_FOR_DURABILITY,
diff --git
a/bookkeeper-server/src/main/java/org/apache/bookkeeper/conf/ClientConfiguration.java
b/bookkeeper-server/src/main/java/org/apache/bookkeeper/conf/ClientConfiguration.java
index 9f09c37..340b40b 100644
---
a/bookkeeper-server/src/main/java/org/apache/bookkeeper/conf/ClientConfiguration.java
+++
b/bookkeeper-server/src/main/java/org/apache/bookkeeper/conf/ClientConfiguration.java
@@ -149,6 +149,7 @@ public class ClientConfiguration extends
AbstractConfiguration<ClientConfigurati
// Ensemble Placement Policy
protected static final String ENSEMBLE_PLACEMENT_POLICY =
"ensemblePlacementPolicy";
protected static final String NETWORK_TOPOLOGY_STABILIZE_PERIOD_SECONDS =
"networkTopologyStabilizePeriodSeconds";
+ protected static final String READ_REORDER_THRESHOLD_PENDING_REQUESTS =
"readReorderThresholdPendingRequests";
protected static final String ENSEMBLE_PLACEMENT_POLICY_ORDER_SLOW_BOOKIES
=
"ensemblePlacementPolicyOrderSlowBookies";
@@ -1127,6 +1128,30 @@ public class ClientConfiguration extends
AbstractConfiguration<ClientConfigurati
}
/**
+ * Get the threshold for the number of pending requests beyond which to
reorder
+ * reads. If <= zero, this feature is turned off.
+ *
+ * @return the threshold for the number of pending requests beyond which to
+ * reorder reads.
+ */
+ public int getReorderThresholdPendingRequests() {
+ return getInt(READ_REORDER_THRESHOLD_PENDING_REQUESTS, 0);
+ }
+
+ /**
+ * Set the threshold for the number of pending requests beyond which to
reorder
+ * reads. If zero, this feature is turned off.
+ *
+ * @param threshold
+ * The threshold for the number of pending requests beyond
which to
+ * reorder reads.
+ */
+ public ClientConfiguration setReorderThresholdPendingRequests(int
threshold) {
+ setProperty(READ_REORDER_THRESHOLD_PENDING_REQUESTS, threshold);
+ return this;
+ }
+
+ /**
* Get the network topology stabilize period in seconds. if it is zero,
this feature is turned off.
*
* @return network topology stabilize period in seconds.
diff --git
a/bookkeeper-server/src/test/java/org/apache/bookkeeper/client/TestRackawareEnsemblePlacementPolicy.java
b/bookkeeper-server/src/test/java/org/apache/bookkeeper/client/TestRackawareEnsemblePlacementPolicy.java
index ad9c9c8..cdff679 100644
---
a/bookkeeper-server/src/test/java/org/apache/bookkeeper/client/TestRackawareEnsemblePlacementPolicy.java
+++
b/bookkeeper-server/src/test/java/org/apache/bookkeeper/client/TestRackawareEnsemblePlacementPolicy.java
@@ -372,6 +372,172 @@ public class TestRackawareEnsemblePlacementPolicy extends
TestCase {
assertEquals(expectedSet, reorderSet);
}
+ /*
+ * Tests the reordering of the writeSet based on number of pending
requests.
+ * Expect the third bookie to be placed first since its number of pending
requests
+ * is READ_REORDER_THRESHOLD_PENDING_REQUESTS=10 less than the originally
first bookie.
+ */
+ @Test
+ public void testPendingRequestsReorder() throws Exception {
+ repp.uninitalize();
+ updateMyRack("/r1/rack1");
+
+ repp = new RackawareEnsemblePlacementPolicy();
+ ClientConfiguration conf = (ClientConfiguration) this.conf.clone();
+ conf.setReorderThresholdPendingRequests(10);
+ repp.initialize(conf, Optional.<DNSToSwitchMapping>empty(), timer,
DISABLE_ALL, NullStatsLogger.INSTANCE);
+ repp.withDefaultRack(NetworkTopology.DEFAULT_REGION_AND_RACK);
+
+ // Update cluster
+ Set<BookieSocketAddress> addrs = new HashSet<BookieSocketAddress>();
+ addrs.add(addr1);
+ addrs.add(addr2);
+ addrs.add(addr3);
+ addrs.add(addr4);
+ repp.onClusterChanged(addrs, new HashSet<BookieSocketAddress>());
+ Map<BookieSocketAddress, Long> bookiePendingMap = new HashMap<>();
+ bookiePendingMap.put(addr1, 20L);
+ bookiePendingMap.put(addr2, 7L);
+ bookiePendingMap.put(addr3, 1L); // best bookie -> this one first
+ bookiePendingMap.put(addr4, 5L);
+
+ DistributionSchedule.WriteSet origWriteSet = writeSet.copy();
+ DistributionSchedule.WriteSet reorderSet = repp.reorderReadSequence(
+ ensemble, getBookiesHealthInfo(new HashMap<>(), bookiePendingMap),
writeSet);
+ DistributionSchedule.WriteSet expectedSet = writeSetFromValues(2, 0,
1, 3);
+ LOG.info("reorder set : {}", reorderSet);
+ assertEquals("expect bookie idx 2 first", expectedSet, reorderSet);
+ }
+
+ /*
+ * Tests the reordering of the writeSet based on number of pending
requests for
+ * an ensemble that is larger than the writeSet.
+ * Expect the sixth bookie to be placed first since its number of pending
requests
+ * is READ_REORDER_THRESHOLD_PENDING_REQUESTS=10 less than the originally
first bookie.
+ */
+ @Test
+ public void testPendingRequestsReorderLargeEnsemble() throws Exception {
+ repp.uninitalize();
+ updateMyRack("/r1/rack1");
+
+ repp = new RackawareEnsemblePlacementPolicy();
+ ClientConfiguration conf = (ClientConfiguration) this.conf.clone();
+ conf.setReorderThresholdPendingRequests(10);
+ repp.initialize(conf, Optional.<DNSToSwitchMapping>empty(), timer,
DISABLE_ALL, NullStatsLogger.INSTANCE);
+ repp.withDefaultRack(NetworkTopology.DEFAULT_REGION_AND_RACK);
+
+ // Update cluster
+ BookieSocketAddress addr5 = new BookieSocketAddress("127.0.0.6", 3181);
+ BookieSocketAddress addr6 = new BookieSocketAddress("127.0.0.7", 3181);
+ BookieSocketAddress addr7 = new BookieSocketAddress("127.0.0.8", 3181);
+ Set<BookieSocketAddress> addrs = new HashSet<BookieSocketAddress>();
+ addrs.add(addr1);
+ addrs.add(addr2);
+ addrs.add(addr3);
+ addrs.add(addr4);
+ addrs.add(addr5);
+ addrs.add(addr6);
+ addrs.add(addr7);
+ repp.onClusterChanged(addrs, new HashSet<BookieSocketAddress>());
+ Map<BookieSocketAddress, Long> bookiePendingMap = new HashMap<>();
+ bookiePendingMap.put(addr1, 1L); // not in write set
+ bookiePendingMap.put(addr2, 20L);
+ bookiePendingMap.put(addr3, 0L); // not in write set
+ bookiePendingMap.put(addr4, 12L);
+ bookiePendingMap.put(addr5, 9L); // not in write set
+ bookiePendingMap.put(addr6, 2L); // best bookie -> this one first
+ bookiePendingMap.put(addr7, 10L);
+ ArrayList<BookieSocketAddress> ensemble = new
ArrayList<BookieSocketAddress>();
+ ensemble.add(addr1);
+ ensemble.add(addr2);
+ ensemble.add(addr3);
+ ensemble.add(addr4);
+ ensemble.add(addr5);
+ ensemble.add(addr6);
+ ensemble.add(addr7);
+
+ DistributionSchedule.WriteSet writeSet = writeSetFromValues(1, 3, 5,
6);
+ DistributionSchedule.WriteSet origWriteSet = writeSet.copy();
+ DistributionSchedule.WriteSet reorderSet = repp.reorderReadSequence(
+ ensemble, getBookiesHealthInfo(new HashMap<>(),
bookiePendingMap), writeSet);
+ DistributionSchedule.WriteSet expectedSet = writeSetFromValues(5, 1,
3, 6);
+ LOG.info("reorder set : {}", reorderSet);
+ assertEquals("expect bookie idx 5 first", expectedSet, reorderSet);
+ }
+
+ /*
+ * Tests the reordering of the writeSet based on number of pending
requests.
+ * Expect no reordering in this case since the currently first bookie's
number of
+ * pending requests is less than
READ_REORDER_THRESHOLD_PENDING_REQUESTS=10 lower
+ * than the best bookie.
+ */
+ @Test
+ public void testPendingRequestsNoReorder1() throws Exception {
+ repp.uninitalize();
+ updateMyRack("/r1/rack1");
+
+ repp = new RackawareEnsemblePlacementPolicy();
+ ClientConfiguration conf = (ClientConfiguration) this.conf.clone();
+ conf.setReorderThresholdPendingRequests(10);
+ repp.initialize(conf, Optional.<DNSToSwitchMapping>empty(), timer,
DISABLE_ALL, NullStatsLogger.INSTANCE);
+ repp.withDefaultRack(NetworkTopology.DEFAULT_REGION_AND_RACK);
+
+ // Update cluster
+ Set<BookieSocketAddress> addrs = new HashSet<BookieSocketAddress>();
+ addrs.add(addr1);
+ addrs.add(addr2);
+ addrs.add(addr3);
+ addrs.add(addr4);
+ repp.onClusterChanged(addrs, new HashSet<BookieSocketAddress>());
+ Map<BookieSocketAddress, Long> bookiePendingMap = new HashMap<>();
+ bookiePendingMap.put(addr1, 10L); // -> this one first
+ bookiePendingMap.put(addr2, 7L);
+ bookiePendingMap.put(addr3, 1L); // best bookie, but below threshold
+ bookiePendingMap.put(addr4, 5L);
+
+ DistributionSchedule.WriteSet origWriteSet = writeSet.copy();
+ DistributionSchedule.WriteSet reorderSet = repp.reorderReadSequence(
+ ensemble, getBookiesHealthInfo(new HashMap<>(), bookiePendingMap),
writeSet);
+ LOG.info("reorder set : {}", reorderSet);
+ assertEquals("writeSet should be in original order", origWriteSet,
reorderSet);
+ }
+
+ /*
+ * Tests the reordering of the writeSet based on number of pending
requests.
+ * Expect no reordering in this case since the currently first bookie's
number of
+ * pending requests is lowest among all bookies already.
+ */
+ @Test
+ public void testPendingRequestsNoReorder2() throws Exception {
+ repp.uninitalize();
+ updateMyRack("/r1/rack1");
+
+ repp = new RackawareEnsemblePlacementPolicy();
+ ClientConfiguration conf = (ClientConfiguration) this.conf.clone();
+ conf.setReorderThresholdPendingRequests(10);
+ repp.initialize(conf, Optional.<DNSToSwitchMapping>empty(), timer,
DISABLE_ALL, NullStatsLogger.INSTANCE);
+ repp.withDefaultRack(NetworkTopology.DEFAULT_REGION_AND_RACK);
+
+ // Update cluster
+ Set<BookieSocketAddress> addrs = new HashSet<BookieSocketAddress>();
+ addrs.add(addr1);
+ addrs.add(addr2);
+ addrs.add(addr3);
+ addrs.add(addr4);
+ repp.onClusterChanged(addrs, new HashSet<BookieSocketAddress>());
+ Map<BookieSocketAddress, Long> bookiePendingMap = new HashMap<>();
+ bookiePendingMap.put(addr1, 1L); // -> this one first
+ bookiePendingMap.put(addr2, 7L);
+ bookiePendingMap.put(addr3, 1L);
+ bookiePendingMap.put(addr4, 5L);
+
+ DistributionSchedule.WriteSet origWriteSet = writeSet.copy();
+ DistributionSchedule.WriteSet reorderSet = repp.reorderReadSequence(
+ ensemble, getBookiesHealthInfo(new HashMap<>(), bookiePendingMap),
writeSet);
+ LOG.info("reorder set : {}", reorderSet);
+ assertEquals("writeSet should be in original order", origWriteSet,
reorderSet);
+ }
+
@Test
public void testReplaceBookieWithEnoughBookiesInSameRack() throws
Exception {
BookieSocketAddress addr1 = new BookieSocketAddress("127.0.0.2", 3181);
--
To stop receiving notification emails like this one, please contact
[email protected].