This is an automated email from the ASF dual-hosted git repository.
sijie pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/bookkeeper.git
The following commit(s) were added to refs/heads/master by this push:
new cc84042 ISSUE #709: Add Slow Bookkeeper Servers to Placement Policy
for read ordering
cc84042 is described below
commit cc84042a4d494fde42d215ead28093033da6861a
Author: Philip Su <[email protected]>
AuthorDate: Tue Dec 19 10:44:41 2017 -0800
ISSUE #709: Add Slow Bookkeeper Servers to Placement Policy for read
ordering
Descriptions of the changes in this PR:
Maintain a list of slow bookkeeper hosts, which are tried only after
readonly bookkeeper hosts are tried. Bookkeeper hosts can be categorized as
"slow" when a speculative timeout occurs on that bookkeeper host.
Master Issue:: 709
Author: Philip Su <[email protected]>
Author: Sijie Guo <[email protected]>
Author: philipsu522 <[email protected]>
Author: Ivan Kelly <[email protected]>
Reviewers: Ivan Kelly <[email protected]>, Yiming Zang
<[email protected]>, Jia Zhai <None>, Enrico Olivelli <[email protected]>
This closes #883 from philipsu522/psu/ISSUE-709-2, closes #709
---
.../bookie/LocalBookieEnsemblePlacementPolicy.java | 10 +-
.../bookkeeper/client/BookiesHealthInfo.java | 47 ++++++
.../client/DefaultEnsemblePlacementPolicy.java | 9 +-
.../bookkeeper/client/EnsemblePlacementPolicy.java | 27 +++-
.../org/apache/bookkeeper/client/LedgerHandle.java | 24 +++
.../apache/bookkeeper/client/PendingReadOp.java | 19 ++-
.../client/RackawareEnsemblePlacementPolicy.java | 8 +-
.../RackawareEnsemblePlacementPolicyImpl.java | 151 ++++++++++++++++--
.../client/ReadLastConfirmedAndEntryOp.java | 17 ++-
.../client/RegionAwareEnsemblePlacementPolicy.java | 110 ++-----------
.../TopologyAwareEnsemblePlacementPolicy.java | 7 +-
.../bookkeeper/conf/ClientConfiguration.java | 23 +++
.../org/apache/bookkeeper/proto/BookieClient.java | 2 +-
.../proto/DefaultPerChannelBookieClientPool.java | 9 ++
.../bookkeeper/proto/PerChannelBookieClient.java | 4 +
.../proto/PerChannelBookieClientPool.java | 6 +-
.../TestRackawareEnsemblePlacementPolicy.java | 157 ++++++++++++++++++-
.../TestRegionAwareEnsemblePlacementPolicy.java | 170 +++++++++++++++++++--
.../bookkeeper/client/TestSpeculativeRead.java | 25 ++-
19 files changed, 662 insertions(+), 163 deletions(-)
diff --git
a/bookkeeper-server/src/main/java/org/apache/bookkeeper/bookie/LocalBookieEnsemblePlacementPolicy.java
b/bookkeeper-server/src/main/java/org/apache/bookkeeper/bookie/LocalBookieEnsemblePlacementPolicy.java
index 2e87398..7c6bad9 100644
---
a/bookkeeper-server/src/main/java/org/apache/bookkeeper/bookie/LocalBookieEnsemblePlacementPolicy.java
+++
b/bookkeeper-server/src/main/java/org/apache/bookkeeper/bookie/LocalBookieEnsemblePlacementPolicy.java
@@ -28,6 +28,7 @@ import java.util.Optional;
import java.util.Set;
import org.apache.bookkeeper.client.BKException.BKNotEnoughBookiesException;
import org.apache.bookkeeper.client.BookieInfoReader.BookieInfo;
+import org.apache.bookkeeper.client.BookiesHealthInfo;
import org.apache.bookkeeper.client.DistributionSchedule;
import org.apache.bookkeeper.client.EnsemblePlacementPolicy;
import org.apache.bookkeeper.conf.ClientConfiguration;
@@ -88,9 +89,14 @@ public class LocalBookieEnsemblePlacementPolicy implements
EnsemblePlacementPoli
}
@Override
+ public void registerSlowBookie(BookieSocketAddress bookieSocketAddress,
long entryId) {
+ return;
+ }
+
+ @Override
public DistributionSchedule.WriteSet reorderReadSequence(
ArrayList<BookieSocketAddress> ensemble,
- Map<BookieSocketAddress, Long> bookieFailureHistory,
+ BookiesHealthInfo bookiesHealthInfo,
DistributionSchedule.WriteSet writeSet) {
return null;
}
@@ -98,7 +104,7 @@ public class LocalBookieEnsemblePlacementPolicy implements
EnsemblePlacementPoli
@Override
public DistributionSchedule.WriteSet reorderReadLACSequence(
ArrayList<BookieSocketAddress> ensemble,
- Map<BookieSocketAddress, Long> bookieFailureHistory,
+ BookiesHealthInfo bookiesHealthInfo,
DistributionSchedule.WriteSet writeSet) {
return null;
}
diff --git
a/bookkeeper-server/src/main/java/org/apache/bookkeeper/client/BookiesHealthInfo.java
b/bookkeeper-server/src/main/java/org/apache/bookkeeper/client/BookiesHealthInfo.java
new file mode 100644
index 0000000..b040440
--- /dev/null
+++
b/bookkeeper-server/src/main/java/org/apache/bookkeeper/client/BookiesHealthInfo.java
@@ -0,0 +1,47 @@
+/**
+ *
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ *
+ */
+package org.apache.bookkeeper.client;
+
+import org.apache.bookkeeper.net.BookieSocketAddress;
+
+/**
+ * This interface returns heuristics used to determine the health of a
Bookkeeper server for read
+ * ordering.
+ */
+public interface BookiesHealthInfo {
+
+ /**
+ * Return the failure history for a bookie.
+ *
+ * @param bookieSocketAddress
+ * @return failed entries on a bookie, -1 if there have been no failures
+ */
+ long getBookieFailureHistory(BookieSocketAddress bookieSocketAddress);
+
+ /**
+ * Returns pending requests to a bookie.
+ *
+ * @param bookieSocketAddress
+ * @return number of pending requests
+ */
+ long getBookiePendingRequests(BookieSocketAddress bookieSocketAddress);
+
+}
diff --git
a/bookkeeper-server/src/main/java/org/apache/bookkeeper/client/DefaultEnsemblePlacementPolicy.java
b/bookkeeper-server/src/main/java/org/apache/bookkeeper/client/DefaultEnsemblePlacementPolicy.java
index eab8954..219086d 100644
---
a/bookkeeper-server/src/main/java/org/apache/bookkeeper/client/DefaultEnsemblePlacementPolicy.java
+++
b/bookkeeper-server/src/main/java/org/apache/bookkeeper/client/DefaultEnsemblePlacementPolicy.java
@@ -153,9 +153,14 @@ public class DefaultEnsemblePlacementPolicy implements
EnsemblePlacementPolicy {
}
@Override
+ public void registerSlowBookie(BookieSocketAddress bookieSocketAddress,
long entryId) {
+ return;
+ }
+
+ @Override
public DistributionSchedule.WriteSet reorderReadSequence(
ArrayList<BookieSocketAddress> ensemble,
- Map<BookieSocketAddress, Long> bookieFailureHistory,
+ BookiesHealthInfo bookiesHealthInfo,
DistributionSchedule.WriteSet writeSet) {
return writeSet;
}
@@ -163,7 +168,7 @@ public class DefaultEnsemblePlacementPolicy implements
EnsemblePlacementPolicy {
@Override
public DistributionSchedule.WriteSet reorderReadLACSequence(
ArrayList<BookieSocketAddress> ensemble,
- Map<BookieSocketAddress, Long> bookieFailureHistory,
+ BookiesHealthInfo bookiesHealthInfo,
DistributionSchedule.WriteSet writeSet) {
writeSet.addMissingIndices(ensemble.size());
return writeSet;
diff --git
a/bookkeeper-server/src/main/java/org/apache/bookkeeper/client/EnsemblePlacementPolicy.java
b/bookkeeper-server/src/main/java/org/apache/bookkeeper/client/EnsemblePlacementPolicy.java
index 855ee75..7656a49 100644
---
a/bookkeeper-server/src/main/java/org/apache/bookkeeper/client/EnsemblePlacementPolicy.java
+++
b/bookkeeper-server/src/main/java/org/apache/bookkeeper/client/EnsemblePlacementPolicy.java
@@ -20,12 +20,12 @@ package org.apache.bookkeeper.client;
import io.netty.util.HashedWheelTimer;
import java.util.ArrayList;
import java.util.Collection;
-import java.util.List;
import java.util.Map;
import java.util.Optional;
import java.util.Set;
import org.apache.bookkeeper.client.BKException.BKNotEnoughBookiesException;
import org.apache.bookkeeper.client.BookieInfoReader.BookieInfo;
+import org.apache.bookkeeper.client.DistributionSchedule.WriteSet;
import org.apache.bookkeeper.common.annotation.InterfaceAudience;
import org.apache.bookkeeper.common.annotation.InterfaceStability;
import org.apache.bookkeeper.conf.ClientConfiguration;
@@ -151,7 +151,8 @@ import org.apache.bookkeeper.stats.StatsLogger;
*
* <h3>How to choose bookies to do speculative reads?</h3>
*
- * <p>{@link #reorderReadSequence(ArrayList, List, Map)} and {@link
#reorderReadLACSequence(ArrayList, List, Map)} are
+ * <p>{@link #reorderReadSequence(ArrayList, BookiesHealthInfo, WriteSet)} and
+ * {@link #reorderReadLACSequence(ArrayList, BookiesHealthInfo, WriteSet)} are
* two methods exposed by the placement policy, to help client determine a
better read sequence according to the
* network topology and the bookie failure history.
*
@@ -290,12 +291,22 @@ public interface EnsemblePlacementPolicy {
throws BKNotEnoughBookiesException;
/**
+ * Register a bookie as slow so that it is tried after available and
read-only bookies.
+ *
+ * @param bookieSocketAddress
+ * Address of bookie host
+ * @param entryId
+ * Entry ID that caused a speculative timeout on the bookie.
+ */
+ void registerSlowBookie(BookieSocketAddress bookieSocketAddress, long
entryId);
+
+ /**
* Reorder the read sequence of a given write quorum <i>writeSet</i>.
*
* @param ensemble
* Ensemble to read entries.
- * @param bookieFailureHistory
- * Observed failures on the bookies
+ * @param bookiesHealthInfo
+ * Health info for bookies
* @param writeSet
* Write quorum to read entries. This will be modified, rather
than
* allocating a new WriteSet.
@@ -305,7 +316,7 @@ public interface EnsemblePlacementPolicy {
*/
DistributionSchedule.WriteSet reorderReadSequence(
ArrayList<BookieSocketAddress> ensemble,
- Map<BookieSocketAddress, Long> bookieFailureHistory,
+ BookiesHealthInfo bookiesHealthInfo,
DistributionSchedule.WriteSet writeSet);
@@ -314,8 +325,8 @@ public interface EnsemblePlacementPolicy {
*
* @param ensemble
* Ensemble to read entries.
- * @param bookieFailureHistory
- * Observed failures on the bookies
+ * @param bookiesHealthInfo
+ * Health info for bookies
* @param writeSet
* Write quorum to read entries. This will be modified, rather
than
* allocating a new WriteSet.
@@ -325,7 +336,7 @@ public interface EnsemblePlacementPolicy {
*/
DistributionSchedule.WriteSet reorderReadLACSequence(
ArrayList<BookieSocketAddress> ensemble,
- Map<BookieSocketAddress, Long> bookieFailureHistory,
+ BookiesHealthInfo bookiesHealthInfo,
DistributionSchedule.WriteSet writeSet);
/**
diff --git
a/bookkeeper-server/src/main/java/org/apache/bookkeeper/client/LedgerHandle.java
b/bookkeeper-server/src/main/java/org/apache/bookkeeper/client/LedgerHandle.java
index 32eeab8..d761530 100644
---
a/bookkeeper-server/src/main/java/org/apache/bookkeeper/client/LedgerHandle.java
+++
b/bookkeeper-server/src/main/java/org/apache/bookkeeper/client/LedgerHandle.java
@@ -77,6 +77,7 @@ import
org.apache.bookkeeper.proto.BookkeeperInternalCallbacks;
import org.apache.bookkeeper.proto.BookkeeperInternalCallbacks.GenericCallback;
import
org.apache.bookkeeper.proto.BookkeeperInternalCallbacks.TimedGenericCallback;
import org.apache.bookkeeper.proto.DataFormats.LedgerMetadataFormat.State;
+import org.apache.bookkeeper.proto.PerChannelBookieClientPool;
import org.apache.bookkeeper.stats.Counter;
import org.apache.bookkeeper.stats.Gauge;
import
org.apache.bookkeeper.util.OrderedSafeExecutor.OrderedSafeGenericCallback;
@@ -106,6 +107,7 @@ public class LedgerHandle implements WriteHandle {
final LoadingCache<BookieSocketAddress, Long> bookieFailureHistory;
final boolean enableParallelRecoveryRead;
final int recoveryReadBatchSize;
+ final BookiesHealthInfo bookiesHealthInfo;
final EnumSet<WriteFlag> writeFlags;
ScheduledFuture<?> timeoutFuture = null;
@@ -174,6 +176,19 @@ public class LedgerHandle implements WriteHandle {
return -1L;
}
});
+ this.bookiesHealthInfo = new BookiesHealthInfo() {
+ @Override
+ public long getBookieFailureHistory(BookieSocketAddress
bookieSocketAddress) {
+ Long lastFailure =
bookieFailureHistory.getIfPresent(bookieSocketAddress);
+ return lastFailure == null ? -1L : lastFailure;
+ }
+
+ @Override
+ public long getBookiePendingRequests(BookieSocketAddress
bookieSocketAddress) {
+ PerChannelBookieClientPool pcbcPool =
bk.bookieClient.lookupClient(bookieSocketAddress);
+ return pcbcPool == null ? 0 :
pcbcPool.getNumPendingCompletionRequests();
+ }
+ };
ensembleChangeCounter =
bk.getStatsLogger().getCounter(BookKeeperClientStats.ENSEMBLE_CHANGES);
lacUpdateHitsCounter =
bk.getStatsLogger().getCounter(BookKeeperClientStats.LAC_UPDATE_HITS);
@@ -333,6 +348,15 @@ public class LedgerHandle implements WriteHandle {
return distributionSchedule;
}
+ /**
+ * Get the health info for bookies for this ledger.
+ *
+ * @return BookiesHealthInfo for every bookie in the write set.
+ */
+ BookiesHealthInfo getBookiesHealthInfo() {
+ return bookiesHealthInfo;
+ }
+
void writeLedgerConfig(GenericCallback<Void> writeCb) {
if (LOG.isDebugEnabled()) {
LOG.debug("Writing metadata to ledger manager: {}, {}",
this.ledgerId, metadata.getVersion());
diff --git
a/bookkeeper-server/src/main/java/org/apache/bookkeeper/client/PendingReadOp.java
b/bookkeeper-server/src/main/java/org/apache/bookkeeper/client/PendingReadOp.java
index 2920231..32816bf 100644
---
a/bookkeeper-server/src/main/java/org/apache/bookkeeper/client/PendingReadOp.java
+++
b/bookkeeper-server/src/main/java/org/apache/bookkeeper/client/PendingReadOp.java
@@ -86,16 +86,18 @@ class PendingReadOp implements ReadEntryCallback,
SafeRunnable {
final ArrayList<BookieSocketAddress> ensemble;
final DistributionSchedule.WriteSet writeSet;
final LedgerEntryImpl entryImpl;
+ final long eId;
LedgerEntryRequest(ArrayList<BookieSocketAddress> ensemble, long lId,
long eId) {
this.entryImpl = LedgerEntryImpl.create(lId, eId);
this.ensemble = ensemble;
+ this.eId = eId;
if (lh.bk.isReorderReadSequence()) {
writeSet = lh.bk.getPlacementPolicy()
.reorderReadSequence(
ensemble,
- lh.bookieFailureHistory.asMap(),
+ lh.getBookiesHealthInfo(),
lh.distributionSchedule.getWriteSet(eId));
} else {
writeSet = lh.distributionSchedule.getWriteSet(eId);
@@ -420,6 +422,21 @@ class PendingReadOp implements ReadEntryCallback,
SafeRunnable {
}
}
+ @Override
+ boolean complete(int bookieIndex, BookieSocketAddress host, ByteBuf
buffer) {
+ boolean completed = super.complete(bookieIndex, host, buffer);
+ if (completed) {
+ int numReplicasTried = getNextReplicaIndexToReadFrom();
+ // Check if any speculative reads were issued and mark any
slow bookies before
+ // the first successful speculative read as "slow"
+ for (int i = 0; i < numReplicasTried - 1; i++) {
+ int slowBookieIndex = writeSet.get(i);
+ BookieSocketAddress slowBookieSocketAddress =
ensemble.get(slowBookieIndex);
+
lh.bk.placementPolicy.registerSlowBookie(slowBookieSocketAddress, eId);
+ }
+ }
+ return completed;
+ }
}
PendingReadOp(LedgerHandle lh,
diff --git
a/bookkeeper-server/src/main/java/org/apache/bookkeeper/client/RackawareEnsemblePlacementPolicy.java
b/bookkeeper-server/src/main/java/org/apache/bookkeeper/client/RackawareEnsemblePlacementPolicy.java
index db8797a..1691f1e 100644
---
a/bookkeeper-server/src/main/java/org/apache/bookkeeper/client/RackawareEnsemblePlacementPolicy.java
+++
b/bookkeeper-server/src/main/java/org/apache/bookkeeper/client/RackawareEnsemblePlacementPolicy.java
@@ -121,18 +121,18 @@ public class RackawareEnsemblePlacementPolicy extends
RackawareEnsemblePlacement
@Override
public DistributionSchedule.WriteSet reorderReadSequence(
ArrayList<BookieSocketAddress> ensemble,
- Map<BookieSocketAddress, Long> bookieFailureHistory,
+ BookiesHealthInfo bookiesHealthInfo,
DistributionSchedule.WriteSet writeSet) {
- return super.reorderReadSequence(ensemble, bookieFailureHistory,
+ return super.reorderReadSequence(ensemble, bookiesHealthInfo,
writeSet);
}
@Override
public DistributionSchedule.WriteSet reorderReadLACSequence(
ArrayList<BookieSocketAddress> ensemble,
- Map<BookieSocketAddress, Long> bookieFailureHistory,
+ BookiesHealthInfo bookiesHealthInfo,
DistributionSchedule.WriteSet writeSet) {
- return super.reorderReadLACSequence(ensemble, bookieFailureHistory,
+ return super.reorderReadLACSequence(ensemble, bookiesHealthInfo,
writeSet);
}
diff --git
a/bookkeeper-server/src/main/java/org/apache/bookkeeper/client/RackawareEnsemblePlacementPolicyImpl.java
b/bookkeeper-server/src/main/java/org/apache/bookkeeper/client/RackawareEnsemblePlacementPolicyImpl.java
index f0572a6..735324b 100644
---
a/bookkeeper-server/src/main/java/org/apache/bookkeeper/client/RackawareEnsemblePlacementPolicyImpl.java
+++
b/bookkeeper-server/src/main/java/org/apache/bookkeeper/client/RackawareEnsemblePlacementPolicyImpl.java
@@ -18,7 +18,11 @@
package org.apache.bookkeeper.client;
import static com.google.common.base.Preconditions.checkNotNull;
+import static
org.apache.bookkeeper.client.RegionAwareEnsemblePlacementPolicy.UNKNOWN_REGION;
+import com.google.common.cache.Cache;
+import com.google.common.cache.CacheBuilder;
+import com.google.common.cache.CacheLoader;
import com.google.common.collect.ImmutableSet;
import com.google.common.collect.Sets;
@@ -37,6 +41,7 @@ import java.util.Map;
import java.util.Optional;
import java.util.Set;
import java.util.concurrent.ThreadLocalRandom;
+import java.util.concurrent.TimeUnit;
import java.util.concurrent.locks.ReentrantReadWriteLock;
import java.util.function.Supplier;
@@ -87,7 +92,8 @@ class RackawareEnsemblePlacementPolicyImpl extends
TopologyAwareEnsemblePlacemen
static final int REMOTE_MASK = 0x04 << 24;
static final int REMOTE_FAIL_MASK = 0x08 << 24;
static final int READ_ONLY_MASK = 0x10 << 24;
- static final int UNAVAIL_MASK = 0x20 << 24;
+ static final int SLOW_MASK = 0x20 << 24;
+ static final int UNAVAIL_MASK = 0x40 << 24;
static final int MASK_BITS = 0xFFF << 20;
static class DefaultResolver implements DNSToSwitchMapping {
@@ -183,6 +189,8 @@ class RackawareEnsemblePlacementPolicyImpl extends
TopologyAwareEnsemblePlacemen
protected DNSToSwitchMapping dnsResolver;
protected HashedWheelTimer timer;
protected final Map<BookieSocketAddress, BookieNode> knownBookies;
+ // Use a loading cache so slow bookies are expired. Use entryId as values.
+ protected Cache<BookieSocketAddress, Long> slowBookies;
protected BookieNode localNode;
protected final ReentrantReadWriteLock rwLock;
protected ImmutableSet<BookieSocketAddress> readOnlyBookies = null;
@@ -302,6 +310,14 @@ class RackawareEnsemblePlacementPolicyImpl extends
TopologyAwareEnsemblePlacemen
dnsResolver = new DefaultResolver(() -> this.getDefaultRack());
}
}
+ slowBookies = CacheBuilder.newBuilder()
+ .expireAfterWrite(conf.getBookieFailureHistoryExpirationMSec(),
TimeUnit.MILLISECONDS)
+ .build(new CacheLoader<BookieSocketAddress, Long>() {
+ @Override
+ public Long load(BookieSocketAddress key) throws Exception {
+ return -1L;
+ }
+ });
return initialize(
dnsResolver,
timer,
@@ -790,34 +806,104 @@ class RackawareEnsemblePlacementPolicyImpl extends
TopologyAwareEnsemblePlacemen
}
@Override
+ public void registerSlowBookie(BookieSocketAddress bookieSocketAddress,
long entryId) {
+ slowBookies.put(bookieSocketAddress, entryId);
+ }
+
+ @Override
public DistributionSchedule.WriteSet reorderReadSequence(
ArrayList<BookieSocketAddress> ensemble,
- Map<BookieSocketAddress, Long> bookieFailureHistory,
+ BookiesHealthInfo bookiesHealthInfo,
DistributionSchedule.WriteSet writeSet) {
+ Map<Integer, String> writeSetWithRegion = new HashMap<>();
+ for (int i = 0; i < writeSet.size(); i++) {
+ writeSetWithRegion.put(writeSet.get(i), "");
+ }
+ return reorderReadSequenceWithRegion(
+ ensemble, writeSet, writeSetWithRegion, bookiesHealthInfo, false,
"", writeSet.size());
+ }
+
+ /**
+ * This function orders the read sequence with a given region. For
region-unaware policies (e.g.
+ * RackAware), we pass in false for regionAware and an empty myRegion.
When this happens, any
+ * remote list will stay empty. The ordering is as follows (the R* at the
beginning of each list item
+ * is only present for region aware policies).
+ * 1. available (local) bookies
+ * 2. R* a remote bookie (based on remoteNodeInReorderSequence
+ * 3. R* remaining (local) bookies
+ * 4. R* remaining remote bookies
+ * 5. read only bookies
+ * 6. slow bookies
+ * 7. unavailable bookies
+ *
+ * @param ensemble
+ * ensemble of bookies
+ * @param writeSet
+ * write set
+ * @param writeSetWithRegion
+ * write set with region information
+ * @param bookiesHealthInfo
+ * heuristics about health of boookies
+ * @param regionAware
+ * whether or not a region-aware policy is used
+ * @param myRegion
+ * current region of policy
+ * @param remoteNodeInReorderSequence
+ * number of local bookies to try before trying a remote bookie
+ * @return ordering of bookies to send read to
+ */
+ DistributionSchedule.WriteSet reorderReadSequenceWithRegion(
+ ArrayList<BookieSocketAddress> ensemble,
+ DistributionSchedule.WriteSet writeSet,
+ Map<Integer, String> writeSetWithRegion,
+ BookiesHealthInfo bookiesHealthInfo,
+ boolean regionAware,
+ String myRegion,
+ int remoteNodeInReorderSequence) {
+ boolean useRegionAware = regionAware &&
(!myRegion.equals(UNKNOWN_REGION));
int ensembleSize = ensemble.size();
for (int i = 0; i < writeSet.size(); i++) {
int idx = writeSet.get(i);
BookieSocketAddress address = ensemble.get(idx);
- Long lastFailedEntryOnBookie = bookieFailureHistory.get(address);
+ String region = writeSetWithRegion.get(idx);
+ Long lastFailedEntryOnBookie =
bookiesHealthInfo.getBookieFailureHistory(address);
if (null == knownBookies.get(address)) {
// there isn't too much differences between readonly bookies
// from unavailable bookies. since there
// is no write requests to them, so we shouldn't try reading
- // from readonly bookie in prior to writable
- // bookies.
+ // from readonly bookie prior to writable bookies.
if ((null == readOnlyBookies)
- || !readOnlyBookies.contains(address)) {
+ || !readOnlyBookies.contains(address)) {
writeSet.set(i, idx | UNAVAIL_MASK);
} else {
- writeSet.set(i, idx | READ_ONLY_MASK);
+ if (slowBookies.getIfPresent(address) != null) {
+ long numPendingReqs =
bookiesHealthInfo.getBookiePendingRequests(address);
+ // use slow bookies with less pending requests first
+ long slowIdx = numPendingReqs * ensembleSize + idx;
+ writeSet.set(i, (int) (slowIdx & ~MASK_BITS) |
SLOW_MASK);
+ } else {
+ writeSet.set(i, idx | READ_ONLY_MASK);
+ }
+ }
+ } else if (lastFailedEntryOnBookie < 0) {
+ if (slowBookies.getIfPresent(address) != null) {
+ long numPendingReqs =
bookiesHealthInfo.getBookiePendingRequests(address);
+ long slowIdx = numPendingReqs * ensembleSize + idx;
+ writeSet.set(i, (int) (slowIdx & ~MASK_BITS) | SLOW_MASK);
+ } else {
+ if (useRegionAware && !myRegion.equals(region)) {
+ writeSet.set(i, idx | REMOTE_MASK);
+ } else {
+ writeSet.set(i, idx | LOCAL_MASK);
+ }
}
} else {
- if ((lastFailedEntryOnBookie == null)
- || (lastFailedEntryOnBookie < 0)) {
- writeSet.set(i, idx | LOCAL_MASK);
+ // use bookies with earlier failed entryIds first
+ long failIdx = lastFailedEntryOnBookie * ensembleSize + idx;
+ if (useRegionAware && !myRegion.equals(region)) {
+ writeSet.set(i, (int) (failIdx & ~MASK_BITS) |
REMOTE_FAIL_MASK);
} else {
- long failIdx = lastFailedEntryOnBookie * ensembleSize +
idx;
writeSet.set(i, (int) (failIdx & ~MASK_BITS) |
LOCAL_FAIL_MASK);
}
}
@@ -836,15 +922,54 @@ class RackawareEnsemblePlacementPolicyImpl extends
TopologyAwareEnsemblePlacemen
if (reorderReadsRandom) {
shuffleWithMask(writeSet, LOCAL_MASK, MASK_BITS);
+ shuffleWithMask(writeSet, REMOTE_MASK, MASK_BITS);
shuffleWithMask(writeSet, READ_ONLY_MASK, MASK_BITS);
shuffleWithMask(writeSet, UNAVAIL_MASK, MASK_BITS);
}
- // remove all masks
+ // nodes within a region are ordered as follows
+ // (Random?) list of nodes that have no history of failure
+ // Nodes with Failure history are ordered in the reverse
+ // order of the most recent entry that generated an error
+ // The sort will have put them in correct order,
+ // so remove the bits that sort by age.
+ for (int i = 0; i < writeSet.size(); i++) {
+ int mask = writeSet.get(i) & MASK_BITS;
+ int idx = (writeSet.get(i) & ~MASK_BITS) % ensembleSize;
+ if (mask == LOCAL_FAIL_MASK) {
+ writeSet.set(i, LOCAL_MASK | idx);
+ } else if (mask == REMOTE_FAIL_MASK) {
+ writeSet.set(i, REMOTE_MASK | idx);
+ } else if (mask == SLOW_MASK) {
+ writeSet.set(i, SLOW_MASK | idx);
+ }
+ }
+
+ // Insert a node from the remote region at the specified location so
+ // we try more than one region within the max allowed latency
+ int firstRemote = -1;
for (int i = 0; i < writeSet.size(); i++) {
- writeSet.set(i, (writeSet.get(i) & ~MASK_BITS) % ensembleSize);
+ if ((writeSet.get(i) & MASK_BITS) == REMOTE_MASK) {
+ firstRemote = i;
+ break;
+ }
+ }
+ if (firstRemote != -1) {
+ int i = 0;
+ for (; i < remoteNodeInReorderSequence
+ && i < writeSet.size(); i++) {
+ if ((writeSet.get(i) & MASK_BITS) != LOCAL_MASK) {
+ break;
+ }
+ }
+ writeSet.moveAndShift(firstRemote, i);
}
+
+ // remove all masks
+ for (int i = 0; i < writeSet.size(); i++) {
+ writeSet.set(i, writeSet.get(i) & ~MASK_BITS);
+ }
return writeSet;
}
diff --git
a/bookkeeper-server/src/main/java/org/apache/bookkeeper/client/ReadLastConfirmedAndEntryOp.java
b/bookkeeper-server/src/main/java/org/apache/bookkeeper/client/ReadLastConfirmedAndEntryOp.java
index fe2f64a..07cb318 100644
---
a/bookkeeper-server/src/main/java/org/apache/bookkeeper/client/ReadLastConfirmedAndEntryOp.java
+++
b/bookkeeper-server/src/main/java/org/apache/bookkeeper/client/ReadLastConfirmedAndEntryOp.java
@@ -83,7 +83,7 @@ class ReadLastConfirmedAndEntryOp implements
BookkeeperInternalCallbacks.ReadEnt
this.writeSet = lh.distributionSchedule.getWriteSet(eId);
if (lh.bk.reorderReadSequence) {
this.orderedEnsemble =
lh.bk.placementPolicy.reorderReadLACSequence(ensemble,
- lh.bookieFailureHistory.asMap(), writeSet.copy());
+ lh.getBookiesHealthInfo(), writeSet.copy());
} else {
this.orderedEnsemble = writeSet.copy();
}
@@ -407,6 +407,21 @@ class ReadLastConfirmedAndEntryOp implements
BookkeeperInternalCallbacks.ReadEnt
}
}
+ @Override
+ boolean complete(int bookieIndex, BookieSocketAddress host, ByteBuf
buffer, long entryId) {
+ boolean completed = super.complete(bookieIndex, host, buffer,
entryId);
+ if (completed) {
+ int numReplicasTried = getNextReplicaIndexToReadFrom();
+ // Check if any speculative reads were issued and mark any
bookies before the
+ // first speculative read as slow
+ for (int i = 0; i < numReplicasTried; i++) {
+ int slowBookieIndex = orderedEnsemble.get(i);
+ BookieSocketAddress slowBookieSocketAddress =
ensemble.get(slowBookieIndex);
+
lh.bk.placementPolicy.registerSlowBookie(slowBookieSocketAddress, entryId);
+ }
+ }
+ return completed;
+ }
}
ReadLastConfirmedAndEntryOp(LedgerHandle lh,
diff --git
a/bookkeeper-server/src/main/java/org/apache/bookkeeper/client/RegionAwareEnsemblePlacementPolicy.java
b/bookkeeper-server/src/main/java/org/apache/bookkeeper/client/RegionAwareEnsemblePlacementPolicy.java
index ba8798e..f990b3f 100644
---
a/bookkeeper-server/src/main/java/org/apache/bookkeeper/client/RegionAwareEnsemblePlacementPolicy.java
+++
b/bookkeeper-server/src/main/java/org/apache/bookkeeper/client/RegionAwareEnsemblePlacementPolicy.java
@@ -521,122 +521,30 @@ public class RegionAwareEnsemblePlacementPolicy extends
RackawareEnsemblePlaceme
@Override
public final DistributionSchedule.WriteSet reorderReadSequence(
ArrayList<BookieSocketAddress> ensemble,
- Map<BookieSocketAddress, Long> bookieFailureHistory,
+ BookiesHealthInfo bookiesHealthInfo,
DistributionSchedule.WriteSet writeSet) {
if (UNKNOWN_REGION.equals(myRegion)) {
- return super.reorderReadSequence(ensemble, bookieFailureHistory,
- writeSet);
+ return super.reorderReadSequence(ensemble, bookiesHealthInfo,
writeSet);
} else {
- int ensembleSize = ensemble.size();
-
+ Map<Integer, String> writeSetWithRegion = new HashMap<>();
for (int i = 0; i < writeSet.size(); i++) {
int idx = writeSet.get(i);
- BookieSocketAddress address = ensemble.get(idx);
- String region = getRegion(address);
- Long lastFailedEntryOnBookie =
bookieFailureHistory.get(address);
- if (null == knownBookies.get(address)) {
- // there isn't too much differences between readonly
bookies
- // from unavailable bookies. since there
- // is no write requests to them, so we shouldn't try
reading
- // from readonly bookie in prior to writable bookies.
- if ((null == readOnlyBookies)
- || !readOnlyBookies.contains(address)) {
- writeSet.set(i, idx | UNAVAIL_MASK);
- } else {
- writeSet.set(i, idx | READ_ONLY_MASK);
- }
- } else if (region.equals(myRegion)) {
- if ((lastFailedEntryOnBookie == null)
- || (lastFailedEntryOnBookie < 0)) {
- writeSet.set(i, idx | LOCAL_MASK);
- } else {
- long failIdx = lastFailedEntryOnBookie * ensembleSize
+ idx;
- writeSet.set(i, (int) (failIdx & ~MASK_BITS) |
LOCAL_FAIL_MASK);
- }
- } else {
- if ((lastFailedEntryOnBookie == null)
- || (lastFailedEntryOnBookie < 0)) {
- writeSet.set(i, idx | REMOTE_MASK);
- } else {
- long failIdx = lastFailedEntryOnBookie * ensembleSize
+ idx;
- writeSet.set(i, (int) (failIdx & ~MASK_BITS)
- | REMOTE_FAIL_MASK);
- }
- }
- }
-
- // Add a mask to ensure the sort is stable, sort,
- // and then remove mask. This maintains stability as
- // long as there are fewer than 16 bookies in the write set.
- for (int i = 0; i < writeSet.size(); i++) {
- writeSet.set(i, writeSet.get(i) | ((i & 0xF) << 20));
- }
- writeSet.sort();
- for (int i = 0; i < writeSet.size(); i++) {
- writeSet.set(i, writeSet.get(i) & ~((0xF) << 20));
- }
-
- if (reorderReadsRandom) {
- shuffleWithMask(writeSet, LOCAL_MASK, MASK_BITS);
- shuffleWithMask(writeSet, REMOTE_MASK, MASK_BITS);
- shuffleWithMask(writeSet, READ_ONLY_MASK, MASK_BITS);
- shuffleWithMask(writeSet, UNAVAIL_MASK, MASK_BITS);
- }
-
- // nodes within a region are ordered as follows
- // (Random?) list of nodes that have no history of failure
- // Nodes with Failure history are ordered in the reverse
- // order of the most recent entry that generated an error
- // The sort will have put them in correct order,
- // so remove the bits that sort by age.
- for (int i = 0; i < writeSet.size(); i++) {
- int mask = writeSet.get(i) & MASK_BITS;
- int idx = (writeSet.get(i) & ~MASK_BITS) % ensembleSize;
- if (mask == LOCAL_FAIL_MASK) {
- writeSet.set(i, LOCAL_MASK | idx);
- } else if (mask == REMOTE_FAIL_MASK) {
- writeSet.set(i, REMOTE_MASK | idx);
- }
- }
-
- // Insert a node from the remote region at the specified location
so
- // we try more than one region within the max allowed latency
- int firstRemote = -1;
- for (int i = 0; i < writeSet.size(); i++) {
- if ((writeSet.get(i) & MASK_BITS) == REMOTE_MASK) {
- firstRemote = i;
- break;
- }
- }
- if (firstRemote != -1) {
- int i = 0;
- for (; i < REMOTE_NODE_IN_REORDER_SEQUENCE && i <
writeSet.size(); i++) {
- if ((writeSet.get(i) & MASK_BITS) != LOCAL_MASK) {
- break;
- }
- }
- writeSet.moveAndShift(firstRemote, i);
- }
-
-
- // remove all masks
- for (int i = 0; i < writeSet.size(); i++) {
- writeSet.set(i, writeSet.get(i) & ~MASK_BITS);
+ writeSetWithRegion.put(idx, getRegion(ensemble.get(idx)));
}
- return writeSet;
+ return super.reorderReadSequenceWithRegion(ensemble, writeSet,
writeSetWithRegion,
+ bookiesHealthInfo, true, myRegion,
REMOTE_NODE_IN_REORDER_SEQUENCE);
}
}
@Override
public final DistributionSchedule.WriteSet reorderReadLACSequence(
ArrayList<BookieSocketAddress> ensemble,
- Map<BookieSocketAddress, Long> bookieFailureHistory,
+ BookiesHealthInfo bookiesHealthInfo,
DistributionSchedule.WriteSet writeSet) {
if (UNKNOWN_REGION.equals(myRegion)) {
- return super.reorderReadLACSequence(ensemble, bookieFailureHistory,
- writeSet);
+ return super.reorderReadLACSequence(ensemble, bookiesHealthInfo,
writeSet);
}
- DistributionSchedule.WriteSet finalList =
reorderReadSequence(ensemble, bookieFailureHistory, writeSet);
+ DistributionSchedule.WriteSet finalList =
reorderReadSequence(ensemble, bookiesHealthInfo, writeSet);
finalList.addMissingIndices(ensemble.size());
return finalList;
}
diff --git
a/bookkeeper-server/src/main/java/org/apache/bookkeeper/client/TopologyAwareEnsemblePlacementPolicy.java
b/bookkeeper-server/src/main/java/org/apache/bookkeeper/client/TopologyAwareEnsemblePlacementPolicy.java
index 88953b3..b70cb0f 100644
---
a/bookkeeper-server/src/main/java/org/apache/bookkeeper/client/TopologyAwareEnsemblePlacementPolicy.java
+++
b/bookkeeper-server/src/main/java/org/apache/bookkeeper/client/TopologyAwareEnsemblePlacementPolicy.java
@@ -24,7 +24,6 @@ import com.google.common.collect.Sets;
import java.util.ArrayList;
import java.util.HashMap;
import java.util.HashSet;
-import java.util.Map;
import java.util.Set;
import org.apache.bookkeeper.net.BookieSocketAddress;
@@ -461,7 +460,7 @@ abstract class TopologyAwareEnsemblePlacementPolicy
implements
@Override
public DistributionSchedule.WriteSet reorderReadSequence(
ArrayList<BookieSocketAddress> ensemble,
- Map<BookieSocketAddress, Long> bookieFailureHistory,
+ BookiesHealthInfo bookiesHealthInfo,
DistributionSchedule.WriteSet writeSet) {
return writeSet;
}
@@ -469,10 +468,10 @@ abstract class TopologyAwareEnsemblePlacementPolicy
implements
@Override
public DistributionSchedule.WriteSet reorderReadLACSequence(
ArrayList<BookieSocketAddress> ensemble,
- Map<BookieSocketAddress, Long> bookieFailureHistory,
+ BookiesHealthInfo bookiesHealthInfo,
DistributionSchedule.WriteSet writeSet) {
DistributionSchedule.WriteSet retList = reorderReadSequence(
- ensemble, bookieFailureHistory, writeSet);
+ ensemble, bookiesHealthInfo, writeSet);
retList.addMissingIndices(ensemble.size());
return retList;
}
diff --git
a/bookkeeper-server/src/main/java/org/apache/bookkeeper/conf/ClientConfiguration.java
b/bookkeeper-server/src/main/java/org/apache/bookkeeper/conf/ClientConfiguration.java
index 257c367..4d9c774 100644
---
a/bookkeeper-server/src/main/java/org/apache/bookkeeper/conf/ClientConfiguration.java
+++
b/bookkeeper-server/src/main/java/org/apache/bookkeeper/conf/ClientConfiguration.java
@@ -112,6 +112,8 @@ public class ClientConfiguration extends
AbstractConfiguration<ClientConfigurati
// Ensemble Placement Policy
protected static final String ENSEMBLE_PLACEMENT_POLICY =
"ensemblePlacementPolicy";
protected static final String NETWORK_TOPOLOGY_STABILIZE_PERIOD_SECONDS =
"networkTopologyStabilizePeriodSeconds";
+ protected static final String ENSEMBLE_PLACEMENT_POLICY_ORDER_SLOW_BOOKIES
=
+ "ensemblePlacementPolicyOrderSlowBookies";
// Ledger Metadata Parameters
protected static final String STORE_SYSTEMTIME_AS_LEDGER_CREATION_TIME =
"storeSystemTimeAsLedgerCreationTime";
@@ -1089,6 +1091,27 @@ public class ClientConfiguration extends
AbstractConfiguration<ClientConfigurati
}
/**
+ * Whether to order slow bookies in placement policy.
+ *
+ * @return flag of whether to order slow bookies in placement policy or
not.
+ */
+ public boolean getEnsemblePlacementPolicySlowBookies() {
+ return getBoolean(ENSEMBLE_PLACEMENT_POLICY_ORDER_SLOW_BOOKIES, false);
+ }
+
+ /**
+ * Enable/Disable ordering slow bookies in placement policy.
+ *
+ * @param enabled
+ * flag to enable/disable ordering slow bookies in placement
policy.
+ * @return client configuration.
+ */
+ public ClientConfiguration setEnsemblePlacementPolicySlowBookies(boolean
enabled) {
+ setProperty(ENSEMBLE_PLACEMENT_POLICY_ORDER_SLOW_BOOKIES, enabled);
+ return this;
+ }
+
+ /**
* Whether to enable recording task execution stats.
*
* @return flag to enable/disable recording task execution stats.
diff --git
a/bookkeeper-server/src/main/java/org/apache/bookkeeper/proto/BookieClient.java
b/bookkeeper-server/src/main/java/org/apache/bookkeeper/proto/BookieClient.java
index 9b1ca9c..08ed194 100644
---
a/bookkeeper-server/src/main/java/org/apache/bookkeeper/proto/BookieClient.java
+++
b/bookkeeper-server/src/main/java/org/apache/bookkeeper/proto/BookieClient.java
@@ -157,7 +157,7 @@ public class BookieClient implements
PerChannelBookieClientFactory {
authProviderFactory, registry,
pcbcPool, shFactory);
}
- private PerChannelBookieClientPool lookupClient(BookieSocketAddress addr) {
+ public PerChannelBookieClientPool lookupClient(BookieSocketAddress addr) {
PerChannelBookieClientPool clientPool = channels.get(addr);
if (null == clientPool) {
closeLock.readLock().lock();
diff --git
a/bookkeeper-server/src/main/java/org/apache/bookkeeper/proto/DefaultPerChannelBookieClientPool.java
b/bookkeeper-server/src/main/java/org/apache/bookkeeper/proto/DefaultPerChannelBookieClientPool.java
index 41233cf..e57b8a7 100644
---
a/bookkeeper-server/src/main/java/org/apache/bookkeeper/proto/DefaultPerChannelBookieClientPool.java
+++
b/bookkeeper-server/src/main/java/org/apache/bookkeeper/proto/DefaultPerChannelBookieClientPool.java
@@ -119,4 +119,13 @@ class DefaultPerChannelBookieClientPool implements
PerChannelBookieClientPool,
pcbc.close(wait);
}
}
+
+ @Override
+ public long getNumPendingCompletionRequests() {
+ long numPending = 0;
+ for (PerChannelBookieClient pcbc : clients) {
+ numPending += pcbc.getNumPendingCompletionRequests();
+ }
+ return numPending;
+ }
}
diff --git
a/bookkeeper-server/src/main/java/org/apache/bookkeeper/proto/PerChannelBookieClient.java
b/bookkeeper-server/src/main/java/org/apache/bookkeeper/proto/PerChannelBookieClient.java
index 6e64b37..e0199ff 100644
---
a/bookkeeper-server/src/main/java/org/apache/bookkeeper/proto/PerChannelBookieClient.java
+++
b/bookkeeper-server/src/main/java/org/apache/bookkeeper/proto/PerChannelBookieClient.java
@@ -355,6 +355,10 @@ public class PerChannelBookieClient extends
ChannelInboundHandlerAdapter {
}
}
+ protected long getNumPendingCompletionRequests() {
+ return completionObjects.size();
+ }
+
protected ChannelFuture connect() {
if (LOG.isDebugEnabled()) {
LOG.debug("Connecting to bookie: {}", addr);
diff --git
a/bookkeeper-server/src/main/java/org/apache/bookkeeper/proto/PerChannelBookieClientPool.java
b/bookkeeper-server/src/main/java/org/apache/bookkeeper/proto/PerChannelBookieClientPool.java
index 80f00a5..97a6a26 100644
---
a/bookkeeper-server/src/main/java/org/apache/bookkeeper/proto/PerChannelBookieClientPool.java
+++
b/bookkeeper-server/src/main/java/org/apache/bookkeeper/proto/PerChannelBookieClientPool.java
@@ -25,7 +25,7 @@ import
org.apache.bookkeeper.proto.BookkeeperInternalCallbacks.GenericCallback;
/**
* An interface to manage channel pooling for bookie client.
*/
-interface PerChannelBookieClientPool {
+public interface PerChannelBookieClientPool {
/**
* intialize the pool. the implementation should not be blocked.
@@ -67,4 +67,8 @@ interface PerChannelBookieClientPool {
*/
void close(boolean wait);
+ /**
+ * Get the number of pending completion requests in the channel.
+ */
+ long getNumPendingCompletionRequests();
}
diff --git
a/bookkeeper-server/src/test/java/org/apache/bookkeeper/client/TestRackawareEnsemblePlacementPolicy.java
b/bookkeeper-server/src/test/java/org/apache/bookkeeper/client/TestRackawareEnsemblePlacementPolicy.java
index 1ce8f0c..637df1b 100644
---
a/bookkeeper-server/src/test/java/org/apache/bookkeeper/client/TestRackawareEnsemblePlacementPolicy.java
+++
b/bookkeeper-server/src/test/java/org/apache/bookkeeper/client/TestRackawareEnsemblePlacementPolicy.java
@@ -104,6 +104,25 @@ public class TestRackawareEnsemblePlacementPolicy extends
TestCase {
super.tearDown();
}
+ static BookiesHealthInfo getBookiesHealthInfo() {
+ return getBookiesHealthInfo(new HashMap<>(), new HashMap<>());
+ }
+
+ static BookiesHealthInfo getBookiesHealthInfo(Map<BookieSocketAddress,
Long> bookieFailureHistory,
+ Map<BookieSocketAddress,
Long> bookiePendingRequests) {
+ return new BookiesHealthInfo() {
+ @Override
+ public long getBookieFailureHistory(BookieSocketAddress
bookieSocketAddress) {
+ return bookieFailureHistory.getOrDefault(bookieSocketAddress,
-1L);
+ }
+
+ @Override
+ public long getBookiePendingRequests(BookieSocketAddress
bookieSocketAddress) {
+ return bookiePendingRequests.getOrDefault(bookieSocketAddress,
0L);
+ }
+ };
+ }
+
static void updateMyRack(String rack) throws Exception {
StaticDNSResolver.addNodeToRack(InetAddress.getLocalHost().getHostAddress(),
rack);
StaticDNSResolver.addNodeToRack(InetAddress.getLocalHost().getHostName(), rack);
@@ -131,7 +150,7 @@ public class TestRackawareEnsemblePlacementPolicy extends
TestCase {
DistributionSchedule.WriteSet origWriteSet = writeSet.copy();
DistributionSchedule.WriteSet reorderSet = repp.reorderReadSequence(
- ensemble, new HashMap<BookieSocketAddress, Long>(),
+ ensemble, getBookiesHealthInfo(),
writeSet);
DistributionSchedule.WriteSet expectedSet = writeSetFromValues(1, 2,
3, 0);
LOG.info("reorder set : {}", reorderSet);
@@ -162,7 +181,37 @@ public class TestRackawareEnsemblePlacementPolicy extends
TestCase {
DistributionSchedule.WriteSet origWriteSet = writeSet.copy();
DistributionSchedule.WriteSet reorderSet = repp.reorderReadSequence(
- ensemble, new HashMap<BookieSocketAddress, Long>(), writeSet);
+ ensemble, getBookiesHealthInfo(), writeSet);
+ DistributionSchedule.WriteSet expectedSet = writeSetFromValues(1, 2,
3, 0);
+ LOG.info("reorder set : {}", reorderSet);
+ assertFalse(reorderSet.equals(origWriteSet));
+ assertEquals(expectedSet, reorderSet);
+ }
+
+ @Test
+ public void testNodeSlow() throws Exception {
+ repp.uninitalize();
+ updateMyRack("/r1/rack1");
+
+ repp = new RackawareEnsemblePlacementPolicy();
+ repp.initialize(conf, Optional.<DNSToSwitchMapping>empty(), timer,
DISABLE_ALL, NullStatsLogger.INSTANCE);
+ repp.withDefaultRack(NetworkTopology.DEFAULT_REGION_AND_RACK);
+
+ // Update cluster
+ Set<BookieSocketAddress> addrs = new HashSet<BookieSocketAddress>();
+ addrs.add(addr1);
+ addrs.add(addr2);
+ addrs.add(addr3);
+ addrs.add(addr4);
+ repp.onClusterChanged(addrs, new HashSet<BookieSocketAddress>());
+ repp.registerSlowBookie(addr1, 0L);
+ Map<BookieSocketAddress, Long> bookiePendingMap = new HashMap<>();
+ bookiePendingMap.put(addr1, 1L);
+ repp.onClusterChanged(addrs, new HashSet<>());
+
+ DistributionSchedule.WriteSet origWriteSet = writeSet.copy();
+ DistributionSchedule.WriteSet reorderSet = repp.reorderReadSequence(
+ ensemble, getBookiesHealthInfo(new HashMap<>(), bookiePendingMap),
writeSet);
DistributionSchedule.WriteSet expectedSet = writeSetFromValues(1, 2,
3, 0);
LOG.info("reorder set : {}", reorderSet);
assertFalse(reorderSet.equals(origWriteSet));
@@ -170,6 +219,38 @@ public class TestRackawareEnsemblePlacementPolicy extends
TestCase {
}
@Test
+ public void testTwoNodesSlow() throws Exception {
+ repp.uninitalize();
+ updateMyRack("/r1/rack1");
+
+ repp = new RackawareEnsemblePlacementPolicy();
+ repp.initialize(conf, Optional.<DNSToSwitchMapping>empty(), timer,
DISABLE_ALL, NullStatsLogger.INSTANCE);
+ repp.withDefaultRack(NetworkTopology.DEFAULT_REGION_AND_RACK);
+
+ // Update cluster
+ Set<BookieSocketAddress> addrs = new HashSet<BookieSocketAddress>();
+ addrs.add(addr1);
+ addrs.add(addr2);
+ addrs.add(addr3);
+ addrs.add(addr4);
+ repp.onClusterChanged(addrs, new HashSet<BookieSocketAddress>());
+ repp.registerSlowBookie(addr1, 0L);
+ repp.registerSlowBookie(addr2, 0L);
+ Map<BookieSocketAddress, Long> bookiePendingMap = new HashMap<>();
+ bookiePendingMap.put(addr1, 1L);
+ bookiePendingMap.put(addr2, 2L);
+ repp.onClusterChanged(addrs, new HashSet<>());
+
+ DistributionSchedule.WriteSet origWriteSet = writeSet.copy();
+ DistributionSchedule.WriteSet reorderSet = repp.reorderReadSequence(
+ ensemble, getBookiesHealthInfo(new HashMap<>(), bookiePendingMap),
writeSet);
+ DistributionSchedule.WriteSet expectedSet = writeSetFromValues(2, 3,
0, 1);
+ LOG.info("reorder set : {}", reorderSet);
+ assertFalse(reorderSet.equals(origWriteSet));
+ assertEquals(expectedSet, reorderSet);
+ }
+
+ @Test
public void testTwoNodesDown() throws Exception {
repp.uninitalize();
updateMyRack("/r1/rack1");
@@ -191,7 +272,7 @@ public class TestRackawareEnsemblePlacementPolicy extends
TestCase {
DistributionSchedule.WriteSet origWriteSet = writeSet.copy();
DistributionSchedule.WriteSet reorderSet = repp.reorderReadSequence(
- ensemble, new HashMap<BookieSocketAddress, Long>(), writeSet);
+ ensemble, getBookiesHealthInfo(), writeSet);
DistributionSchedule.WriteSet expectedSet = writeSetFromValues(2, 3,
0, 1);
LOG.info("reorder set : {}", reorderSet);
assertFalse(reorderSet.equals(origWriteSet));
@@ -221,13 +302,79 @@ public class TestRackawareEnsemblePlacementPolicy extends
TestCase {
repp.onClusterChanged(addrs, roAddrs);
DistributionSchedule.WriteSet origWriteSet = writeSet.copy();
DistributionSchedule.WriteSet reorderSet = repp.reorderReadSequence(
- ensemble, new HashMap<BookieSocketAddress, Long>(), writeSet);
+ ensemble, getBookiesHealthInfo(), writeSet);
DistributionSchedule.WriteSet expectedSet = writeSetFromValues(2, 3,
1, 0);
assertFalse(reorderSet.equals(origWriteSet));
assertEquals(expectedSet, reorderSet);
}
@Test
+ public void testNodeDownAndNodeSlow() throws Exception {
+ repp.uninitalize();
+ updateMyRack("/r1/rack1");
+
+ repp = new RackawareEnsemblePlacementPolicy();
+ repp.initialize(conf, Optional.<DNSToSwitchMapping>empty(), timer,
DISABLE_ALL, NullStatsLogger.INSTANCE);
+ repp.withDefaultRack(NetworkTopology.DEFAULT_REGION_AND_RACK);
+
+ // Update cluster
+ Set<BookieSocketAddress> addrs = new HashSet<BookieSocketAddress>();
+ addrs.add(addr1);
+ addrs.add(addr2);
+ addrs.add(addr3);
+ addrs.add(addr4);
+ repp.onClusterChanged(addrs, new HashSet<BookieSocketAddress>());
+ repp.registerSlowBookie(addr1, 0L);
+ Map<BookieSocketAddress, Long> bookiePendingMap = new HashMap<>();
+ bookiePendingMap.put(addr1, 1L);
+ addrs.remove(addr2);
+ repp.onClusterChanged(addrs, new HashSet<>());
+
+ DistributionSchedule.WriteSet origWriteSet = writeSet.copy();
+ DistributionSchedule.WriteSet reorderSet = repp.reorderReadSequence(
+ ensemble, getBookiesHealthInfo(new HashMap<>(), bookiePendingMap),
writeSet);
+ DistributionSchedule.WriteSet expectedSet = writeSetFromValues(2, 3,
0, 1);
+ LOG.info("reorder set : {}", reorderSet);
+ assertFalse(reorderSet.equals(origWriteSet));
+ assertEquals(expectedSet, reorderSet);
+ }
+
+ @Test
+ public void testNodeDownAndReadOnlyAndNodeSlow() throws Exception {
+ repp.uninitalize();
+ updateMyRack("/r1/rack1");
+
+ repp = new RackawareEnsemblePlacementPolicy();
+ repp.initialize(conf, Optional.<DNSToSwitchMapping>empty(), timer,
DISABLE_ALL, NullStatsLogger.INSTANCE);
+ repp.withDefaultRack(NetworkTopology.DEFAULT_REGION_AND_RACK);
+
+ // Update cluster
+ Set<BookieSocketAddress> addrs = new HashSet<BookieSocketAddress>();
+ addrs.add(addr1);
+ addrs.add(addr2);
+ addrs.add(addr3);
+ addrs.add(addr4);
+ repp.onClusterChanged(addrs, new HashSet<BookieSocketAddress>());
+ addrs.remove(addr1);
+ addrs.remove(addr2);
+ Set<BookieSocketAddress> ro = new HashSet<BookieSocketAddress>();
+ ro.add(addr2);
+ repp.registerSlowBookie(addr3, 0L);
+ Map<BookieSocketAddress, Long> bookiePendingMap = new HashMap<>();
+ bookiePendingMap.put(addr3, 1L);
+ addrs.remove(addr2);
+ repp.onClusterChanged(addrs, ro);
+
+ DistributionSchedule.WriteSet origWriteSet = writeSet.copy();
+ DistributionSchedule.WriteSet reorderSet = repp.reorderReadSequence(
+ ensemble, getBookiesHealthInfo(new HashMap<>(), bookiePendingMap),
writeSet);
+ DistributionSchedule.WriteSet expectedSet = writeSetFromValues(3, 1,
2, 0);
+ LOG.info("reorder set : {}", reorderSet);
+ assertFalse(reorderSet.equals(origWriteSet));
+ assertEquals(expectedSet, reorderSet);
+ }
+
+ @Test
public void testReplaceBookieWithEnoughBookiesInSameRack() throws
Exception {
BookieSocketAddress addr1 = new BookieSocketAddress("127.0.0.2", 3181);
BookieSocketAddress addr2 = new BookieSocketAddress("127.0.0.3", 3181);
@@ -738,7 +885,7 @@ public class TestRackawareEnsemblePlacementPolicy extends
TestCase {
bookieFailures.put(addr2, 22L);
DistributionSchedule.WriteSet reoderSet = repp.reorderReadSequence(
- ensemble, bookieFailures, writeSet);
+ ensemble, getBookiesHealthInfo(bookieFailures, new
HashMap<>()), writeSet);
LOG.info("reorder set : {}", reoderSet);
assertEquals(ensemble.get(reoderSet.get(2)), addr1);
assertEquals(ensemble.get(reoderSet.get(3)), addr2);
diff --git
a/bookkeeper-server/src/test/java/org/apache/bookkeeper/client/TestRegionAwareEnsemblePlacementPolicy.java
b/bookkeeper-server/src/test/java/org/apache/bookkeeper/client/TestRegionAwareEnsemblePlacementPolicy.java
index 1dcf0e1..fe75b31 100644
---
a/bookkeeper-server/src/test/java/org/apache/bookkeeper/client/TestRegionAwareEnsemblePlacementPolicy.java
+++
b/bookkeeper-server/src/test/java/org/apache/bookkeeper/client/TestRegionAwareEnsemblePlacementPolicy.java
@@ -35,6 +35,7 @@ import java.util.ArrayList;
import java.util.HashMap;
import java.util.HashSet;
import java.util.List;
+import java.util.Map;
import java.util.Optional;
import java.util.Set;
import java.util.concurrent.TimeUnit;
@@ -118,6 +119,26 @@ public class TestRegionAwareEnsemblePlacementPolicy
extends TestCase {
super.tearDown();
}
+ static BookiesHealthInfo getBookiesHealthInfo() {
+ return getBookiesHealthInfo(new HashMap<>(), new HashMap<>());
+ }
+
+ static BookiesHealthInfo getBookiesHealthInfo(Map<BookieSocketAddress,
Long> bookieFailureHistory,
+ Map<BookieSocketAddress,
Long> bookiePendingRequests) {
+ return new BookiesHealthInfo() {
+ @Override
+ public long getBookieFailureHistory(BookieSocketAddress
bookieSocketAddress) {
+ return bookieFailureHistory.getOrDefault(bookieSocketAddress,
-1L);
+ }
+
+ @Override
+ public long getBookiePendingRequests(BookieSocketAddress
bookieSocketAddress) {
+ return bookiePendingRequests.getOrDefault(bookieSocketAddress,
0L);
+ }
+ };
+ }
+
+
@Test
public void testNotReorderReadIfInDefaultRack() throws Exception {
repp.uninitalize();
@@ -128,7 +149,7 @@ public class TestRegionAwareEnsemblePlacementPolicy extends
TestCase {
DistributionSchedule.WriteSet origWriteSet = writeSet.copy();
DistributionSchedule.WriteSet reorderSet = repp.reorderReadSequence(
- ensemble, new HashMap<BookieSocketAddress, Long>(), writeSet);
+ ensemble, getBookiesHealthInfo(), writeSet);
assertEquals(origWriteSet, reorderSet);
}
@@ -151,7 +172,7 @@ public class TestRegionAwareEnsemblePlacementPolicy extends
TestCase {
repp.onClusterChanged(addrs, new HashSet<BookieSocketAddress>());
DistributionSchedule.WriteSet reorderSet = repp.reorderReadSequence(
- ensemble, new HashMap<BookieSocketAddress, Long>(),
writeSet.copy());
+ ensemble, getBookiesHealthInfo(), writeSet.copy());
DistributionSchedule.WriteSet expectedSet = writeSetFromValues(0, 3,
1, 2);
LOG.info("write set : {}", writeSet);
LOG.info("reorder set : {}", reorderSet);
@@ -171,7 +192,7 @@ public class TestRegionAwareEnsemblePlacementPolicy extends
TestCase {
DistributionSchedule.WriteSet origWriteSet = writeSet.copy();
DistributionSchedule.WriteSet reorderSet = repp.reorderReadSequence(
- ensemble, new HashMap<BookieSocketAddress, Long>(), writeSet);
+ ensemble, getBookiesHealthInfo(), writeSet);
LOG.info("reorder set : {}", reorderSet);
assertEquals(origWriteSet, reorderSet);
}
@@ -196,7 +217,7 @@ public class TestRegionAwareEnsemblePlacementPolicy extends
TestCase {
DistributionSchedule.WriteSet origWriteSet = writeSet.copy();
DistributionSchedule.WriteSet reorderSet = repp.reorderReadSequence(
- ensemble, new HashMap<BookieSocketAddress, Long>(), writeSet);
+ ensemble, getBookiesHealthInfo(), writeSet);
DistributionSchedule.WriteSet expectedSet = writeSetFromValues(3, 1,
2, 0);
LOG.info("reorder set : {}", reorderSet);
assertFalse(reorderSet.equals(origWriteSet));
@@ -225,7 +246,7 @@ public class TestRegionAwareEnsemblePlacementPolicy extends
TestCase {
DistributionSchedule.WriteSet origWriteSet = writeSet.copy();
DistributionSchedule.WriteSet reorderSet = repp.reorderReadSequence(
- ensemble, new HashMap<BookieSocketAddress, Long>(), writeSet);
+ ensemble, getBookiesHealthInfo(), writeSet);
DistributionSchedule.WriteSet expectedSet = writeSetFromValues(3, 1,
2, 0);
LOG.info("reorder set : {}", reorderSet);
assertFalse(reorderSet.equals(origWriteSet));
@@ -233,6 +254,66 @@ public class TestRegionAwareEnsemblePlacementPolicy
extends TestCase {
}
@Test
+ public void testNodeSlow() throws Exception {
+ repp.uninitalize();
+ updateMyRack("/r1/rack1");
+
+ repp = new RegionAwareEnsemblePlacementPolicy();
+ repp.initialize(conf, Optional.<DNSToSwitchMapping>empty(), timer,
DISABLE_ALL, NullStatsLogger.INSTANCE);
+
+ // Update cluster
+ Set<BookieSocketAddress> addrs = new HashSet<BookieSocketAddress>();
+ addrs.add(addr1);
+ addrs.add(addr2);
+ addrs.add(addr3);
+ addrs.add(addr4);
+ repp.onClusterChanged(addrs, new HashSet<BookieSocketAddress>());
+ repp.registerSlowBookie(addr1, 0L);
+ Map<BookieSocketAddress, Long> bookiePendingMap = new HashMap<>();
+ bookiePendingMap.put(addr1, 1L);
+ repp.onClusterChanged(addrs, new HashSet<>());
+
+ DistributionSchedule.WriteSet origWriteSet = writeSet.copy();
+ DistributionSchedule.WriteSet reorderSet = repp.reorderReadSequence(
+ ensemble, getBookiesHealthInfo(new HashMap<>(), bookiePendingMap),
writeSet);
+ DistributionSchedule.WriteSet expectedSet = writeSetFromValues(3, 1,
2, 0);
+ LOG.info("reorder set : {}", reorderSet);
+ assertFalse(reorderSet.equals(origWriteSet));
+ assertEquals(expectedSet, reorderSet);
+ }
+
+ @Test
+ public void testTwoNodesSlow() throws Exception {
+ repp.uninitalize();
+ updateMyRack("/r1/rack1");
+
+ repp = new RegionAwareEnsemblePlacementPolicy();
+ repp.initialize(conf, Optional.<DNSToSwitchMapping>empty(), timer,
DISABLE_ALL, NullStatsLogger.INSTANCE);
+
+ // Update cluster
+ Set<BookieSocketAddress> addrs = new HashSet<BookieSocketAddress>();
+ addrs.add(addr1);
+ addrs.add(addr2);
+ addrs.add(addr3);
+ addrs.add(addr4);
+ repp.onClusterChanged(addrs, new HashSet<BookieSocketAddress>());
+ repp.registerSlowBookie(addr1, 0L);
+ repp.registerSlowBookie(addr2, 0L);
+ Map<BookieSocketAddress, Long> bookiePendingMap = new HashMap<>();
+ bookiePendingMap.put(addr1, 1L);
+ bookiePendingMap.put(addr2, 2L);
+ repp.onClusterChanged(addrs, new HashSet<>());
+
+ DistributionSchedule.WriteSet origWriteSet = writeSet.copy();
+ DistributionSchedule.WriteSet reorderSet = repp.reorderReadSequence(
+ ensemble, getBookiesHealthInfo(new HashMap<>(), bookiePendingMap),
writeSet);
+ DistributionSchedule.WriteSet expectedSet = writeSetFromValues(3, 2,
0, 1);
+ LOG.info("reorder set : {}", reorderSet);
+ assertFalse(reorderSet.equals(origWriteSet));
+ assertEquals(expectedSet, reorderSet);
+ }
+
+ @Test
public void testTwoNodesDown() throws Exception {
repp.uninitalize();
updateMyRack("/r1/rack1");
@@ -253,7 +334,37 @@ public class TestRegionAwareEnsemblePlacementPolicy
extends TestCase {
DistributionSchedule.WriteSet origWriteSet = writeSet.copy();
DistributionSchedule.WriteSet reorderSet = repp.reorderReadSequence(
- ensemble, new HashMap<BookieSocketAddress, Long>(), writeSet);
+ ensemble, getBookiesHealthInfo(), writeSet);
+ DistributionSchedule.WriteSet expectedSet = writeSetFromValues(3, 2,
0, 1);
+ LOG.info("reorder set : {}", reorderSet);
+ assertFalse(reorderSet.equals(origWriteSet));
+ assertEquals(expectedSet, reorderSet);
+ }
+
+ @Test
+ public void testNodeDownAndNodeSlow() throws Exception {
+ repp.uninitalize();
+ updateMyRack("/r1/rack1");
+
+ repp = new RegionAwareEnsemblePlacementPolicy();
+ repp.initialize(conf, Optional.<DNSToSwitchMapping>empty(), timer,
DISABLE_ALL, NullStatsLogger.INSTANCE);
+
+ // Update cluster
+ Set<BookieSocketAddress> addrs = new HashSet<BookieSocketAddress>();
+ addrs.add(addr1);
+ addrs.add(addr2);
+ addrs.add(addr3);
+ addrs.add(addr4);
+ repp.onClusterChanged(addrs, new HashSet<BookieSocketAddress>());
+ repp.registerSlowBookie(addr1, 0L);
+ Map<BookieSocketAddress, Long> bookiePendingMap = new HashMap<>();
+ bookiePendingMap.put(addr1, 1L);
+ addrs.remove(addr2);
+ repp.onClusterChanged(addrs, new HashSet<BookieSocketAddress>());
+
+ DistributionSchedule.WriteSet origWriteSet = writeSet.copy();
+ DistributionSchedule.WriteSet reorderSet = repp.reorderReadSequence(
+ ensemble, getBookiesHealthInfo(new HashMap<>(), bookiePendingMap),
writeSet);
DistributionSchedule.WriteSet expectedSet = writeSetFromValues(3, 2,
0, 1);
LOG.info("reorder set : {}", reorderSet);
assertFalse(reorderSet.equals(origWriteSet));
@@ -261,6 +372,39 @@ public class TestRegionAwareEnsemblePlacementPolicy
extends TestCase {
}
@Test
+ public void testNodeDownAndReadOnlyAndNodeSlow() throws Exception {
+ repp.uninitalize();
+ updateMyRack("/r1/rack1");
+
+ repp = new RegionAwareEnsemblePlacementPolicy();
+ repp.initialize(conf, Optional.<DNSToSwitchMapping>empty(), timer,
DISABLE_ALL, NullStatsLogger.INSTANCE);
+
+ // Update cluster
+ Set<BookieSocketAddress> addrs = new HashSet<BookieSocketAddress>();
+ addrs.add(addr1);
+ addrs.add(addr2);
+ addrs.add(addr3);
+ addrs.add(addr4);
+ repp.onClusterChanged(addrs, new HashSet<BookieSocketAddress>());
+ addrs.remove(addr1);
+ addrs.remove(addr2);
+ Set<BookieSocketAddress> ro = new HashSet<>();
+ ro.add(addr2);
+ repp.registerSlowBookie(addr3, 0L);
+ Map<BookieSocketAddress, Long> bookiePendingMap = new HashMap<>();
+ bookiePendingMap.put(addr3, 1L);
+ repp.onClusterChanged(addrs, ro);
+
+ DistributionSchedule.WriteSet origWriteSet = writeSet.copy();
+ DistributionSchedule.WriteSet reorderSet = repp.reorderReadSequence(
+ ensemble, getBookiesHealthInfo(new HashMap<>(), bookiePendingMap),
writeSet);
+ DistributionSchedule.WriteSet expectedSet = writeSetFromValues(3, 1,
2, 0);
+ LOG.info("reorder set : {}", reorderSet);
+ assertFalse(reorderSet.equals(origWriteSet));
+ assertEquals(expectedSet, reorderSet);
+ }
+
+ @Test
public void testReplaceBookieWithEnoughBookiesInSameRegion() throws
Exception {
BookieSocketAddress addr1 = new BookieSocketAddress("127.0.0.2", 3181);
BookieSocketAddress addr2 = new BookieSocketAddress("127.0.0.3", 3181);
@@ -1068,11 +1212,11 @@ public class TestRegionAwareEnsemblePlacementPolicy
extends TestCase {
if (isReadLAC) {
readSet = repp.reorderReadLACSequence(
ensemble,
- new HashMap<BookieSocketAddress, Long>(), writeSet);
+ getBookiesHealthInfo(), writeSet);
} else {
readSet = repp.reorderReadSequence(
ensemble,
- new HashMap<BookieSocketAddress, Long>(), writeSet);
+ getBookiesHealthInfo(), writeSet);
}
LOG.info("Reorder {} => {}.", origWriteSet, readSet);
@@ -1124,12 +1268,12 @@ public class TestRegionAwareEnsemblePlacementPolicy
extends TestCase {
if (isReadLAC) {
readSet = repp.reorderReadLACSequence(
ensemble,
- new HashMap<BookieSocketAddress, Long>(),
+ getBookiesHealthInfo(),
writeSet.copy());
} else {
readSet = repp.reorderReadSequence(
ensemble,
- new HashMap<BookieSocketAddress, Long>(),
+ getBookiesHealthInfo(),
writeSet.copy());
}
@@ -1199,11 +1343,11 @@ public class TestRegionAwareEnsemblePlacementPolicy
extends TestCase {
DistributionSchedule.WriteSet readSet;
if (isReadLAC) {
readSet = repp.reorderReadLACSequence(
- ensemble, new HashMap<BookieSocketAddress, Long>(),
+ ensemble, getBookiesHealthInfo(),
writeSet.copy());
} else {
readSet = repp.reorderReadSequence(
- ensemble, new HashMap<BookieSocketAddress, Long>(),
+ ensemble, getBookiesHealthInfo(),
writeSet.copy());
}
@@ -1293,7 +1437,7 @@ public class TestRegionAwareEnsemblePlacementPolicy
extends TestCase {
LOG.info("write set : {}", writeSet2);
DistributionSchedule.WriteSet reoderSet = repp.reorderReadSequence(
- ensemble, bookieFailures, writeSet2);
+ ensemble, getBookiesHealthInfo(bookieFailures, new
HashMap<>()), writeSet2);
LOG.info("reorder set : {}", reoderSet);
assertEquals(ensemble.get(reoderSet.get(0)), addr6);
assertEquals(ensemble.get(reoderSet.get(1)), addr7);
diff --git
a/bookkeeper-server/src/test/java/org/apache/bookkeeper/client/TestSpeculativeRead.java
b/bookkeeper-server/src/test/java/org/apache/bookkeeper/client/TestSpeculativeRead.java
index 41e5b2e..d692cea 100644
---
a/bookkeeper-server/src/test/java/org/apache/bookkeeper/client/TestSpeculativeRead.java
+++
b/bookkeeper-server/src/test/java/org/apache/bookkeeper/client/TestSpeculativeRead.java
@@ -20,6 +20,7 @@
*/
package org.apache.bookkeeper.client;
+import static org.junit.Assert.assertEquals;
import static org.junit.Assert.assertFalse;
import static org.junit.Assert.assertNull;
import static org.junit.Assert.assertTrue;
@@ -27,6 +28,8 @@ import static org.junit.Assert.assertTrue;
import java.util.ArrayList;
import java.util.BitSet;
import java.util.Enumeration;
+import java.util.HashSet;
+import java.util.Set;
import java.util.concurrent.CountDownLatch;
import java.util.concurrent.TimeUnit;
@@ -69,7 +72,9 @@ public class TestSpeculativeRead extends
BookKeeperClusterTestCase {
BookKeeper createClient(int specTimeout) throws Exception {
ClientConfiguration conf = new ClientConfiguration()
.setSpeculativeReadTimeout(specTimeout)
- .setReadTimeout(30000);
+ .setReadTimeout(30000)
+ .setReorderReadSequenceEnabled(true)
+ .setEnsemblePlacementPolicySlowBookies(true);
conf.setZkServers(zkUtil.getZooKeeperConnectString());
return new BookKeeper(conf);
}
@@ -150,6 +155,10 @@ public class TestSpeculativeRead extends
BookKeeperClusterTestCase {
lspec.asyncReadEntries(1, 1, speccb, null);
speccb.expectSuccess(4000);
nospeccb.expectTimeout(4000);
+ // Check that the second bookie is registered as slow at entryId 1
+ RackawareEnsemblePlacementPolicy rep =
(RackawareEnsemblePlacementPolicy) lspec.bk.placementPolicy;
+ assertTrue(rep.slowBookies.asMap().size() == 1);
+ assertTrue(rep.slowBookies.asMap().get(second) == 1L);
} finally {
sleepLatch.countDown();
lspec.close();
@@ -195,16 +204,18 @@ public class TestSpeculativeRead extends
BookKeeperClusterTestCase {
latch1.getDuration() >= timeout * 2
&& latch1.getDuration() < timeout * 3);
- // third should have to hit one timeouts (bookie 2)
+ // bookies 1 & 2 should be registered as slow bookies because of
speculative reads
+ Set<BookieSocketAddress> expectedSlowBookies = new HashSet<>();
+
expectedSlowBookies.add(l.getLedgerMetadata().getEnsembles().get(0L).get(1));
+
expectedSlowBookies.add(l.getLedgerMetadata().getEnsembles().get(0L).get(2));
+ assertEquals(((RackawareEnsemblePlacementPolicy)
l.bk.placementPolicy).slowBookies.asMap().keySet(),
+ expectedSlowBookies);
+
+ // third should not hit timeouts since bookies 1 & 2 are
registered as slow
// bookie 3 has the entry
LatchCallback latch2 = new LatchCallback();
l.asyncReadEntries(2, 2, latch2, null);
- latch2.expectTimeout(timeout / 2);
latch2.expectSuccess(timeout);
- LOG.info("Timeout {} latch2 duration {}", timeout,
latch2.getDuration());
- assertTrue("should have taken longer than one timeout, but less
than 2",
- latch2.getDuration() >= timeout
- && latch2.getDuration() < timeout * 2);
// fourth should have no timeout
// bookie 3 has the entry
--
To stop receiving notification emails like this one, please contact
['"[email protected]" <[email protected]>'].