This is an automated email from the ASF dual-hosted git repository.
sijie pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/bookkeeper.git
The following commit(s) were added to refs/heads/master by this push:
new 737f92b Add semantically meaningful return values to placement policy
737f92b is described below
commit 737f92b2782fc0d027c51ea6bf88cb6b802398db
Author: Ivan Kelly <[email protected]>
AuthorDate: Fri Jan 25 05:09:10 2019 +0100
Add semantically meaningful return values to placement policy
Currently newEnsemble and replaceBookie in EnsemblePlacementPolicy
return a apache commons Pair<> with the second argument being a
boolean to denote whether the placement conforms strictly to the
policy. From calling code, the meaning of this second value is
unclear.
This patch replaces Pair<> with an PlacementResult object, in which
the strict conformity argument is clearly labels. This will also allow
extension in the future to return more metadata about particular
placements.
Also, we shouldn't put third party library classes in interfaces.
Issue: #1914
Reviewers: Enrico Olivelli <[email protected]>, Charan Reddy Guttapalem
<[email protected]>, Sijie Guo <[email protected]>
This closes #1916 from ivankelly/meaningful-placement-res
---
.../bookie/LocalBookieEnsemblePlacementPolicy.java | 7 +-
.../apache/bookkeeper/client/BookKeeperAdmin.java | 7 +-
.../bookkeeper/client/BookieWatcherImpl.java | 21 ++-
.../client/DefaultEnsemblePlacementPolicy.java | 16 +--
.../bookkeeper/client/EnsemblePlacementPolicy.java | 54 +++++--
.../ITopologyAwareEnsemblePlacementPolicy.java | 3 +-
.../client/RackawareEnsemblePlacementPolicy.java | 7 +-
.../RackawareEnsemblePlacementPolicyImpl.java | 37 ++---
.../client/RegionAwareEnsemblePlacementPolicy.java | 14 +-
.../client/GenericEnsemblePlacementPolicyTest.java | 5 +-
.../TestRackawareEnsemblePlacementPolicy.java | 160 ++++++++++-----------
...ackawareEnsemblePlacementPolicyUsingScript.java | 51 +++----
.../TestRackawarePolicyNotificationUpdates.java | 12 +-
.../TestRegionAwareEnsemblePlacementPolicy.java | 116 ++++++---------
14 files changed, 245 insertions(+), 265 deletions(-)
diff --git
a/bookkeeper-server/src/main/java/org/apache/bookkeeper/bookie/LocalBookieEnsemblePlacementPolicy.java
b/bookkeeper-server/src/main/java/org/apache/bookkeeper/bookie/LocalBookieEnsemblePlacementPolicy.java
index 7b7cc46..db88bc7 100644
---
a/bookkeeper-server/src/main/java/org/apache/bookkeeper/bookie/LocalBookieEnsemblePlacementPolicy.java
+++
b/bookkeeper-server/src/main/java/org/apache/bookkeeper/bookie/LocalBookieEnsemblePlacementPolicy.java
@@ -36,7 +36,6 @@ import org.apache.bookkeeper.feature.FeatureProvider;
import org.apache.bookkeeper.net.BookieSocketAddress;
import org.apache.bookkeeper.net.DNSToSwitchMapping;
import org.apache.bookkeeper.stats.StatsLogger;
-import org.apache.commons.lang3.tuple.Pair;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
@@ -81,7 +80,7 @@ public class LocalBookieEnsemblePlacementPolicy implements
EnsemblePlacementPoli
}
@Override
- public Pair<BookieSocketAddress, Boolean> replaceBookie(int ensembleSize,
int writeQuorumSize, int ackQuorumSize,
+ public PlacementResult<BookieSocketAddress> replaceBookie(int
ensembleSize, int writeQuorumSize, int ackQuorumSize,
java.util.Map<String, byte[]> customMetadata,
List<BookieSocketAddress> currentEnsemble,
BookieSocketAddress bookieToReplace, Set<BookieSocketAddress>
excludeBookies)
throws BKNotEnoughBookiesException {
@@ -110,14 +109,14 @@ public class LocalBookieEnsemblePlacementPolicy
implements EnsemblePlacementPoli
}
@Override
- public Pair<List<BookieSocketAddress>, Boolean> newEnsemble(int
ensembleSize, int writeQuorumSize,
+ public PlacementResult<List<BookieSocketAddress>> newEnsemble(int
ensembleSize, int writeQuorumSize,
int ackQuorumSize, java.util.Map<String, byte[]> customMetadata,
Set<BookieSocketAddress> excludeBookies)
throws BKNotEnoughBookiesException {
if (ensembleSize > 1) {
throw new IllegalArgumentException("Local ensemble policy can only
return 1 bookie");
}
- return Pair.of(Lists.newArrayList(bookieAddress), true);
+ return PlacementResult.of(Lists.newArrayList(bookieAddress), true);
}
@Override
diff --git
a/bookkeeper-server/src/main/java/org/apache/bookkeeper/client/BookKeeperAdmin.java
b/bookkeeper-server/src/main/java/org/apache/bookkeeper/client/BookKeeperAdmin.java
index 7b49292..1b4f476 100644
---
a/bookkeeper-server/src/main/java/org/apache/bookkeeper/client/BookKeeperAdmin.java
+++
b/bookkeeper-server/src/main/java/org/apache/bookkeeper/client/BookKeeperAdmin.java
@@ -81,7 +81,6 @@ import
org.apache.bookkeeper.replication.ReplicationException.UnavailableExcepti
import org.apache.bookkeeper.stats.NullStatsLogger;
import org.apache.bookkeeper.stats.StatsLogger;
import org.apache.bookkeeper.util.IOUtils;
-import org.apache.commons.lang3.tuple.Pair;
import org.apache.zookeeper.AsyncCallback;
import org.apache.zookeeper.KeeperException;
import org.slf4j.Logger;
@@ -994,7 +993,7 @@ public class BookKeeperAdmin implements AutoCloseable {
// allocate bookies
for (Integer bookieIndex : bookieIndexesToRereplicate) {
BookieSocketAddress oldBookie = ensemble.get(bookieIndex);
- Pair<BookieSocketAddress, Boolean> replaceBookieResponse =
+ EnsemblePlacementPolicy.PlacementResult<BookieSocketAddress>
replaceBookieResponse =
bkc.getPlacementPolicy().replaceBookie(
lh.getLedgerMetadata().getEnsembleSize(),
lh.getLedgerMetadata().getWriteQuorumSize(),
@@ -1003,8 +1002,8 @@ public class BookKeeperAdmin implements AutoCloseable {
ensemble,
oldBookie,
bookiesToExclude);
- BookieSocketAddress newBookie = replaceBookieResponse.getLeft();
- boolean isEnsembleAdheringToPlacementPolicy =
replaceBookieResponse.getRight();
+ BookieSocketAddress newBookie = replaceBookieResponse.getResult();
+ boolean isEnsembleAdheringToPlacementPolicy =
replaceBookieResponse.isStrictlyAdheringToPolicy();
if (!isEnsembleAdheringToPlacementPolicy) {
if (LOG.isDebugEnabled()) {
LOG.debug(
diff --git
a/bookkeeper-server/src/main/java/org/apache/bookkeeper/client/BookieWatcherImpl.java
b/bookkeeper-server/src/main/java/org/apache/bookkeeper/client/BookieWatcherImpl.java
index 74c1df9..f4dd9a9 100644
---
a/bookkeeper-server/src/main/java/org/apache/bookkeeper/client/BookieWatcherImpl.java
+++
b/bookkeeper-server/src/main/java/org/apache/bookkeeper/client/BookieWatcherImpl.java
@@ -50,7 +50,6 @@ import org.apache.bookkeeper.stats.Counter;
import org.apache.bookkeeper.stats.OpStatsLogger;
import org.apache.bookkeeper.stats.StatsLogger;
import org.apache.bookkeeper.stats.annotations.StatsDoc;
-import org.apache.commons.lang3.tuple.Pair;
/**
* This class is responsible for maintaining a consistent view of what bookies
@@ -226,15 +225,15 @@ class BookieWatcherImpl implements BookieWatcher {
int ackQuorumSize, Map<String, byte[]> customMetadata)
throws BKNotEnoughBookiesException {
long startTime = MathUtils.nowInNano();
- Pair<List<BookieSocketAddress>, Boolean> newEnsembleResponse;
+ EnsemblePlacementPolicy.PlacementResult<List<BookieSocketAddress>>
newEnsembleResponse;
List<BookieSocketAddress> socketAddresses;
boolean isEnsembleAdheringToPlacementPolicy = false;
try {
Set<BookieSocketAddress> quarantinedBookiesSet =
quarantinedBookies.asMap().keySet();
newEnsembleResponse = placementPolicy.newEnsemble(ensembleSize,
writeQuorumSize, ackQuorumSize,
customMetadata, new
HashSet<BookieSocketAddress>(quarantinedBookiesSet));
- socketAddresses = newEnsembleResponse.getLeft();
- isEnsembleAdheringToPlacementPolicy =
newEnsembleResponse.getRight();
+ socketAddresses = newEnsembleResponse.getResult();
+ isEnsembleAdheringToPlacementPolicy =
newEnsembleResponse.isStrictlyAdheringToPolicy();
if (!isEnsembleAdheringToPlacementPolicy) {
ensembleNotAdheringToPlacementPolicy.inc();
log.warn("New ensemble: {} is not adhering to Placement
Policy. quarantinedBookies: {}",
@@ -248,8 +247,8 @@ class BookieWatcherImpl implements BookieWatcher {
}
newEnsembleResponse = placementPolicy.newEnsemble(
ensembleSize, writeQuorumSize, ackQuorumSize,
customMetadata, new HashSet<>());
- socketAddresses = newEnsembleResponse.getLeft();
- isEnsembleAdheringToPlacementPolicy =
newEnsembleResponse.getRight();
+ socketAddresses = newEnsembleResponse.getResult();
+ isEnsembleAdheringToPlacementPolicy =
newEnsembleResponse.isStrictlyAdheringToPolicy();
if (!isEnsembleAdheringToPlacementPolicy) {
ensembleNotAdheringToPlacementPolicy.inc();
log.warn("New ensemble: {} is not adhering to Placement
Policy", socketAddresses);
@@ -267,7 +266,7 @@ class BookieWatcherImpl implements BookieWatcher {
throws BKNotEnoughBookiesException {
long startTime = MathUtils.nowInNano();
BookieSocketAddress addr = existingBookies.get(bookieIdx);
- Pair<BookieSocketAddress, Boolean> replaceBookieResponse;
+ EnsemblePlacementPolicy.PlacementResult<BookieSocketAddress>
replaceBookieResponse;
BookieSocketAddress socketAddress;
boolean isEnsembleAdheringToPlacementPolicy = false;
try {
@@ -279,8 +278,8 @@ class BookieWatcherImpl implements BookieWatcher {
replaceBookieResponse = placementPolicy.replaceBookie(
ensembleSize, writeQuorumSize, ackQuorumSize,
customMetadata,
existingBookies, addr,
excludedBookiesAndQuarantinedBookies);
- socketAddress = replaceBookieResponse.getLeft();
- isEnsembleAdheringToPlacementPolicy =
replaceBookieResponse.getRight();
+ socketAddress = replaceBookieResponse.getResult();
+ isEnsembleAdheringToPlacementPolicy =
replaceBookieResponse.isStrictlyAdheringToPolicy();
if (!isEnsembleAdheringToPlacementPolicy) {
ensembleNotAdheringToPlacementPolicy.inc();
log.warn(
@@ -295,8 +294,8 @@ class BookieWatcherImpl implements BookieWatcher {
}
replaceBookieResponse =
placementPolicy.replaceBookie(ensembleSize, writeQuorumSize, ackQuorumSize,
customMetadata, existingBookies, addr, excludeBookies);
- socketAddress = replaceBookieResponse.getLeft();
- isEnsembleAdheringToPlacementPolicy =
replaceBookieResponse.getRight();
+ socketAddress = replaceBookieResponse.getResult();
+ isEnsembleAdheringToPlacementPolicy =
replaceBookieResponse.isStrictlyAdheringToPolicy();
if (!isEnsembleAdheringToPlacementPolicy) {
ensembleNotAdheringToPlacementPolicy.inc();
log.warn(
diff --git
a/bookkeeper-server/src/main/java/org/apache/bookkeeper/client/DefaultEnsemblePlacementPolicy.java
b/bookkeeper-server/src/main/java/org/apache/bookkeeper/client/DefaultEnsemblePlacementPolicy.java
index dddbe1c..c130b5d 100644
---
a/bookkeeper-server/src/main/java/org/apache/bookkeeper/client/DefaultEnsemblePlacementPolicy.java
+++
b/bookkeeper-server/src/main/java/org/apache/bookkeeper/client/DefaultEnsemblePlacementPolicy.java
@@ -39,7 +39,6 @@ import org.apache.bookkeeper.net.BookieSocketAddress;
import org.apache.bookkeeper.net.DNSToSwitchMapping;
import org.apache.bookkeeper.stats.StatsLogger;
import org.apache.commons.collections4.CollectionUtils;
-import org.apache.commons.lang3.tuple.Pair;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
@@ -65,12 +64,12 @@ public class DefaultEnsemblePlacementPolicy implements
EnsemblePlacementPolicy {
}
@Override
- public Pair<List<BookieSocketAddress>, Boolean> newEnsemble(int
ensembleSize, int quorumSize, int ackQuorumSize,
+ public PlacementResult<List<BookieSocketAddress>> newEnsemble(int
ensembleSize, int quorumSize, int ackQuorumSize,
Map<String, byte[]> customMetadata, Set<BookieSocketAddress>
excludeBookies)
throws BKNotEnoughBookiesException {
ArrayList<BookieSocketAddress> newBookies = new
ArrayList<BookieSocketAddress>(ensembleSize);
if (ensembleSize <= 0) {
- return Pair.of(newBookies, false);
+ return PlacementResult.of(newBookies, false);
}
List<BookieSocketAddress> allBookies;
rwLock.readLock().lock();
@@ -96,7 +95,7 @@ public class DefaultEnsemblePlacementPolicy implements
EnsemblePlacementPolicy {
newBookies.add(b);
--ensembleSize;
if (ensembleSize == 0) {
- return Pair.of(newBookies,
+ return PlacementResult.of(newBookies,
isEnsembleAdheringToPlacementPolicy(newBookies, quorumSize, ackQuorumSize));
}
}
@@ -112,7 +111,7 @@ public class DefaultEnsemblePlacementPolicy implements
EnsemblePlacementPolicy {
newBookies.add(bookie);
--ensembleSize;
if (ensembleSize == 0) {
- return Pair.of(newBookies,
+ return PlacementResult.of(newBookies,
isEnsembleAdheringToPlacementPolicy(newBookies,
quorumSize, ackQuorumSize));
}
}
@@ -121,17 +120,18 @@ public class DefaultEnsemblePlacementPolicy implements
EnsemblePlacementPolicy {
}
@Override
- public Pair<BookieSocketAddress, Boolean> replaceBookie(int ensembleSize,
int writeQuorumSize, int ackQuorumSize,
+ public PlacementResult<BookieSocketAddress> replaceBookie(int
ensembleSize, int writeQuorumSize, int ackQuorumSize,
Map<String, byte[]> customMetadata, List<BookieSocketAddress>
currentEnsemble,
BookieSocketAddress bookieToReplace, Set<BookieSocketAddress>
excludeBookies)
throws BKNotEnoughBookiesException {
excludeBookies.addAll(currentEnsemble);
- List<BookieSocketAddress> addresses = newEnsemble(1, 1, 1,
customMetadata, excludeBookies).getLeft();
+ List<BookieSocketAddress> addresses = newEnsemble(1, 1, 1,
customMetadata, excludeBookies).getResult();
BookieSocketAddress candidateAddr = addresses.get(0);
List<BookieSocketAddress> newEnsemble = new
ArrayList<BookieSocketAddress>(currentEnsemble);
newEnsemble.set(currentEnsemble.indexOf(bookieToReplace),
candidateAddr);
- return Pair.of(candidateAddr,
isEnsembleAdheringToPlacementPolicy(newEnsemble, writeQuorumSize,
ackQuorumSize));
+ return PlacementResult.of(candidateAddr,
+ isEnsembleAdheringToPlacementPolicy(newEnsemble,
writeQuorumSize, ackQuorumSize));
}
@Override
diff --git
a/bookkeeper-server/src/main/java/org/apache/bookkeeper/client/EnsemblePlacementPolicy.java
b/bookkeeper-server/src/main/java/org/apache/bookkeeper/client/EnsemblePlacementPolicy.java
index 00bac8e..fad3f92 100644
---
a/bookkeeper-server/src/main/java/org/apache/bookkeeper/client/EnsemblePlacementPolicy.java
+++
b/bookkeeper-server/src/main/java/org/apache/bookkeeper/client/EnsemblePlacementPolicy.java
@@ -35,7 +35,6 @@ import org.apache.bookkeeper.feature.FeatureProvider;
import org.apache.bookkeeper.net.BookieSocketAddress;
import org.apache.bookkeeper.net.DNSToSwitchMapping;
import org.apache.bookkeeper.stats.StatsLogger;
-import org.apache.commons.lang3.tuple.Pair;
/**
* {@link EnsemblePlacementPolicy} encapsulates the algorithm that bookkeeper
client uses to select a number of bookies
@@ -262,13 +261,13 @@ public interface EnsemblePlacementPolicy {
* provides in {@link BookKeeper#createLedger(int,
int, int, BookKeeper.DigestType, byte[])}
* @param excludeBookies Bookies that should not be considered as targets.
* @throws BKNotEnoughBookiesException if not enough bookies available.
- * @return the List<org.apache.bookkeeper.net.BookieSocketAddress>
+ * @return a placement result containing list of bookie addresses for the
ensemble.
*/
- Pair<List<BookieSocketAddress>, Boolean> newEnsemble(int ensembleSize,
- int writeQuorumSize,
- int ackQuorumSize,
- Map<String, byte[]>
customMetadata,
-
Set<BookieSocketAddress> excludeBookies)
+ PlacementResult<List<BookieSocketAddress>> newEnsemble(int ensembleSize,
+ int writeQuorumSize,
+ int ackQuorumSize,
+ Map<String, byte[]>
customMetadata,
+
Set<BookieSocketAddress> excludeBookies)
throws BKNotEnoughBookiesException;
/**
@@ -286,15 +285,15 @@ public interface EnsemblePlacementPolicy {
* @param bookieToReplace bookie to replace
* @param excludeBookies bookies that should not be considered as
candidate.
* @throws BKNotEnoughBookiesException
- * @return the org.apache.bookkeeper.net.BookieSocketAddress
+ * @return a placement result containing the new bookie address.
*/
- Pair<BookieSocketAddress, Boolean> replaceBookie(int ensembleSize,
- int writeQuorumSize,
- int ackQuorumSize,
- Map<String, byte[]>
customMetadata,
- List<BookieSocketAddress>
currentEnsemble,
- BookieSocketAddress
bookieToReplace,
- Set<BookieSocketAddress>
excludeBookies)
+ PlacementResult<BookieSocketAddress> replaceBookie(int ensembleSize,
+ int writeQuorumSize,
+ int ackQuorumSize,
+ Map<String, byte[]>
customMetadata,
+
List<BookieSocketAddress> currentEnsemble,
+ BookieSocketAddress
bookieToReplace,
+
Set<BookieSocketAddress> excludeBookies)
throws BKNotEnoughBookiesException;
/**
@@ -408,4 +407,29 @@ public interface EnsemblePlacementPolicy {
int ackQuorumSize) {
return false;
}
+
+ /**
+ * Result of a placement calculation against a placement policy.
+ */
+ final class PlacementResult<T> {
+ private final T result;
+ private final boolean adhering;
+
+ public static <T> PlacementResult<T> of(T result, boolean adhering) {
+ return new PlacementResult<>(result, adhering);
+ }
+
+ private PlacementResult(T result, boolean adhering) {
+ this.result = result;
+ this.adhering = adhering;
+ }
+
+ public T getResult() {
+ return result;
+ }
+
+ public boolean isStrictlyAdheringToPolicy() {
+ return adhering;
+ }
+ }
}
diff --git
a/bookkeeper-server/src/main/java/org/apache/bookkeeper/client/ITopologyAwareEnsemblePlacementPolicy.java
b/bookkeeper-server/src/main/java/org/apache/bookkeeper/client/ITopologyAwareEnsemblePlacementPolicy.java
index 254f535..b196236 100644
---
a/bookkeeper-server/src/main/java/org/apache/bookkeeper/client/ITopologyAwareEnsemblePlacementPolicy.java
+++
b/bookkeeper-server/src/main/java/org/apache/bookkeeper/client/ITopologyAwareEnsemblePlacementPolicy.java
@@ -28,7 +28,6 @@ import
org.apache.bookkeeper.common.annotation.InterfaceAudience;
import org.apache.bookkeeper.common.annotation.InterfaceStability;
import org.apache.bookkeeper.net.BookieSocketAddress;
import org.apache.bookkeeper.net.Node;
-import org.apache.commons.lang3.tuple.Pair;
/**
* Interface for topology aware ensemble placement policy.
@@ -94,7 +93,7 @@ public interface ITopologyAwareEnsemblePlacementPolicy<T
extends Node> extends E
* @return list of bookies forming the ensemble
* @throws BKException.BKNotEnoughBookiesException
*/
- Pair<List<BookieSocketAddress>, Boolean> newEnsemble(
+ PlacementResult<List<BookieSocketAddress>> newEnsemble(
int ensembleSize,
int writeQuorumSize,
int ackQuorumSize,
diff --git
a/bookkeeper-server/src/main/java/org/apache/bookkeeper/client/RackawareEnsemblePlacementPolicy.java
b/bookkeeper-server/src/main/java/org/apache/bookkeeper/client/RackawareEnsemblePlacementPolicy.java
index 8054d97..a7cdce4 100644
---
a/bookkeeper-server/src/main/java/org/apache/bookkeeper/client/RackawareEnsemblePlacementPolicy.java
+++
b/bookkeeper-server/src/main/java/org/apache/bookkeeper/client/RackawareEnsemblePlacementPolicy.java
@@ -27,7 +27,6 @@ import org.apache.bookkeeper.net.BookieSocketAddress;
import org.apache.bookkeeper.net.DNSToSwitchMapping;
import org.apache.bookkeeper.net.Node;
import org.apache.bookkeeper.stats.StatsLogger;
-import org.apache.commons.lang3.tuple.Pair;
/**
* A placement policy implementation use rack information for placing
ensembles.
@@ -92,7 +91,7 @@ public class RackawareEnsemblePlacementPolicy extends
RackawareEnsemblePlacement
}
@Override
- public Pair<List<BookieSocketAddress>, Boolean> newEnsemble(int
ensembleSize, int writeQuorumSize,
+ public PlacementResult<List<BookieSocketAddress>> newEnsemble(int
ensembleSize, int writeQuorumSize,
int ackQuorumSize, Map<String, byte[]> customMetadata,
Set<BookieSocketAddress> excludeBookies)
throws BKException.BKNotEnoughBookiesException {
try {
@@ -107,7 +106,7 @@ public class RackawareEnsemblePlacementPolicy extends
RackawareEnsemblePlacement
}
@Override
- public Pair<BookieSocketAddress, Boolean> replaceBookie(int ensembleSize,
int writeQuorumSize, int ackQuorumSize,
+ public PlacementResult<BookieSocketAddress> replaceBookie(int
ensembleSize, int writeQuorumSize, int ackQuorumSize,
Map<String, byte[]> customMetadata, List<BookieSocketAddress>
currentEnsemble,
BookieSocketAddress bookieToReplace, Set<BookieSocketAddress>
excludeBookies)
throws BKException.BKNotEnoughBookiesException {
@@ -143,7 +142,7 @@ public class RackawareEnsemblePlacementPolicy extends
RackawareEnsemblePlacement
}
@Override
- public Pair<List<BookieSocketAddress>, Boolean> newEnsemble(int
ensembleSize,
+ public PlacementResult<List<BookieSocketAddress>> newEnsemble(int
ensembleSize,
int writeQuorumSize,
int ackQuorumSize,
Set<BookieSocketAddress>
excludeBookies,
diff --git
a/bookkeeper-server/src/main/java/org/apache/bookkeeper/client/RackawareEnsemblePlacementPolicyImpl.java
b/bookkeeper-server/src/main/java/org/apache/bookkeeper/client/RackawareEnsemblePlacementPolicyImpl.java
index 6db7de8..02cf3c3 100644
---
a/bookkeeper-server/src/main/java/org/apache/bookkeeper/client/RackawareEnsemblePlacementPolicyImpl.java
+++
b/bookkeeper-server/src/main/java/org/apache/bookkeeper/client/RackawareEnsemblePlacementPolicyImpl.java
@@ -71,7 +71,6 @@ import org.apache.bookkeeper.stats.OpStatsLogger;
import org.apache.bookkeeper.stats.StatsLogger;
import org.apache.bookkeeper.stats.annotations.StatsDoc;
import org.apache.commons.collections4.CollectionUtils;
-import org.apache.commons.lang3.tuple.Pair;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
@@ -526,17 +525,17 @@ public class RackawareEnsemblePlacementPolicyImpl extends
TopologyAwareEnsembleP
}
@Override
- public Pair<List<BookieSocketAddress>, Boolean> newEnsemble(int
ensembleSize, int writeQuorumSize,
+ public PlacementResult<List<BookieSocketAddress>> newEnsemble(int
ensembleSize, int writeQuorumSize,
int ackQuorumSize, Map<String, byte[]> customMetadata,
Set<BookieSocketAddress> excludeBookies)
throws BKNotEnoughBookiesException {
return newEnsembleInternal(ensembleSize, writeQuorumSize,
excludeBookies, null, null);
}
- protected Pair<List<BookieSocketAddress>, Boolean> newEnsembleInternal(int
ensembleSize,
- int
writeQuorumSize,
-
Set<BookieSocketAddress> excludeBookies,
-
Ensemble<BookieNode> parentEnsemble,
-
Predicate<BookieNode> parentPredicate)
+ protected PlacementResult<List<BookieSocketAddress>>
newEnsembleInternal(int ensembleSize,
+
int writeQuorumSize,
+
Set<BookieSocketAddress> excludeBookies,
+
Ensemble<BookieNode> parentEnsemble,
+
Predicate<BookieNode> parentPredicate)
throws BKNotEnoughBookiesException {
return newEnsembleInternal(
ensembleSize,
@@ -548,12 +547,12 @@ public class RackawareEnsemblePlacementPolicyImpl extends
TopologyAwareEnsembleP
}
@Override
- public Pair<List<BookieSocketAddress>, Boolean> newEnsemble(int
ensembleSize,
- int writeQuorumSize,
- int ackQuorumSize,
- Set<BookieSocketAddress>
excludeBookies,
- Ensemble<BookieNode>
parentEnsemble,
- Predicate<BookieNode>
parentPredicate)
+ public PlacementResult<List<BookieSocketAddress>> newEnsemble(int
ensembleSize,
+ int
writeQuorumSize,
+ int
ackQuorumSize,
+
Set<BookieSocketAddress> excludeBookies,
+
Ensemble<BookieNode> parentEnsemble,
+
Predicate<BookieNode> parentPredicate)
throws BKNotEnoughBookiesException {
return newEnsembleInternal(
ensembleSize,
@@ -564,7 +563,7 @@ public class RackawareEnsemblePlacementPolicyImpl extends
TopologyAwareEnsembleP
parentPredicate);
}
- protected Pair<List<BookieSocketAddress>, Boolean> newEnsembleInternal(
+ protected PlacementResult<List<BookieSocketAddress>> newEnsembleInternal(
int ensembleSize,
int writeQuorumSize,
int ackQuorumSize,
@@ -598,7 +597,7 @@ public class RackawareEnsemblePlacementPolicyImpl extends
TopologyAwareEnsembleP
for (BookieNode bn : bns) {
addrs.add(bn.getAddr());
}
- return Pair.of(addrs, false);
+ return PlacementResult.of(addrs, false);
}
for (int i = 0; i < ensembleSize; i++) {
@@ -622,14 +621,16 @@ public class RackawareEnsemblePlacementPolicyImpl extends
TopologyAwareEnsembleP
ensembleSize, bookieList);
throw new BKNotEnoughBookiesException();
}
- return Pair.of(bookieList,
isEnsembleAdheringToPlacementPolicy(bookieList, writeQuorumSize,
ackQuorumSize));
+ return PlacementResult.of(bookieList,
+ isEnsembleAdheringToPlacementPolicy(
+ bookieList, writeQuorumSize,
ackQuorumSize));
} finally {
rwLock.readLock().unlock();
}
}
@Override
- public Pair<BookieSocketAddress, Boolean> replaceBookie(int ensembleSize,
int writeQuorumSize, int ackQuorumSize,
+ public PlacementResult<BookieSocketAddress> replaceBookie(int
ensembleSize, int writeQuorumSize, int ackQuorumSize,
Map<String, byte[]> customMetadata, List<BookieSocketAddress>
currentEnsemble,
BookieSocketAddress bookieToReplace, Set<BookieSocketAddress>
excludeBookies)
throws BKNotEnoughBookiesException {
@@ -676,7 +677,7 @@ public class RackawareEnsemblePlacementPolicyImpl extends
TopologyAwareEnsembleP
} else {
newEnsemble.set(currentEnsemble.indexOf(bookieToReplace),
candidateAddr);
}
- return Pair.of(candidateAddr,
+ return PlacementResult.of(candidateAddr,
isEnsembleAdheringToPlacementPolicy(newEnsemble,
writeQuorumSize, ackQuorumSize));
} finally {
rwLock.readLock().unlock();
diff --git
a/bookkeeper-server/src/main/java/org/apache/bookkeeper/client/RegionAwareEnsemblePlacementPolicy.java
b/bookkeeper-server/src/main/java/org/apache/bookkeeper/client/RegionAwareEnsemblePlacementPolicy.java
index 1bd4b75..f91e9ef 100644
---
a/bookkeeper-server/src/main/java/org/apache/bookkeeper/client/RegionAwareEnsemblePlacementPolicy.java
+++
b/bookkeeper-server/src/main/java/org/apache/bookkeeper/client/RegionAwareEnsemblePlacementPolicy.java
@@ -225,7 +225,7 @@ public class RegionAwareEnsemblePlacementPolicy extends
RackawareEnsemblePlaceme
@Override
- public Pair<List<BookieSocketAddress>, Boolean> newEnsemble(int
ensembleSize, int writeQuorumSize,
+ public PlacementResult<List<BookieSocketAddress>> newEnsemble(int
ensembleSize, int writeQuorumSize,
int ackQuorumSize, Map<String, byte[]> customMetadata,
Set<BookieSocketAddress> excludeBookies)
throws BKException.BKNotEnoughBookiesException {
@@ -279,7 +279,9 @@ public class RegionAwareEnsemblePlacementPolicy extends
RackawareEnsemblePlaceme
for (BookieNode bn : bns) {
addrs.add(bn.getAddr());
}
- return Pair.of(addrs,
isEnsembleAdheringToPlacementPolicy(addrs, writeQuorumSize, ackQuorumSize));
+ return PlacementResult.of(addrs,
+ isEnsembleAdheringToPlacementPolicy(
+ addrs, writeQuorumSize,
ackQuorumSize));
}
// Single region, fall back to RackAwareEnsemblePlacement
@@ -347,7 +349,7 @@ public class RegionAwareEnsemblePlacementPolicy extends
RackawareEnsemblePlaceme
try {
List<BookieSocketAddress> allocated =
policyWithinRegion.newEnsemble(newEnsembleSize,
newWriteQuorumSize,
newWriteQuorumSize, excludeBookies, tempEnsemble,
- tempEnsemble).getLeft();
+ tempEnsemble).getResult();
ensemble = tempEnsemble;
remainingEnsemble -= addToEnsembleSize;
remainingWriteQuorum -= addToWriteQuorum;
@@ -408,7 +410,7 @@ public class RegionAwareEnsemblePlacementPolicy extends
RackawareEnsemblePlaceme
}
LOG.info("Bookies allocated successfully {}", ensemble);
List<BookieSocketAddress> ensembleList = ensemble.toList();
- return Pair.of(ensembleList,
+ return PlacementResult.of(ensembleList,
isEnsembleAdheringToPlacementPolicy(ensembleList,
writeQuorumSize, ackQuorumSize));
} finally {
rwLock.readLock().unlock();
@@ -416,7 +418,7 @@ public class RegionAwareEnsemblePlacementPolicy extends
RackawareEnsemblePlaceme
}
@Override
- public Pair<BookieSocketAddress, Boolean> replaceBookie(int ensembleSize,
int writeQuorumSize, int ackQuorumSize,
+ public PlacementResult<BookieSocketAddress> replaceBookie(int
ensembleSize, int writeQuorumSize, int ackQuorumSize,
Map<String, byte[]> customMetadata, List<BookieSocketAddress>
currentEnsemble,
BookieSocketAddress bookieToReplace, Set<BookieSocketAddress>
excludeBookies)
throws BKException.BKNotEnoughBookiesException {
@@ -482,7 +484,7 @@ public class RegionAwareEnsemblePlacementPolicy extends
RackawareEnsemblePlaceme
} else {
newEnsemble.set(currentEnsemble.indexOf(bookieToReplace),
candidateAddr);
}
- return Pair.of(candidateAddr,
+ return PlacementResult.of(candidateAddr,
isEnsembleAdheringToPlacementPolicy(newEnsemble,
writeQuorumSize, ackQuorumSize));
} finally {
rwLock.readLock().unlock();
diff --git
a/bookkeeper-server/src/test/java/org/apache/bookkeeper/client/GenericEnsemblePlacementPolicyTest.java
b/bookkeeper-server/src/test/java/org/apache/bookkeeper/client/GenericEnsemblePlacementPolicyTest.java
index 205c5f4..828a431 100644
---
a/bookkeeper-server/src/test/java/org/apache/bookkeeper/client/GenericEnsemblePlacementPolicyTest.java
+++
b/bookkeeper-server/src/test/java/org/apache/bookkeeper/client/GenericEnsemblePlacementPolicyTest.java
@@ -33,7 +33,6 @@ import java.util.Set;
import org.apache.bookkeeper.net.BookieSocketAddress;
import org.apache.bookkeeper.test.BookKeeperClusterTestCase;
-import org.apache.commons.lang3.tuple.Pair;
import org.junit.Before;
import org.junit.Test;
import org.junit.runner.RunWith;
@@ -70,7 +69,7 @@ public class GenericEnsemblePlacementPolicyTest extends
BookKeeperClusterTestCas
public static final class CustomEnsemblePlacementPolicy extends
DefaultEnsemblePlacementPolicy {
@Override
- public Pair<BookieSocketAddress, Boolean> replaceBookie(int
ensembleSize, int writeQuorumSize,
+ public PlacementResult<BookieSocketAddress> replaceBookie(int
ensembleSize, int writeQuorumSize,
int ackQuorumSize, Map<String, byte[]> customMetadata,
List<BookieSocketAddress> currentEnsemble,
BookieSocketAddress bookieToReplace, Set<BookieSocketAddress>
excludeBookies)
throws BKException.BKNotEnoughBookiesException {
@@ -82,7 +81,7 @@ public class GenericEnsemblePlacementPolicyTest extends
BookKeeperClusterTestCas
}
@Override
- public Pair<List<BookieSocketAddress>, Boolean> newEnsemble(int
ensembleSize, int quorumSize,
+ public PlacementResult<List<BookieSocketAddress>> newEnsemble(int
ensembleSize, int quorumSize,
int ackQuorumSize, Map<String, byte[]> customMetadata,
Set<BookieSocketAddress> excludeBookies)
throws BKException.BKNotEnoughBookiesException {
assertNotNull(customMetadata);
diff --git
a/bookkeeper-server/src/test/java/org/apache/bookkeeper/client/TestRackawareEnsemblePlacementPolicy.java
b/bookkeeper-server/src/test/java/org/apache/bookkeeper/client/TestRackawareEnsemblePlacementPolicy.java
index e6cd07b..a55b560 100644
---
a/bookkeeper-server/src/test/java/org/apache/bookkeeper/client/TestRackawareEnsemblePlacementPolicy.java
+++
b/bookkeeper-server/src/test/java/org/apache/bookkeeper/client/TestRackawareEnsemblePlacementPolicy.java
@@ -52,7 +52,6 @@ import org.apache.bookkeeper.net.NetworkTopology;
import org.apache.bookkeeper.net.Node;
import org.apache.bookkeeper.stats.NullStatsLogger;
import org.apache.bookkeeper.util.StaticDNSResolver;
-import org.apache.commons.lang3.tuple.Pair;
import org.junit.Test;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
@@ -567,10 +566,10 @@ public class TestRackawareEnsemblePlacementPolicy extends
TestCase {
addrs.add(addr4);
repp.onClusterChanged(addrs, new HashSet<BookieSocketAddress>());
// replace node under r2
- Pair<BookieSocketAddress, Boolean> replaceBookieResponse =
repp.replaceBookie(1, 1, 1, null, new ArrayList<>(),
- addr2, new HashSet<>());
- BookieSocketAddress replacedBookie = replaceBookieResponse.getLeft();
- boolean isEnsembleAdheringToPlacementPolicy =
replaceBookieResponse.getRight();
+ EnsemblePlacementPolicy.PlacementResult<BookieSocketAddress>
replaceBookieResponse =
+ repp.replaceBookie(1, 1, 1, null, new ArrayList<>(), addr2, new
HashSet<>());
+ BookieSocketAddress replacedBookie = replaceBookieResponse.getResult();
+ boolean isEnsembleAdheringToPlacementPolicy =
replaceBookieResponse.isStrictlyAdheringToPolicy();
assertEquals(addr3, replacedBookie);
assertTrue(isEnsembleAdheringToPlacementPolicy);
}
@@ -596,10 +595,10 @@ public class TestRackawareEnsemblePlacementPolicy extends
TestCase {
// replace node under r2
Set<BookieSocketAddress> excludedAddrs = new
HashSet<BookieSocketAddress>();
excludedAddrs.add(addr1);
- Pair<BookieSocketAddress, Boolean> replaceBookieResponse =
repp.replaceBookie(1, 1, 1, null, new ArrayList<>(),
- addr2, excludedAddrs);
- BookieSocketAddress replacedBookie = replaceBookieResponse.getLeft();
- boolean isEnsembleAdheringToPlacementPolicy =
replaceBookieResponse.getRight();
+ EnsemblePlacementPolicy.PlacementResult<BookieSocketAddress>
replaceBookieResponse =
+ repp.replaceBookie(1, 1, 1, null, new ArrayList<>(), addr2,
excludedAddrs);
+ BookieSocketAddress replacedBookie = replaceBookieResponse.getResult();
+ boolean isEnsembleAdheringToPlacementPolicy =
replaceBookieResponse.isStrictlyAdheringToPolicy();
assertFalse(addr1.equals(replacedBookie));
assertTrue(addr3.equals(replacedBookie) ||
addr4.equals(replacedBookie));
assertTrue(isEnsembleAdheringToPlacementPolicy);
@@ -658,13 +657,13 @@ public class TestRackawareEnsemblePlacementPolicy extends
TestCase {
List<BookieSocketAddress> ensembleBookies = new
ArrayList<BookieSocketAddress>();
ensembleBookies.add(addr2);
ensembleBookies.add(addr4);
- Pair<BookieSocketAddress, Boolean> replaceBookieResponse =
repp.replaceBookie(
+ EnsemblePlacementPolicy.PlacementResult<BookieSocketAddress>
replaceBookieResponse = repp.replaceBookie(
1, 1, 1 , null,
ensembleBookies,
addr4,
new HashSet<>());
- BookieSocketAddress replacedBookie = replaceBookieResponse.getLeft();
- boolean isEnsembleAdheringToPlacementPolicy =
replaceBookieResponse.getRight();
+ BookieSocketAddress replacedBookie = replaceBookieResponse.getResult();
+ boolean isEnsembleAdheringToPlacementPolicy =
replaceBookieResponse.isStrictlyAdheringToPolicy();
assertEquals(addr1, replacedBookie);
assertTrue(isEnsembleAdheringToPlacementPolicy);
}
@@ -683,16 +682,16 @@ public class TestRackawareEnsemblePlacementPolicy extends
TestCase {
addrs.add(addr4);
repp.onClusterChanged(addrs, new HashSet<BookieSocketAddress>());
try {
- Pair<List<BookieSocketAddress>, Boolean> ensembleResponse;
+ EnsemblePlacementPolicy.PlacementResult<List<BookieSocketAddress>>
ensembleResponse;
ensembleResponse = repp.newEnsemble(3, 2, 2, null, new
HashSet<>());
- List<BookieSocketAddress> ensemble = ensembleResponse.getLeft();
- boolean isEnsembleAdheringToPlacementPolicy =
ensembleResponse.getRight();
+ List<BookieSocketAddress> ensemble = ensembleResponse.getResult();
+ boolean isEnsembleAdheringToPlacementPolicy =
ensembleResponse.isStrictlyAdheringToPolicy();
assertEquals(0, getNumCoveredWriteQuorums(ensemble, 2,
conf.getMinNumRacksPerWriteQuorum()));
assertFalse(isEnsembleAdheringToPlacementPolicy);
- Pair<List<BookieSocketAddress>, Boolean> ensembleResponse2;
+ EnsemblePlacementPolicy.PlacementResult<List<BookieSocketAddress>>
ensembleResponse2;
ensembleResponse2 = repp.newEnsemble(4, 2, 2, null, new
HashSet<>());
- List<BookieSocketAddress> ensemble2 = ensembleResponse2.getLeft();
- boolean isEnsembleAdheringToPlacementPolicy2 =
ensembleResponse2.getRight();
+ List<BookieSocketAddress> ensemble2 =
ensembleResponse2.getResult();
+ boolean isEnsembleAdheringToPlacementPolicy2 =
ensembleResponse2.isStrictlyAdheringToPolicy();
assertEquals(0, getNumCoveredWriteQuorums(ensemble2, 2,
conf.getMinNumRacksPerWriteQuorum()));
assertFalse(isEnsembleAdheringToPlacementPolicy2);
} catch (BKNotEnoughBookiesException bnebe) {
@@ -724,19 +723,16 @@ public class TestRackawareEnsemblePlacementPolicy extends
TestCase {
addrs.add(addr3);
addrs.add(addr4);
repp.onClusterChanged(addrs, new HashSet<BookieSocketAddress>());
- Pair<List<BookieSocketAddress>, Boolean> ensembleResponse;
List<BookieSocketAddress> ensemble;
try {
- ensembleResponse = repp.newEnsemble(3, 2, 2, null, new
HashSet<>());
- ensemble = ensembleResponse.getLeft();
+ ensemble = repp.newEnsemble(3, 2, 2, null, new
HashSet<>()).getResult();
fail("Should get not enough bookies exception since there is only
one rack.");
} catch (BKNotEnoughBookiesException bnebe) {
}
try {
- ensembleResponse = repp.newEnsemble(3, 2, 2, new HashSet<>(),
- EnsembleForReplacementWithNoConstraints.INSTANCE,
TruePredicate.INSTANCE);
- ensemble = ensembleResponse.getLeft();
+ ensemble = repp.newEnsemble(3, 2, 2, new HashSet<>(),
+ EnsembleForReplacementWithNoConstraints.INSTANCE,
TruePredicate.INSTANCE).getResult();
fail("Should get not enough bookies exception since there is only
one rack.");
} catch (BKNotEnoughBookiesException bnebe) {
}
@@ -789,7 +785,7 @@ public class TestRackawareEnsemblePlacementPolicy extends
TestCase {
* and there are enough bookies in 3 racks, this newEnsemble calls
should
* succeed.
*/
- Pair<List<BookieSocketAddress>, Boolean> ensembleResponse;
+ EnsemblePlacementPolicy.PlacementResult<List<BookieSocketAddress>>
ensembleResponse;
List<BookieSocketAddress> ensemble;
boolean isEnsembleAdheringToPlacementPolicy;
int ensembleSize = numOfRacks * numOfBookiesPerRack;
@@ -797,16 +793,16 @@ public class TestRackawareEnsemblePlacementPolicy extends
TestCase {
int ackQuorumSize = numOfRacks;
ensembleResponse = repp.newEnsemble(ensembleSize, writeQuorumSize,
ackQuorumSize, null, new HashSet<>());
- ensemble = ensembleResponse.getLeft();
- isEnsembleAdheringToPlacementPolicy = ensembleResponse.getRight();
+ ensemble = ensembleResponse.getResult();
+ isEnsembleAdheringToPlacementPolicy =
ensembleResponse.isStrictlyAdheringToPolicy();
assertEquals("Number of writeQuorum sets covered", ensembleSize,
getNumCoveredWriteQuorums(ensemble, writeQuorumSize,
clientConf.getMinNumRacksPerWriteQuorum()));
assertTrue(isEnsembleAdheringToPlacementPolicy);
ensembleResponse = repp.newEnsemble(ensembleSize, writeQuorumSize,
ackQuorumSize, new HashSet<>(),
EnsembleForReplacementWithNoConstraints.INSTANCE,
TruePredicate.INSTANCE);
- ensemble = ensembleResponse.getLeft();
- isEnsembleAdheringToPlacementPolicy = ensembleResponse.getRight();
+ ensemble = ensembleResponse.getResult();
+ isEnsembleAdheringToPlacementPolicy =
ensembleResponse.isStrictlyAdheringToPolicy();
assertEquals("Number of writeQuorum sets covered", ensembleSize,
getNumCoveredWriteQuorums(ensemble, writeQuorumSize,
clientConf.getMinNumRacksPerWriteQuorum()));
assertTrue(isEnsembleAdheringToPlacementPolicy);
@@ -853,21 +849,21 @@ public class TestRackawareEnsemblePlacementPolicy extends
TestCase {
* ensembleSizes (as long as there are enough number of bookies in each
* rack).
*/
- Pair<List<BookieSocketAddress>, Boolean> ensembleResponse;
+ EnsemblePlacementPolicy.PlacementResult<List<BookieSocketAddress>>
ensembleResponse;
List<BookieSocketAddress> ensemble;
boolean isEnsembleAdheringToPlacementPolicy;
for (int ensembleSize = effectiveMinNumRacksPerWriteQuorum;
ensembleSize < 40; ensembleSize++) {
ensembleResponse = repp.newEnsemble(ensembleSize, writeQuorumSize,
ackQuorumSize, null, new HashSet<>());
- ensemble = ensembleResponse.getLeft();
- isEnsembleAdheringToPlacementPolicy = ensembleResponse.getRight();
+ ensemble = ensembleResponse.getResult();
+ isEnsembleAdheringToPlacementPolicy =
ensembleResponse.isStrictlyAdheringToPolicy();
assertEquals("Number of writeQuorum sets covered", ensembleSize,
getNumCoveredWriteQuorums(ensemble, writeQuorumSize,
clientConf.getMinNumRacksPerWriteQuorum()));
assertTrue(isEnsembleAdheringToPlacementPolicy);
ensembleResponse = repp.newEnsemble(ensembleSize, writeQuorumSize,
ackQuorumSize, new HashSet<>(),
EnsembleForReplacementWithNoConstraints.INSTANCE,
TruePredicate.INSTANCE);
- ensemble = ensembleResponse.getLeft();
- isEnsembleAdheringToPlacementPolicy = ensembleResponse.getRight();
+ ensemble = ensembleResponse.getResult();
+ isEnsembleAdheringToPlacementPolicy =
ensembleResponse.isStrictlyAdheringToPolicy();
assertEquals("Number of writeQuorum sets covered", ensembleSize,
getNumCoveredWriteQuorums(ensemble, writeQuorumSize,
clientConf.getMinNumRacksPerWriteQuorum()));
assertTrue(isEnsembleAdheringToPlacementPolicy);
@@ -912,14 +908,14 @@ public class TestRackawareEnsemblePlacementPolicy extends
TestCase {
* and there are enough bookies in 3 racks, this newEnsemble call
should
* succeed.
*/
- Pair<List<BookieSocketAddress>, Boolean> ensembleResponse;
+ EnsemblePlacementPolicy.PlacementResult<List<BookieSocketAddress>>
ensembleResponse;
List<BookieSocketAddress> ensemble;
int ensembleSize = numOfRacks * numOfBookiesPerRack;
int writeQuorumSize = numOfRacks;
int ackQuorumSize = numOfRacks;
ensembleResponse = repp.newEnsemble(ensembleSize, writeQuorumSize,
ackQuorumSize, null, new HashSet<>());
- ensemble = ensembleResponse.getLeft();
+ ensemble = ensembleResponse.getResult();
BookieSocketAddress bookieInEnsembleToBeReplaced = ensemble.get(7);
// get rack of some other bookie
@@ -958,13 +954,13 @@ public class TestRackawareEnsemblePlacementPolicy extends
TestCase {
* this replaceBookie should succeed, because a new bookie is added to
a
* new rack.
*/
- Pair<BookieSocketAddress, Boolean> replaceBookieResponse;
+ EnsemblePlacementPolicy.PlacementResult<BookieSocketAddress>
replaceBookieResponse;
BookieSocketAddress replacedBookieAddress;
boolean isEnsembleAdheringToPlacementPolicy;
replaceBookieResponse = repp.replaceBookie(ensembleSize,
writeQuorumSize, ackQuorumSize, null, ensemble,
bookieInEnsembleToBeReplaced, new HashSet<>());
- replacedBookieAddress = replaceBookieResponse.getLeft();
- isEnsembleAdheringToPlacementPolicy = replaceBookieResponse.getRight();
+ replacedBookieAddress = replaceBookieResponse.getResult();
+ isEnsembleAdheringToPlacementPolicy =
replaceBookieResponse.isStrictlyAdheringToPolicy();
assertEquals("It should be newBookieAddress2", newBookieAddress2,
replacedBookieAddress);
assertTrue(isEnsembleAdheringToPlacementPolicy);
@@ -998,8 +994,8 @@ public class TestRackawareEnsemblePlacementPolicy extends
TestCase {
*/
replaceBookieResponse = repp.replaceBookie(ensembleSize,
writeQuorumSize, ackQuorumSize, null,
ensemble, bookieInEnsembleToBeReplaced, bookiesToExclude);
- replacedBookieAddress = replaceBookieResponse.getLeft();
- isEnsembleAdheringToPlacementPolicy = replaceBookieResponse.getRight();
+ replacedBookieAddress = replaceBookieResponse.getResult();
+ isEnsembleAdheringToPlacementPolicy =
replaceBookieResponse.isStrictlyAdheringToPolicy();
assertEquals("It should be newBookieAddress3", newBookieAddress3,
replacedBookieAddress);
assertTrue(isEnsembleAdheringToPlacementPolicy);
}
@@ -1348,18 +1344,20 @@ public class TestRackawareEnsemblePlacementPolicy
extends TestCase {
int ensembleSize = 3;
int writeQuorumSize = 2;
int acqQuorumSize = 2;
- Pair<List<BookieSocketAddress>, Boolean> ensembleResponse =
repp.newEnsemble(ensembleSize, writeQuorumSize,
- acqQuorumSize, null, new HashSet<>());
- List<BookieSocketAddress> ensemble = ensembleResponse.getLeft();
- boolean isEnsembleAdheringToPlacementPolicy =
ensembleResponse.getRight();
+ EnsemblePlacementPolicy.PlacementResult<List<BookieSocketAddress>>
ensembleResponse =
+ repp.newEnsemble(ensembleSize, writeQuorumSize,
+ acqQuorumSize, null, new HashSet<>());
+ List<BookieSocketAddress> ensemble = ensembleResponse.getResult();
+ boolean isEnsembleAdheringToPlacementPolicy =
ensembleResponse.isStrictlyAdheringToPolicy();
int numCovered = getNumCoveredWriteQuorums(ensemble,
writeQuorumSize, conf.getMinNumRacksPerWriteQuorum());
assertTrue(numCovered >= 1 && numCovered < 3);
assertFalse(isEnsembleAdheringToPlacementPolicy);
ensembleSize = 4;
- Pair<List<BookieSocketAddress>, Boolean> ensembleResponse2 =
repp.newEnsemble(ensembleSize, writeQuorumSize,
- acqQuorumSize, null, new HashSet<>());
- List<BookieSocketAddress> ensemble2 = ensembleResponse2.getLeft();
- boolean isEnsembleAdheringToPlacementPolicy2 =
ensembleResponse2.getRight();
+ EnsemblePlacementPolicy.PlacementResult<List<BookieSocketAddress>>
ensembleResponse2 =
+ repp.newEnsemble(ensembleSize, writeQuorumSize,
+ acqQuorumSize, null, new HashSet<>());
+ List<BookieSocketAddress> ensemble2 =
ensembleResponse2.getResult();
+ boolean isEnsembleAdheringToPlacementPolicy2 =
ensembleResponse2.isStrictlyAdheringToPolicy();
numCovered = getNumCoveredWriteQuorums(ensemble2, writeQuorumSize,
conf.getMinNumRacksPerWriteQuorum());
assertTrue(numCovered >= 1 && numCovered < 3);
assertFalse(isEnsembleAdheringToPlacementPolicy2);
@@ -1431,10 +1429,11 @@ public class TestRackawareEnsemblePlacementPolicy
extends TestCase {
repp.initialize(newConf, Optional.<DNSToSwitchMapping> empty(), timer,
DISABLE_ALL, NullStatsLogger.INSTANCE);
repp.withDefaultRack(NetworkTopology.DEFAULT_REGION_AND_RACK);
repp.onClusterChanged(addrs, new HashSet<BookieSocketAddress>());
- Pair<List<BookieSocketAddress>, Boolean> ensembleResponse =
repp.newEnsemble(ensembleSize, writeQuorumSize,
- writeQuorumSize, null, new HashSet<>());
- List<BookieSocketAddress> ensemble = ensembleResponse.getLeft();
- boolean isEnsembleAdheringToPlacementPolicy =
ensembleResponse.getRight();
+ EnsemblePlacementPolicy.PlacementResult<List<BookieSocketAddress>>
ensembleResponse =
+ repp.newEnsemble(ensembleSize, writeQuorumSize,
+ writeQuorumSize, null, new HashSet<>());
+ List<BookieSocketAddress> ensemble = ensembleResponse.getResult();
+ boolean isEnsembleAdheringToPlacementPolicy =
ensembleResponse.isStrictlyAdheringToPolicy();
int numCovered = getNumCoveredWriteQuorums(ensemble, writeQuorumSize,
minNumRacksPerWriteQuorum);
assertEquals("minimum number of racks covered for writequorum
ensemble: " + ensemble, ensembleSize, numCovered);
assertTrue(isEnsembleAdheringToPlacementPolicy);
@@ -1475,19 +1474,20 @@ public class TestRackawareEnsemblePlacementPolicy
extends TestCase {
int ensembleSize = 3;
int writeQuorumSize = 3;
int ackQuorumSize = 2;
- Pair<List<BookieSocketAddress>, Boolean> ensembleResponse =
repp.newEnsemble(ensembleSize, writeQuorumSize,
- ackQuorumSize, null, new HashSet<>());
- List<BookieSocketAddress> ensemble1 = ensembleResponse.getLeft();
- boolean isEnsembleAdheringToPlacementPolicy1 =
ensembleResponse.getRight();
+ EnsemblePlacementPolicy.PlacementResult<List<BookieSocketAddress>>
ensembleResponse =
+ repp.newEnsemble(ensembleSize, writeQuorumSize,
+ ackQuorumSize, null, new HashSet<>());
+ List<BookieSocketAddress> ensemble1 = ensembleResponse.getResult();
+ boolean isEnsembleAdheringToPlacementPolicy1 =
ensembleResponse.isStrictlyAdheringToPolicy();
assertEquals(ensembleSize,
getNumCoveredWriteQuorums(ensemble1, writeQuorumSize,
conf.getMinNumRacksPerWriteQuorum()));
assertTrue(isEnsembleAdheringToPlacementPolicy1);
ensembleSize = 4;
writeQuorumSize = 4;
- Pair<List<BookieSocketAddress>, Boolean> ensembleResponse2 =
repp.newEnsemble(ensembleSize, writeQuorumSize,
- 2, null, new HashSet<>());
- List<BookieSocketAddress> ensemble2 = ensembleResponse2.getLeft();
- boolean isEnsembleAdheringToPlacementPolicy2 =
ensembleResponse2.getRight();
+ EnsemblePlacementPolicy.PlacementResult<List<BookieSocketAddress>>
ensembleResponse2 =
+ repp.newEnsemble(ensembleSize, writeQuorumSize, 2, null, new
HashSet<>());
+ List<BookieSocketAddress> ensemble2 =
ensembleResponse2.getResult();
+ boolean isEnsembleAdheringToPlacementPolicy2 =
ensembleResponse2.isStrictlyAdheringToPolicy();
assertEquals(ensembleSize,
getNumCoveredWriteQuorums(ensemble2, writeQuorumSize,
conf.getMinNumRacksPerWriteQuorum()));
assertTrue(isEnsembleAdheringToPlacementPolicy2);
@@ -1561,14 +1561,14 @@ public class TestRackawareEnsemblePlacementPolicy
extends TestCase {
selectionCounts.put(addr3, 0L);
selectionCounts.put(addr4, 0L);
int numTries = 50000;
- Pair<BookieSocketAddress, Boolean> replaceBookieResponse;
+ EnsemblePlacementPolicy.PlacementResult<BookieSocketAddress>
replaceBookieResponse;
boolean isEnsembleAdheringToPlacementPolicy;
BookieSocketAddress replacedBookie;
for (int i = 0; i < numTries; i++) {
// replace node under r2
replaceBookieResponse = repp.replaceBookie(1, 1, 1, null, new
ArrayList<>(), addr2, new HashSet<>());
- replacedBookie = replaceBookieResponse.getLeft();
- isEnsembleAdheringToPlacementPolicy =
replaceBookieResponse.getRight();
+ replacedBookie = replaceBookieResponse.getResult();
+ isEnsembleAdheringToPlacementPolicy =
replaceBookieResponse.isStrictlyAdheringToPolicy();
assertTrue("replaced : " + replacedBookie,
addr3.equals(replacedBookie) || addr4.equals(replacedBookie));
assertTrue(isEnsembleAdheringToPlacementPolicy);
selectionCounts.put(replacedBookie,
selectionCounts.get(replacedBookie) + 1);
@@ -1626,7 +1626,7 @@ public class TestRackawareEnsemblePlacementPolicy extends
TestCase {
selectionCounts.put(addr3, 0L);
selectionCounts.put(addr4, 0L);
int numTries = 50000;
- Pair<BookieSocketAddress, Boolean> replaceBookieResponse;
+ EnsemblePlacementPolicy.PlacementResult<BookieSocketAddress>
replaceBookieResponse;
BookieSocketAddress replacedBookie;
boolean isEnsembleAdheringToPlacementPolicy;
for (int i = 0; i < numTries; i++) {
@@ -1634,8 +1634,8 @@ public class TestRackawareEnsemblePlacementPolicy extends
TestCase {
// will come from other racks. However, the weight should be
honored in such
// selections as well
replaceBookieResponse = repp.replaceBookie(1, 1, 1, null, new
ArrayList<>(), addr2, new HashSet<>());
- replacedBookie = replaceBookieResponse.getLeft();
- isEnsembleAdheringToPlacementPolicy =
replaceBookieResponse.getRight();
+ replacedBookie = replaceBookieResponse.getResult();
+ isEnsembleAdheringToPlacementPolicy =
replaceBookieResponse.isStrictlyAdheringToPolicy();
assertTrue(addr0.equals(replacedBookie) ||
addr1.equals(replacedBookie) || addr3.equals(replacedBookie)
|| addr4.equals(replacedBookie));
assertTrue(isEnsembleAdheringToPlacementPolicy);
@@ -1730,7 +1730,7 @@ public class TestRackawareEnsemblePlacementPolicy extends
TestCase {
int numTries = 10000;
Set<BookieSocketAddress> excludeList = new
HashSet<BookieSocketAddress>();
- Pair<List<BookieSocketAddress>, Boolean> ensembleResponse;
+ EnsemblePlacementPolicy.PlacementResult<List<BookieSocketAddress>>
ensembleResponse;
List<BookieSocketAddress> ensemble;
int ensembleSize = 3;
int writeQuorumSize = 2;
@@ -1740,7 +1740,7 @@ public class TestRackawareEnsemblePlacementPolicy extends
TestCase {
// will come from other racks. However, the weight should be
honored in such
// selections as well
ensembleResponse = repp.newEnsemble(ensembleSize, writeQuorumSize,
acqQuorumSize, null, excludeList);
- ensemble = ensembleResponse.getLeft();
+ ensemble = ensembleResponse.getResult();
assertTrue(
"Rackaware selection not happening "
+ getNumCoveredWriteQuorums(ensemble,
writeQuorumSize, conf.getMinNumRacksPerWriteQuorum()),
@@ -1802,7 +1802,7 @@ public class TestRackawareEnsemblePlacementPolicy extends
TestCase {
bookieInfoMap.put(addr5, new BookieInfo(1000L, 1000L));
repp.updateBookieInfo(bookieInfoMap);
- Pair<List<BookieSocketAddress>, Boolean> ensembleResponse;
+ EnsemblePlacementPolicy.PlacementResult<List<BookieSocketAddress>>
ensembleResponse;
List<BookieSocketAddress> ensemble;
Set<BookieSocketAddress> excludeList = new
HashSet<BookieSocketAddress>();
try {
@@ -1811,14 +1811,14 @@ public class TestRackawareEnsemblePlacementPolicy
extends TestCase {
excludeList.add(addr3);
excludeList.add(addr4);
ensembleResponse = repp.newEnsemble(3, 2, 2, null, excludeList);
- ensemble = ensembleResponse.getLeft();
+ ensemble = ensembleResponse.getResult();
fail("Should throw BKNotEnoughBookiesException when there is not
enough bookies" + ensemble);
} catch (BKNotEnoughBookiesException e) {
// this is expected
}
try {
ensembleResponse = repp.newEnsemble(1, 1, 1, null, excludeList);
- ensemble = ensembleResponse.getLeft();
+ ensemble = ensembleResponse.getResult();
} catch (BKNotEnoughBookiesException e) {
fail("Should not throw BKNotEnoughBookiesException when there are
enough bookies for the ensemble");
}
@@ -1902,19 +1902,19 @@ public class TestRackawareEnsemblePlacementPolicy
extends TestCase {
// we will never use addr4 even it is in the stabilized network
topology
for (int i = 0; i < 5; i++) {
- Pair<List<BookieSocketAddress>, Boolean> ensembleResponse =
repp.newEnsemble(3, 2, 2, null,
- new HashSet<BookieSocketAddress>());
- List<BookieSocketAddress> ensemble = ensembleResponse.getLeft();
- boolean isEnsembleAdheringToPlacementPolicy =
ensembleResponse.getRight();
+ EnsemblePlacementPolicy.PlacementResult<List<BookieSocketAddress>>
ensembleResponse =
+ repp.newEnsemble(3, 2, 2, null, new
HashSet<BookieSocketAddress>());
+ List<BookieSocketAddress> ensemble = ensembleResponse.getResult();
+ boolean isEnsembleAdheringToPlacementPolicy =
ensembleResponse.isStrictlyAdheringToPolicy();
assertFalse(ensemble.contains(addr4));
assertFalse(isEnsembleAdheringToPlacementPolicy);
}
// we could still use addr4 for urgent allocation if it is just bookie
flapping
- Pair<List<BookieSocketAddress>, Boolean> ensembleResponse =
repp.newEnsemble(4, 2, 2, null,
- new HashSet<BookieSocketAddress>());
- List<BookieSocketAddress> ensemble = ensembleResponse.getLeft();
- boolean isEnsembleAdheringToPlacementPolicy =
ensembleResponse.getRight();
+ EnsemblePlacementPolicy.PlacementResult<List<BookieSocketAddress>>
ensembleResponse =
+ repp.newEnsemble(4, 2, 2, null, new
HashSet<BookieSocketAddress>());
+ List<BookieSocketAddress> ensemble = ensembleResponse.getResult();
+ boolean isEnsembleAdheringToPlacementPolicy =
ensembleResponse.isStrictlyAdheringToPolicy();
assertFalse(isEnsembleAdheringToPlacementPolicy);
assertTrue(ensemble.contains(addr4));
}
diff --git
a/bookkeeper-server/src/test/java/org/apache/bookkeeper/client/TestRackawareEnsemblePlacementPolicyUsingScript.java
b/bookkeeper-server/src/test/java/org/apache/bookkeeper/client/TestRackawareEnsemblePlacementPolicyUsingScript.java
index e0fd2bd..2aff5d8 100644
---
a/bookkeeper-server/src/test/java/org/apache/bookkeeper/client/TestRackawareEnsemblePlacementPolicyUsingScript.java
+++
b/bookkeeper-server/src/test/java/org/apache/bookkeeper/client/TestRackawareEnsemblePlacementPolicyUsingScript.java
@@ -46,7 +46,6 @@ import org.apache.bookkeeper.net.DNSToSwitchMapping;
import org.apache.bookkeeper.net.ScriptBasedMapping;
import org.apache.bookkeeper.stats.NullStatsLogger;
import org.apache.bookkeeper.util.Shell;
-import org.apache.commons.lang3.tuple.Pair;
import org.junit.After;
import org.junit.Assume;
import org.junit.Before;
@@ -114,9 +113,8 @@ public class
TestRackawareEnsemblePlacementPolicyUsingScript {
addrs.add(addr4);
repp.onClusterChanged(addrs, new HashSet<BookieSocketAddress>());
// replace node under r2
- Pair<BookieSocketAddress, Boolean> replaceBookieResponse =
repp.replaceBookie(1, 1, 1, null, new ArrayList<>(),
- addr2, new HashSet<>());
- BookieSocketAddress replacedBookie = replaceBookieResponse.getLeft();
+ BookieSocketAddress replacedBookie = repp.replaceBookie(1, 1, 1, null,
new ArrayList<>(),
+ addr2, new
HashSet<>()).getResult();
assertEquals(addr3, replacedBookie);
}
@@ -138,9 +136,8 @@ public class
TestRackawareEnsemblePlacementPolicyUsingScript {
// replace node under r2
Set<BookieSocketAddress> excludedAddrs = new
HashSet<BookieSocketAddress>();
excludedAddrs.add(addr1);
- Pair<BookieSocketAddress, Boolean> replaceBookieResponse =
repp.replaceBookie(1, 1, 1, null, new ArrayList<>(),
- addr2, excludedAddrs);
- BookieSocketAddress replacedBookie = replaceBookieResponse.getLeft();
+ BookieSocketAddress replacedBookie = repp.replaceBookie(1, 1, 1, null,
new ArrayList<>(),
+ addr2,
excludedAddrs).getResult();
assertFalse(addr1.equals(replacedBookie));
assertTrue(addr3.equals(replacedBookie) ||
addr4.equals(replacedBookie));
@@ -204,9 +201,8 @@ public class
TestRackawareEnsemblePlacementPolicyUsingScript {
// replace node under r2
Set<BookieSocketAddress> excludedAddrs = new
HashSet<BookieSocketAddress>();
excludedAddrs.add(addr1);
- Pair<BookieSocketAddress, Boolean> replaceBookieResponse =
repp.replaceBookie(1, 1, 1, null, new ArrayList<>(),
- addr2, excludedAddrs);
- BookieSocketAddress replacedBookie = replaceBookieResponse.getLeft();
+ BookieSocketAddress replacedBookie = repp.replaceBookie(1, 1, 1, null,
new ArrayList<>(),
+ addr2,
excludedAddrs).getResult();
assertFalse(addr1.equals(replacedBookie));
assertFalse(addr2.equals(replacedBookie));
@@ -244,9 +240,8 @@ public class
TestRackawareEnsemblePlacementPolicyUsingScript {
// replace node under r2
Set<BookieSocketAddress> excludedAddrs = new
HashSet<BookieSocketAddress>();
excludedAddrs.add(addr1);
- Pair<BookieSocketAddress, Boolean> replaceBookieResponse =
repp.replaceBookie(1, 1, 1, null, new ArrayList<>(),
- addr2, excludedAddrs);
- BookieSocketAddress replacedBookie = replaceBookieResponse.getLeft();
+ BookieSocketAddress replacedBookie = repp.replaceBookie(1, 1, 1, null,
new ArrayList<>(),
+ addr2,
excludedAddrs).getResult();
assertFalse(addr1.equals(replacedBookie));
assertFalse(addr2.equals(replacedBookie));
@@ -268,13 +263,11 @@ public class
TestRackawareEnsemblePlacementPolicyUsingScript {
addrs.add(addr4);
repp.onClusterChanged(addrs, new HashSet<BookieSocketAddress>());
try {
- Pair<List<BookieSocketAddress>, Boolean> ensembleResponse =
repp.newEnsemble(3, 2, 2, null,
- new HashSet<>());
- List<BookieSocketAddress> ensemble = ensembleResponse.getLeft();
+ List<BookieSocketAddress> ensemble = repp.newEnsemble(3, 2, 2,
null,
+ new
HashSet<>()).getResult();
assertEquals(0, getNumCoveredWriteQuorums(ensemble, 2));
- Pair<List<BookieSocketAddress>, Boolean> ensembleResponse2 =
repp.newEnsemble(4, 2, 2, null,
- new HashSet<>());
- List<BookieSocketAddress> ensemble2 = ensembleResponse2.getLeft();
+ List<BookieSocketAddress> ensemble2 = repp.newEnsemble(4, 2, 2,
null,
+ new
HashSet<>()).getResult();
assertEquals(0, getNumCoveredWriteQuorums(ensemble2, 2));
} catch (BKNotEnoughBookiesException bnebe) {
fail("Should not get not enough bookies exception even there is
only one rack.");
@@ -296,14 +289,12 @@ public class
TestRackawareEnsemblePlacementPolicyUsingScript {
addrs.add(addr4);
repp.onClusterChanged(addrs, new HashSet<BookieSocketAddress>());
try {
- Pair<List<BookieSocketAddress>, Boolean> ensembleResponse =
repp.newEnsemble(3, 2, 2, null,
- new HashSet<>());
- List<BookieSocketAddress> ensemble = ensembleResponse.getLeft();
+ List<BookieSocketAddress> ensemble = repp.newEnsemble(3, 2, 2,
null,
+ new
HashSet<>()).getResult();
int numCovered = getNumCoveredWriteQuorums(ensemble, 2);
assertTrue(numCovered == 2);
- Pair<List<BookieSocketAddress>, Boolean> ensembleResponse2 =
repp.newEnsemble(4, 2, 2, null,
- new HashSet<>());
- List<BookieSocketAddress> ensemble2 = ensembleResponse2.getLeft();
+ List<BookieSocketAddress> ensemble2 = repp.newEnsemble(4, 2, 2,
null,
+ new
HashSet<>()).getResult();
numCovered = getNumCoveredWriteQuorums(ensemble2, 2);
assertTrue(numCovered == 2);
} catch (BKNotEnoughBookiesException bnebe) {
@@ -334,13 +325,11 @@ public class
TestRackawareEnsemblePlacementPolicyUsingScript {
addrs.add(addr8);
repp.onClusterChanged(addrs, new HashSet<BookieSocketAddress>());
try {
- Pair<List<BookieSocketAddress>, Boolean> ensembleResponse1 =
repp.newEnsemble(3, 2, 2, null,
- new HashSet<>());
- List<BookieSocketAddress> ensemble1 = ensembleResponse1.getLeft();
+ List<BookieSocketAddress> ensemble1 = repp.newEnsemble(3, 2, 2,
null,
+ new
HashSet<>()).getResult();
assertEquals(3, getNumCoveredWriteQuorums(ensemble1, 2));
- Pair<List<BookieSocketAddress>, Boolean> ensembleResponse2 =
repp.newEnsemble(4, 2, 2, null,
- new HashSet<>());
- List<BookieSocketAddress> ensemble2 = ensembleResponse2.getLeft();
+ List<BookieSocketAddress> ensemble2 = repp.newEnsemble(4, 2, 2,
null,
+ new
HashSet<>()).getResult();
assertEquals(4, getNumCoveredWriteQuorums(ensemble2, 2));
} catch (BKNotEnoughBookiesException bnebe) {
fail("Should not get not enough bookies exception.");
diff --git
a/bookkeeper-server/src/test/java/org/apache/bookkeeper/client/TestRackawarePolicyNotificationUpdates.java
b/bookkeeper-server/src/test/java/org/apache/bookkeeper/client/TestRackawarePolicyNotificationUpdates.java
index 3192d04..b206e64 100644
---
a/bookkeeper-server/src/test/java/org/apache/bookkeeper/client/TestRackawarePolicyNotificationUpdates.java
+++
b/bookkeeper-server/src/test/java/org/apache/bookkeeper/client/TestRackawarePolicyNotificationUpdates.java
@@ -42,7 +42,7 @@ import org.apache.bookkeeper.net.DNSToSwitchMapping;
import org.apache.bookkeeper.net.NetworkTopology;
import org.apache.bookkeeper.stats.NullStatsLogger;
import org.apache.bookkeeper.util.StaticDNSResolver;
-import org.apache.commons.lang3.tuple.Pair;
+
import org.junit.Test;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
@@ -105,9 +105,8 @@ public class TestRackawarePolicyNotificationUpdates extends
TestCase {
int ensembleSize = 3;
int writeQuorumSize = 2;
int acqQuorumSize = 2;
- Pair<List<BookieSocketAddress>, Boolean> ensembleResponse =
repp.newEnsemble(ensembleSize, writeQuorumSize,
- acqQuorumSize, Collections.emptyMap(), Collections.emptySet());
- List<BookieSocketAddress> ensemble = ensembleResponse.getLeft();
+ List<BookieSocketAddress> ensemble = repp.newEnsemble(ensembleSize,
writeQuorumSize,
+ acqQuorumSize, Collections.emptyMap(),
Collections.emptySet()).getResult();
int numCovered =
TestRackawareEnsemblePlacementPolicy.getNumCoveredWriteQuorums(ensemble,
writeQuorumSize,
conf.getMinNumRacksPerWriteQuorum());
assertTrue(numCovered >= 1 && numCovered < 3);
@@ -120,9 +119,8 @@ public class TestRackawarePolicyNotificationUpdates extends
TestCase {
StaticDNSResolver.changeRack(bookieAddressList, rackList);
numOfAvailableRacks = numOfAvailableRacks + 1;
acqQuorumSize = 1;
- ensembleResponse = repp.newEnsemble(ensembleSize, writeQuorumSize,
acqQuorumSize, Collections.emptyMap(),
- Collections.emptySet());
- ensemble = ensembleResponse.getLeft();
+ ensemble = repp.newEnsemble(ensembleSize, writeQuorumSize,
acqQuorumSize, Collections.emptyMap(),
+ Collections.emptySet()).getResult();
assertEquals(3,
TestRackawareEnsemblePlacementPolicy.getNumCoveredWriteQuorums(ensemble,
writeQuorumSize,
conf.getMinNumRacksPerWriteQuorum()));
assertTrue(ensemble.contains(addr1));
diff --git
a/bookkeeper-server/src/test/java/org/apache/bookkeeper/client/TestRegionAwareEnsemblePlacementPolicy.java
b/bookkeeper-server/src/test/java/org/apache/bookkeeper/client/TestRegionAwareEnsemblePlacementPolicy.java
index 8e4f10d..a7f42aa 100644
---
a/bookkeeper-server/src/test/java/org/apache/bookkeeper/client/TestRegionAwareEnsemblePlacementPolicy.java
+++
b/bookkeeper-server/src/test/java/org/apache/bookkeeper/client/TestRegionAwareEnsemblePlacementPolicy.java
@@ -51,7 +51,6 @@ import org.apache.bookkeeper.net.NetworkTopology;
import org.apache.bookkeeper.stats.NullStatsLogger;
import org.apache.bookkeeper.util.BookKeeperConstants;
import org.apache.bookkeeper.util.StaticDNSResolver;
-import org.apache.commons.lang3.tuple.Pair;
import org.junit.Test;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
@@ -424,9 +423,8 @@ public class TestRegionAwareEnsemblePlacementPolicy extends
TestCase {
addrs.add(addr4);
repp.onClusterChanged(addrs, new HashSet<BookieSocketAddress>());
// replace node under r2
- Pair<BookieSocketAddress, Boolean> replaceBookieResponse =
repp.replaceBookie(1, 1, 1, null,
- new ArrayList<BookieSocketAddress>(), addr2, new
HashSet<BookieSocketAddress>());
- BookieSocketAddress replacedBookie = replaceBookieResponse.getLeft();
+ BookieSocketAddress replacedBookie = repp.replaceBookie(1, 1, 1, null,
+ new ArrayList<BookieSocketAddress>(), addr2, new
HashSet<BookieSocketAddress>()).getResult();
assertEquals(addr3, replacedBookie);
}
@@ -451,9 +449,8 @@ public class TestRegionAwareEnsemblePlacementPolicy extends
TestCase {
// replace node under r2
Set<BookieSocketAddress> excludedAddrs = new
HashSet<BookieSocketAddress>();
excludedAddrs.add(addr1);
- Pair<BookieSocketAddress, Boolean> replaceBookieResponse =
repp.replaceBookie(1, 1, 1, null,
- new ArrayList<BookieSocketAddress>(), addr2, excludedAddrs);
- BookieSocketAddress replacedBookie = replaceBookieResponse.getLeft();
+ BookieSocketAddress replacedBookie = repp.replaceBookie(1, 1, 1, null,
+ new ArrayList<BookieSocketAddress>(), addr2,
excludedAddrs).getResult();
assertFalse(addr1.equals(replacedBookie));
assertTrue(addr3.equals(replacedBookie) ||
addr4.equals(replacedBookie));
@@ -478,9 +475,8 @@ public class TestRegionAwareEnsemblePlacementPolicy extends
TestCase {
addrs.add(addr4);
repp.onClusterChanged(addrs, new HashSet<BookieSocketAddress>());
try {
- Pair<List<BookieSocketAddress>, Boolean> ensembleResponse =
repp.newEnsemble(5, 5, 3, null,
- new HashSet<BookieSocketAddress>());
- List<BookieSocketAddress> list = ensembleResponse.getLeft();
+ List<BookieSocketAddress> list = repp.newEnsemble(5, 5, 3, null,
+ new HashSet<BookieSocketAddress>()).getResult();
LOG.info("Ensemble : {}", list);
fail("Should throw BKNotEnoughBookiesException when there is not
enough bookies");
} catch (BKNotEnoughBookiesException bnebe) {
@@ -541,13 +537,11 @@ public class TestRegionAwareEnsemblePlacementPolicy
extends TestCase {
addrs.add(addr4);
repp.onClusterChanged(addrs, new HashSet<BookieSocketAddress>());
try {
- Pair<List<BookieSocketAddress>, Boolean> ensembleResponse =
repp.newEnsemble(3, 2, 2, null,
- new HashSet<BookieSocketAddress>());
- List<BookieSocketAddress> ensemble = ensembleResponse.getLeft();
+ List<BookieSocketAddress> ensemble = repp.newEnsemble(3, 2, 2,
null,
+ new HashSet<BookieSocketAddress>()).getResult();
assertEquals(0, getNumCoveredRegionsInWriteQuorum(ensemble, 2));
- Pair<List<BookieSocketAddress>, Boolean> ensembleResponse2 =
repp.newEnsemble(4, 2, 2, null,
- new HashSet<BookieSocketAddress>());
- List<BookieSocketAddress> ensemble2 = ensembleResponse2.getLeft();
+ List<BookieSocketAddress> ensemble2 = repp.newEnsemble(4, 2, 2,
null,
+ new HashSet<BookieSocketAddress>()).getResult();
assertEquals(0, getNumCoveredRegionsInWriteQuorum(ensemble2, 2));
} catch (BKNotEnoughBookiesException bnebe) {
fail("Should not get not enough bookies exception even there is
only one rack.");
@@ -576,9 +570,8 @@ public class TestRegionAwareEnsemblePlacementPolicy extends
TestCase {
addrs.add(addr4);
repp.onClusterChanged(addrs, new HashSet<BookieSocketAddress>());
try {
- Pair<List<BookieSocketAddress>, Boolean> ensembleResponse =
repp.newEnsemble(3, 2, 2, null,
- new HashSet<BookieSocketAddress>());
- List<BookieSocketAddress> ensemble = ensembleResponse.getLeft();
+ List<BookieSocketAddress> ensemble = repp.newEnsemble(3, 2, 2,
null,
+ new HashSet<BookieSocketAddress>()).getResult();
int numCovered = getNumCoveredRegionsInWriteQuorum(ensemble, 2);
assertTrue(numCovered >= 1);
assertTrue(numCovered < 3);
@@ -586,9 +579,8 @@ public class TestRegionAwareEnsemblePlacementPolicy extends
TestCase {
fail("Should not get not enough bookies exception even there is
only one rack.");
}
try {
- Pair<List<BookieSocketAddress>, Boolean> ensembleResponse2 =
repp.newEnsemble(4, 2, 2, null,
- new HashSet<BookieSocketAddress>());
- List<BookieSocketAddress> ensemble2 = ensembleResponse2.getLeft();
+ List<BookieSocketAddress> ensemble2 = repp.newEnsemble(4, 2, 2,
null,
+ new HashSet<BookieSocketAddress>()).getResult();
int numCovered = getNumCoveredRegionsInWriteQuorum(ensemble2, 2);
assertTrue(numCovered >= 1 && numCovered < 3);
} catch (BKNotEnoughBookiesException bnebe) {
@@ -627,13 +619,11 @@ public class TestRegionAwareEnsemblePlacementPolicy
extends TestCase {
addrs.add(addr8);
repp.onClusterChanged(addrs, new HashSet<BookieSocketAddress>());
try {
- Pair<List<BookieSocketAddress>, Boolean> ensembleResponse1 =
repp.newEnsemble(3, 2, 2, null,
- new HashSet<BookieSocketAddress>());
- List<BookieSocketAddress> ensemble1 = ensembleResponse1.getLeft();
+ List<BookieSocketAddress> ensemble1 = repp.newEnsemble(3, 2, 2,
null,
+ new HashSet<BookieSocketAddress>()).getResult();
assertEquals(3, getNumCoveredRegionsInWriteQuorum(ensemble1, 2));
- Pair<List<BookieSocketAddress>, Boolean> ensembleResponse2 =
repp.newEnsemble(4, 2, 2, null,
- new HashSet<BookieSocketAddress>());
- List<BookieSocketAddress> ensemble2 = ensembleResponse2.getLeft();
+ List<BookieSocketAddress> ensemble2 = repp.newEnsemble(4, 2, 2,
null,
+ new HashSet<BookieSocketAddress>()).getResult();
assertEquals(4, getNumCoveredRegionsInWriteQuorum(ensemble2, 2));
} catch (BKNotEnoughBookiesException bnebe) {
fail("Should not get not enough bookies exception even there is
only one rack.");
@@ -680,27 +670,23 @@ public class TestRegionAwareEnsemblePlacementPolicy
extends TestCase {
addrs.add(addr10);
repp.onClusterChanged(addrs, new HashSet<BookieSocketAddress>());
try {
- Pair<List<BookieSocketAddress>, Boolean> ensembleResponse =
repp.newEnsemble(6, 6, 4, null,
- new HashSet<BookieSocketAddress>());
- List<BookieSocketAddress> ensemble = ensembleResponse.getLeft();
+ List<BookieSocketAddress> ensemble = repp.newEnsemble(6, 6, 4,
null,
+ new HashSet<BookieSocketAddress>()).getResult();
assert(ensemble.contains(addr4));
assert(ensemble.contains(addr8));
assert(ensemble.size() == 6);
assertEquals(3, getNumRegionsInEnsemble(ensemble));
- ensembleResponse = repp.newEnsemble(7, 7, 4, null, new
HashSet<BookieSocketAddress>());
- ensemble = ensembleResponse.getLeft();
+ ensemble = repp.newEnsemble(7, 7, 4, null, new
HashSet<BookieSocketAddress>()).getResult();
assert(ensemble.contains(addr4));
assert(ensemble.contains(addr8));
assert(ensemble.size() == 7);
assertEquals(3, getNumRegionsInEnsemble(ensemble));
- ensembleResponse = repp.newEnsemble(8, 8, 5, null, new
HashSet<BookieSocketAddress>());
- ensemble = ensembleResponse.getLeft();
+ ensemble = repp.newEnsemble(8, 8, 5, null, new
HashSet<BookieSocketAddress>()).getResult();
assert(ensemble.contains(addr4));
assert(ensemble.contains(addr8));
assert(ensemble.size() == 8);
assertEquals(3, getNumRegionsInEnsemble(ensemble));
- ensembleResponse = repp.newEnsemble(9, 9, 5, null, new
HashSet<BookieSocketAddress>());
- ensemble = ensembleResponse.getLeft();
+ ensemble = repp.newEnsemble(9, 9, 5, null, new
HashSet<BookieSocketAddress>()).getResult();
assert(ensemble.contains(addr4));
assert(ensemble.contains(addr8));
assert(ensemble.size() == 9);
@@ -753,9 +739,8 @@ public class TestRegionAwareEnsemblePlacementPolicy extends
TestCase {
repp.onClusterChanged(addrs, new HashSet<BookieSocketAddress>());
try {
((SettableFeature)
featureProvider.scope("region1").getFeature("disallowBookies")).set(true);
- Pair<List<BookieSocketAddress>, Boolean> ensembleResponse =
repp.newEnsemble(6, 6, 4, null,
- new HashSet<BookieSocketAddress>());
- List<BookieSocketAddress> ensemble = ensembleResponse.getLeft();
+ List<BookieSocketAddress> ensemble = repp.newEnsemble(6, 6, 4,
null,
+ new
HashSet<BookieSocketAddress>()).getResult();
assertEquals(2, getNumRegionsInEnsemble(ensemble));
assert(ensemble.contains(addr1));
assert(ensemble.contains(addr3));
@@ -776,9 +761,8 @@ public class TestRegionAwareEnsemblePlacementPolicy extends
TestCase {
}
try {
((SettableFeature)
featureProvider.scope("region2").getFeature("disallowBookies")).set(false);
- Pair<List<BookieSocketAddress>, Boolean> ensembleResponse =
repp.newEnsemble(6, 6, 4, null,
- new HashSet<BookieSocketAddress>());
- List<BookieSocketAddress> ensemble = ensembleResponse.getLeft();
+ List<BookieSocketAddress> ensemble = repp.newEnsemble(6, 6, 4,
null,
+ new
HashSet<BookieSocketAddress>()).getResult();
assert(ensemble.contains(addr1));
assert(ensemble.contains(addr3));
assert(ensemble.contains(addr4));
@@ -851,9 +835,8 @@ public class TestRegionAwareEnsemblePlacementPolicy extends
TestCase {
repp.onClusterChanged(addrs, new HashSet<BookieSocketAddress>());
try {
- Pair<List<BookieSocketAddress>, Boolean> ensembleResponse =
repp.newEnsemble(10, 10, 10, null,
- new HashSet<BookieSocketAddress>());
- List<BookieSocketAddress> ensemble = ensembleResponse.getLeft();
+ List<BookieSocketAddress> ensemble = repp.newEnsemble(10, 10, 10,
null,
+ new
HashSet<BookieSocketAddress>()).getResult();
assert(ensemble.size() == 10);
assertEquals(5, getNumRegionsInEnsemble(ensemble));
} catch (BKNotEnoughBookiesException bnebe) {
@@ -864,9 +847,8 @@ public class TestRegionAwareEnsemblePlacementPolicy extends
TestCase {
try {
Set<BookieSocketAddress> excludedAddrs = new
HashSet<BookieSocketAddress>();
excludedAddrs.add(addr10);
- Pair<List<BookieSocketAddress>, Boolean> ensembleResponse =
repp.newEnsemble(10, 10, 10, null,
- excludedAddrs);
- List<BookieSocketAddress> ensemble = ensembleResponse.getLeft();
+ List<BookieSocketAddress> ensemble = repp.newEnsemble(10, 10, 10,
null,
+
excludedAddrs).getResult();
assert(ensemble.contains(addr11) && ensemble.contains(addr12));
assert(ensemble.size() == 10);
assertEquals(5, getNumRegionsInEnsemble(ensemble));
@@ -956,11 +938,9 @@ public class TestRegionAwareEnsemblePlacementPolicy
extends TestCase {
ackQuorum = 5;
}
- Pair<List<BookieSocketAddress>, Boolean> ensembleResponse;
List<BookieSocketAddress> ensemble;
try {
- ensembleResponse = repp.newEnsemble(6, 6, ackQuorum, null, new
HashSet<BookieSocketAddress>());
- ensemble = ensembleResponse.getLeft();
+ ensemble = repp.newEnsemble(6, 6, ackQuorum, null, new
HashSet<BookieSocketAddress>()).getResult();
assert(ensemble.size() == 6);
assertEquals(3, getNumRegionsInEnsemble(ensemble));
} catch (BKNotEnoughBookiesException bnebe) {
@@ -981,9 +961,8 @@ public class TestRegionAwareEnsemblePlacementPolicy extends
TestCase {
Set<BookieSocketAddress> excludedAddrs = new
HashSet<BookieSocketAddress>();
for (BookieSocketAddress addr: region2Bookies) {
if (ensemble.contains(addr)) {
- Pair<BookieSocketAddress, Boolean> replaceBookieResponse =
repp.replaceBookie(6, 6, ackQuorum, null,
- ensemble, addr, excludedAddrs);
- BookieSocketAddress replacedBookie =
replaceBookieResponse.getLeft();
+ BookieSocketAddress replacedBookie = repp.replaceBookie(6,
6, ackQuorum, null,
+
ensemble, addr, excludedAddrs).getResult();
ensemble.remove(addr);
ensemble.add(replacedBookie);
}
@@ -1007,9 +986,8 @@ public class TestRegionAwareEnsemblePlacementPolicy
extends TestCase {
Set<BookieSocketAddress> excludedAddrs = new
HashSet<BookieSocketAddress>();
try {
- Pair<BookieSocketAddress, Boolean> replaceBookieResponse =
repp.replaceBookie(6, 6, ackQuorum, null,
- ensemble, bookieToReplace, excludedAddrs);
- BookieSocketAddress replacedBookie =
replaceBookieResponse.getLeft();
+ BookieSocketAddress replacedBookie = repp.replaceBookie(6, 6,
ackQuorum, null,
+ ensemble, bookieToReplace, excludedAddrs).getResult();
assert (replacedBookie.equals(replacedBookieExpected));
assertEquals(3, getNumRegionsInEnsemble(ensemble));
} catch (BKNotEnoughBookiesException bnebe) {
@@ -1092,11 +1070,9 @@ public class TestRegionAwareEnsemblePlacementPolicy
extends TestCase {
.set(true);
}
- Pair<List<BookieSocketAddress>, Boolean> ensembleResponse;
List<BookieSocketAddress> ensemble;
try {
- ensembleResponse = repp.newEnsemble(6, 6, 4, null, new
HashSet<BookieSocketAddress>());
- ensemble = ensembleResponse.getLeft();
+ ensemble = repp.newEnsemble(6, 6, 4, null, new
HashSet<BookieSocketAddress>()).getResult();
assert(ensemble.size() == 6);
} catch (BKNotEnoughBookiesException bnebe) {
LOG.error("BKNotEnoughBookiesException", bnebe);
@@ -1160,8 +1136,7 @@ public class TestRegionAwareEnsemblePlacementPolicy
extends TestCase {
excludedAddrs.add(addr10);
excludedAddrs.add(addr9);
try {
- Pair<List<BookieSocketAddress>, Boolean> list =
repp.newEnsemble(5, 5, 5, null, excludedAddrs);
- LOG.info("Ensemble : {}", list.getLeft());
+ LOG.info("Ensemble : {}", repp.newEnsemble(5, 5, 5, null,
excludedAddrs).getResult());
fail("Should throw BKNotEnoughBookiesException when there is not
enough bookies");
} catch (BKNotEnoughBookiesException bnebe) {
// should throw not enou
@@ -1220,9 +1195,8 @@ public class TestRegionAwareEnsemblePlacementPolicy
extends TestCase {
private void basicReorderReadSequenceWithLocalRegionTest(String myRegion,
boolean isReadLAC) throws Exception {
prepareNetworkTopologyForReorderTests(myRegion);
- Pair<List<BookieSocketAddress>, Boolean> ensembleResponse =
repp.newEnsemble(9, 9, 5, null,
- new HashSet<BookieSocketAddress>());
- List<BookieSocketAddress> ensemble = ensembleResponse.getLeft();
+ List<BookieSocketAddress> ensemble = repp.newEnsemble(9, 9, 5, null,
+ new
HashSet<BookieSocketAddress>()).getResult();
assertEquals(9, getNumCoveredRegionsInWriteQuorum(ensemble, 9));
DistributionSchedule ds = new RoundRobinDistributionSchedule(9, 9, 9);
@@ -1278,9 +1252,8 @@ public class TestRegionAwareEnsemblePlacementPolicy
extends TestCase {
private void basicReorderReadSequenceWithRemoteRegionTest(String myRegion,
boolean isReadLAC) throws Exception {
prepareNetworkTopologyForReorderTests(myRegion);
- Pair<List<BookieSocketAddress>, Boolean> ensembleResponse =
repp.newEnsemble(9, 9, 5, null,
- new HashSet<BookieSocketAddress>());
- List<BookieSocketAddress> ensemble = ensembleResponse.getLeft();
+ List<BookieSocketAddress> ensemble = repp.newEnsemble(9, 9, 5, null,
+ new
HashSet<BookieSocketAddress>()).getResult();
assertEquals(9, getNumCoveredRegionsInWriteQuorum(ensemble, 9));
DistributionSchedule ds = new RoundRobinDistributionSchedule(9, 9, 9);
@@ -1350,9 +1323,8 @@ public class TestRegionAwareEnsemblePlacementPolicy
extends TestCase {
prepareNetworkTopologyForReorderTests(myRegion);
- Pair<List<BookieSocketAddress>, Boolean> ensembleResponse =
repp.newEnsemble(9, 9, 5, null,
- new HashSet<BookieSocketAddress>());
- List<BookieSocketAddress> ensemble = ensembleResponse.getLeft();
+ List<BookieSocketAddress> ensemble = repp.newEnsemble(9, 9, 5, null,
+ new
HashSet<BookieSocketAddress>()).getResult();
assertEquals(9, getNumCoveredRegionsInWriteQuorum(ensemble, 9));
DistributionSchedule ds = new RoundRobinDistributionSchedule(9, 9, 9);