This is an automated email from the ASF dual-hosted git repository.
sijie pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/bookkeeper.git
The following commit(s) were added to refs/heads/master by this push:
new 42fe150 Enhance EnsemblePlacementPolicy and DNSResolverDecorator to
log relevant metrics.
42fe150 is described below
commit 42fe1506afe04b8068b897c530c7ca7188da503e
Author: Charan Reddy Guttapalem <[email protected]>
AuthorDate: Sat Jan 12 18:04:35 2019 -0800
Enhance EnsemblePlacementPolicy and DNSResolverDecorator to log relevant
metrics.
Descriptions of the changes in this PR:
Make changes to EnsemblePlacementPolicy so that it would return boolean
value indicating
if the return value of newEnsemble and replaceBookie are strictly adhering
to corresponding
PlacementPolicy or it fell back to random.
Similarly DNSResolverDecorator should log a metric when it was unable to
resolve rack info and
it is using default rack.
Reviewers: Samuel Just <[email protected]>, Sijie Guo
<[email protected]>, Venkateswararao Jujjuri (JV) <None>
This closes #1883 from reddycharan/enhanceplacementpolicy
---
.../bookkeeper/bookie/BookKeeperServerStats.java | 2 +
.../bookie/LocalBookieEnsemblePlacementPolicy.java | 19 ++-
.../apache/bookkeeper/client/BookKeeperAdmin.java | 16 +-
.../bookkeeper/client/BookieWatcherImpl.java | 71 +++++++--
.../client/DefaultEnsemblePlacementPolicy.java | 29 +++-
.../bookkeeper/client/EnsemblePlacementPolicy.java | 50 ++++--
.../ITopologyAwareEnsemblePlacementPolicy.java | 3 +-
.../client/RackawareEnsemblePlacementPolicy.java | 15 +-
.../RackawareEnsemblePlacementPolicyImpl.java | 100 ++++++++++--
.../client/RegionAwareEnsemblePlacementPolicy.java | 42 ++++-
.../apache/bookkeeper/net/ScriptBasedMapping.java | 8 +-
.../client/GenericEnsemblePlacementPolicyTest.java | 7 +-
.../TestRackawareEnsemblePlacementPolicy.java | 177 +++++++++++++++------
...ackawareEnsemblePlacementPolicyUsingScript.java | 45 ++++--
.../TestRackawarePolicyNotificationUpdates.java | 9 +-
.../TestRegionAwareEnsemblePlacementPolicy.java | 108 ++++++++-----
16 files changed, 515 insertions(+), 186 deletions(-)
diff --git
a/bookkeeper-server/src/main/java/org/apache/bookkeeper/bookie/BookKeeperServerStats.java
b/bookkeeper-server/src/main/java/org/apache/bookkeeper/bookie/BookKeeperServerStats.java
index cdafd15..b58514b 100644
---
a/bookkeeper-server/src/main/java/org/apache/bookkeeper/bookie/BookKeeperServerStats.java
+++
b/bookkeeper-server/src/main/java/org/apache/bookkeeper/bookie/BookKeeperServerStats.java
@@ -69,6 +69,8 @@ public interface BookKeeperServerStats {
String WATCHER_SCOPE = "bookie_watcher";
String REPLACE_BOOKIE_TIME = "REPLACE_BOOKIE_TIME";
String NEW_ENSEMBLE_TIME = "NEW_ENSEMBLE_TIME";
+ String FAILED_TO_RESOLVE_NETWORK_LOCATION_COUNTER =
"FAILED_TO_RESOLVE_NETWORK_LOCATION_COUNTER";
+ String ENSEMBLE_NOT_ADHERING_TO_PLACEMENT_POLICY_COUNTER =
"ENSEMBLE_NOT_ADHERING_TO_PLACEMENT_POLICY_COUNTER";
// Bookie Operations
String BOOKIE_ADD_ENTRY = "BOOKIE_ADD_ENTRY";
diff --git
a/bookkeeper-server/src/main/java/org/apache/bookkeeper/bookie/LocalBookieEnsemblePlacementPolicy.java
b/bookkeeper-server/src/main/java/org/apache/bookkeeper/bookie/LocalBookieEnsemblePlacementPolicy.java
index 46978ea..7b7cc46 100644
---
a/bookkeeper-server/src/main/java/org/apache/bookkeeper/bookie/LocalBookieEnsemblePlacementPolicy.java
+++
b/bookkeeper-server/src/main/java/org/apache/bookkeeper/bookie/LocalBookieEnsemblePlacementPolicy.java
@@ -36,6 +36,7 @@ import org.apache.bookkeeper.feature.FeatureProvider;
import org.apache.bookkeeper.net.BookieSocketAddress;
import org.apache.bookkeeper.net.DNSToSwitchMapping;
import org.apache.bookkeeper.stats.StatsLogger;
+import org.apache.commons.lang3.tuple.Pair;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
@@ -80,9 +81,9 @@ public class LocalBookieEnsemblePlacementPolicy implements
EnsemblePlacementPoli
}
@Override
- public BookieSocketAddress replaceBookie(int ensembleSize, int
writeQuorumSize, int ackQuorumSize,
- java.util.Map<String, byte[]> customMetadata, Set<BookieSocketAddress>
currentEnsemble,
- BookieSocketAddress bookieToReplace, Set<BookieSocketAddress>
excludeBookies)
+ public Pair<BookieSocketAddress, Boolean> replaceBookie(int ensembleSize,
int writeQuorumSize, int ackQuorumSize,
+ java.util.Map<String, byte[]> customMetadata,
List<BookieSocketAddress> currentEnsemble,
+ BookieSocketAddress bookieToReplace, Set<BookieSocketAddress>
excludeBookies)
throws BKNotEnoughBookiesException {
throw new BKNotEnoughBookiesException();
}
@@ -109,18 +110,24 @@ public class LocalBookieEnsemblePlacementPolicy
implements EnsemblePlacementPoli
}
@Override
- public List<BookieSocketAddress> newEnsemble(int ensembleSize, int
writeQuorumSize, int ackQuorumSize,
- java.util.Map<String, byte[]> customMetadata, Set<BookieSocketAddress>
excludeBookies)
+ public Pair<List<BookieSocketAddress>, Boolean> newEnsemble(int
ensembleSize, int writeQuorumSize,
+ int ackQuorumSize, java.util.Map<String, byte[]> customMetadata,
Set<BookieSocketAddress> excludeBookies)
throws BKNotEnoughBookiesException {
if (ensembleSize > 1) {
throw new IllegalArgumentException("Local ensemble policy can only
return 1 bookie");
}
- return Lists.newArrayList(bookieAddress);
+ return Pair.of(Lists.newArrayList(bookieAddress), true);
}
@Override
public void updateBookieInfo(Map<BookieSocketAddress, BookieInfo>
bookieToFreeSpaceMap) {
return;
}
+
+ @Override
+ public boolean
isEnsembleAdheringToPlacementPolicy(List<BookieSocketAddress> ensembleList, int
writeQuorumSize,
+ int ackQuorumSize) {
+ return true;
+ }
}
diff --git
a/bookkeeper-server/src/main/java/org/apache/bookkeeper/client/BookKeeperAdmin.java
b/bookkeeper-server/src/main/java/org/apache/bookkeeper/client/BookKeeperAdmin.java
index 3f837db..7b49292 100644
---
a/bookkeeper-server/src/main/java/org/apache/bookkeeper/client/BookKeeperAdmin.java
+++
b/bookkeeper-server/src/main/java/org/apache/bookkeeper/client/BookKeeperAdmin.java
@@ -35,7 +35,6 @@ import java.util.ArrayList;
import java.util.Collection;
import java.util.Enumeration;
import java.util.HashMap;
-import java.util.HashSet;
import java.util.Iterator;
import java.util.LinkedList;
import java.util.List;
@@ -82,6 +81,7 @@ import
org.apache.bookkeeper.replication.ReplicationException.UnavailableExcepti
import org.apache.bookkeeper.stats.NullStatsLogger;
import org.apache.bookkeeper.stats.StatsLogger;
import org.apache.bookkeeper.util.IOUtils;
+import org.apache.commons.lang3.tuple.Pair;
import org.apache.zookeeper.AsyncCallback;
import org.apache.zookeeper.KeeperException;
import org.slf4j.Logger;
@@ -994,15 +994,25 @@ public class BookKeeperAdmin implements AutoCloseable {
// allocate bookies
for (Integer bookieIndex : bookieIndexesToRereplicate) {
BookieSocketAddress oldBookie = ensemble.get(bookieIndex);
- BookieSocketAddress newBookie =
+ Pair<BookieSocketAddress, Boolean> replaceBookieResponse =
bkc.getPlacementPolicy().replaceBookie(
lh.getLedgerMetadata().getEnsembleSize(),
lh.getLedgerMetadata().getWriteQuorumSize(),
lh.getLedgerMetadata().getAckQuorumSize(),
lh.getLedgerMetadata().getCustomMetadata(),
- new HashSet<>(ensemble),
+ ensemble,
oldBookie,
bookiesToExclude);
+ BookieSocketAddress newBookie = replaceBookieResponse.getLeft();
+ boolean isEnsembleAdheringToPlacementPolicy =
replaceBookieResponse.getRight();
+ if (!isEnsembleAdheringToPlacementPolicy) {
+ if (LOG.isDebugEnabled()) {
+ LOG.debug(
+ "replaceBookie for bookie: {} in ensemble: {} "
+ + "is not adhering to placement policy and
chose {}",
+ oldBookie, ensemble, newBookie);
+ }
+ }
targetBookieAddresses.put(bookieIndex, newBookie);
bookiesToExclude.add(newBookie);
}
diff --git
a/bookkeeper-server/src/main/java/org/apache/bookkeeper/client/BookieWatcherImpl.java
b/bookkeeper-server/src/main/java/org/apache/bookkeeper/client/BookieWatcherImpl.java
index 8b14b77..74c1df9 100644
---
a/bookkeeper-server/src/main/java/org/apache/bookkeeper/client/BookieWatcherImpl.java
+++
b/bookkeeper-server/src/main/java/org/apache/bookkeeper/client/BookieWatcherImpl.java
@@ -17,6 +17,7 @@
*/
package org.apache.bookkeeper.client;
+import static
org.apache.bookkeeper.bookie.BookKeeperServerStats.ENSEMBLE_NOT_ADHERING_TO_PLACEMENT_POLICY_COUNTER;
import static
org.apache.bookkeeper.bookie.BookKeeperServerStats.NEW_ENSEMBLE_TIME;
import static
org.apache.bookkeeper.bookie.BookKeeperServerStats.REPLACE_BOOKIE_TIME;
import static org.apache.bookkeeper.bookie.BookKeeperServerStats.WATCHER_SCOPE;
@@ -35,6 +36,8 @@ import java.util.concurrent.CompletableFuture;
import java.util.concurrent.TimeUnit;
import java.util.function.Function;
import lombok.extern.slf4j.Slf4j;
+
+import org.apache.bookkeeper.bookie.BookKeeperServerStats;
import org.apache.bookkeeper.client.BKException.BKInterruptedException;
import org.apache.bookkeeper.client.BKException.BKNotEnoughBookiesException;
import org.apache.bookkeeper.client.BKException.MetaStoreException;
@@ -43,9 +46,11 @@ import org.apache.bookkeeper.common.util.MathUtils;
import org.apache.bookkeeper.conf.ClientConfiguration;
import org.apache.bookkeeper.discover.RegistrationClient;
import org.apache.bookkeeper.net.BookieSocketAddress;
+import org.apache.bookkeeper.stats.Counter;
import org.apache.bookkeeper.stats.OpStatsLogger;
import org.apache.bookkeeper.stats.StatsLogger;
import org.apache.bookkeeper.stats.annotations.StatsDoc;
+import org.apache.commons.lang3.tuple.Pair;
/**
* This class is responsible for maintaining a consistent view of what bookies
@@ -88,6 +93,12 @@ class BookieWatcherImpl implements BookieWatcher {
help = "operation stats of replacing bookie in an ensemble"
)
private final OpStatsLogger replaceBookieTimer;
+ @StatsDoc(
+ name = ENSEMBLE_NOT_ADHERING_TO_PLACEMENT_POLICY_COUNTER,
+ help = "total number of newEnsemble/replaceBookie operations
failed to adhere"
+ + " EnsemblePlacementPolicy"
+ )
+ private final Counter ensembleNotAdheringToPlacementPolicy;
// Bookies that will not be preferred to be chosen in a new ensemble
final Cache<BookieSocketAddress, Boolean> quarantinedBookies;
@@ -117,6 +128,8 @@ class BookieWatcherImpl implements BookieWatcher {
}).build();
this.newEnsembleTimer =
statsLogger.getOpStatsLogger(NEW_ENSEMBLE_TIME);
this.replaceBookieTimer =
statsLogger.getOpStatsLogger(REPLACE_BOOKIE_TIME);
+ this.ensembleNotAdheringToPlacementPolicy = statsLogger
+
.getCounter(BookKeeperServerStats.ENSEMBLE_NOT_ADHERING_TO_PLACEMENT_POLICY_COUNTER);
}
@Override
@@ -213,19 +226,34 @@ class BookieWatcherImpl implements BookieWatcher {
int ackQuorumSize, Map<String, byte[]> customMetadata)
throws BKNotEnoughBookiesException {
long startTime = MathUtils.nowInNano();
+ Pair<List<BookieSocketAddress>, Boolean> newEnsembleResponse;
List<BookieSocketAddress> socketAddresses;
+ boolean isEnsembleAdheringToPlacementPolicy = false;
try {
- socketAddresses = placementPolicy.newEnsemble(ensembleSize,
- writeQuorumSize, ackQuorumSize, customMetadata, new
HashSet<BookieSocketAddress>(
- quarantinedBookies.asMap().keySet()));
+ Set<BookieSocketAddress> quarantinedBookiesSet =
quarantinedBookies.asMap().keySet();
+ newEnsembleResponse = placementPolicy.newEnsemble(ensembleSize,
writeQuorumSize, ackQuorumSize,
+ customMetadata, new
HashSet<BookieSocketAddress>(quarantinedBookiesSet));
+ socketAddresses = newEnsembleResponse.getLeft();
+ isEnsembleAdheringToPlacementPolicy =
newEnsembleResponse.getRight();
+ if (!isEnsembleAdheringToPlacementPolicy) {
+ ensembleNotAdheringToPlacementPolicy.inc();
+ log.warn("New ensemble: {} is not adhering to Placement
Policy. quarantinedBookies: {}",
+ socketAddresses, quarantinedBookiesSet);
+ }
// we try to only get from the healthy bookies first
newEnsembleTimer.registerSuccessfulEvent(MathUtils.nowInNano() -
startTime, TimeUnit.NANOSECONDS);
} catch (BKNotEnoughBookiesException e) {
if (log.isDebugEnabled()) {
log.debug("Not enough healthy bookies available, using
quarantined bookies");
}
- socketAddresses = placementPolicy.newEnsemble(
+ newEnsembleResponse = placementPolicy.newEnsemble(
ensembleSize, writeQuorumSize, ackQuorumSize,
customMetadata, new HashSet<>());
+ socketAddresses = newEnsembleResponse.getLeft();
+ isEnsembleAdheringToPlacementPolicy =
newEnsembleResponse.getRight();
+ if (!isEnsembleAdheringToPlacementPolicy) {
+ ensembleNotAdheringToPlacementPolicy.inc();
+ log.warn("New ensemble: {} is not adhering to Placement
Policy", socketAddresses);
+ }
newEnsembleTimer.registerFailedEvent(MathUtils.nowInNano() -
startTime, TimeUnit.NANOSECONDS);
}
return socketAddresses;
@@ -239,22 +267,43 @@ class BookieWatcherImpl implements BookieWatcher {
throws BKNotEnoughBookiesException {
long startTime = MathUtils.nowInNano();
BookieSocketAddress addr = existingBookies.get(bookieIdx);
+ Pair<BookieSocketAddress, Boolean> replaceBookieResponse;
BookieSocketAddress socketAddress;
+ boolean isEnsembleAdheringToPlacementPolicy = false;
try {
// we exclude the quarantined bookies also first
- Set<BookieSocketAddress> existingAndQuarantinedBookies = new
HashSet<BookieSocketAddress>(existingBookies);
-
existingAndQuarantinedBookies.addAll(quarantinedBookies.asMap().keySet());
- socketAddress = placementPolicy.replaceBookie(
+ Set<BookieSocketAddress> excludedBookiesAndQuarantinedBookies =
new HashSet<BookieSocketAddress>(
+ excludeBookies);
+ Set<BookieSocketAddress> quarantinedBookiesSet =
quarantinedBookies.asMap().keySet();
+ excludedBookiesAndQuarantinedBookies.addAll(quarantinedBookiesSet);
+ replaceBookieResponse = placementPolicy.replaceBookie(
ensembleSize, writeQuorumSize, ackQuorumSize,
customMetadata,
- existingAndQuarantinedBookies, addr, excludeBookies);
+ existingBookies, addr,
excludedBookiesAndQuarantinedBookies);
+ socketAddress = replaceBookieResponse.getLeft();
+ isEnsembleAdheringToPlacementPolicy =
replaceBookieResponse.getRight();
+ if (!isEnsembleAdheringToPlacementPolicy) {
+ ensembleNotAdheringToPlacementPolicy.inc();
+ log.warn(
+ "replaceBookie for bookie: {} in ensemble: {} is not
adhering to placement policy and"
+ + " chose {}. excludedBookies {} and
quarantinedBookies {}",
+ addr, existingBookies, socketAddress, excludeBookies,
quarantinedBookiesSet);
+ }
replaceBookieTimer.registerSuccessfulEvent(MathUtils.nowInNano() -
startTime, TimeUnit.NANOSECONDS);
} catch (BKNotEnoughBookiesException e) {
if (log.isDebugEnabled()) {
log.debug("Not enough healthy bookies available, using
quarantined bookies");
}
- socketAddress = placementPolicy.replaceBookie(
- ensembleSize, writeQuorumSize, ackQuorumSize,
customMetadata,
- new HashSet<BookieSocketAddress>(existingBookies), addr,
excludeBookies);
+ replaceBookieResponse =
placementPolicy.replaceBookie(ensembleSize, writeQuorumSize, ackQuorumSize,
+ customMetadata, existingBookies, addr, excludeBookies);
+ socketAddress = replaceBookieResponse.getLeft();
+ isEnsembleAdheringToPlacementPolicy =
replaceBookieResponse.getRight();
+ if (!isEnsembleAdheringToPlacementPolicy) {
+ ensembleNotAdheringToPlacementPolicy.inc();
+ log.warn(
+ "replaceBookie for bookie: {} in ensemble: {} is not
adhering to placement policy and"
+ + " chose {}. excludedBookies {}",
+ addr, existingBookies, socketAddress, excludeBookies);
+ }
replaceBookieTimer.registerFailedEvent(MathUtils.nowInNano() -
startTime, TimeUnit.NANOSECONDS);
}
return socketAddress;
diff --git
a/bookkeeper-server/src/main/java/org/apache/bookkeeper/client/DefaultEnsemblePlacementPolicy.java
b/bookkeeper-server/src/main/java/org/apache/bookkeeper/client/DefaultEnsemblePlacementPolicy.java
index 917d18f..dddbe1c 100644
---
a/bookkeeper-server/src/main/java/org/apache/bookkeeper/client/DefaultEnsemblePlacementPolicy.java
+++
b/bookkeeper-server/src/main/java/org/apache/bookkeeper/client/DefaultEnsemblePlacementPolicy.java
@@ -39,6 +39,7 @@ import org.apache.bookkeeper.net.BookieSocketAddress;
import org.apache.bookkeeper.net.DNSToSwitchMapping;
import org.apache.bookkeeper.stats.StatsLogger;
import org.apache.commons.collections4.CollectionUtils;
+import org.apache.commons.lang3.tuple.Pair;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
@@ -64,12 +65,12 @@ public class DefaultEnsemblePlacementPolicy implements
EnsemblePlacementPolicy {
}
@Override
- public ArrayList<BookieSocketAddress> newEnsemble(int ensembleSize, int
quorumSize, int ackQuorumSize,
+ public Pair<List<BookieSocketAddress>, Boolean> newEnsemble(int
ensembleSize, int quorumSize, int ackQuorumSize,
Map<String, byte[]> customMetadata, Set<BookieSocketAddress>
excludeBookies)
throws BKNotEnoughBookiesException {
ArrayList<BookieSocketAddress> newBookies = new
ArrayList<BookieSocketAddress>(ensembleSize);
if (ensembleSize <= 0) {
- return newBookies;
+ return Pair.of(newBookies, false);
}
List<BookieSocketAddress> allBookies;
rwLock.readLock().lock();
@@ -95,7 +96,8 @@ public class DefaultEnsemblePlacementPolicy implements
EnsemblePlacementPolicy {
newBookies.add(b);
--ensembleSize;
if (ensembleSize == 0) {
- return newBookies;
+ return Pair.of(newBookies,
+
isEnsembleAdheringToPlacementPolicy(newBookies, quorumSize, ackQuorumSize));
}
}
} finally {
@@ -110,7 +112,8 @@ public class DefaultEnsemblePlacementPolicy implements
EnsemblePlacementPolicy {
newBookies.add(bookie);
--ensembleSize;
if (ensembleSize == 0) {
- return newBookies;
+ return Pair.of(newBookies,
+ isEnsembleAdheringToPlacementPolicy(newBookies,
quorumSize, ackQuorumSize));
}
}
}
@@ -118,13 +121,17 @@ public class DefaultEnsemblePlacementPolicy implements
EnsemblePlacementPolicy {
}
@Override
- public BookieSocketAddress replaceBookie(int ensembleSize, int
writeQuorumSize, int ackQuorumSize,
- Map<String, byte[]> customMetadata, Set<BookieSocketAddress>
currentEnsemble,
+ public Pair<BookieSocketAddress, Boolean> replaceBookie(int ensembleSize,
int writeQuorumSize, int ackQuorumSize,
+ Map<String, byte[]> customMetadata, List<BookieSocketAddress>
currentEnsemble,
BookieSocketAddress bookieToReplace, Set<BookieSocketAddress>
excludeBookies)
throws BKNotEnoughBookiesException {
excludeBookies.addAll(currentEnsemble);
- ArrayList<BookieSocketAddress> addresses = newEnsemble(1, 1, 1,
customMetadata, excludeBookies);
- return addresses.get(0);
+ List<BookieSocketAddress> addresses = newEnsemble(1, 1, 1,
customMetadata, excludeBookies).getLeft();
+
+ BookieSocketAddress candidateAddr = addresses.get(0);
+ List<BookieSocketAddress> newEnsemble = new
ArrayList<BookieSocketAddress>(currentEnsemble);
+ newEnsemble.set(currentEnsemble.indexOf(bookieToReplace),
candidateAddr);
+ return Pair.of(candidateAddr,
isEnsembleAdheringToPlacementPolicy(newEnsemble, writeQuorumSize,
ackQuorumSize));
}
@Override
@@ -210,4 +217,10 @@ public class DefaultEnsemblePlacementPolicy implements
EnsemblePlacementPolicy {
public void uninitalize() {
// do nothing
}
+
+ @Override
+ public boolean
isEnsembleAdheringToPlacementPolicy(List<BookieSocketAddress> ensembleList, int
writeQuorumSize,
+ int ackQuorumSize) {
+ return true;
+ }
}
diff --git
a/bookkeeper-server/src/main/java/org/apache/bookkeeper/client/EnsemblePlacementPolicy.java
b/bookkeeper-server/src/main/java/org/apache/bookkeeper/client/EnsemblePlacementPolicy.java
index 23932a3..00bac8e 100644
---
a/bookkeeper-server/src/main/java/org/apache/bookkeeper/client/EnsemblePlacementPolicy.java
+++
b/bookkeeper-server/src/main/java/org/apache/bookkeeper/client/EnsemblePlacementPolicy.java
@@ -35,6 +35,7 @@ import org.apache.bookkeeper.feature.FeatureProvider;
import org.apache.bookkeeper.net.BookieSocketAddress;
import org.apache.bookkeeper.net.DNSToSwitchMapping;
import org.apache.bookkeeper.stats.StatsLogger;
+import org.apache.commons.lang3.tuple.Pair;
/**
* {@link EnsemblePlacementPolicy} encapsulates the algorithm that bookkeeper
client uses to select a number of bookies
@@ -263,12 +264,12 @@ public interface EnsemblePlacementPolicy {
* @throws BKNotEnoughBookiesException if not enough bookies available.
* @return the List<org.apache.bookkeeper.net.BookieSocketAddress>
*/
- List<BookieSocketAddress> newEnsemble(int ensembleSize,
- int writeQuorumSize,
- int ackQuorumSize,
- Map<String, byte[]>
customMetadata,
- Set<BookieSocketAddress>
excludeBookies)
- throws BKNotEnoughBookiesException;
+ Pair<List<BookieSocketAddress>, Boolean> newEnsemble(int ensembleSize,
+ int writeQuorumSize,
+ int ackQuorumSize,
+ Map<String, byte[]>
customMetadata,
+
Set<BookieSocketAddress> excludeBookies)
+ throws BKNotEnoughBookiesException;
/**
* Choose a new bookie to replace <i>bookieToReplace</i>. If no bookie
available in the cluster,
@@ -287,14 +288,14 @@ public interface EnsemblePlacementPolicy {
* @throws BKNotEnoughBookiesException
* @return the org.apache.bookkeeper.net.BookieSocketAddress
*/
- BookieSocketAddress replaceBookie(int ensembleSize,
- int writeQuorumSize,
- int ackQuorumSize,
- Map<String, byte[]> customMetadata,
- Set<BookieSocketAddress> currentEnsemble,
- BookieSocketAddress bookieToReplace,
- Set<BookieSocketAddress> excludeBookies)
- throws BKNotEnoughBookiesException;
+ Pair<BookieSocketAddress, Boolean> replaceBookie(int ensembleSize,
+ int writeQuorumSize,
+ int ackQuorumSize,
+ Map<String, byte[]>
customMetadata,
+ List<BookieSocketAddress>
currentEnsemble,
+ BookieSocketAddress
bookieToReplace,
+ Set<BookieSocketAddress>
excludeBookies)
+ throws BKNotEnoughBookiesException;
/**
* Register a bookie as slow so that it is tried after available and
read-only bookies.
@@ -386,4 +387,25 @@ public interface EnsemblePlacementPolicy {
return (currentStickyBookieIndex.get() + 1) %
metadata.getEnsembleSize();
}
}
+
+ /**
+ * returns true if the Ensemble is strictly adhering to placement policy,
+ * like in the case of RackawareEnsemblePlacementPolicy, bookies in the
+ * writeset are from 'minNumRacksPerWriteQuorum' number of racks. And in
the
+ * case of RegionawareEnsemblePlacementPolicy, check for
+ * minimumRegionsForDurability, reppRegionsToWrite, rack distribution
within
+ * a region and other parameters of RegionAwareEnsemblePlacementPolicy.
+ *
+ * @param ensembleList
+ * list of BookieSocketAddress of bookies in the ensemble
+ * @param writeQuorumSize
+ * writeQuorumSize of the ensemble
+ * @param ackQuorumSize
+ * ackQuorumSize of the ensemble
+ * @return
+ */
+ default boolean
isEnsembleAdheringToPlacementPolicy(List<BookieSocketAddress> ensembleList, int
writeQuorumSize,
+ int ackQuorumSize) {
+ return false;
+ }
}
diff --git
a/bookkeeper-server/src/main/java/org/apache/bookkeeper/client/ITopologyAwareEnsemblePlacementPolicy.java
b/bookkeeper-server/src/main/java/org/apache/bookkeeper/client/ITopologyAwareEnsemblePlacementPolicy.java
index 7c9e07c..254f535 100644
---
a/bookkeeper-server/src/main/java/org/apache/bookkeeper/client/ITopologyAwareEnsemblePlacementPolicy.java
+++
b/bookkeeper-server/src/main/java/org/apache/bookkeeper/client/ITopologyAwareEnsemblePlacementPolicy.java
@@ -28,6 +28,7 @@ import
org.apache.bookkeeper.common.annotation.InterfaceAudience;
import org.apache.bookkeeper.common.annotation.InterfaceStability;
import org.apache.bookkeeper.net.BookieSocketAddress;
import org.apache.bookkeeper.net.Node;
+import org.apache.commons.lang3.tuple.Pair;
/**
* Interface for topology aware ensemble placement policy.
@@ -93,7 +94,7 @@ public interface ITopologyAwareEnsemblePlacementPolicy<T
extends Node> extends E
* @return list of bookies forming the ensemble
* @throws BKException.BKNotEnoughBookiesException
*/
- List<BookieSocketAddress> newEnsemble(
+ Pair<List<BookieSocketAddress>, Boolean> newEnsemble(
int ensembleSize,
int writeQuorumSize,
int ackQuorumSize,
diff --git
a/bookkeeper-server/src/main/java/org/apache/bookkeeper/client/RackawareEnsemblePlacementPolicy.java
b/bookkeeper-server/src/main/java/org/apache/bookkeeper/client/RackawareEnsemblePlacementPolicy.java
index 1fd7580..8054d97 100644
---
a/bookkeeper-server/src/main/java/org/apache/bookkeeper/client/RackawareEnsemblePlacementPolicy.java
+++
b/bookkeeper-server/src/main/java/org/apache/bookkeeper/client/RackawareEnsemblePlacementPolicy.java
@@ -18,19 +18,16 @@
package org.apache.bookkeeper.client;
import io.netty.util.HashedWheelTimer;
-
import java.util.List;
import java.util.Map;
import java.util.Set;
import org.apache.bookkeeper.client.BKException.BKNotEnoughBookiesException;
-import
org.apache.bookkeeper.client.ITopologyAwareEnsemblePlacementPolicy.Ensemble;
-import
org.apache.bookkeeper.client.ITopologyAwareEnsemblePlacementPolicy.Predicate;
-import
org.apache.bookkeeper.client.TopologyAwareEnsemblePlacementPolicy.BookieNode;
import org.apache.bookkeeper.net.BookieSocketAddress;
import org.apache.bookkeeper.net.DNSToSwitchMapping;
import org.apache.bookkeeper.net.Node;
import org.apache.bookkeeper.stats.StatsLogger;
+import org.apache.commons.lang3.tuple.Pair;
/**
* A placement policy implementation use rack information for placing
ensembles.
@@ -95,8 +92,8 @@ public class RackawareEnsemblePlacementPolicy extends
RackawareEnsemblePlacement
}
@Override
- public List<BookieSocketAddress> newEnsemble(int ensembleSize, int
writeQuorumSize, int ackQuorumSize,
- Map<String, byte[]> customMetadata, Set<BookieSocketAddress>
excludeBookies)
+ public Pair<List<BookieSocketAddress>, Boolean> newEnsemble(int
ensembleSize, int writeQuorumSize,
+ int ackQuorumSize, Map<String, byte[]> customMetadata,
Set<BookieSocketAddress> excludeBookies)
throws BKException.BKNotEnoughBookiesException {
try {
return super.newEnsemble(ensembleSize, writeQuorumSize,
ackQuorumSize, customMetadata, excludeBookies);
@@ -110,8 +107,8 @@ public class RackawareEnsemblePlacementPolicy extends
RackawareEnsemblePlacement
}
@Override
- public BookieSocketAddress replaceBookie(int ensembleSize, int
writeQuorumSize, int ackQuorumSize,
- Map<String, byte[]> customMetadata, Set<BookieSocketAddress>
currentEnsemble,
+ public Pair<BookieSocketAddress, Boolean> replaceBookie(int ensembleSize,
int writeQuorumSize, int ackQuorumSize,
+ Map<String, byte[]> customMetadata, List<BookieSocketAddress>
currentEnsemble,
BookieSocketAddress bookieToReplace, Set<BookieSocketAddress>
excludeBookies)
throws BKException.BKNotEnoughBookiesException {
try {
@@ -146,7 +143,7 @@ public class RackawareEnsemblePlacementPolicy extends
RackawareEnsemblePlacement
}
@Override
- public List<BookieSocketAddress> newEnsemble(int ensembleSize,
+ public Pair<List<BookieSocketAddress>, Boolean> newEnsemble(int
ensembleSize,
int writeQuorumSize,
int ackQuorumSize,
Set<BookieSocketAddress>
excludeBookies,
diff --git
a/bookkeeper-server/src/main/java/org/apache/bookkeeper/client/RackawareEnsemblePlacementPolicyImpl.java
b/bookkeeper-server/src/main/java/org/apache/bookkeeper/client/RackawareEnsemblePlacementPolicyImpl.java
index 7578c41..6db7de8 100644
---
a/bookkeeper-server/src/main/java/org/apache/bookkeeper/client/RackawareEnsemblePlacementPolicyImpl.java
+++
b/bookkeeper-server/src/main/java/org/apache/bookkeeper/client/RackawareEnsemblePlacementPolicyImpl.java
@@ -20,6 +20,7 @@ package org.apache.bookkeeper.client;
import static com.google.common.base.Preconditions.checkNotNull;
import static
org.apache.bookkeeper.bookie.BookKeeperServerStats.BOOKIES_JOINED;
import static org.apache.bookkeeper.bookie.BookKeeperServerStats.BOOKIES_LEFT;
+import static
org.apache.bookkeeper.bookie.BookKeeperServerStats.FAILED_TO_RESOLVE_NETWORK_LOCATION_COUNTER;
import static org.apache.bookkeeper.client.BookKeeperClientStats.CLIENT_SCOPE;
import static
org.apache.bookkeeper.client.BookKeeperClientStats.READ_REQUESTS_REORDERED;
import static
org.apache.bookkeeper.client.RegionAwareEnsemblePlacementPolicy.UNKNOWN_REGION;
@@ -35,6 +36,7 @@ import io.netty.util.HashedWheelTimer;
import java.net.InetAddress;
import java.net.UnknownHostException;
import java.util.ArrayList;
+import java.util.Collection;
import java.util.Collections;
import java.util.HashMap;
import java.util.HashSet;
@@ -64,10 +66,12 @@ import org.apache.bookkeeper.net.Node;
import org.apache.bookkeeper.net.NodeBase;
import org.apache.bookkeeper.net.ScriptBasedMapping;
import org.apache.bookkeeper.net.StabilizeNetworkTopology;
+import org.apache.bookkeeper.stats.Counter;
import org.apache.bookkeeper.stats.OpStatsLogger;
import org.apache.bookkeeper.stats.StatsLogger;
import org.apache.bookkeeper.stats.annotations.StatsDoc;
import org.apache.commons.collections4.CollectionUtils;
+import org.apache.commons.lang3.tuple.Pair;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
@@ -146,12 +150,19 @@ public class RackawareEnsemblePlacementPolicyImpl extends
TopologyAwareEnsembleP
final Supplier<String> defaultRackSupplier;
final DNSToSwitchMapping resolver;
-
- DNSResolverDecorator(DNSToSwitchMapping resolver, Supplier<String>
defaultRackSupplier) {
+ @StatsDoc(
+ name = FAILED_TO_RESOLVE_NETWORK_LOCATION_COUNTER,
+ help = "total number of times Resolver failed to resolve rack
information of a node"
+ )
+ final Counter failedToResolveNetworkLocationCounter;
+
+ DNSResolverDecorator(DNSToSwitchMapping resolver, Supplier<String>
defaultRackSupplier,
+ Counter failedToResolveNetworkLocationCounter) {
checkNotNull(resolver, "Resolver cannot be null");
checkNotNull(defaultRackSupplier, "defaultRackSupplier should not
be null");
this.defaultRackSupplier = defaultRackSupplier;
this.resolver = resolver;
+ this.failedToResolveNetworkLocationCounter =
failedToResolveNetworkLocationCounter;
}
public List<String> resolve(List<String> names) {
@@ -167,6 +178,7 @@ public class RackawareEnsemblePlacementPolicyImpl extends
TopologyAwareEnsembleP
if (rNames.get(i) == null) {
LOG.warn("Failed to resolve network location for {},
using default rack for it : {}.",
names.get(i), defaultRack);
+ failedToResolveNetworkLocationCounter.inc();
rNames.set(i, defaultRack);
}
}
@@ -178,6 +190,7 @@ public class RackawareEnsemblePlacementPolicyImpl extends
TopologyAwareEnsembleP
rNames = new ArrayList<>(names.size());
for (int i = 0; i < names.size(); ++i) {
+ failedToResolveNetworkLocationCounter.inc();
rNames.add(defaultRack);
}
return rNames;
@@ -227,6 +240,7 @@ public class RackawareEnsemblePlacementPolicyImpl extends
TopologyAwareEnsembleP
help = "The distribution of number of bookies reordered on each read
request"
)
protected OpStatsLogger readReorderedCounter = null;
+ protected Counter failedToResolveNetworkLocationCounter = null;
private String defaultRack = NetworkTopology.DEFAULT_RACK;
@@ -267,10 +281,13 @@ public class RackawareEnsemblePlacementPolicyImpl extends
TopologyAwareEnsembleP
this.bookiesJoinedCounter =
statsLogger.getOpStatsLogger(BOOKIES_JOINED);
this.bookiesLeftCounter = statsLogger.getOpStatsLogger(BOOKIES_LEFT);
this.readReorderedCounter =
statsLogger.getOpStatsLogger(READ_REQUESTS_REORDERED);
+ this.failedToResolveNetworkLocationCounter = statsLogger
+ .getCounter(FAILED_TO_RESOLVE_NETWORK_LOCATION_COUNTER);
this.reorderReadsRandom = reorderReadsRandom;
this.stabilizePeriodSeconds = stabilizePeriodSeconds;
this.reorderThresholdPendingRequests = reorderThresholdPendingRequests;
- this.dnsResolver = new DNSResolverDecorator(dnsResolver, () ->
this.getDefaultRack());
+ this.dnsResolver = new DNSResolverDecorator(dnsResolver, () ->
this.getDefaultRack(),
+ failedToResolveNetworkLocationCounter);
this.timer = timer;
this.minNumRacksPerWriteQuorum = minNumRacksPerWriteQuorum;
this.enforceMinNumRacksPerWriteQuorum =
enforceMinNumRacksPerWriteQuorum;
@@ -342,9 +359,18 @@ public class RackawareEnsemblePlacementPolicyImpl extends
TopologyAwareEnsembleP
((RackChangeNotifier)
dnsResolver).registerRackChangeListener(this);
}
} catch (RuntimeException re) {
- LOG.info("Failed to initialize DNS Resolver {}, used default
subnet resolver : {}",
- dnsResolverName, re, re.getMessage());
- dnsResolver = new DefaultResolver(() -> this.getDefaultRack());
+ if (!enforceMinNumRacksPerWriteQuorum) {
+ LOG.info("Failed to initialize DNS Resolver {}, used
default subnet resolver : {}", dnsResolverName,
+ re, re.getMessage());
+ dnsResolver = new DefaultResolver(() ->
this.getDefaultRack());
+ } else {
+ /*
+ * if minNumRacksPerWriteQuorum is enforced, then it
+ * shouldn't continue in the case of failure to create
+ * dnsResolver.
+ */
+ throw re;
+ }
}
}
slowBookies = CacheBuilder.newBuilder()
@@ -479,7 +505,7 @@ public class RackawareEnsemblePlacementPolicyImpl extends
TopologyAwareEnsembleP
}
}
- protected Set<Node> convertBookiesToNodes(Set<BookieSocketAddress>
excludeBookies) {
+ protected Set<Node> convertBookiesToNodes(Collection<BookieSocketAddress>
excludeBookies) {
Set<Node> nodes = new HashSet<Node>();
for (BookieSocketAddress addr : excludeBookies) {
BookieNode bn = knownBookies.get(addr);
@@ -500,13 +526,13 @@ public class RackawareEnsemblePlacementPolicyImpl extends
TopologyAwareEnsembleP
}
@Override
- public List<BookieSocketAddress> newEnsemble(int ensembleSize, int
writeQuorumSize, int ackQuorumSize,
- Map<String, byte[]> customMetadata, Set<BookieSocketAddress>
excludeBookies)
+ public Pair<List<BookieSocketAddress>, Boolean> newEnsemble(int
ensembleSize, int writeQuorumSize,
+ int ackQuorumSize, Map<String, byte[]> customMetadata,
Set<BookieSocketAddress> excludeBookies)
throws BKNotEnoughBookiesException {
return newEnsembleInternal(ensembleSize, writeQuorumSize,
excludeBookies, null, null);
}
- protected List<BookieSocketAddress> newEnsembleInternal(int ensembleSize,
+ protected Pair<List<BookieSocketAddress>, Boolean> newEnsembleInternal(int
ensembleSize,
int
writeQuorumSize,
Set<BookieSocketAddress> excludeBookies,
Ensemble<BookieNode> parentEnsemble,
@@ -522,7 +548,7 @@ public class RackawareEnsemblePlacementPolicyImpl extends
TopologyAwareEnsembleP
}
@Override
- public List<BookieSocketAddress> newEnsemble(int ensembleSize,
+ public Pair<List<BookieSocketAddress>, Boolean> newEnsemble(int
ensembleSize,
int writeQuorumSize,
int ackQuorumSize,
Set<BookieSocketAddress>
excludeBookies,
@@ -538,7 +564,7 @@ public class RackawareEnsemblePlacementPolicyImpl extends
TopologyAwareEnsembleP
parentPredicate);
}
- protected List<BookieSocketAddress> newEnsembleInternal(
+ protected Pair<List<BookieSocketAddress>, Boolean> newEnsembleInternal(
int ensembleSize,
int writeQuorumSize,
int ackQuorumSize,
@@ -572,7 +598,7 @@ public class RackawareEnsemblePlacementPolicyImpl extends
TopologyAwareEnsembleP
for (BookieNode bn : bns) {
addrs.add(bn.getAddr());
}
- return addrs;
+ return Pair.of(addrs, false);
}
for (int i = 0; i < ensembleSize; i++) {
@@ -596,15 +622,15 @@ public class RackawareEnsemblePlacementPolicyImpl extends
TopologyAwareEnsembleP
ensembleSize, bookieList);
throw new BKNotEnoughBookiesException();
}
- return bookieList;
+ return Pair.of(bookieList,
isEnsembleAdheringToPlacementPolicy(bookieList, writeQuorumSize,
ackQuorumSize));
} finally {
rwLock.readLock().unlock();
}
}
@Override
- public BookieSocketAddress replaceBookie(int ensembleSize, int
writeQuorumSize, int ackQuorumSize,
- Map<String, byte[]> customMetadata, Set<BookieSocketAddress>
currentEnsemble,
+ public Pair<BookieSocketAddress, Boolean> replaceBookie(int ensembleSize,
int writeQuorumSize, int ackQuorumSize,
+ Map<String, byte[]> customMetadata, List<BookieSocketAddress>
currentEnsemble,
BookieSocketAddress bookieToReplace, Set<BookieSocketAddress>
excludeBookies)
throws BKNotEnoughBookiesException {
rwLock.readLock().lock();
@@ -639,7 +665,19 @@ public class RackawareEnsemblePlacementPolicyImpl extends
TopologyAwareEnsembleP
if (LOG.isDebugEnabled()) {
LOG.debug("Bookie {} is chosen to replace bookie {}.",
candidate, bn);
}
- return candidate.getAddr();
+ BookieSocketAddress candidateAddr = candidate.getAddr();
+ List<BookieSocketAddress> newEnsemble = new
ArrayList<BookieSocketAddress>(currentEnsemble);
+ if (currentEnsemble.isEmpty()) {
+ /*
+ * in testing code there are test cases which would pass empty
+ * currentEnsemble
+ */
+ newEnsemble.add(candidateAddr);
+ } else {
+ newEnsemble.set(currentEnsemble.indexOf(bookieToReplace),
candidateAddr);
+ }
+ return Pair.of(candidateAddr,
+ isEnsembleAdheringToPlacementPolicy(newEnsemble,
writeQuorumSize, ackQuorumSize));
} finally {
rwLock.readLock().unlock();
}
@@ -1206,4 +1244,32 @@ public class RackawareEnsemblePlacementPolicyImpl
extends TopologyAwareEnsembleP
}
}
}
+
+ @Override
+ public boolean
isEnsembleAdheringToPlacementPolicy(List<BookieSocketAddress> ensembleList, int
writeQuorumSize,
+ int ackQuorumSize) {
+ int ensembleSize = ensembleList.size();
+ int minNumRacksPerWriteQuorumForThisEnsemble =
Math.min(writeQuorumSize, minNumRacksPerWriteQuorum);
+ HashSet<String> racksOrRegionsInQuorum = new HashSet<String>();
+ BookieSocketAddress bookie;
+ for (int i = 0; i < ensembleList.size(); i++) {
+ racksOrRegionsInQuorum.clear();
+ for (int j = 0; j < writeQuorumSize; j++) {
+ bookie = ensembleList.get((i + j) % ensembleSize);
+ try {
+
racksOrRegionsInQuorum.add(knownBookies.get(bookie).getNetworkLocation());
+ } catch (Exception e) {
+ /*
+ * any issue/exception in analyzing whether ensemble is
strictly adhering to
+ * placement policy should be swallowed.
+ */
+ LOG.warn("Received exception while trying to get network
location of bookie: {}", bookie, e);
+ }
+ }
+ if (racksOrRegionsInQuorum.size() <
minNumRacksPerWriteQuorumForThisEnsemble) {
+ return false;
+ }
+ }
+ return true;
+ }
}
diff --git
a/bookkeeper-server/src/main/java/org/apache/bookkeeper/client/RegionAwareEnsemblePlacementPolicy.java
b/bookkeeper-server/src/main/java/org/apache/bookkeeper/client/RegionAwareEnsemblePlacementPolicy.java
index c52e0fe..1bd4b75 100644
---
a/bookkeeper-server/src/main/java/org/apache/bookkeeper/client/RegionAwareEnsemblePlacementPolicy.java
+++
b/bookkeeper-server/src/main/java/org/apache/bookkeeper/client/RegionAwareEnsemblePlacementPolicy.java
@@ -225,8 +225,8 @@ public class RegionAwareEnsemblePlacementPolicy extends
RackawareEnsemblePlaceme
@Override
- public List<BookieSocketAddress> newEnsemble(int ensembleSize, int
writeQuorumSize, int ackQuorumSize,
- Map<String, byte[]> customMetadata, Set<BookieSocketAddress>
excludeBookies)
+ public Pair<List<BookieSocketAddress>, Boolean> newEnsemble(int
ensembleSize, int writeQuorumSize,
+ int ackQuorumSize, Map<String, byte[]> customMetadata,
Set<BookieSocketAddress> excludeBookies)
throws BKException.BKNotEnoughBookiesException {
int effectiveMinRegionsForDurability =
disableDurabilityFeature.isAvailable() ? 1 : minRegionsForDurability;
@@ -279,7 +279,7 @@ public class RegionAwareEnsemblePlacementPolicy extends
RackawareEnsemblePlaceme
for (BookieNode bn : bns) {
addrs.add(bn.getAddr());
}
- return addrs;
+ return Pair.of(addrs,
isEnsembleAdheringToPlacementPolicy(addrs, writeQuorumSize, ackQuorumSize));
}
// Single region, fall back to RackAwareEnsemblePlacement
@@ -347,7 +347,7 @@ public class RegionAwareEnsemblePlacementPolicy extends
RackawareEnsemblePlaceme
try {
List<BookieSocketAddress> allocated =
policyWithinRegion.newEnsemble(newEnsembleSize,
newWriteQuorumSize,
newWriteQuorumSize, excludeBookies, tempEnsemble,
- tempEnsemble);
+ tempEnsemble).getLeft();
ensemble = tempEnsemble;
remainingEnsemble -= addToEnsembleSize;
remainingWriteQuorum -= addToWriteQuorum;
@@ -407,15 +407,17 @@ public class RegionAwareEnsemblePlacementPolicy extends
RackawareEnsemblePlaceme
throw new BKException.BKNotEnoughBookiesException();
}
LOG.info("Bookies allocated successfully {}", ensemble);
- return ensemble.toList();
+ List<BookieSocketAddress> ensembleList = ensemble.toList();
+ return Pair.of(ensembleList,
+ isEnsembleAdheringToPlacementPolicy(ensembleList,
writeQuorumSize, ackQuorumSize));
} finally {
rwLock.readLock().unlock();
}
}
@Override
- public BookieSocketAddress replaceBookie(int ensembleSize, int
writeQuorumSize, int ackQuorumSize,
- Map<String, byte[]> customMetadata, Set<BookieSocketAddress>
currentEnsemble,
+ public Pair<BookieSocketAddress, Boolean> replaceBookie(int ensembleSize,
int writeQuorumSize, int ackQuorumSize,
+ Map<String, byte[]> customMetadata, List<BookieSocketAddress>
currentEnsemble,
BookieSocketAddress bookieToReplace, Set<BookieSocketAddress>
excludeBookies)
throws BKException.BKNotEnoughBookiesException {
rwLock.readLock().lock();
@@ -469,7 +471,19 @@ public class RegionAwareEnsemblePlacementPolicy extends
RackawareEnsemblePlaceme
if (LOG.isDebugEnabled()) {
LOG.debug("Bookie {} is chosen to replace bookie {}.",
candidate, bookieNodeToReplace);
}
- return candidate.getAddr();
+ BookieSocketAddress candidateAddr = candidate.getAddr();
+ List<BookieSocketAddress> newEnsemble = new
ArrayList<BookieSocketAddress>(currentEnsemble);
+ if (currentEnsemble.isEmpty()) {
+ /*
+ * in testing code there are test cases which would pass empty
+ * currentEnsemble
+ */
+ newEnsemble.add(candidateAddr);
+ } else {
+ newEnsemble.set(currentEnsemble.indexOf(bookieToReplace),
candidateAddr);
+ }
+ return Pair.of(candidateAddr,
+ isEnsembleAdheringToPlacementPolicy(newEnsemble,
writeQuorumSize, ackQuorumSize));
} finally {
rwLock.readLock().unlock();
}
@@ -550,4 +564,16 @@ public class RegionAwareEnsemblePlacementPolicy extends
RackawareEnsemblePlaceme
finalList.addMissingIndices(ensemble.size());
return finalList;
}
+
+ @Override
+ public boolean
isEnsembleAdheringToPlacementPolicy(List<BookieSocketAddress> ensembleList, int
writeQuorumSize,
+ int ackQuorumSize) {
+ /**
+ * TODO: have to implement actual logic for this method for
+ * RegionAwareEnsemblePlacementPolicy. For now return true value.
+ *
+ * - https://github.com/apache/bookkeeper/issues/1898
+ */
+ return true;
+ }
}
diff --git
a/bookkeeper-server/src/main/java/org/apache/bookkeeper/net/ScriptBasedMapping.java
b/bookkeeper-server/src/main/java/org/apache/bookkeeper/net/ScriptBasedMapping.java
index 8c8350c..230f66d 100644
---
a/bookkeeper-server/src/main/java/org/apache/bookkeeper/net/ScriptBasedMapping.java
+++
b/bookkeeper-server/src/main/java/org/apache/bookkeeper/net/ScriptBasedMapping.java
@@ -25,8 +25,8 @@ import java.util.StringTokenizer;
import org.apache.bookkeeper.util.Shell.ShellCommandExecutor;
import org.apache.commons.configuration.Configuration;
-import org.apache.commons.logging.Log;
-import org.apache.commons.logging.LogFactory;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
/**
* This class implements the {@link DNSToSwitchMapping} interface using a
@@ -129,7 +129,7 @@ public final class ScriptBasedMapping extends
CachedDNSToSwitchMapping {
private static final class RawScriptBasedMapping extends
AbstractDNSToSwitchMapping {
private String scriptName;
private int maxArgs; //max hostnames per call of the script
- private static final Log LOG =
LogFactory.getLog(ScriptBasedMapping.class);
+ private static final Logger LOG =
LoggerFactory.getLogger(ScriptBasedMapping.class);
/**
* Set the configuration and extract the configuration parameters of
interest.
@@ -233,7 +233,7 @@ public final class ScriptBasedMapping extends
CachedDNSToSwitchMapping {
s.execute();
allOutput.append(s.getOutput()).append(" ");
} catch (Exception e) {
- LOG.warn("Exception running " + s, e);
+ LOG.warn("Exception running: {} Exception message: {}", s,
e.getMessage());
return null;
}
loopCount++;
diff --git
a/bookkeeper-server/src/test/java/org/apache/bookkeeper/client/GenericEnsemblePlacementPolicyTest.java
b/bookkeeper-server/src/test/java/org/apache/bookkeeper/client/GenericEnsemblePlacementPolicyTest.java
index bb55d0c..205c5f4 100644
---
a/bookkeeper-server/src/test/java/org/apache/bookkeeper/client/GenericEnsemblePlacementPolicyTest.java
+++
b/bookkeeper-server/src/test/java/org/apache/bookkeeper/client/GenericEnsemblePlacementPolicyTest.java
@@ -33,6 +33,7 @@ import java.util.Set;
import org.apache.bookkeeper.net.BookieSocketAddress;
import org.apache.bookkeeper.test.BookKeeperClusterTestCase;
+import org.apache.commons.lang3.tuple.Pair;
import org.junit.Before;
import org.junit.Test;
import org.junit.runner.RunWith;
@@ -69,8 +70,8 @@ public class GenericEnsemblePlacementPolicyTest extends
BookKeeperClusterTestCas
public static final class CustomEnsemblePlacementPolicy extends
DefaultEnsemblePlacementPolicy {
@Override
- public BookieSocketAddress replaceBookie(int ensembleSize, int
writeQuorumSize,
- int ackQuorumSize, Map<String, byte[]> customMetadata,
Set<BookieSocketAddress> currentEnsemble,
+ public Pair<BookieSocketAddress, Boolean> replaceBookie(int
ensembleSize, int writeQuorumSize,
+ int ackQuorumSize, Map<String, byte[]> customMetadata,
List<BookieSocketAddress> currentEnsemble,
BookieSocketAddress bookieToReplace, Set<BookieSocketAddress>
excludeBookies)
throws BKException.BKNotEnoughBookiesException {
new Exception("replaceBookie " + ensembleSize + "," +
customMetadata).printStackTrace();
@@ -81,7 +82,7 @@ public class GenericEnsemblePlacementPolicyTest extends
BookKeeperClusterTestCas
}
@Override
- public ArrayList<BookieSocketAddress> newEnsemble(int ensembleSize,
int quorumSize,
+ public Pair<List<BookieSocketAddress>, Boolean> newEnsemble(int
ensembleSize, int quorumSize,
int ackQuorumSize, Map<String, byte[]> customMetadata,
Set<BookieSocketAddress> excludeBookies)
throws BKException.BKNotEnoughBookiesException {
assertNotNull(customMetadata);
diff --git
a/bookkeeper-server/src/test/java/org/apache/bookkeeper/client/TestRackawareEnsemblePlacementPolicy.java
b/bookkeeper-server/src/test/java/org/apache/bookkeeper/client/TestRackawareEnsemblePlacementPolicy.java
index e75e93f..e6cd07b 100644
---
a/bookkeeper-server/src/test/java/org/apache/bookkeeper/client/TestRackawareEnsemblePlacementPolicy.java
+++
b/bookkeeper-server/src/test/java/org/apache/bookkeeper/client/TestRackawareEnsemblePlacementPolicy.java
@@ -52,6 +52,7 @@ import org.apache.bookkeeper.net.NetworkTopology;
import org.apache.bookkeeper.net.Node;
import org.apache.bookkeeper.stats.NullStatsLogger;
import org.apache.bookkeeper.util.StaticDNSResolver;
+import org.apache.commons.lang3.tuple.Pair;
import org.junit.Test;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
@@ -69,6 +70,7 @@ public class TestRackawareEnsemblePlacementPolicy extends
TestCase {
ClientConfiguration conf = new ClientConfiguration();
BookieSocketAddress addr1, addr2, addr3, addr4;
io.netty.util.HashedWheelTimer timer;
+ final int minNumRacksPerWriteQuorumConfValue = 2;
@Override
protected void setUp() throws Exception {
@@ -80,6 +82,7 @@ public class TestRackawareEnsemblePlacementPolicy extends
TestCase {
StaticDNSResolver.addNodeToRack("localhost",
NetworkTopology.DEFAULT_REGION_AND_RACK);
LOG.info("Set up static DNS Resolver.");
conf.setProperty(REPP_DNS_RESOLVER_CLASS,
StaticDNSResolver.class.getName());
+ conf.setMinNumRacksPerWriteQuorum(minNumRacksPerWriteQuorumConfValue);
addr1 = new BookieSocketAddress("127.0.0.2", 3181);
addr2 = new BookieSocketAddress("127.0.0.3", 3181);
addr3 = new BookieSocketAddress("127.0.0.4", 3181);
@@ -564,8 +567,12 @@ public class TestRackawareEnsemblePlacementPolicy extends
TestCase {
addrs.add(addr4);
repp.onClusterChanged(addrs, new HashSet<BookieSocketAddress>());
// replace node under r2
- BookieSocketAddress replacedBookie = repp.replaceBookie(1, 1, 1, null,
new HashSet<>(), addr2, new HashSet<>());
+ Pair<BookieSocketAddress, Boolean> replaceBookieResponse =
repp.replaceBookie(1, 1, 1, null, new ArrayList<>(),
+ addr2, new HashSet<>());
+ BookieSocketAddress replacedBookie = replaceBookieResponse.getLeft();
+ boolean isEnsembleAdheringToPlacementPolicy =
replaceBookieResponse.getRight();
assertEquals(addr3, replacedBookie);
+ assertTrue(isEnsembleAdheringToPlacementPolicy);
}
@Test
@@ -589,10 +596,13 @@ public class TestRackawareEnsemblePlacementPolicy extends
TestCase {
// replace node under r2
Set<BookieSocketAddress> excludedAddrs = new
HashSet<BookieSocketAddress>();
excludedAddrs.add(addr1);
- BookieSocketAddress replacedBookie = repp.replaceBookie(1, 1, 1, null,
new HashSet<>(), addr2, excludedAddrs);
-
+ Pair<BookieSocketAddress, Boolean> replaceBookieResponse =
repp.replaceBookie(1, 1, 1, null, new ArrayList<>(),
+ addr2, excludedAddrs);
+ BookieSocketAddress replacedBookie = replaceBookieResponse.getLeft();
+ boolean isEnsembleAdheringToPlacementPolicy =
replaceBookieResponse.getRight();
assertFalse(addr1.equals(replacedBookie));
assertTrue(addr3.equals(replacedBookie) ||
addr4.equals(replacedBookie));
+ assertTrue(isEnsembleAdheringToPlacementPolicy);
}
@Test
@@ -619,7 +629,7 @@ public class TestRackawareEnsemblePlacementPolicy extends
TestCase {
excludedAddrs.add(addr3);
excludedAddrs.add(addr4);
try {
- repp.replaceBookie(1, 1, 1, null, new
HashSet<BookieSocketAddress>(), addr2, excludedAddrs);
+ repp.replaceBookie(1, 1, 1, null, new
ArrayList<BookieSocketAddress>(), addr2, excludedAddrs);
fail("Should throw BKNotEnoughBookiesException when there is not
enough bookies");
} catch (BKNotEnoughBookiesException bnebe) {
// should throw not enou
@@ -628,7 +638,7 @@ public class TestRackawareEnsemblePlacementPolicy extends
TestCase {
@Test
public void testReplaceBookieWithEnoughBookiesInSameRackAsEnsemble()
throws Exception {
- BookieSocketAddress addr1 = new BookieSocketAddress("127.0.0.1", 3181);
+ BookieSocketAddress addr1 = new BookieSocketAddress("127.0.0.5", 3181);
BookieSocketAddress addr2 = new BookieSocketAddress("127.0.0.2", 3181);
BookieSocketAddress addr3 = new BookieSocketAddress("127.0.0.3", 3181);
BookieSocketAddress addr4 = new BookieSocketAddress("127.0.0.4", 3181);
@@ -645,15 +655,18 @@ public class TestRackawareEnsemblePlacementPolicy extends
TestCase {
addrs.add(addr4);
repp.onClusterChanged(addrs, new HashSet<BookieSocketAddress>());
// replace node under r2
- Set<BookieSocketAddress> ensembleBookies = new
HashSet<BookieSocketAddress>();
+ List<BookieSocketAddress> ensembleBookies = new
ArrayList<BookieSocketAddress>();
ensembleBookies.add(addr2);
ensembleBookies.add(addr4);
- BookieSocketAddress replacedBookie = repp.replaceBookie(
+ Pair<BookieSocketAddress, Boolean> replaceBookieResponse =
repp.replaceBookie(
1, 1, 1 , null,
ensembleBookies,
addr4,
new HashSet<>());
+ BookieSocketAddress replacedBookie = replaceBookieResponse.getLeft();
+ boolean isEnsembleAdheringToPlacementPolicy =
replaceBookieResponse.getRight();
assertEquals(addr1, replacedBookie);
+ assertTrue(isEnsembleAdheringToPlacementPolicy);
}
@Test
@@ -670,10 +683,18 @@ public class TestRackawareEnsemblePlacementPolicy extends
TestCase {
addrs.add(addr4);
repp.onClusterChanged(addrs, new HashSet<BookieSocketAddress>());
try {
- List<BookieSocketAddress> ensemble = repp.newEnsemble(3, 2, 2,
null, new HashSet<>());
+ Pair<List<BookieSocketAddress>, Boolean> ensembleResponse;
+ ensembleResponse = repp.newEnsemble(3, 2, 2, null, new
HashSet<>());
+ List<BookieSocketAddress> ensemble = ensembleResponse.getLeft();
+ boolean isEnsembleAdheringToPlacementPolicy =
ensembleResponse.getRight();
assertEquals(0, getNumCoveredWriteQuorums(ensemble, 2,
conf.getMinNumRacksPerWriteQuorum()));
- List<BookieSocketAddress> ensemble2 = repp.newEnsemble(4, 2, 2,
null, new HashSet<>());
+ assertFalse(isEnsembleAdheringToPlacementPolicy);
+ Pair<List<BookieSocketAddress>, Boolean> ensembleResponse2;
+ ensembleResponse2 = repp.newEnsemble(4, 2, 2, null, new
HashSet<>());
+ List<BookieSocketAddress> ensemble2 = ensembleResponse2.getLeft();
+ boolean isEnsembleAdheringToPlacementPolicy2 =
ensembleResponse2.getRight();
assertEquals(0, getNumCoveredWriteQuorums(ensemble2, 2,
conf.getMinNumRacksPerWriteQuorum()));
+ assertFalse(isEnsembleAdheringToPlacementPolicy2);
} catch (BKNotEnoughBookiesException bnebe) {
fail("Should not get not enough bookies exception even there is
only one rack.");
}
@@ -703,17 +724,19 @@ public class TestRackawareEnsemblePlacementPolicy extends
TestCase {
addrs.add(addr3);
addrs.add(addr4);
repp.onClusterChanged(addrs, new HashSet<BookieSocketAddress>());
-
+ Pair<List<BookieSocketAddress>, Boolean> ensembleResponse;
List<BookieSocketAddress> ensemble;
try {
- ensemble = repp.newEnsemble(3, 2, 2, null, new HashSet<>());
+ ensembleResponse = repp.newEnsemble(3, 2, 2, null, new
HashSet<>());
+ ensemble = ensembleResponse.getLeft();
fail("Should get not enough bookies exception since there is only
one rack.");
} catch (BKNotEnoughBookiesException bnebe) {
}
try {
- ensemble = repp.newEnsemble(3, 2, 2, new HashSet<>(),
EnsembleForReplacementWithNoConstraints.INSTANCE,
- TruePredicate.INSTANCE);
+ ensembleResponse = repp.newEnsemble(3, 2, 2, new HashSet<>(),
+ EnsembleForReplacementWithNoConstraints.INSTANCE,
TruePredicate.INSTANCE);
+ ensemble = ensembleResponse.getLeft();
fail("Should get not enough bookies exception since there is only
one rack.");
} catch (BKNotEnoughBookiesException bnebe) {
}
@@ -766,19 +789,27 @@ public class TestRackawareEnsemblePlacementPolicy extends
TestCase {
* and there are enough bookies in 3 racks, this newEnsemble calls
should
* succeed.
*/
+ Pair<List<BookieSocketAddress>, Boolean> ensembleResponse;
List<BookieSocketAddress> ensemble;
+ boolean isEnsembleAdheringToPlacementPolicy;
int ensembleSize = numOfRacks * numOfBookiesPerRack;
int writeQuorumSize = numOfRacks;
int ackQuorumSize = numOfRacks;
- ensemble = repp.newEnsemble(ensembleSize, writeQuorumSize,
ackQuorumSize, null, new HashSet<>());
+ ensembleResponse = repp.newEnsemble(ensembleSize, writeQuorumSize,
ackQuorumSize, null, new HashSet<>());
+ ensemble = ensembleResponse.getLeft();
+ isEnsembleAdheringToPlacementPolicy = ensembleResponse.getRight();
assertEquals("Number of writeQuorum sets covered", ensembleSize,
getNumCoveredWriteQuorums(ensemble, writeQuorumSize,
clientConf.getMinNumRacksPerWriteQuorum()));
+ assertTrue(isEnsembleAdheringToPlacementPolicy);
- ensemble = repp.newEnsemble(ensembleSize, writeQuorumSize,
ackQuorumSize, new HashSet<>(),
+ ensembleResponse = repp.newEnsemble(ensembleSize, writeQuorumSize,
ackQuorumSize, new HashSet<>(),
EnsembleForReplacementWithNoConstraints.INSTANCE,
TruePredicate.INSTANCE);
+ ensemble = ensembleResponse.getLeft();
+ isEnsembleAdheringToPlacementPolicy = ensembleResponse.getRight();
assertEquals("Number of writeQuorum sets covered", ensembleSize,
getNumCoveredWriteQuorums(ensemble, writeQuorumSize,
clientConf.getMinNumRacksPerWriteQuorum()));
+ assertTrue(isEnsembleAdheringToPlacementPolicy);
}
@Test
@@ -822,16 +853,24 @@ public class TestRackawareEnsemblePlacementPolicy extends
TestCase {
* ensembleSizes (as long as there are enough number of bookies in each
* rack).
*/
+ Pair<List<BookieSocketAddress>, Boolean> ensembleResponse;
List<BookieSocketAddress> ensemble;
+ boolean isEnsembleAdheringToPlacementPolicy;
for (int ensembleSize = effectiveMinNumRacksPerWriteQuorum;
ensembleSize < 40; ensembleSize++) {
- ensemble = repp.newEnsemble(ensembleSize, writeQuorumSize,
ackQuorumSize, null, new HashSet<>());
+ ensembleResponse = repp.newEnsemble(ensembleSize, writeQuorumSize,
ackQuorumSize, null, new HashSet<>());
+ ensemble = ensembleResponse.getLeft();
+ isEnsembleAdheringToPlacementPolicy = ensembleResponse.getRight();
assertEquals("Number of writeQuorum sets covered", ensembleSize,
getNumCoveredWriteQuorums(ensemble, writeQuorumSize,
clientConf.getMinNumRacksPerWriteQuorum()));
+ assertTrue(isEnsembleAdheringToPlacementPolicy);
- ensemble = repp.newEnsemble(ensembleSize, writeQuorumSize,
ackQuorumSize, new HashSet<>(),
+ ensembleResponse = repp.newEnsemble(ensembleSize, writeQuorumSize,
ackQuorumSize, new HashSet<>(),
EnsembleForReplacementWithNoConstraints.INSTANCE,
TruePredicate.INSTANCE);
+ ensemble = ensembleResponse.getLeft();
+ isEnsembleAdheringToPlacementPolicy = ensembleResponse.getRight();
assertEquals("Number of writeQuorum sets covered", ensembleSize,
getNumCoveredWriteQuorums(ensemble, writeQuorumSize,
clientConf.getMinNumRacksPerWriteQuorum()));
+ assertTrue(isEnsembleAdheringToPlacementPolicy);
}
}
@@ -873,12 +912,14 @@ public class TestRackawareEnsemblePlacementPolicy extends
TestCase {
* and there are enough bookies in 3 racks, this newEnsemble call
should
* succeed.
*/
+ Pair<List<BookieSocketAddress>, Boolean> ensembleResponse;
List<BookieSocketAddress> ensemble;
int ensembleSize = numOfRacks * numOfBookiesPerRack;
int writeQuorumSize = numOfRacks;
int ackQuorumSize = numOfRacks;
- ensemble = repp.newEnsemble(ensembleSize, writeQuorumSize,
ackQuorumSize, null, new HashSet<>());
+ ensembleResponse = repp.newEnsemble(ensembleSize, writeQuorumSize,
ackQuorumSize, null, new HashSet<>());
+ ensemble = ensembleResponse.getLeft();
BookieSocketAddress bookieInEnsembleToBeReplaced = ensemble.get(7);
// get rack of some other bookie
@@ -895,7 +936,7 @@ public class TestRackawareEnsemblePlacementPolicy extends
TestCase {
repp.onClusterChanged(bookieSocketAddresses, new
HashSet<BookieSocketAddress>());
try {
repp.replaceBookie(ensembleSize, writeQuorumSize, ackQuorumSize,
null,
- new HashSet<BookieSocketAddress>(ensemble),
bookieInEnsembleToBeReplaced, new HashSet<>());
+ ensemble, bookieInEnsembleToBeReplaced, new HashSet<>());
fail("Should get not enough bookies exception since there are no
more bookies in rack"
+ "of 'bookieInEnsembleToReplace'"
+ "and new bookie added belongs to the rack of some other
bookie in the ensemble");
@@ -917,16 +958,22 @@ public class TestRackawareEnsemblePlacementPolicy extends
TestCase {
* this replaceBookie should succeed, because a new bookie is added to
a
* new rack.
*/
- BookieSocketAddress replacedBookieAddress =
repp.replaceBookie(ensembleSize, writeQuorumSize, ackQuorumSize,
- null, new HashSet<BookieSocketAddress>(ensemble),
bookieInEnsembleToBeReplaced, new HashSet<>());
+ Pair<BookieSocketAddress, Boolean> replaceBookieResponse;
+ BookieSocketAddress replacedBookieAddress;
+ boolean isEnsembleAdheringToPlacementPolicy;
+ replaceBookieResponse = repp.replaceBookie(ensembleSize,
writeQuorumSize, ackQuorumSize, null, ensemble,
+ bookieInEnsembleToBeReplaced, new HashSet<>());
+ replacedBookieAddress = replaceBookieResponse.getLeft();
+ isEnsembleAdheringToPlacementPolicy = replaceBookieResponse.getRight();
assertEquals("It should be newBookieAddress2", newBookieAddress2,
replacedBookieAddress);
+ assertTrue(isEnsembleAdheringToPlacementPolicy);
Set<BookieSocketAddress> bookiesToExclude = new HashSet<>();
bookiesToExclude.add(newBookieAddress2);
repp.onClusterChanged(bookieSocketAddresses, new
HashSet<BookieSocketAddress>());
try {
- repp.replaceBookie(ensembleSize, writeQuorumSize, ackQuorumSize,
null,
- new HashSet<BookieSocketAddress>(ensemble),
bookieInEnsembleToBeReplaced, bookiesToExclude);
+ repp.replaceBookie(ensembleSize, writeQuorumSize, ackQuorumSize,
null, ensemble,
+ bookieInEnsembleToBeReplaced, bookiesToExclude);
fail("Should get not enough bookies exception since the only
available bookie to replace"
+ "is added to excludedBookies list");
} catch (BKNotEnoughBookiesException bnebe) {
@@ -949,9 +996,12 @@ public class TestRackawareEnsemblePlacementPolicy extends
TestCase {
* replaced, so we should be able to replacebookie though
* newBookieAddress2 is added to excluded bookies list.
*/
- replacedBookieAddress = repp.replaceBookie(ensembleSize,
writeQuorumSize, ackQuorumSize, null,
- new HashSet<BookieSocketAddress>(ensemble),
bookieInEnsembleToBeReplaced, bookiesToExclude);
+ replaceBookieResponse = repp.replaceBookie(ensembleSize,
writeQuorumSize, ackQuorumSize, null,
+ ensemble, bookieInEnsembleToBeReplaced, bookiesToExclude);
+ replacedBookieAddress = replaceBookieResponse.getLeft();
+ isEnsembleAdheringToPlacementPolicy = replaceBookieResponse.getRight();
assertEquals("It should be newBookieAddress3", newBookieAddress3,
replacedBookieAddress);
+ assertTrue(isEnsembleAdheringToPlacementPolicy);
}
@Test
@@ -1298,15 +1348,21 @@ public class TestRackawareEnsemblePlacementPolicy
extends TestCase {
int ensembleSize = 3;
int writeQuorumSize = 2;
int acqQuorumSize = 2;
- List<BookieSocketAddress> ensemble =
repp.newEnsemble(ensembleSize, writeQuorumSize, acqQuorumSize,
- null, new HashSet<>());
+ Pair<List<BookieSocketAddress>, Boolean> ensembleResponse =
repp.newEnsemble(ensembleSize, writeQuorumSize,
+ acqQuorumSize, null, new HashSet<>());
+ List<BookieSocketAddress> ensemble = ensembleResponse.getLeft();
+ boolean isEnsembleAdheringToPlacementPolicy =
ensembleResponse.getRight();
int numCovered = getNumCoveredWriteQuorums(ensemble,
writeQuorumSize, conf.getMinNumRacksPerWriteQuorum());
assertTrue(numCovered >= 1 && numCovered < 3);
+ assertFalse(isEnsembleAdheringToPlacementPolicy);
ensembleSize = 4;
- List<BookieSocketAddress> ensemble2 =
repp.newEnsemble(ensembleSize, writeQuorumSize, acqQuorumSize,
- null, new HashSet<>());
+ Pair<List<BookieSocketAddress>, Boolean> ensembleResponse2 =
repp.newEnsemble(ensembleSize, writeQuorumSize,
+ acqQuorumSize, null, new HashSet<>());
+ List<BookieSocketAddress> ensemble2 = ensembleResponse2.getLeft();
+ boolean isEnsembleAdheringToPlacementPolicy2 =
ensembleResponse2.getRight();
numCovered = getNumCoveredWriteQuorums(ensemble2, writeQuorumSize,
conf.getMinNumRacksPerWriteQuorum());
assertTrue(numCovered >= 1 && numCovered < 3);
+ assertFalse(isEnsembleAdheringToPlacementPolicy2);
} catch (BKNotEnoughBookiesException bnebe) {
fail("Should not get not enough bookies exception even there is
only one rack.");
}
@@ -1375,11 +1431,13 @@ public class TestRackawareEnsemblePlacementPolicy
extends TestCase {
repp.initialize(newConf, Optional.<DNSToSwitchMapping> empty(), timer,
DISABLE_ALL, NullStatsLogger.INSTANCE);
repp.withDefaultRack(NetworkTopology.DEFAULT_REGION_AND_RACK);
repp.onClusterChanged(addrs, new HashSet<BookieSocketAddress>());
-
- List<BookieSocketAddress> ensemble = repp.newEnsemble(ensembleSize,
writeQuorumSize, writeQuorumSize, null,
- new HashSet<>());
+ Pair<List<BookieSocketAddress>, Boolean> ensembleResponse =
repp.newEnsemble(ensembleSize, writeQuorumSize,
+ writeQuorumSize, null, new HashSet<>());
+ List<BookieSocketAddress> ensemble = ensembleResponse.getLeft();
+ boolean isEnsembleAdheringToPlacementPolicy =
ensembleResponse.getRight();
int numCovered = getNumCoveredWriteQuorums(ensemble, writeQuorumSize,
minNumRacksPerWriteQuorum);
assertEquals("minimum number of racks covered for writequorum
ensemble: " + ensemble, ensembleSize, numCovered);
+ assertTrue(isEnsembleAdheringToPlacementPolicy);
}
@Test
@@ -1417,16 +1475,22 @@ public class TestRackawareEnsemblePlacementPolicy
extends TestCase {
int ensembleSize = 3;
int writeQuorumSize = 3;
int ackQuorumSize = 2;
- List<BookieSocketAddress> ensemble1 =
repp.newEnsemble(ensembleSize, writeQuorumSize, ackQuorumSize,
- null, new HashSet<>());
+ Pair<List<BookieSocketAddress>, Boolean> ensembleResponse =
repp.newEnsemble(ensembleSize, writeQuorumSize,
+ ackQuorumSize, null, new HashSet<>());
+ List<BookieSocketAddress> ensemble1 = ensembleResponse.getLeft();
+ boolean isEnsembleAdheringToPlacementPolicy1 =
ensembleResponse.getRight();
assertEquals(ensembleSize,
getNumCoveredWriteQuorums(ensemble1, writeQuorumSize,
conf.getMinNumRacksPerWriteQuorum()));
+ assertTrue(isEnsembleAdheringToPlacementPolicy1);
ensembleSize = 4;
writeQuorumSize = 4;
- List<BookieSocketAddress> ensemble2 =
repp.newEnsemble(ensembleSize, writeQuorumSize, 2, null,
- new HashSet<>());
+ Pair<List<BookieSocketAddress>, Boolean> ensembleResponse2 =
repp.newEnsemble(ensembleSize, writeQuorumSize,
+ 2, null, new HashSet<>());
+ List<BookieSocketAddress> ensemble2 = ensembleResponse2.getLeft();
+ boolean isEnsembleAdheringToPlacementPolicy2 =
ensembleResponse2.getRight();
assertEquals(ensembleSize,
getNumCoveredWriteQuorums(ensemble2, writeQuorumSize,
conf.getMinNumRacksPerWriteQuorum()));
+ assertTrue(isEnsembleAdheringToPlacementPolicy2);
} catch (BKNotEnoughBookiesException bnebe) {
fail("Should not get not enough bookies exception even there is
only one rack.");
}
@@ -1497,11 +1561,16 @@ public class TestRackawareEnsemblePlacementPolicy
extends TestCase {
selectionCounts.put(addr3, 0L);
selectionCounts.put(addr4, 0L);
int numTries = 50000;
+ Pair<BookieSocketAddress, Boolean> replaceBookieResponse;
+ boolean isEnsembleAdheringToPlacementPolicy;
BookieSocketAddress replacedBookie;
for (int i = 0; i < numTries; i++) {
// replace node under r2
- replacedBookie = repp.replaceBookie(1, 1, 1, null, new
HashSet<>(), addr2, new HashSet<>());
+ replaceBookieResponse = repp.replaceBookie(1, 1, 1, null, new
ArrayList<>(), addr2, new HashSet<>());
+ replacedBookie = replaceBookieResponse.getLeft();
+ isEnsembleAdheringToPlacementPolicy =
replaceBookieResponse.getRight();
assertTrue("replaced : " + replacedBookie,
addr3.equals(replacedBookie) || addr4.equals(replacedBookie));
+ assertTrue(isEnsembleAdheringToPlacementPolicy);
selectionCounts.put(replacedBookie,
selectionCounts.get(replacedBookie) + 1);
}
double observedMultiple = ((double) selectionCounts.get(addr4) /
(double) selectionCounts.get(addr3));
@@ -1557,14 +1626,19 @@ public class TestRackawareEnsemblePlacementPolicy
extends TestCase {
selectionCounts.put(addr3, 0L);
selectionCounts.put(addr4, 0L);
int numTries = 50000;
+ Pair<BookieSocketAddress, Boolean> replaceBookieResponse;
BookieSocketAddress replacedBookie;
+ boolean isEnsembleAdheringToPlacementPolicy;
for (int i = 0; i < numTries; i++) {
// addr2 is on /r2 and this is the only one on this rack. So the
replacement
// will come from other racks. However, the weight should be
honored in such
// selections as well
- replacedBookie = repp.replaceBookie(1, 1, 1, null, new
HashSet<>(), addr2, new HashSet<>());
+ replaceBookieResponse = repp.replaceBookie(1, 1, 1, null, new
ArrayList<>(), addr2, new HashSet<>());
+ replacedBookie = replaceBookieResponse.getLeft();
+ isEnsembleAdheringToPlacementPolicy =
replaceBookieResponse.getRight();
assertTrue(addr0.equals(replacedBookie) ||
addr1.equals(replacedBookie) || addr3.equals(replacedBookie)
|| addr4.equals(replacedBookie));
+ assertTrue(isEnsembleAdheringToPlacementPolicy);
selectionCounts.put(replacedBookie,
selectionCounts.get(replacedBookie) + 1);
}
/*
@@ -1656,6 +1730,7 @@ public class TestRackawareEnsemblePlacementPolicy extends
TestCase {
int numTries = 10000;
Set<BookieSocketAddress> excludeList = new
HashSet<BookieSocketAddress>();
+ Pair<List<BookieSocketAddress>, Boolean> ensembleResponse;
List<BookieSocketAddress> ensemble;
int ensembleSize = 3;
int writeQuorumSize = 2;
@@ -1664,7 +1739,8 @@ public class TestRackawareEnsemblePlacementPolicy extends
TestCase {
// addr2 is on /r2 and this is the only one on this rack. So the
replacement
// will come from other racks. However, the weight should be
honored in such
// selections as well
- ensemble = repp.newEnsemble(ensembleSize, writeQuorumSize,
acqQuorumSize, null, excludeList);
+ ensembleResponse = repp.newEnsemble(ensembleSize, writeQuorumSize,
acqQuorumSize, null, excludeList);
+ ensemble = ensembleResponse.getLeft();
assertTrue(
"Rackaware selection not happening "
+ getNumCoveredWriteQuorums(ensemble,
writeQuorumSize, conf.getMinNumRacksPerWriteQuorum()),
@@ -1726,21 +1802,23 @@ public class TestRackawareEnsemblePlacementPolicy
extends TestCase {
bookieInfoMap.put(addr5, new BookieInfo(1000L, 1000L));
repp.updateBookieInfo(bookieInfoMap);
-
- List<BookieSocketAddress> ensemble = new
ArrayList<BookieSocketAddress>();
+ Pair<List<BookieSocketAddress>, Boolean> ensembleResponse;
+ List<BookieSocketAddress> ensemble;
Set<BookieSocketAddress> excludeList = new
HashSet<BookieSocketAddress>();
try {
excludeList.add(addr1);
excludeList.add(addr2);
excludeList.add(addr3);
excludeList.add(addr4);
- ensemble = repp.newEnsemble(3, 2, 2, null, excludeList);
+ ensembleResponse = repp.newEnsemble(3, 2, 2, null, excludeList);
+ ensemble = ensembleResponse.getLeft();
fail("Should throw BKNotEnoughBookiesException when there is not
enough bookies" + ensemble);
} catch (BKNotEnoughBookiesException e) {
// this is expected
}
try {
- ensemble = repp.newEnsemble(1, 1, 1, null, excludeList);
+ ensembleResponse = repp.newEnsemble(1, 1, 1, null, excludeList);
+ ensemble = ensembleResponse.getLeft();
} catch (BKNotEnoughBookiesException e) {
fail("Should not throw BKNotEnoughBookiesException when there are
enough bookies for the ensemble");
}
@@ -1824,13 +1902,20 @@ public class TestRackawareEnsemblePlacementPolicy
extends TestCase {
// we will never use addr4 even it is in the stabilized network
topology
for (int i = 0; i < 5; i++) {
- List<BookieSocketAddress> ensemble =
- repp.newEnsemble(3, 3, 3, null, new
HashSet<BookieSocketAddress>());
+ Pair<List<BookieSocketAddress>, Boolean> ensembleResponse =
repp.newEnsemble(3, 2, 2, null,
+ new HashSet<BookieSocketAddress>());
+ List<BookieSocketAddress> ensemble = ensembleResponse.getLeft();
+ boolean isEnsembleAdheringToPlacementPolicy =
ensembleResponse.getRight();
assertFalse(ensemble.contains(addr4));
+ assertFalse(isEnsembleAdheringToPlacementPolicy);
}
// we could still use addr4 for urgent allocation if it is just bookie
flapping
- List<BookieSocketAddress> ensemble = repp.newEnsemble(4, 4, 4, null,
new HashSet<BookieSocketAddress>());
+ Pair<List<BookieSocketAddress>, Boolean> ensembleResponse =
repp.newEnsemble(4, 2, 2, null,
+ new HashSet<BookieSocketAddress>());
+ List<BookieSocketAddress> ensemble = ensembleResponse.getLeft();
+ boolean isEnsembleAdheringToPlacementPolicy =
ensembleResponse.getRight();
+ assertFalse(isEnsembleAdheringToPlacementPolicy);
assertTrue(ensemble.contains(addr4));
}
diff --git
a/bookkeeper-server/src/test/java/org/apache/bookkeeper/client/TestRackawareEnsemblePlacementPolicyUsingScript.java
b/bookkeeper-server/src/test/java/org/apache/bookkeeper/client/TestRackawareEnsemblePlacementPolicyUsingScript.java
index d9f2535..e0fd2bd 100644
---
a/bookkeeper-server/src/test/java/org/apache/bookkeeper/client/TestRackawareEnsemblePlacementPolicyUsingScript.java
+++
b/bookkeeper-server/src/test/java/org/apache/bookkeeper/client/TestRackawareEnsemblePlacementPolicyUsingScript.java
@@ -30,6 +30,8 @@ import static org.junit.Assert.fail;
import com.google.common.util.concurrent.ThreadFactoryBuilder;
import io.netty.util.HashedWheelTimer;
+
+import java.util.ArrayList;
import java.util.HashSet;
import java.util.List;
import java.util.Optional;
@@ -44,6 +46,7 @@ import org.apache.bookkeeper.net.DNSToSwitchMapping;
import org.apache.bookkeeper.net.ScriptBasedMapping;
import org.apache.bookkeeper.stats.NullStatsLogger;
import org.apache.bookkeeper.util.Shell;
+import org.apache.commons.lang3.tuple.Pair;
import org.junit.After;
import org.junit.Assume;
import org.junit.Before;
@@ -111,7 +114,9 @@ public class
TestRackawareEnsemblePlacementPolicyUsingScript {
addrs.add(addr4);
repp.onClusterChanged(addrs, new HashSet<BookieSocketAddress>());
// replace node under r2
- BookieSocketAddress replacedBookie = repp.replaceBookie(1, 1, 1, null,
new HashSet<>(), addr2, new HashSet<>());
+ Pair<BookieSocketAddress, Boolean> replaceBookieResponse =
repp.replaceBookie(1, 1, 1, null, new ArrayList<>(),
+ addr2, new HashSet<>());
+ BookieSocketAddress replacedBookie = replaceBookieResponse.getLeft();
assertEquals(addr3, replacedBookie);
}
@@ -133,7 +138,9 @@ public class
TestRackawareEnsemblePlacementPolicyUsingScript {
// replace node under r2
Set<BookieSocketAddress> excludedAddrs = new
HashSet<BookieSocketAddress>();
excludedAddrs.add(addr1);
- BookieSocketAddress replacedBookie = repp.replaceBookie(1, 1, 1, null,
new HashSet<>(), addr2, excludedAddrs);
+ Pair<BookieSocketAddress, Boolean> replaceBookieResponse =
repp.replaceBookie(1, 1, 1, null, new ArrayList<>(),
+ addr2, excludedAddrs);
+ BookieSocketAddress replacedBookie = replaceBookieResponse.getLeft();
assertFalse(addr1.equals(replacedBookie));
assertTrue(addr3.equals(replacedBookie) ||
addr4.equals(replacedBookie));
@@ -160,7 +167,7 @@ public class
TestRackawareEnsemblePlacementPolicyUsingScript {
excludedAddrs.add(addr3);
excludedAddrs.add(addr4);
try {
- repp.replaceBookie(1, 1, 1, null, new
HashSet<BookieSocketAddress>(), addr2, excludedAddrs);
+ repp.replaceBookie(1, 1, 1, null, new
ArrayList<BookieSocketAddress>(), addr2, excludedAddrs);
fail("Should throw BKNotEnoughBookiesException when there is not
enough bookies");
} catch (BKNotEnoughBookiesException bnebe) {
// should throw not BKNotEnoughBookiesException
@@ -197,7 +204,9 @@ public class
TestRackawareEnsemblePlacementPolicyUsingScript {
// replace node under r2
Set<BookieSocketAddress> excludedAddrs = new
HashSet<BookieSocketAddress>();
excludedAddrs.add(addr1);
- BookieSocketAddress replacedBookie = repp.replaceBookie(1, 1, 1, null,
new HashSet<>(), addr2, excludedAddrs);
+ Pair<BookieSocketAddress, Boolean> replaceBookieResponse =
repp.replaceBookie(1, 1, 1, null, new ArrayList<>(),
+ addr2, excludedAddrs);
+ BookieSocketAddress replacedBookie = replaceBookieResponse.getLeft();
assertFalse(addr1.equals(replacedBookie));
assertFalse(addr2.equals(replacedBookie));
@@ -235,7 +244,9 @@ public class
TestRackawareEnsemblePlacementPolicyUsingScript {
// replace node under r2
Set<BookieSocketAddress> excludedAddrs = new
HashSet<BookieSocketAddress>();
excludedAddrs.add(addr1);
- BookieSocketAddress replacedBookie = repp.replaceBookie(1, 1, 1, null,
new HashSet<>(), addr2, excludedAddrs);
+ Pair<BookieSocketAddress, Boolean> replaceBookieResponse =
repp.replaceBookie(1, 1, 1, null, new ArrayList<>(),
+ addr2, excludedAddrs);
+ BookieSocketAddress replacedBookie = replaceBookieResponse.getLeft();
assertFalse(addr1.equals(replacedBookie));
assertFalse(addr2.equals(replacedBookie));
@@ -257,9 +268,13 @@ public class
TestRackawareEnsemblePlacementPolicyUsingScript {
addrs.add(addr4);
repp.onClusterChanged(addrs, new HashSet<BookieSocketAddress>());
try {
- List<BookieSocketAddress> ensemble = repp.newEnsemble(3, 2, 2,
null, new HashSet<>());
+ Pair<List<BookieSocketAddress>, Boolean> ensembleResponse =
repp.newEnsemble(3, 2, 2, null,
+ new HashSet<>());
+ List<BookieSocketAddress> ensemble = ensembleResponse.getLeft();
assertEquals(0, getNumCoveredWriteQuorums(ensemble, 2));
- List<BookieSocketAddress> ensemble2 = repp.newEnsemble(4, 2, 2,
null, new HashSet<>());
+ Pair<List<BookieSocketAddress>, Boolean> ensembleResponse2 =
repp.newEnsemble(4, 2, 2, null,
+ new HashSet<>());
+ List<BookieSocketAddress> ensemble2 = ensembleResponse2.getLeft();
assertEquals(0, getNumCoveredWriteQuorums(ensemble2, 2));
} catch (BKNotEnoughBookiesException bnebe) {
fail("Should not get not enough bookies exception even there is
only one rack.");
@@ -281,10 +296,14 @@ public class
TestRackawareEnsemblePlacementPolicyUsingScript {
addrs.add(addr4);
repp.onClusterChanged(addrs, new HashSet<BookieSocketAddress>());
try {
- List<BookieSocketAddress> ensemble = repp.newEnsemble(3, 2, 2,
null, new HashSet<>());
+ Pair<List<BookieSocketAddress>, Boolean> ensembleResponse =
repp.newEnsemble(3, 2, 2, null,
+ new HashSet<>());
+ List<BookieSocketAddress> ensemble = ensembleResponse.getLeft();
int numCovered = getNumCoveredWriteQuorums(ensemble, 2);
assertTrue(numCovered == 2);
- List<BookieSocketAddress> ensemble2 = repp.newEnsemble(4, 2, 2,
null, new HashSet<>());
+ Pair<List<BookieSocketAddress>, Boolean> ensembleResponse2 =
repp.newEnsemble(4, 2, 2, null,
+ new HashSet<>());
+ List<BookieSocketAddress> ensemble2 = ensembleResponse2.getLeft();
numCovered = getNumCoveredWriteQuorums(ensemble2, 2);
assertTrue(numCovered == 2);
} catch (BKNotEnoughBookiesException bnebe) {
@@ -315,9 +334,13 @@ public class
TestRackawareEnsemblePlacementPolicyUsingScript {
addrs.add(addr8);
repp.onClusterChanged(addrs, new HashSet<BookieSocketAddress>());
try {
- List<BookieSocketAddress> ensemble1 = repp.newEnsemble(3, 2, 2,
null, new HashSet<>());
+ Pair<List<BookieSocketAddress>, Boolean> ensembleResponse1 =
repp.newEnsemble(3, 2, 2, null,
+ new HashSet<>());
+ List<BookieSocketAddress> ensemble1 = ensembleResponse1.getLeft();
assertEquals(3, getNumCoveredWriteQuorums(ensemble1, 2));
- List<BookieSocketAddress> ensemble2 = repp.newEnsemble(4, 2, 2,
null, new HashSet<>());
+ Pair<List<BookieSocketAddress>, Boolean> ensembleResponse2 =
repp.newEnsemble(4, 2, 2, null,
+ new HashSet<>());
+ List<BookieSocketAddress> ensemble2 = ensembleResponse2.getLeft();
assertEquals(4, getNumCoveredWriteQuorums(ensemble2, 2));
} catch (BKNotEnoughBookiesException bnebe) {
fail("Should not get not enough bookies exception.");
diff --git
a/bookkeeper-server/src/test/java/org/apache/bookkeeper/client/TestRackawarePolicyNotificationUpdates.java
b/bookkeeper-server/src/test/java/org/apache/bookkeeper/client/TestRackawarePolicyNotificationUpdates.java
index 7dc1d39..3192d04 100644
---
a/bookkeeper-server/src/test/java/org/apache/bookkeeper/client/TestRackawarePolicyNotificationUpdates.java
+++
b/bookkeeper-server/src/test/java/org/apache/bookkeeper/client/TestRackawarePolicyNotificationUpdates.java
@@ -42,6 +42,7 @@ import org.apache.bookkeeper.net.DNSToSwitchMapping;
import org.apache.bookkeeper.net.NetworkTopology;
import org.apache.bookkeeper.stats.NullStatsLogger;
import org.apache.bookkeeper.util.StaticDNSResolver;
+import org.apache.commons.lang3.tuple.Pair;
import org.junit.Test;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
@@ -104,8 +105,9 @@ public class TestRackawarePolicyNotificationUpdates extends
TestCase {
int ensembleSize = 3;
int writeQuorumSize = 2;
int acqQuorumSize = 2;
- List<BookieSocketAddress> ensemble = repp.newEnsemble(ensembleSize,
writeQuorumSize, acqQuorumSize,
- Collections.emptyMap(), Collections.emptySet());
+ Pair<List<BookieSocketAddress>, Boolean> ensembleResponse =
repp.newEnsemble(ensembleSize, writeQuorumSize,
+ acqQuorumSize, Collections.emptyMap(), Collections.emptySet());
+ List<BookieSocketAddress> ensemble = ensembleResponse.getLeft();
int numCovered =
TestRackawareEnsemblePlacementPolicy.getNumCoveredWriteQuorums(ensemble,
writeQuorumSize,
conf.getMinNumRacksPerWriteQuorum());
assertTrue(numCovered >= 1 && numCovered < 3);
@@ -118,8 +120,9 @@ public class TestRackawarePolicyNotificationUpdates extends
TestCase {
StaticDNSResolver.changeRack(bookieAddressList, rackList);
numOfAvailableRacks = numOfAvailableRacks + 1;
acqQuorumSize = 1;
- ensemble = repp.newEnsemble(ensembleSize, writeQuorumSize,
acqQuorumSize, Collections.emptyMap(),
+ ensembleResponse = repp.newEnsemble(ensembleSize, writeQuorumSize,
acqQuorumSize, Collections.emptyMap(),
Collections.emptySet());
+ ensemble = ensembleResponse.getLeft();
assertEquals(3,
TestRackawareEnsemblePlacementPolicy.getNumCoveredWriteQuorums(ensemble,
writeQuorumSize,
conf.getMinNumRacksPerWriteQuorum()));
assertTrue(ensemble.contains(addr1));
diff --git
a/bookkeeper-server/src/test/java/org/apache/bookkeeper/client/TestRegionAwareEnsemblePlacementPolicy.java
b/bookkeeper-server/src/test/java/org/apache/bookkeeper/client/TestRegionAwareEnsemblePlacementPolicy.java
index e541230..8e4f10d 100644
---
a/bookkeeper-server/src/test/java/org/apache/bookkeeper/client/TestRegionAwareEnsemblePlacementPolicy.java
+++
b/bookkeeper-server/src/test/java/org/apache/bookkeeper/client/TestRegionAwareEnsemblePlacementPolicy.java
@@ -51,6 +51,7 @@ import org.apache.bookkeeper.net.NetworkTopology;
import org.apache.bookkeeper.stats.NullStatsLogger;
import org.apache.bookkeeper.util.BookKeeperConstants;
import org.apache.bookkeeper.util.StaticDNSResolver;
+import org.apache.commons.lang3.tuple.Pair;
import org.junit.Test;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
@@ -423,8 +424,9 @@ public class TestRegionAwareEnsemblePlacementPolicy extends
TestCase {
addrs.add(addr4);
repp.onClusterChanged(addrs, new HashSet<BookieSocketAddress>());
// replace node under r2
- BookieSocketAddress replacedBookie = repp.replaceBookie(1, 1, 1, null,
new HashSet<BookieSocketAddress>(),
- addr2, new HashSet<BookieSocketAddress>());
+ Pair<BookieSocketAddress, Boolean> replaceBookieResponse =
repp.replaceBookie(1, 1, 1, null,
+ new ArrayList<BookieSocketAddress>(), addr2, new
HashSet<BookieSocketAddress>());
+ BookieSocketAddress replacedBookie = replaceBookieResponse.getLeft();
assertEquals(addr3, replacedBookie);
}
@@ -449,8 +451,9 @@ public class TestRegionAwareEnsemblePlacementPolicy extends
TestCase {
// replace node under r2
Set<BookieSocketAddress> excludedAddrs = new
HashSet<BookieSocketAddress>();
excludedAddrs.add(addr1);
- BookieSocketAddress replacedBookie = repp.replaceBookie(1, 1, 1, null,
- new HashSet<BookieSocketAddress>(), addr2, excludedAddrs);
+ Pair<BookieSocketAddress, Boolean> replaceBookieResponse =
repp.replaceBookie(1, 1, 1, null,
+ new ArrayList<BookieSocketAddress>(), addr2, excludedAddrs);
+ BookieSocketAddress replacedBookie = replaceBookieResponse.getLeft();
assertFalse(addr1.equals(replacedBookie));
assertTrue(addr3.equals(replacedBookie) ||
addr4.equals(replacedBookie));
@@ -475,7 +478,9 @@ public class TestRegionAwareEnsemblePlacementPolicy extends
TestCase {
addrs.add(addr4);
repp.onClusterChanged(addrs, new HashSet<BookieSocketAddress>());
try {
- List<BookieSocketAddress> list = repp.newEnsemble(5, 5, 3, null,
new HashSet<BookieSocketAddress>());
+ Pair<List<BookieSocketAddress>, Boolean> ensembleResponse =
repp.newEnsemble(5, 5, 3, null,
+ new HashSet<BookieSocketAddress>());
+ List<BookieSocketAddress> list = ensembleResponse.getLeft();
LOG.info("Ensemble : {}", list);
fail("Should throw BKNotEnoughBookiesException when there is not
enough bookies");
} catch (BKNotEnoughBookiesException bnebe) {
@@ -507,7 +512,7 @@ public class TestRegionAwareEnsemblePlacementPolicy extends
TestCase {
excludedAddrs.add(addr3);
excludedAddrs.add(addr4);
try {
- repp.replaceBookie(1, 1, 1, null, new
HashSet<BookieSocketAddress>(), addr2, excludedAddrs);
+ repp.replaceBookie(1, 1, 1, null, new
ArrayList<BookieSocketAddress>(), addr2, excludedAddrs);
fail("Should throw BKNotEnoughBookiesException when there is not
enough bookies");
} catch (BKNotEnoughBookiesException bnebe) {
// should throw not enou
@@ -536,11 +541,13 @@ public class TestRegionAwareEnsemblePlacementPolicy
extends TestCase {
addrs.add(addr4);
repp.onClusterChanged(addrs, new HashSet<BookieSocketAddress>());
try {
- List<BookieSocketAddress> ensemble = repp.newEnsemble(3, 2, 2,
null,
+ Pair<List<BookieSocketAddress>, Boolean> ensembleResponse =
repp.newEnsemble(3, 2, 2, null,
new HashSet<BookieSocketAddress>());
+ List<BookieSocketAddress> ensemble = ensembleResponse.getLeft();
assertEquals(0, getNumCoveredRegionsInWriteQuorum(ensemble, 2));
- List<BookieSocketAddress> ensemble2 = repp.newEnsemble(4, 2, 2,
null,
+ Pair<List<BookieSocketAddress>, Boolean> ensembleResponse2 =
repp.newEnsemble(4, 2, 2, null,
new HashSet<BookieSocketAddress>());
+ List<BookieSocketAddress> ensemble2 = ensembleResponse2.getLeft();
assertEquals(0, getNumCoveredRegionsInWriteQuorum(ensemble2, 2));
} catch (BKNotEnoughBookiesException bnebe) {
fail("Should not get not enough bookies exception even there is
only one rack.");
@@ -569,8 +576,9 @@ public class TestRegionAwareEnsemblePlacementPolicy extends
TestCase {
addrs.add(addr4);
repp.onClusterChanged(addrs, new HashSet<BookieSocketAddress>());
try {
- List<BookieSocketAddress> ensemble = repp.newEnsemble(3, 2, 2,
null,
+ Pair<List<BookieSocketAddress>, Boolean> ensembleResponse =
repp.newEnsemble(3, 2, 2, null,
new HashSet<BookieSocketAddress>());
+ List<BookieSocketAddress> ensemble = ensembleResponse.getLeft();
int numCovered = getNumCoveredRegionsInWriteQuorum(ensemble, 2);
assertTrue(numCovered >= 1);
assertTrue(numCovered < 3);
@@ -578,8 +586,9 @@ public class TestRegionAwareEnsemblePlacementPolicy extends
TestCase {
fail("Should not get not enough bookies exception even there is
only one rack.");
}
try {
- List<BookieSocketAddress> ensemble2 = repp.newEnsemble(4, 2, 2,
null,
+ Pair<List<BookieSocketAddress>, Boolean> ensembleResponse2 =
repp.newEnsemble(4, 2, 2, null,
new HashSet<BookieSocketAddress>());
+ List<BookieSocketAddress> ensemble2 = ensembleResponse2.getLeft();
int numCovered = getNumCoveredRegionsInWriteQuorum(ensemble2, 2);
assertTrue(numCovered >= 1 && numCovered < 3);
} catch (BKNotEnoughBookiesException bnebe) {
@@ -618,11 +627,13 @@ public class TestRegionAwareEnsemblePlacementPolicy
extends TestCase {
addrs.add(addr8);
repp.onClusterChanged(addrs, new HashSet<BookieSocketAddress>());
try {
- List<BookieSocketAddress> ensemble1 = repp.newEnsemble(3, 2, 2,
null,
+ Pair<List<BookieSocketAddress>, Boolean> ensembleResponse1 =
repp.newEnsemble(3, 2, 2, null,
new HashSet<BookieSocketAddress>());
+ List<BookieSocketAddress> ensemble1 = ensembleResponse1.getLeft();
assertEquals(3, getNumCoveredRegionsInWriteQuorum(ensemble1, 2));
- List<BookieSocketAddress> ensemble2 = repp.newEnsemble(4, 2, 2,
null,
+ Pair<List<BookieSocketAddress>, Boolean> ensembleResponse2 =
repp.newEnsemble(4, 2, 2, null,
new HashSet<BookieSocketAddress>());
+ List<BookieSocketAddress> ensemble2 = ensembleResponse2.getLeft();
assertEquals(4, getNumCoveredRegionsInWriteQuorum(ensemble2, 2));
} catch (BKNotEnoughBookiesException bnebe) {
fail("Should not get not enough bookies exception even there is
only one rack.");
@@ -669,23 +680,27 @@ public class TestRegionAwareEnsemblePlacementPolicy
extends TestCase {
addrs.add(addr10);
repp.onClusterChanged(addrs, new HashSet<BookieSocketAddress>());
try {
- List<BookieSocketAddress> ensemble = repp.newEnsemble(6, 6, 4,
null,
+ Pair<List<BookieSocketAddress>, Boolean> ensembleResponse =
repp.newEnsemble(6, 6, 4, null,
new HashSet<BookieSocketAddress>());
+ List<BookieSocketAddress> ensemble = ensembleResponse.getLeft();
assert(ensemble.contains(addr4));
assert(ensemble.contains(addr8));
assert(ensemble.size() == 6);
assertEquals(3, getNumRegionsInEnsemble(ensemble));
- ensemble = repp.newEnsemble(7, 7, 4, null, new
HashSet<BookieSocketAddress>());
+ ensembleResponse = repp.newEnsemble(7, 7, 4, null, new
HashSet<BookieSocketAddress>());
+ ensemble = ensembleResponse.getLeft();
assert(ensemble.contains(addr4));
assert(ensemble.contains(addr8));
assert(ensemble.size() == 7);
assertEquals(3, getNumRegionsInEnsemble(ensemble));
- ensemble = repp.newEnsemble(8, 8, 5, null, new
HashSet<BookieSocketAddress>());
+ ensembleResponse = repp.newEnsemble(8, 8, 5, null, new
HashSet<BookieSocketAddress>());
+ ensemble = ensembleResponse.getLeft();
assert(ensemble.contains(addr4));
assert(ensemble.contains(addr8));
assert(ensemble.size() == 8);
assertEquals(3, getNumRegionsInEnsemble(ensemble));
- ensemble = repp.newEnsemble(9, 9, 5, null, new
HashSet<BookieSocketAddress>());
+ ensembleResponse = repp.newEnsemble(9, 9, 5, null, new
HashSet<BookieSocketAddress>());
+ ensemble = ensembleResponse.getLeft();
assert(ensemble.contains(addr4));
assert(ensemble.contains(addr8));
assert(ensemble.size() == 9);
@@ -738,8 +753,9 @@ public class TestRegionAwareEnsemblePlacementPolicy extends
TestCase {
repp.onClusterChanged(addrs, new HashSet<BookieSocketAddress>());
try {
((SettableFeature)
featureProvider.scope("region1").getFeature("disallowBookies")).set(true);
- List<BookieSocketAddress> ensemble = repp.newEnsemble(6, 6, 4,
null,
+ Pair<List<BookieSocketAddress>, Boolean> ensembleResponse =
repp.newEnsemble(6, 6, 4, null,
new HashSet<BookieSocketAddress>());
+ List<BookieSocketAddress> ensemble = ensembleResponse.getLeft();
assertEquals(2, getNumRegionsInEnsemble(ensemble));
assert(ensemble.contains(addr1));
assert(ensemble.contains(addr3));
@@ -753,16 +769,16 @@ public class TestRegionAwareEnsemblePlacementPolicy
extends TestCase {
}
try {
((SettableFeature)
featureProvider.scope("region2").getFeature("disallowBookies")).set(true);
- List<BookieSocketAddress> ensemble = repp.newEnsemble(6, 6, 4,
null,
- new HashSet<BookieSocketAddress>());
+ repp.newEnsemble(6, 6, 4, null, new
HashSet<BookieSocketAddress>());
fail("Should get not enough bookies exception even there is only
one region with insufficient bookies.");
} catch (BKNotEnoughBookiesException bnebe) {
// Expected
}
try {
((SettableFeature)
featureProvider.scope("region2").getFeature("disallowBookies")).set(false);
- List<BookieSocketAddress> ensemble = repp.newEnsemble(6, 6, 4,
null,
+ Pair<List<BookieSocketAddress>, Boolean> ensembleResponse =
repp.newEnsemble(6, 6, 4, null,
new HashSet<BookieSocketAddress>());
+ List<BookieSocketAddress> ensemble = ensembleResponse.getLeft();
assert(ensemble.contains(addr1));
assert(ensemble.contains(addr3));
assert(ensemble.contains(addr4));
@@ -835,8 +851,9 @@ public class TestRegionAwareEnsemblePlacementPolicy extends
TestCase {
repp.onClusterChanged(addrs, new HashSet<BookieSocketAddress>());
try {
- List<BookieSocketAddress> ensemble = repp.newEnsemble(10, 10, 10,
null,
+ Pair<List<BookieSocketAddress>, Boolean> ensembleResponse =
repp.newEnsemble(10, 10, 10, null,
new HashSet<BookieSocketAddress>());
+ List<BookieSocketAddress> ensemble = ensembleResponse.getLeft();
assert(ensemble.size() == 10);
assertEquals(5, getNumRegionsInEnsemble(ensemble));
} catch (BKNotEnoughBookiesException bnebe) {
@@ -847,7 +864,9 @@ public class TestRegionAwareEnsemblePlacementPolicy extends
TestCase {
try {
Set<BookieSocketAddress> excludedAddrs = new
HashSet<BookieSocketAddress>();
excludedAddrs.add(addr10);
- List<BookieSocketAddress> ensemble = repp.newEnsemble(10, 10, 10,
null, excludedAddrs);
+ Pair<List<BookieSocketAddress>, Boolean> ensembleResponse =
repp.newEnsemble(10, 10, 10, null,
+ excludedAddrs);
+ List<BookieSocketAddress> ensemble = ensembleResponse.getLeft();
assert(ensemble.contains(addr11) && ensemble.contains(addr12));
assert(ensemble.size() == 10);
assertEquals(5, getNumRegionsInEnsemble(ensemble));
@@ -937,9 +956,11 @@ public class TestRegionAwareEnsemblePlacementPolicy
extends TestCase {
ackQuorum = 5;
}
+ Pair<List<BookieSocketAddress>, Boolean> ensembleResponse;
List<BookieSocketAddress> ensemble;
try {
- ensemble = repp.newEnsemble(6, 6, ackQuorum, null, new
HashSet<BookieSocketAddress>());
+ ensembleResponse = repp.newEnsemble(6, 6, ackQuorum, null, new
HashSet<BookieSocketAddress>());
+ ensemble = ensembleResponse.getLeft();
assert(ensemble.size() == 6);
assertEquals(3, getNumRegionsInEnsemble(ensemble));
} catch (BKNotEnoughBookiesException bnebe) {
@@ -960,9 +981,9 @@ public class TestRegionAwareEnsemblePlacementPolicy extends
TestCase {
Set<BookieSocketAddress> excludedAddrs = new
HashSet<BookieSocketAddress>();
for (BookieSocketAddress addr: region2Bookies) {
if (ensemble.contains(addr)) {
- BookieSocketAddress replacedBookie = repp.replaceBookie(
- 6, 6, ackQuorum, null,
- new HashSet<>(ensemble), addr, excludedAddrs);
+ Pair<BookieSocketAddress, Boolean> replaceBookieResponse =
repp.replaceBookie(6, 6, ackQuorum, null,
+ ensemble, addr, excludedAddrs);
+ BookieSocketAddress replacedBookie =
replaceBookieResponse.getLeft();
ensemble.remove(addr);
ensemble.add(replacedBookie);
}
@@ -986,9 +1007,9 @@ public class TestRegionAwareEnsemblePlacementPolicy
extends TestCase {
Set<BookieSocketAddress> excludedAddrs = new
HashSet<BookieSocketAddress>();
try {
- BookieSocketAddress replacedBookie = repp.replaceBookie(
- 6, 6, ackQuorum, null,
- new HashSet<>(ensemble), bookieToReplace, excludedAddrs);
+ Pair<BookieSocketAddress, Boolean> replaceBookieResponse =
repp.replaceBookie(6, 6, ackQuorum, null,
+ ensemble, bookieToReplace, excludedAddrs);
+ BookieSocketAddress replacedBookie =
replaceBookieResponse.getLeft();
assert (replacedBookie.equals(replacedBookieExpected));
assertEquals(3, getNumRegionsInEnsemble(ensemble));
} catch (BKNotEnoughBookiesException bnebe) {
@@ -997,9 +1018,7 @@ public class TestRegionAwareEnsemblePlacementPolicy
extends TestCase {
excludedAddrs.add(replacedBookieExpected);
try {
- BookieSocketAddress replacedBookie = repp.replaceBookie(
- 6, 6, ackQuorum, null,
- new HashSet<>(ensemble), bookieToReplace, excludedAddrs);
+ repp.replaceBookie(6, 6, ackQuorum, null, ensemble,
bookieToReplace, excludedAddrs);
if (minDurability > 1 &&
!disableDurabilityFeature.isAvailable()) {
fail("Should throw BKNotEnoughBookiesException when there
is not enough bookies");
}
@@ -1073,9 +1092,11 @@ public class TestRegionAwareEnsemblePlacementPolicy
extends TestCase {
.set(true);
}
+ Pair<List<BookieSocketAddress>, Boolean> ensembleResponse;
List<BookieSocketAddress> ensemble;
try {
- ensemble = repp.newEnsemble(6, 6, 4, null, new
HashSet<BookieSocketAddress>());
+ ensembleResponse = repp.newEnsemble(6, 6, 4, null, new
HashSet<BookieSocketAddress>());
+ ensemble = ensembleResponse.getLeft();
assert(ensemble.size() == 6);
} catch (BKNotEnoughBookiesException bnebe) {
LOG.error("BKNotEnoughBookiesException", bnebe);
@@ -1086,9 +1107,7 @@ public class TestRegionAwareEnsemblePlacementPolicy
extends TestCase {
Set<BookieSocketAddress> excludedAddrs = new
HashSet<BookieSocketAddress>();
try {
- repp.replaceBookie(
- 6, 6, 4, null,
- new HashSet<>(ensemble), addr4, excludedAddrs);
+ repp.replaceBookie(6, 6, 4, null, ensemble, ensemble.get(2),
excludedAddrs);
} catch (BKNotEnoughBookiesException bnebe) {
fail("Should not get not enough bookies exception even there is
only one rack.");
}
@@ -1141,8 +1160,8 @@ public class TestRegionAwareEnsemblePlacementPolicy
extends TestCase {
excludedAddrs.add(addr10);
excludedAddrs.add(addr9);
try {
- List<BookieSocketAddress> list = repp.newEnsemble(5, 5, 5, null,
excludedAddrs);
- LOG.info("Ensemble : {}", list);
+ Pair<List<BookieSocketAddress>, Boolean> list =
repp.newEnsemble(5, 5, 5, null, excludedAddrs);
+ LOG.info("Ensemble : {}", list.getLeft());
fail("Should throw BKNotEnoughBookiesException when there is not
enough bookies");
} catch (BKNotEnoughBookiesException bnebe) {
// should throw not enou
@@ -1201,8 +1220,9 @@ public class TestRegionAwareEnsemblePlacementPolicy
extends TestCase {
private void basicReorderReadSequenceWithLocalRegionTest(String myRegion,
boolean isReadLAC) throws Exception {
prepareNetworkTopologyForReorderTests(myRegion);
-
- List<BookieSocketAddress> ensemble = repp.newEnsemble(9, 9, 5, null,
new HashSet<BookieSocketAddress>());
+ Pair<List<BookieSocketAddress>, Boolean> ensembleResponse =
repp.newEnsemble(9, 9, 5, null,
+ new HashSet<BookieSocketAddress>());
+ List<BookieSocketAddress> ensemble = ensembleResponse.getLeft();
assertEquals(9, getNumCoveredRegionsInWriteQuorum(ensemble, 9));
DistributionSchedule ds = new RoundRobinDistributionSchedule(9, 9, 9);
@@ -1258,7 +1278,9 @@ public class TestRegionAwareEnsemblePlacementPolicy
extends TestCase {
private void basicReorderReadSequenceWithRemoteRegionTest(String myRegion,
boolean isReadLAC) throws Exception {
prepareNetworkTopologyForReorderTests(myRegion);
- List<BookieSocketAddress> ensemble = repp.newEnsemble(9, 9, 5, null,
new HashSet<BookieSocketAddress>());
+ Pair<List<BookieSocketAddress>, Boolean> ensembleResponse =
repp.newEnsemble(9, 9, 5, null,
+ new HashSet<BookieSocketAddress>());
+ List<BookieSocketAddress> ensemble = ensembleResponse.getLeft();
assertEquals(9, getNumCoveredRegionsInWriteQuorum(ensemble, 9));
DistributionSchedule ds = new RoundRobinDistributionSchedule(9, 9, 9);
@@ -1328,7 +1350,9 @@ public class TestRegionAwareEnsemblePlacementPolicy
extends TestCase {
prepareNetworkTopologyForReorderTests(myRegion);
- List<BookieSocketAddress> ensemble = repp.newEnsemble(9, 9, 5, null,
new HashSet<BookieSocketAddress>());
+ Pair<List<BookieSocketAddress>, Boolean> ensembleResponse =
repp.newEnsemble(9, 9, 5, null,
+ new HashSet<BookieSocketAddress>());
+ List<BookieSocketAddress> ensemble = ensembleResponse.getLeft();
assertEquals(9, getNumCoveredRegionsInWriteQuorum(ensemble, 9));
DistributionSchedule ds = new RoundRobinDistributionSchedule(9, 9, 9);