sijie closed pull request #1883: Enhance EnsemblePlacementPolicy and
DNSResolverDecorator to log relevant metrics.
URL: https://github.com/apache/bookkeeper/pull/1883
This is a PR merged from a forked repository.
As GitHub hides the original diff on merge, it is displayed below for
the sake of provenance:
As this is a foreign pull request (from a fork), the diff is supplied
below (as it won't show otherwise due to GitHub magic):
diff --git
a/bookkeeper-server/src/main/java/org/apache/bookkeeper/bookie/BookKeeperServerStats.java
b/bookkeeper-server/src/main/java/org/apache/bookkeeper/bookie/BookKeeperServerStats.java
index cdafd15a99..b58514b670 100644
---
a/bookkeeper-server/src/main/java/org/apache/bookkeeper/bookie/BookKeeperServerStats.java
+++
b/bookkeeper-server/src/main/java/org/apache/bookkeeper/bookie/BookKeeperServerStats.java
@@ -69,6 +69,8 @@
String WATCHER_SCOPE = "bookie_watcher";
String REPLACE_BOOKIE_TIME = "REPLACE_BOOKIE_TIME";
String NEW_ENSEMBLE_TIME = "NEW_ENSEMBLE_TIME";
+ String FAILED_TO_RESOLVE_NETWORK_LOCATION_COUNTER =
"FAILED_TO_RESOLVE_NETWORK_LOCATION_COUNTER";
+ String ENSEMBLE_NOT_ADHERING_TO_PLACEMENT_POLICY_COUNTER =
"ENSEMBLE_NOT_ADHERING_TO_PLACEMENT_POLICY_COUNTER";
// Bookie Operations
String BOOKIE_ADD_ENTRY = "BOOKIE_ADD_ENTRY";
diff --git
a/bookkeeper-server/src/main/java/org/apache/bookkeeper/bookie/LocalBookieEnsemblePlacementPolicy.java
b/bookkeeper-server/src/main/java/org/apache/bookkeeper/bookie/LocalBookieEnsemblePlacementPolicy.java
index 46978ea5c6..7b7cc46bcf 100644
---
a/bookkeeper-server/src/main/java/org/apache/bookkeeper/bookie/LocalBookieEnsemblePlacementPolicy.java
+++
b/bookkeeper-server/src/main/java/org/apache/bookkeeper/bookie/LocalBookieEnsemblePlacementPolicy.java
@@ -36,6 +36,7 @@
import org.apache.bookkeeper.net.BookieSocketAddress;
import org.apache.bookkeeper.net.DNSToSwitchMapping;
import org.apache.bookkeeper.stats.StatsLogger;
+import org.apache.commons.lang3.tuple.Pair;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
@@ -80,9 +81,9 @@ public void uninitalize() {
}
@Override
- public BookieSocketAddress replaceBookie(int ensembleSize, int
writeQuorumSize, int ackQuorumSize,
- java.util.Map<String, byte[]> customMetadata, Set<BookieSocketAddress>
currentEnsemble,
- BookieSocketAddress bookieToReplace, Set<BookieSocketAddress>
excludeBookies)
+ public Pair<BookieSocketAddress, Boolean> replaceBookie(int ensembleSize,
int writeQuorumSize, int ackQuorumSize,
+ java.util.Map<String, byte[]> customMetadata,
List<BookieSocketAddress> currentEnsemble,
+ BookieSocketAddress bookieToReplace, Set<BookieSocketAddress>
excludeBookies)
throws BKNotEnoughBookiesException {
throw new BKNotEnoughBookiesException();
}
@@ -109,18 +110,24 @@ public void registerSlowBookie(BookieSocketAddress
bookieSocketAddress, long ent
}
@Override
- public List<BookieSocketAddress> newEnsemble(int ensembleSize, int
writeQuorumSize, int ackQuorumSize,
- java.util.Map<String, byte[]> customMetadata, Set<BookieSocketAddress>
excludeBookies)
+ public Pair<List<BookieSocketAddress>, Boolean> newEnsemble(int
ensembleSize, int writeQuorumSize,
+ int ackQuorumSize, java.util.Map<String, byte[]> customMetadata,
Set<BookieSocketAddress> excludeBookies)
throws BKNotEnoughBookiesException {
if (ensembleSize > 1) {
throw new IllegalArgumentException("Local ensemble policy can only
return 1 bookie");
}
- return Lists.newArrayList(bookieAddress);
+ return Pair.of(Lists.newArrayList(bookieAddress), true);
}
@Override
public void updateBookieInfo(Map<BookieSocketAddress, BookieInfo>
bookieToFreeSpaceMap) {
return;
}
+
+ @Override
+ public boolean
isEnsembleAdheringToPlacementPolicy(List<BookieSocketAddress> ensembleList, int
writeQuorumSize,
+ int ackQuorumSize) {
+ return true;
+ }
}
diff --git
a/bookkeeper-server/src/main/java/org/apache/bookkeeper/client/BookKeeperAdmin.java
b/bookkeeper-server/src/main/java/org/apache/bookkeeper/client/BookKeeperAdmin.java
index 3f837dbaf7..7b492922f1 100644
---
a/bookkeeper-server/src/main/java/org/apache/bookkeeper/client/BookKeeperAdmin.java
+++
b/bookkeeper-server/src/main/java/org/apache/bookkeeper/client/BookKeeperAdmin.java
@@ -35,7 +35,6 @@
import java.util.Collection;
import java.util.Enumeration;
import java.util.HashMap;
-import java.util.HashSet;
import java.util.Iterator;
import java.util.LinkedList;
import java.util.List;
@@ -82,6 +81,7 @@
import org.apache.bookkeeper.stats.NullStatsLogger;
import org.apache.bookkeeper.stats.StatsLogger;
import org.apache.bookkeeper.util.IOUtils;
+import org.apache.commons.lang3.tuple.Pair;
import org.apache.zookeeper.AsyncCallback;
import org.apache.zookeeper.KeeperException;
import org.slf4j.Logger;
@@ -994,15 +994,25 @@ private void asyncRecoverLedgerFragment(final
LedgerHandle lh,
// allocate bookies
for (Integer bookieIndex : bookieIndexesToRereplicate) {
BookieSocketAddress oldBookie = ensemble.get(bookieIndex);
- BookieSocketAddress newBookie =
+ Pair<BookieSocketAddress, Boolean> replaceBookieResponse =
bkc.getPlacementPolicy().replaceBookie(
lh.getLedgerMetadata().getEnsembleSize(),
lh.getLedgerMetadata().getWriteQuorumSize(),
lh.getLedgerMetadata().getAckQuorumSize(),
lh.getLedgerMetadata().getCustomMetadata(),
- new HashSet<>(ensemble),
+ ensemble,
oldBookie,
bookiesToExclude);
+ BookieSocketAddress newBookie = replaceBookieResponse.getLeft();
+ boolean isEnsembleAdheringToPlacementPolicy =
replaceBookieResponse.getRight();
+ if (!isEnsembleAdheringToPlacementPolicy) {
+ if (LOG.isDebugEnabled()) {
+ LOG.debug(
+ "replaceBookie for bookie: {} in ensemble: {} "
+ + "is not adhering to placement policy and
chose {}",
+ oldBookie, ensemble, newBookie);
+ }
+ }
targetBookieAddresses.put(bookieIndex, newBookie);
bookiesToExclude.add(newBookie);
}
diff --git
a/bookkeeper-server/src/main/java/org/apache/bookkeeper/client/BookieWatcherImpl.java
b/bookkeeper-server/src/main/java/org/apache/bookkeeper/client/BookieWatcherImpl.java
index 8b14b77174..74c1df984b 100644
---
a/bookkeeper-server/src/main/java/org/apache/bookkeeper/client/BookieWatcherImpl.java
+++
b/bookkeeper-server/src/main/java/org/apache/bookkeeper/client/BookieWatcherImpl.java
@@ -17,6 +17,7 @@
*/
package org.apache.bookkeeper.client;
+import static
org.apache.bookkeeper.bookie.BookKeeperServerStats.ENSEMBLE_NOT_ADHERING_TO_PLACEMENT_POLICY_COUNTER;
import static
org.apache.bookkeeper.bookie.BookKeeperServerStats.NEW_ENSEMBLE_TIME;
import static
org.apache.bookkeeper.bookie.BookKeeperServerStats.REPLACE_BOOKIE_TIME;
import static org.apache.bookkeeper.bookie.BookKeeperServerStats.WATCHER_SCOPE;
@@ -35,6 +36,8 @@
import java.util.concurrent.TimeUnit;
import java.util.function.Function;
import lombok.extern.slf4j.Slf4j;
+
+import org.apache.bookkeeper.bookie.BookKeeperServerStats;
import org.apache.bookkeeper.client.BKException.BKInterruptedException;
import org.apache.bookkeeper.client.BKException.BKNotEnoughBookiesException;
import org.apache.bookkeeper.client.BKException.MetaStoreException;
@@ -43,9 +46,11 @@
import org.apache.bookkeeper.conf.ClientConfiguration;
import org.apache.bookkeeper.discover.RegistrationClient;
import org.apache.bookkeeper.net.BookieSocketAddress;
+import org.apache.bookkeeper.stats.Counter;
import org.apache.bookkeeper.stats.OpStatsLogger;
import org.apache.bookkeeper.stats.StatsLogger;
import org.apache.bookkeeper.stats.annotations.StatsDoc;
+import org.apache.commons.lang3.tuple.Pair;
/**
* This class is responsible for maintaining a consistent view of what bookies
@@ -88,6 +93,12 @@
help = "operation stats of replacing bookie in an ensemble"
)
private final OpStatsLogger replaceBookieTimer;
+ @StatsDoc(
+ name = ENSEMBLE_NOT_ADHERING_TO_PLACEMENT_POLICY_COUNTER,
+ help = "total number of newEnsemble/replaceBookie operations
failed to adhere"
+ + " EnsemblePlacementPolicy"
+ )
+ private final Counter ensembleNotAdheringToPlacementPolicy;
// Bookies that will not be preferred to be chosen in a new ensemble
final Cache<BookieSocketAddress, Boolean> quarantinedBookies;
@@ -117,6 +128,8 @@ public void
onRemoval(RemovalNotification<BookieSocketAddress, Boolean> bookie)
}).build();
this.newEnsembleTimer =
statsLogger.getOpStatsLogger(NEW_ENSEMBLE_TIME);
this.replaceBookieTimer =
statsLogger.getOpStatsLogger(REPLACE_BOOKIE_TIME);
+ this.ensembleNotAdheringToPlacementPolicy = statsLogger
+
.getCounter(BookKeeperServerStats.ENSEMBLE_NOT_ADHERING_TO_PLACEMENT_POLICY_COUNTER);
}
@Override
@@ -213,19 +226,34 @@ public void initialBlockingBookieRead() throws
BKException {
int ackQuorumSize, Map<String, byte[]> customMetadata)
throws BKNotEnoughBookiesException {
long startTime = MathUtils.nowInNano();
+ Pair<List<BookieSocketAddress>, Boolean> newEnsembleResponse;
List<BookieSocketAddress> socketAddresses;
+ boolean isEnsembleAdheringToPlacementPolicy = false;
try {
- socketAddresses = placementPolicy.newEnsemble(ensembleSize,
- writeQuorumSize, ackQuorumSize, customMetadata, new
HashSet<BookieSocketAddress>(
- quarantinedBookies.asMap().keySet()));
+ Set<BookieSocketAddress> quarantinedBookiesSet =
quarantinedBookies.asMap().keySet();
+ newEnsembleResponse = placementPolicy.newEnsemble(ensembleSize,
writeQuorumSize, ackQuorumSize,
+ customMetadata, new
HashSet<BookieSocketAddress>(quarantinedBookiesSet));
+ socketAddresses = newEnsembleResponse.getLeft();
+ isEnsembleAdheringToPlacementPolicy =
newEnsembleResponse.getRight();
+ if (!isEnsembleAdheringToPlacementPolicy) {
+ ensembleNotAdheringToPlacementPolicy.inc();
+ log.warn("New ensemble: {} is not adhering to Placement
Policy. quarantinedBookies: {}",
+ socketAddresses, quarantinedBookiesSet);
+ }
// we try to only get from the healthy bookies first
newEnsembleTimer.registerSuccessfulEvent(MathUtils.nowInNano() -
startTime, TimeUnit.NANOSECONDS);
} catch (BKNotEnoughBookiesException e) {
if (log.isDebugEnabled()) {
log.debug("Not enough healthy bookies available, using
quarantined bookies");
}
- socketAddresses = placementPolicy.newEnsemble(
+ newEnsembleResponse = placementPolicy.newEnsemble(
ensembleSize, writeQuorumSize, ackQuorumSize,
customMetadata, new HashSet<>());
+ socketAddresses = newEnsembleResponse.getLeft();
+ isEnsembleAdheringToPlacementPolicy =
newEnsembleResponse.getRight();
+ if (!isEnsembleAdheringToPlacementPolicy) {
+ ensembleNotAdheringToPlacementPolicy.inc();
+ log.warn("New ensemble: {} is not adhering to Placement
Policy", socketAddresses);
+ }
newEnsembleTimer.registerFailedEvent(MathUtils.nowInNano() -
startTime, TimeUnit.NANOSECONDS);
}
return socketAddresses;
@@ -239,22 +267,43 @@ public BookieSocketAddress replaceBookie(int
ensembleSize, int writeQuorumSize,
throws BKNotEnoughBookiesException {
long startTime = MathUtils.nowInNano();
BookieSocketAddress addr = existingBookies.get(bookieIdx);
+ Pair<BookieSocketAddress, Boolean> replaceBookieResponse;
BookieSocketAddress socketAddress;
+ boolean isEnsembleAdheringToPlacementPolicy = false;
try {
// we exclude the quarantined bookies also first
- Set<BookieSocketAddress> existingAndQuarantinedBookies = new
HashSet<BookieSocketAddress>(existingBookies);
-
existingAndQuarantinedBookies.addAll(quarantinedBookies.asMap().keySet());
- socketAddress = placementPolicy.replaceBookie(
+ Set<BookieSocketAddress> excludedBookiesAndQuarantinedBookies =
new HashSet<BookieSocketAddress>(
+ excludeBookies);
+ Set<BookieSocketAddress> quarantinedBookiesSet =
quarantinedBookies.asMap().keySet();
+ excludedBookiesAndQuarantinedBookies.addAll(quarantinedBookiesSet);
+ replaceBookieResponse = placementPolicy.replaceBookie(
ensembleSize, writeQuorumSize, ackQuorumSize,
customMetadata,
- existingAndQuarantinedBookies, addr, excludeBookies);
+ existingBookies, addr,
excludedBookiesAndQuarantinedBookies);
+ socketAddress = replaceBookieResponse.getLeft();
+ isEnsembleAdheringToPlacementPolicy =
replaceBookieResponse.getRight();
+ if (!isEnsembleAdheringToPlacementPolicy) {
+ ensembleNotAdheringToPlacementPolicy.inc();
+ log.warn(
+ "replaceBookie for bookie: {} in ensemble: {} is not
adhering to placement policy and"
+ + " chose {}. excludedBookies {} and
quarantinedBookies {}",
+ addr, existingBookies, socketAddress, excludeBookies,
quarantinedBookiesSet);
+ }
replaceBookieTimer.registerSuccessfulEvent(MathUtils.nowInNano() -
startTime, TimeUnit.NANOSECONDS);
} catch (BKNotEnoughBookiesException e) {
if (log.isDebugEnabled()) {
log.debug("Not enough healthy bookies available, using
quarantined bookies");
}
- socketAddress = placementPolicy.replaceBookie(
- ensembleSize, writeQuorumSize, ackQuorumSize,
customMetadata,
- new HashSet<BookieSocketAddress>(existingBookies), addr,
excludeBookies);
+ replaceBookieResponse =
placementPolicy.replaceBookie(ensembleSize, writeQuorumSize, ackQuorumSize,
+ customMetadata, existingBookies, addr, excludeBookies);
+ socketAddress = replaceBookieResponse.getLeft();
+ isEnsembleAdheringToPlacementPolicy =
replaceBookieResponse.getRight();
+ if (!isEnsembleAdheringToPlacementPolicy) {
+ ensembleNotAdheringToPlacementPolicy.inc();
+ log.warn(
+ "replaceBookie for bookie: {} in ensemble: {} is not
adhering to placement policy and"
+ + " chose {}. excludedBookies {}",
+ addr, existingBookies, socketAddress, excludeBookies);
+ }
replaceBookieTimer.registerFailedEvent(MathUtils.nowInNano() -
startTime, TimeUnit.NANOSECONDS);
}
return socketAddress;
diff --git
a/bookkeeper-server/src/main/java/org/apache/bookkeeper/client/DefaultEnsemblePlacementPolicy.java
b/bookkeeper-server/src/main/java/org/apache/bookkeeper/client/DefaultEnsemblePlacementPolicy.java
index 917d18f717..dddbe1cb40 100644
---
a/bookkeeper-server/src/main/java/org/apache/bookkeeper/client/DefaultEnsemblePlacementPolicy.java
+++
b/bookkeeper-server/src/main/java/org/apache/bookkeeper/client/DefaultEnsemblePlacementPolicy.java
@@ -39,6 +39,7 @@
import org.apache.bookkeeper.net.DNSToSwitchMapping;
import org.apache.bookkeeper.stats.StatsLogger;
import org.apache.commons.collections4.CollectionUtils;
+import org.apache.commons.lang3.tuple.Pair;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
@@ -64,12 +65,12 @@
}
@Override
- public ArrayList<BookieSocketAddress> newEnsemble(int ensembleSize, int
quorumSize, int ackQuorumSize,
+ public Pair<List<BookieSocketAddress>, Boolean> newEnsemble(int
ensembleSize, int quorumSize, int ackQuorumSize,
Map<String, byte[]> customMetadata, Set<BookieSocketAddress>
excludeBookies)
throws BKNotEnoughBookiesException {
ArrayList<BookieSocketAddress> newBookies = new
ArrayList<BookieSocketAddress>(ensembleSize);
if (ensembleSize <= 0) {
- return newBookies;
+ return Pair.of(newBookies, false);
}
List<BookieSocketAddress> allBookies;
rwLock.readLock().lock();
@@ -95,7 +96,8 @@
newBookies.add(b);
--ensembleSize;
if (ensembleSize == 0) {
- return newBookies;
+ return Pair.of(newBookies,
+
isEnsembleAdheringToPlacementPolicy(newBookies, quorumSize, ackQuorumSize));
}
}
} finally {
@@ -110,7 +112,8 @@
newBookies.add(bookie);
--ensembleSize;
if (ensembleSize == 0) {
- return newBookies;
+ return Pair.of(newBookies,
+ isEnsembleAdheringToPlacementPolicy(newBookies,
quorumSize, ackQuorumSize));
}
}
}
@@ -118,13 +121,17 @@
}
@Override
- public BookieSocketAddress replaceBookie(int ensembleSize, int
writeQuorumSize, int ackQuorumSize,
- Map<String, byte[]> customMetadata, Set<BookieSocketAddress>
currentEnsemble,
+ public Pair<BookieSocketAddress, Boolean> replaceBookie(int ensembleSize,
int writeQuorumSize, int ackQuorumSize,
+ Map<String, byte[]> customMetadata, List<BookieSocketAddress>
currentEnsemble,
BookieSocketAddress bookieToReplace, Set<BookieSocketAddress>
excludeBookies)
throws BKNotEnoughBookiesException {
excludeBookies.addAll(currentEnsemble);
- ArrayList<BookieSocketAddress> addresses = newEnsemble(1, 1, 1,
customMetadata, excludeBookies);
- return addresses.get(0);
+ List<BookieSocketAddress> addresses = newEnsemble(1, 1, 1,
customMetadata, excludeBookies).getLeft();
+
+ BookieSocketAddress candidateAddr = addresses.get(0);
+ List<BookieSocketAddress> newEnsemble = new
ArrayList<BookieSocketAddress>(currentEnsemble);
+ newEnsemble.set(currentEnsemble.indexOf(bookieToReplace),
candidateAddr);
+ return Pair.of(candidateAddr,
isEnsembleAdheringToPlacementPolicy(newEnsemble, writeQuorumSize,
ackQuorumSize));
}
@Override
@@ -210,4 +217,10 @@ public void updateBookieInfo(Map<BookieSocketAddress,
BookieInfo> bookieInfoMap)
public void uninitalize() {
// do nothing
}
+
+ @Override
+ public boolean
isEnsembleAdheringToPlacementPolicy(List<BookieSocketAddress> ensembleList, int
writeQuorumSize,
+ int ackQuorumSize) {
+ return true;
+ }
}
diff --git
a/bookkeeper-server/src/main/java/org/apache/bookkeeper/client/EnsemblePlacementPolicy.java
b/bookkeeper-server/src/main/java/org/apache/bookkeeper/client/EnsemblePlacementPolicy.java
index 23932a3d7d..00bac8e3c4 100644
---
a/bookkeeper-server/src/main/java/org/apache/bookkeeper/client/EnsemblePlacementPolicy.java
+++
b/bookkeeper-server/src/main/java/org/apache/bookkeeper/client/EnsemblePlacementPolicy.java
@@ -35,6 +35,7 @@
import org.apache.bookkeeper.net.BookieSocketAddress;
import org.apache.bookkeeper.net.DNSToSwitchMapping;
import org.apache.bookkeeper.stats.StatsLogger;
+import org.apache.commons.lang3.tuple.Pair;
/**
* {@link EnsemblePlacementPolicy} encapsulates the algorithm that bookkeeper
client uses to select a number of bookies
@@ -263,12 +264,12 @@ EnsemblePlacementPolicy initialize(ClientConfiguration
conf,
* @throws BKNotEnoughBookiesException if not enough bookies available.
* @return the List<org.apache.bookkeeper.net.BookieSocketAddress>
*/
- List<BookieSocketAddress> newEnsemble(int ensembleSize,
- int writeQuorumSize,
- int ackQuorumSize,
- Map<String, byte[]>
customMetadata,
- Set<BookieSocketAddress>
excludeBookies)
- throws BKNotEnoughBookiesException;
+ Pair<List<BookieSocketAddress>, Boolean> newEnsemble(int ensembleSize,
+ int writeQuorumSize,
+ int ackQuorumSize,
+ Map<String, byte[]>
customMetadata,
+
Set<BookieSocketAddress> excludeBookies)
+ throws BKNotEnoughBookiesException;
/**
* Choose a new bookie to replace <i>bookieToReplace</i>. If no bookie
available in the cluster,
@@ -287,14 +288,14 @@ EnsemblePlacementPolicy initialize(ClientConfiguration
conf,
* @throws BKNotEnoughBookiesException
* @return the org.apache.bookkeeper.net.BookieSocketAddress
*/
- BookieSocketAddress replaceBookie(int ensembleSize,
- int writeQuorumSize,
- int ackQuorumSize,
- Map<String, byte[]> customMetadata,
- Set<BookieSocketAddress> currentEnsemble,
- BookieSocketAddress bookieToReplace,
- Set<BookieSocketAddress> excludeBookies)
- throws BKNotEnoughBookiesException;
+ Pair<BookieSocketAddress, Boolean> replaceBookie(int ensembleSize,
+ int writeQuorumSize,
+ int ackQuorumSize,
+ Map<String, byte[]>
customMetadata,
+ List<BookieSocketAddress>
currentEnsemble,
+ BookieSocketAddress
bookieToReplace,
+ Set<BookieSocketAddress>
excludeBookies)
+ throws BKNotEnoughBookiesException;
/**
* Register a bookie as slow so that it is tried after available and
read-only bookies.
@@ -386,4 +387,25 @@ default int getStickyReadBookieIndex(LedgerMetadata
metadata, Optional<Integer>
return (currentStickyBookieIndex.get() + 1) %
metadata.getEnsembleSize();
}
}
+
+ /**
+ * returns true if the Ensemble is strictly adhering to placement policy,
+ * like in the case of RackawareEnsemblePlacementPolicy, bookies in the
+ * writeset are from 'minNumRacksPerWriteQuorum' number of racks. And in
the
+ * case of RegionawareEnsemblePlacementPolicy, check for
+ * minimumRegionsForDurability, reppRegionsToWrite, rack distribution
within
+ * a region and other parameters of RegionAwareEnsemblePlacementPolicy.
+ *
+ * @param ensembleList
+ * list of BookieSocketAddress of bookies in the ensemble
+ * @param writeQuorumSize
+ * writeQuorumSize of the ensemble
+ * @param ackQuorumSize
+ * ackQuorumSize of the ensemble
+ * @return
+ */
+ default boolean
isEnsembleAdheringToPlacementPolicy(List<BookieSocketAddress> ensembleList, int
writeQuorumSize,
+ int ackQuorumSize) {
+ return false;
+ }
}
diff --git
a/bookkeeper-server/src/main/java/org/apache/bookkeeper/client/ITopologyAwareEnsemblePlacementPolicy.java
b/bookkeeper-server/src/main/java/org/apache/bookkeeper/client/ITopologyAwareEnsemblePlacementPolicy.java
index 7c9e07cd10..254f5359c2 100644
---
a/bookkeeper-server/src/main/java/org/apache/bookkeeper/client/ITopologyAwareEnsemblePlacementPolicy.java
+++
b/bookkeeper-server/src/main/java/org/apache/bookkeeper/client/ITopologyAwareEnsemblePlacementPolicy.java
@@ -28,6 +28,7 @@
import org.apache.bookkeeper.common.annotation.InterfaceStability;
import org.apache.bookkeeper.net.BookieSocketAddress;
import org.apache.bookkeeper.net.Node;
+import org.apache.commons.lang3.tuple.Pair;
/**
* Interface for topology aware ensemble placement policy.
@@ -93,7 +94,7 @@
* @return list of bookies forming the ensemble
* @throws BKException.BKNotEnoughBookiesException
*/
- List<BookieSocketAddress> newEnsemble(
+ Pair<List<BookieSocketAddress>, Boolean> newEnsemble(
int ensembleSize,
int writeQuorumSize,
int ackQuorumSize,
diff --git
a/bookkeeper-server/src/main/java/org/apache/bookkeeper/client/RackawareEnsemblePlacementPolicy.java
b/bookkeeper-server/src/main/java/org/apache/bookkeeper/client/RackawareEnsemblePlacementPolicy.java
index 1fd7580c81..8054d97187 100644
---
a/bookkeeper-server/src/main/java/org/apache/bookkeeper/client/RackawareEnsemblePlacementPolicy.java
+++
b/bookkeeper-server/src/main/java/org/apache/bookkeeper/client/RackawareEnsemblePlacementPolicy.java
@@ -18,19 +18,16 @@
package org.apache.bookkeeper.client;
import io.netty.util.HashedWheelTimer;
-
import java.util.List;
import java.util.Map;
import java.util.Set;
import org.apache.bookkeeper.client.BKException.BKNotEnoughBookiesException;
-import
org.apache.bookkeeper.client.ITopologyAwareEnsemblePlacementPolicy.Ensemble;
-import
org.apache.bookkeeper.client.ITopologyAwareEnsemblePlacementPolicy.Predicate;
-import
org.apache.bookkeeper.client.TopologyAwareEnsemblePlacementPolicy.BookieNode;
import org.apache.bookkeeper.net.BookieSocketAddress;
import org.apache.bookkeeper.net.DNSToSwitchMapping;
import org.apache.bookkeeper.net.Node;
import org.apache.bookkeeper.stats.StatsLogger;
+import org.apache.commons.lang3.tuple.Pair;
/**
* A placement policy implementation use rack information for placing
ensembles.
@@ -95,8 +92,8 @@ public void uninitalize() {
}
@Override
- public List<BookieSocketAddress> newEnsemble(int ensembleSize, int
writeQuorumSize, int ackQuorumSize,
- Map<String, byte[]> customMetadata, Set<BookieSocketAddress>
excludeBookies)
+ public Pair<List<BookieSocketAddress>, Boolean> newEnsemble(int
ensembleSize, int writeQuorumSize,
+ int ackQuorumSize, Map<String, byte[]> customMetadata,
Set<BookieSocketAddress> excludeBookies)
throws BKException.BKNotEnoughBookiesException {
try {
return super.newEnsemble(ensembleSize, writeQuorumSize,
ackQuorumSize, customMetadata, excludeBookies);
@@ -110,8 +107,8 @@ public void uninitalize() {
}
@Override
- public BookieSocketAddress replaceBookie(int ensembleSize, int
writeQuorumSize, int ackQuorumSize,
- Map<String, byte[]> customMetadata, Set<BookieSocketAddress>
currentEnsemble,
+ public Pair<BookieSocketAddress, Boolean> replaceBookie(int ensembleSize,
int writeQuorumSize, int ackQuorumSize,
+ Map<String, byte[]> customMetadata, List<BookieSocketAddress>
currentEnsemble,
BookieSocketAddress bookieToReplace, Set<BookieSocketAddress>
excludeBookies)
throws BKException.BKNotEnoughBookiesException {
try {
@@ -146,7 +143,7 @@ public BookieSocketAddress replaceBookie(int ensembleSize,
int writeQuorumSize,
}
@Override
- public List<BookieSocketAddress> newEnsemble(int ensembleSize,
+ public Pair<List<BookieSocketAddress>, Boolean> newEnsemble(int
ensembleSize,
int writeQuorumSize,
int ackQuorumSize,
Set<BookieSocketAddress>
excludeBookies,
diff --git
a/bookkeeper-server/src/main/java/org/apache/bookkeeper/client/RackawareEnsemblePlacementPolicyImpl.java
b/bookkeeper-server/src/main/java/org/apache/bookkeeper/client/RackawareEnsemblePlacementPolicyImpl.java
index 7578c416ee..6db7de8f72 100644
---
a/bookkeeper-server/src/main/java/org/apache/bookkeeper/client/RackawareEnsemblePlacementPolicyImpl.java
+++
b/bookkeeper-server/src/main/java/org/apache/bookkeeper/client/RackawareEnsemblePlacementPolicyImpl.java
@@ -20,6 +20,7 @@
import static com.google.common.base.Preconditions.checkNotNull;
import static
org.apache.bookkeeper.bookie.BookKeeperServerStats.BOOKIES_JOINED;
import static org.apache.bookkeeper.bookie.BookKeeperServerStats.BOOKIES_LEFT;
+import static
org.apache.bookkeeper.bookie.BookKeeperServerStats.FAILED_TO_RESOLVE_NETWORK_LOCATION_COUNTER;
import static org.apache.bookkeeper.client.BookKeeperClientStats.CLIENT_SCOPE;
import static
org.apache.bookkeeper.client.BookKeeperClientStats.READ_REQUESTS_REORDERED;
import static
org.apache.bookkeeper.client.RegionAwareEnsemblePlacementPolicy.UNKNOWN_REGION;
@@ -35,6 +36,7 @@
import java.net.InetAddress;
import java.net.UnknownHostException;
import java.util.ArrayList;
+import java.util.Collection;
import java.util.Collections;
import java.util.HashMap;
import java.util.HashSet;
@@ -64,10 +66,12 @@
import org.apache.bookkeeper.net.NodeBase;
import org.apache.bookkeeper.net.ScriptBasedMapping;
import org.apache.bookkeeper.net.StabilizeNetworkTopology;
+import org.apache.bookkeeper.stats.Counter;
import org.apache.bookkeeper.stats.OpStatsLogger;
import org.apache.bookkeeper.stats.StatsLogger;
import org.apache.bookkeeper.stats.annotations.StatsDoc;
import org.apache.commons.collections4.CollectionUtils;
+import org.apache.commons.lang3.tuple.Pair;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
@@ -146,12 +150,19 @@ public void reloadCachedMappings() {
final Supplier<String> defaultRackSupplier;
final DNSToSwitchMapping resolver;
-
- DNSResolverDecorator(DNSToSwitchMapping resolver, Supplier<String>
defaultRackSupplier) {
+ @StatsDoc(
+ name = FAILED_TO_RESOLVE_NETWORK_LOCATION_COUNTER,
+ help = "total number of times Resolver failed to resolve rack
information of a node"
+ )
+ final Counter failedToResolveNetworkLocationCounter;
+
+ DNSResolverDecorator(DNSToSwitchMapping resolver, Supplier<String>
defaultRackSupplier,
+ Counter failedToResolveNetworkLocationCounter) {
checkNotNull(resolver, "Resolver cannot be null");
checkNotNull(defaultRackSupplier, "defaultRackSupplier should not
be null");
this.defaultRackSupplier = defaultRackSupplier;
this.resolver = resolver;
+ this.failedToResolveNetworkLocationCounter =
failedToResolveNetworkLocationCounter;
}
public List<String> resolve(List<String> names) {
@@ -167,6 +178,7 @@ public void reloadCachedMappings() {
if (rNames.get(i) == null) {
LOG.warn("Failed to resolve network location for {},
using default rack for it : {}.",
names.get(i), defaultRack);
+ failedToResolveNetworkLocationCounter.inc();
rNames.set(i, defaultRack);
}
}
@@ -178,6 +190,7 @@ public void reloadCachedMappings() {
rNames = new ArrayList<>(names.size());
for (int i = 0; i < names.size(); ++i) {
+ failedToResolveNetworkLocationCounter.inc();
rNames.add(defaultRack);
}
return rNames;
@@ -227,6 +240,7 @@ public void reloadCachedMappings() {
help = "The distribution of number of bookies reordered on each read
request"
)
protected OpStatsLogger readReorderedCounter = null;
+ protected Counter failedToResolveNetworkLocationCounter = null;
private String defaultRack = NetworkTopology.DEFAULT_RACK;
@@ -267,10 +281,13 @@ protected RackawareEnsemblePlacementPolicyImpl
initialize(DNSToSwitchMapping dns
this.bookiesJoinedCounter =
statsLogger.getOpStatsLogger(BOOKIES_JOINED);
this.bookiesLeftCounter = statsLogger.getOpStatsLogger(BOOKIES_LEFT);
this.readReorderedCounter =
statsLogger.getOpStatsLogger(READ_REQUESTS_REORDERED);
+ this.failedToResolveNetworkLocationCounter = statsLogger
+ .getCounter(FAILED_TO_RESOLVE_NETWORK_LOCATION_COUNTER);
this.reorderReadsRandom = reorderReadsRandom;
this.stabilizePeriodSeconds = stabilizePeriodSeconds;
this.reorderThresholdPendingRequests = reorderThresholdPendingRequests;
- this.dnsResolver = new DNSResolverDecorator(dnsResolver, () ->
this.getDefaultRack());
+ this.dnsResolver = new DNSResolverDecorator(dnsResolver, () ->
this.getDefaultRack(),
+ failedToResolveNetworkLocationCounter);
this.timer = timer;
this.minNumRacksPerWriteQuorum = minNumRacksPerWriteQuorum;
this.enforceMinNumRacksPerWriteQuorum =
enforceMinNumRacksPerWriteQuorum;
@@ -342,9 +359,18 @@ public RackawareEnsemblePlacementPolicyImpl
initialize(ClientConfiguration conf,
((RackChangeNotifier)
dnsResolver).registerRackChangeListener(this);
}
} catch (RuntimeException re) {
- LOG.info("Failed to initialize DNS Resolver {}, used default
subnet resolver : {}",
- dnsResolverName, re, re.getMessage());
- dnsResolver = new DefaultResolver(() -> this.getDefaultRack());
+ if (!enforceMinNumRacksPerWriteQuorum) {
+ LOG.info("Failed to initialize DNS Resolver {}, used
default subnet resolver : {}", dnsResolverName,
+ re, re.getMessage());
+ dnsResolver = new DefaultResolver(() ->
this.getDefaultRack());
+ } else {
+ /*
+ * if minNumRacksPerWriteQuorum is enforced, then it
+ * shouldn't continue in the case of failure to create
+ * dnsResolver.
+ */
+ throw re;
+ }
}
}
slowBookies = CacheBuilder.newBuilder()
@@ -479,7 +505,7 @@ public void
handleBookiesThatJoined(Set<BookieSocketAddress> joinedBookies) {
}
}
- protected Set<Node> convertBookiesToNodes(Set<BookieSocketAddress>
excludeBookies) {
+ protected Set<Node> convertBookiesToNodes(Collection<BookieSocketAddress>
excludeBookies) {
Set<Node> nodes = new HashSet<Node>();
for (BookieSocketAddress addr : excludeBookies) {
BookieNode bn = knownBookies.get(addr);
@@ -500,13 +526,13 @@ public void
handleBookiesThatJoined(Set<BookieSocketAddress> joinedBookies) {
}
@Override
- public List<BookieSocketAddress> newEnsemble(int ensembleSize, int
writeQuorumSize, int ackQuorumSize,
- Map<String, byte[]> customMetadata, Set<BookieSocketAddress>
excludeBookies)
+ public Pair<List<BookieSocketAddress>, Boolean> newEnsemble(int
ensembleSize, int writeQuorumSize,
+ int ackQuorumSize, Map<String, byte[]> customMetadata,
Set<BookieSocketAddress> excludeBookies)
throws BKNotEnoughBookiesException {
return newEnsembleInternal(ensembleSize, writeQuorumSize,
excludeBookies, null, null);
}
- protected List<BookieSocketAddress> newEnsembleInternal(int ensembleSize,
+ protected Pair<List<BookieSocketAddress>, Boolean> newEnsembleInternal(int
ensembleSize,
int
writeQuorumSize,
Set<BookieSocketAddress> excludeBookies,
Ensemble<BookieNode> parentEnsemble,
@@ -522,7 +548,7 @@ public void
handleBookiesThatJoined(Set<BookieSocketAddress> joinedBookies) {
}
@Override
- public List<BookieSocketAddress> newEnsemble(int ensembleSize,
+ public Pair<List<BookieSocketAddress>, Boolean> newEnsemble(int
ensembleSize,
int writeQuorumSize,
int ackQuorumSize,
Set<BookieSocketAddress>
excludeBookies,
@@ -538,7 +564,7 @@ public void
handleBookiesThatJoined(Set<BookieSocketAddress> joinedBookies) {
parentPredicate);
}
- protected List<BookieSocketAddress> newEnsembleInternal(
+ protected Pair<List<BookieSocketAddress>, Boolean> newEnsembleInternal(
int ensembleSize,
int writeQuorumSize,
int ackQuorumSize,
@@ -572,7 +598,7 @@ public void
handleBookiesThatJoined(Set<BookieSocketAddress> joinedBookies) {
for (BookieNode bn : bns) {
addrs.add(bn.getAddr());
}
- return addrs;
+ return Pair.of(addrs, false);
}
for (int i = 0; i < ensembleSize; i++) {
@@ -596,15 +622,15 @@ public void
handleBookiesThatJoined(Set<BookieSocketAddress> joinedBookies) {
ensembleSize, bookieList);
throw new BKNotEnoughBookiesException();
}
- return bookieList;
+ return Pair.of(bookieList,
isEnsembleAdheringToPlacementPolicy(bookieList, writeQuorumSize,
ackQuorumSize));
} finally {
rwLock.readLock().unlock();
}
}
@Override
- public BookieSocketAddress replaceBookie(int ensembleSize, int
writeQuorumSize, int ackQuorumSize,
- Map<String, byte[]> customMetadata, Set<BookieSocketAddress>
currentEnsemble,
+ public Pair<BookieSocketAddress, Boolean> replaceBookie(int ensembleSize,
int writeQuorumSize, int ackQuorumSize,
+ Map<String, byte[]> customMetadata, List<BookieSocketAddress>
currentEnsemble,
BookieSocketAddress bookieToReplace, Set<BookieSocketAddress>
excludeBookies)
throws BKNotEnoughBookiesException {
rwLock.readLock().lock();
@@ -639,7 +665,19 @@ public BookieSocketAddress replaceBookie(int ensembleSize,
int writeQuorumSize,
if (LOG.isDebugEnabled()) {
LOG.debug("Bookie {} is chosen to replace bookie {}.",
candidate, bn);
}
- return candidate.getAddr();
+ BookieSocketAddress candidateAddr = candidate.getAddr();
+ List<BookieSocketAddress> newEnsemble = new
ArrayList<BookieSocketAddress>(currentEnsemble);
+ if (currentEnsemble.isEmpty()) {
+ /*
+ * in testing code there are test cases which would pass empty
+ * currentEnsemble
+ */
+ newEnsemble.add(candidateAddr);
+ } else {
+ newEnsemble.set(currentEnsemble.indexOf(bookieToReplace),
candidateAddr);
+ }
+ return Pair.of(candidateAddr,
+ isEnsembleAdheringToPlacementPolicy(newEnsemble,
writeQuorumSize, ackQuorumSize));
} finally {
rwLock.readLock().unlock();
}
@@ -1206,4 +1244,32 @@ static void
shuffleWithMask(DistributionSchedule.WriteSet writeSet,
}
}
}
+
+ @Override
+ public boolean
isEnsembleAdheringToPlacementPolicy(List<BookieSocketAddress> ensembleList, int
writeQuorumSize,
+ int ackQuorumSize) {
+ int ensembleSize = ensembleList.size();
+ int minNumRacksPerWriteQuorumForThisEnsemble =
Math.min(writeQuorumSize, minNumRacksPerWriteQuorum);
+ HashSet<String> racksOrRegionsInQuorum = new HashSet<String>();
+ BookieSocketAddress bookie;
+ for (int i = 0; i < ensembleList.size(); i++) {
+ racksOrRegionsInQuorum.clear();
+ for (int j = 0; j < writeQuorumSize; j++) {
+ bookie = ensembleList.get((i + j) % ensembleSize);
+ try {
+
racksOrRegionsInQuorum.add(knownBookies.get(bookie).getNetworkLocation());
+ } catch (Exception e) {
+ /*
+ * any issue/exception in analyzing whether ensemble is
strictly adhering to
+ * placement policy should be swallowed.
+ */
+ LOG.warn("Received exception while trying to get network
location of bookie: {}", bookie, e);
+ }
+ }
+ if (racksOrRegionsInQuorum.size() <
minNumRacksPerWriteQuorumForThisEnsemble) {
+ return false;
+ }
+ }
+ return true;
+ }
}
diff --git
a/bookkeeper-server/src/main/java/org/apache/bookkeeper/client/RegionAwareEnsemblePlacementPolicy.java
b/bookkeeper-server/src/main/java/org/apache/bookkeeper/client/RegionAwareEnsemblePlacementPolicy.java
index c52e0fee85..1bd4b75eac 100644
---
a/bookkeeper-server/src/main/java/org/apache/bookkeeper/client/RegionAwareEnsemblePlacementPolicy.java
+++
b/bookkeeper-server/src/main/java/org/apache/bookkeeper/client/RegionAwareEnsemblePlacementPolicy.java
@@ -225,8 +225,8 @@ public RegionAwareEnsemblePlacementPolicy
initialize(ClientConfiguration conf,
@Override
- public List<BookieSocketAddress> newEnsemble(int ensembleSize, int
writeQuorumSize, int ackQuorumSize,
- Map<String, byte[]> customMetadata, Set<BookieSocketAddress>
excludeBookies)
+ public Pair<List<BookieSocketAddress>, Boolean> newEnsemble(int
ensembleSize, int writeQuorumSize,
+ int ackQuorumSize, Map<String, byte[]> customMetadata,
Set<BookieSocketAddress> excludeBookies)
throws BKException.BKNotEnoughBookiesException {
int effectiveMinRegionsForDurability =
disableDurabilityFeature.isAvailable() ? 1 : minRegionsForDurability;
@@ -279,7 +279,7 @@ public RegionAwareEnsemblePlacementPolicy
initialize(ClientConfiguration conf,
for (BookieNode bn : bns) {
addrs.add(bn.getAddr());
}
- return addrs;
+ return Pair.of(addrs,
isEnsembleAdheringToPlacementPolicy(addrs, writeQuorumSize, ackQuorumSize));
}
// Single region, fall back to RackAwareEnsemblePlacement
@@ -347,7 +347,7 @@ public RegionAwareEnsemblePlacementPolicy
initialize(ClientConfiguration conf,
try {
List<BookieSocketAddress> allocated =
policyWithinRegion.newEnsemble(newEnsembleSize,
newWriteQuorumSize,
newWriteQuorumSize, excludeBookies, tempEnsemble,
- tempEnsemble);
+ tempEnsemble).getLeft();
ensemble = tempEnsemble;
remainingEnsemble -= addToEnsembleSize;
remainingWriteQuorum -= addToWriteQuorum;
@@ -407,15 +407,17 @@ public RegionAwareEnsemblePlacementPolicy
initialize(ClientConfiguration conf,
throw new BKException.BKNotEnoughBookiesException();
}
LOG.info("Bookies allocated successfully {}", ensemble);
- return ensemble.toList();
+ List<BookieSocketAddress> ensembleList = ensemble.toList();
+ return Pair.of(ensembleList,
+ isEnsembleAdheringToPlacementPolicy(ensembleList,
writeQuorumSize, ackQuorumSize));
} finally {
rwLock.readLock().unlock();
}
}
@Override
- public BookieSocketAddress replaceBookie(int ensembleSize, int
writeQuorumSize, int ackQuorumSize,
- Map<String, byte[]> customMetadata, Set<BookieSocketAddress>
currentEnsemble,
+ public Pair<BookieSocketAddress, Boolean> replaceBookie(int ensembleSize,
int writeQuorumSize, int ackQuorumSize,
+ Map<String, byte[]> customMetadata, List<BookieSocketAddress>
currentEnsemble,
BookieSocketAddress bookieToReplace, Set<BookieSocketAddress>
excludeBookies)
throws BKException.BKNotEnoughBookiesException {
rwLock.readLock().lock();
@@ -469,7 +471,19 @@ public BookieSocketAddress replaceBookie(int ensembleSize,
int writeQuorumSize,
if (LOG.isDebugEnabled()) {
LOG.debug("Bookie {} is chosen to replace bookie {}.",
candidate, bookieNodeToReplace);
}
- return candidate.getAddr();
+ BookieSocketAddress candidateAddr = candidate.getAddr();
+ List<BookieSocketAddress> newEnsemble = new
ArrayList<BookieSocketAddress>(currentEnsemble);
+ if (currentEnsemble.isEmpty()) {
+ /*
+ * in testing code there are test cases which would pass empty
+ * currentEnsemble
+ */
+ newEnsemble.add(candidateAddr);
+ } else {
+ newEnsemble.set(currentEnsemble.indexOf(bookieToReplace),
candidateAddr);
+ }
+ return Pair.of(candidateAddr,
+ isEnsembleAdheringToPlacementPolicy(newEnsemble,
writeQuorumSize, ackQuorumSize));
} finally {
rwLock.readLock().unlock();
}
@@ -550,4 +564,16 @@ protected BookieNode replaceFromRack(BookieNode
bookieNodeToReplace,
finalList.addMissingIndices(ensemble.size());
return finalList;
}
+
+ @Override
+ public boolean
isEnsembleAdheringToPlacementPolicy(List<BookieSocketAddress> ensembleList, int
writeQuorumSize,
+ int ackQuorumSize) {
+ /**
+ * TODO: have to implement actual logic for this method for
+ * RegionAwareEnsemblePlacementPolicy. For now return true value.
+ *
+ * - https://github.com/apache/bookkeeper/issues/1898
+ */
+ return true;
+ }
}
diff --git
a/bookkeeper-server/src/main/java/org/apache/bookkeeper/net/ScriptBasedMapping.java
b/bookkeeper-server/src/main/java/org/apache/bookkeeper/net/ScriptBasedMapping.java
index 8c8350c3fc..230f66d411 100644
---
a/bookkeeper-server/src/main/java/org/apache/bookkeeper/net/ScriptBasedMapping.java
+++
b/bookkeeper-server/src/main/java/org/apache/bookkeeper/net/ScriptBasedMapping.java
@@ -25,8 +25,8 @@
import org.apache.bookkeeper.util.Shell.ShellCommandExecutor;
import org.apache.commons.configuration.Configuration;
-import org.apache.commons.logging.Log;
-import org.apache.commons.logging.LogFactory;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
/**
* This class implements the {@link DNSToSwitchMapping} interface using a
@@ -129,7 +129,7 @@ public void setConf(Configuration conf) {
private static final class RawScriptBasedMapping extends
AbstractDNSToSwitchMapping {
private String scriptName;
private int maxArgs; //max hostnames per call of the script
- private static final Log LOG =
LogFactory.getLog(ScriptBasedMapping.class);
+ private static final Logger LOG =
LoggerFactory.getLogger(ScriptBasedMapping.class);
/**
* Set the configuration and extract the configuration parameters of
interest.
@@ -233,7 +233,7 @@ private String runResolveCommand(List<String> args) {
s.execute();
allOutput.append(s.getOutput()).append(" ");
} catch (Exception e) {
- LOG.warn("Exception running " + s, e);
+ LOG.warn("Exception running: {} Exception message: {}", s,
e.getMessage());
return null;
}
loopCount++;
diff --git
a/bookkeeper-server/src/test/java/org/apache/bookkeeper/client/GenericEnsemblePlacementPolicyTest.java
b/bookkeeper-server/src/test/java/org/apache/bookkeeper/client/GenericEnsemblePlacementPolicyTest.java
index bb55d0c6d0..205c5f4cb9 100644
---
a/bookkeeper-server/src/test/java/org/apache/bookkeeper/client/GenericEnsemblePlacementPolicyTest.java
+++
b/bookkeeper-server/src/test/java/org/apache/bookkeeper/client/GenericEnsemblePlacementPolicyTest.java
@@ -33,6 +33,7 @@
import org.apache.bookkeeper.net.BookieSocketAddress;
import org.apache.bookkeeper.test.BookKeeperClusterTestCase;
+import org.apache.commons.lang3.tuple.Pair;
import org.junit.Before;
import org.junit.Test;
import org.junit.runner.RunWith;
@@ -69,8 +70,8 @@ public GenericEnsemblePlacementPolicyTest(boolean
diskWeightBasedPlacementEnable
public static final class CustomEnsemblePlacementPolicy extends
DefaultEnsemblePlacementPolicy {
@Override
- public BookieSocketAddress replaceBookie(int ensembleSize, int
writeQuorumSize,
- int ackQuorumSize, Map<String, byte[]> customMetadata,
Set<BookieSocketAddress> currentEnsemble,
+ public Pair<BookieSocketAddress, Boolean> replaceBookie(int
ensembleSize, int writeQuorumSize,
+ int ackQuorumSize, Map<String, byte[]> customMetadata,
List<BookieSocketAddress> currentEnsemble,
BookieSocketAddress bookieToReplace, Set<BookieSocketAddress>
excludeBookies)
throws BKException.BKNotEnoughBookiesException {
new Exception("replaceBookie " + ensembleSize + "," +
customMetadata).printStackTrace();
@@ -81,7 +82,7 @@ public BookieSocketAddress replaceBookie(int ensembleSize,
int writeQuorumSize,
}
@Override
- public ArrayList<BookieSocketAddress> newEnsemble(int ensembleSize,
int quorumSize,
+ public Pair<List<BookieSocketAddress>, Boolean> newEnsemble(int
ensembleSize, int quorumSize,
int ackQuorumSize, Map<String, byte[]> customMetadata,
Set<BookieSocketAddress> excludeBookies)
throws BKException.BKNotEnoughBookiesException {
assertNotNull(customMetadata);
diff --git
a/bookkeeper-server/src/test/java/org/apache/bookkeeper/client/TestRackawareEnsemblePlacementPolicy.java
b/bookkeeper-server/src/test/java/org/apache/bookkeeper/client/TestRackawareEnsemblePlacementPolicy.java
index e75e93fa86..e6cd07bcc7 100644
---
a/bookkeeper-server/src/test/java/org/apache/bookkeeper/client/TestRackawareEnsemblePlacementPolicy.java
+++
b/bookkeeper-server/src/test/java/org/apache/bookkeeper/client/TestRackawareEnsemblePlacementPolicy.java
@@ -52,6 +52,7 @@
import org.apache.bookkeeper.net.Node;
import org.apache.bookkeeper.stats.NullStatsLogger;
import org.apache.bookkeeper.util.StaticDNSResolver;
+import org.apache.commons.lang3.tuple.Pair;
import org.junit.Test;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
@@ -69,6 +70,7 @@
ClientConfiguration conf = new ClientConfiguration();
BookieSocketAddress addr1, addr2, addr3, addr4;
io.netty.util.HashedWheelTimer timer;
+ final int minNumRacksPerWriteQuorumConfValue = 2;
@Override
protected void setUp() throws Exception {
@@ -80,6 +82,7 @@ protected void setUp() throws Exception {
StaticDNSResolver.addNodeToRack("localhost",
NetworkTopology.DEFAULT_REGION_AND_RACK);
LOG.info("Set up static DNS Resolver.");
conf.setProperty(REPP_DNS_RESOLVER_CLASS,
StaticDNSResolver.class.getName());
+ conf.setMinNumRacksPerWriteQuorum(minNumRacksPerWriteQuorumConfValue);
addr1 = new BookieSocketAddress("127.0.0.2", 3181);
addr2 = new BookieSocketAddress("127.0.0.3", 3181);
addr3 = new BookieSocketAddress("127.0.0.4", 3181);
@@ -564,8 +567,12 @@ public void testReplaceBookieWithEnoughBookiesInSameRack()
throws Exception {
addrs.add(addr4);
repp.onClusterChanged(addrs, new HashSet<BookieSocketAddress>());
// replace node under r2
- BookieSocketAddress replacedBookie = repp.replaceBookie(1, 1, 1, null,
new HashSet<>(), addr2, new HashSet<>());
+ Pair<BookieSocketAddress, Boolean> replaceBookieResponse =
repp.replaceBookie(1, 1, 1, null, new ArrayList<>(),
+ addr2, new HashSet<>());
+ BookieSocketAddress replacedBookie = replaceBookieResponse.getLeft();
+ boolean isEnsembleAdheringToPlacementPolicy =
replaceBookieResponse.getRight();
assertEquals(addr3, replacedBookie);
+ assertTrue(isEnsembleAdheringToPlacementPolicy);
}
@Test
@@ -589,10 +596,13 @@ public void
testReplaceBookieWithEnoughBookiesInDifferentRack() throws Exception
// replace node under r2
Set<BookieSocketAddress> excludedAddrs = new
HashSet<BookieSocketAddress>();
excludedAddrs.add(addr1);
- BookieSocketAddress replacedBookie = repp.replaceBookie(1, 1, 1, null,
new HashSet<>(), addr2, excludedAddrs);
-
+ Pair<BookieSocketAddress, Boolean> replaceBookieResponse =
repp.replaceBookie(1, 1, 1, null, new ArrayList<>(),
+ addr2, excludedAddrs);
+ BookieSocketAddress replacedBookie = replaceBookieResponse.getLeft();
+ boolean isEnsembleAdheringToPlacementPolicy =
replaceBookieResponse.getRight();
assertFalse(addr1.equals(replacedBookie));
assertTrue(addr3.equals(replacedBookie) ||
addr4.equals(replacedBookie));
+ assertTrue(isEnsembleAdheringToPlacementPolicy);
}
@Test
@@ -619,7 +629,7 @@ public void testReplaceBookieWithNotEnoughBookies() throws
Exception {
excludedAddrs.add(addr3);
excludedAddrs.add(addr4);
try {
- repp.replaceBookie(1, 1, 1, null, new
HashSet<BookieSocketAddress>(), addr2, excludedAddrs);
+ repp.replaceBookie(1, 1, 1, null, new
ArrayList<BookieSocketAddress>(), addr2, excludedAddrs);
fail("Should throw BKNotEnoughBookiesException when there is not
enough bookies");
} catch (BKNotEnoughBookiesException bnebe) {
// should throw not enou
@@ -628,7 +638,7 @@ public void testReplaceBookieWithNotEnoughBookies() throws
Exception {
@Test
public void testReplaceBookieWithEnoughBookiesInSameRackAsEnsemble()
throws Exception {
- BookieSocketAddress addr1 = new BookieSocketAddress("127.0.0.1", 3181);
+ BookieSocketAddress addr1 = new BookieSocketAddress("127.0.0.5", 3181);
BookieSocketAddress addr2 = new BookieSocketAddress("127.0.0.2", 3181);
BookieSocketAddress addr3 = new BookieSocketAddress("127.0.0.3", 3181);
BookieSocketAddress addr4 = new BookieSocketAddress("127.0.0.4", 3181);
@@ -645,15 +655,18 @@ public void
testReplaceBookieWithEnoughBookiesInSameRackAsEnsemble() throws Exce
addrs.add(addr4);
repp.onClusterChanged(addrs, new HashSet<BookieSocketAddress>());
// replace node under r2
- Set<BookieSocketAddress> ensembleBookies = new
HashSet<BookieSocketAddress>();
+ List<BookieSocketAddress> ensembleBookies = new
ArrayList<BookieSocketAddress>();
ensembleBookies.add(addr2);
ensembleBookies.add(addr4);
- BookieSocketAddress replacedBookie = repp.replaceBookie(
+ Pair<BookieSocketAddress, Boolean> replaceBookieResponse =
repp.replaceBookie(
1, 1, 1 , null,
ensembleBookies,
addr4,
new HashSet<>());
+ BookieSocketAddress replacedBookie = replaceBookieResponse.getLeft();
+ boolean isEnsembleAdheringToPlacementPolicy =
replaceBookieResponse.getRight();
assertEquals(addr1, replacedBookie);
+ assertTrue(isEnsembleAdheringToPlacementPolicy);
}
@Test
@@ -670,10 +683,18 @@ public void testNewEnsembleWithSingleRack() throws
Exception {
addrs.add(addr4);
repp.onClusterChanged(addrs, new HashSet<BookieSocketAddress>());
try {
- List<BookieSocketAddress> ensemble = repp.newEnsemble(3, 2, 2,
null, new HashSet<>());
+ Pair<List<BookieSocketAddress>, Boolean> ensembleResponse;
+ ensembleResponse = repp.newEnsemble(3, 2, 2, null, new
HashSet<>());
+ List<BookieSocketAddress> ensemble = ensembleResponse.getLeft();
+ boolean isEnsembleAdheringToPlacementPolicy =
ensembleResponse.getRight();
assertEquals(0, getNumCoveredWriteQuorums(ensemble, 2,
conf.getMinNumRacksPerWriteQuorum()));
- List<BookieSocketAddress> ensemble2 = repp.newEnsemble(4, 2, 2,
null, new HashSet<>());
+ assertFalse(isEnsembleAdheringToPlacementPolicy);
+ Pair<List<BookieSocketAddress>, Boolean> ensembleResponse2;
+ ensembleResponse2 = repp.newEnsemble(4, 2, 2, null, new
HashSet<>());
+ List<BookieSocketAddress> ensemble2 = ensembleResponse2.getLeft();
+ boolean isEnsembleAdheringToPlacementPolicy2 =
ensembleResponse2.getRight();
assertEquals(0, getNumCoveredWriteQuorums(ensemble2, 2,
conf.getMinNumRacksPerWriteQuorum()));
+ assertFalse(isEnsembleAdheringToPlacementPolicy2);
} catch (BKNotEnoughBookiesException bnebe) {
fail("Should not get not enough bookies exception even there is
only one rack.");
}
@@ -703,17 +724,19 @@ public void testSingleRackWithEnforceMinNumRacks() throws
Exception {
addrs.add(addr3);
addrs.add(addr4);
repp.onClusterChanged(addrs, new HashSet<BookieSocketAddress>());
-
+ Pair<List<BookieSocketAddress>, Boolean> ensembleResponse;
List<BookieSocketAddress> ensemble;
try {
- ensemble = repp.newEnsemble(3, 2, 2, null, new HashSet<>());
+ ensembleResponse = repp.newEnsemble(3, 2, 2, null, new
HashSet<>());
+ ensemble = ensembleResponse.getLeft();
fail("Should get not enough bookies exception since there is only
one rack.");
} catch (BKNotEnoughBookiesException bnebe) {
}
try {
- ensemble = repp.newEnsemble(3, 2, 2, new HashSet<>(),
EnsembleForReplacementWithNoConstraints.INSTANCE,
- TruePredicate.INSTANCE);
+ ensembleResponse = repp.newEnsemble(3, 2, 2, new HashSet<>(),
+ EnsembleForReplacementWithNoConstraints.INSTANCE,
TruePredicate.INSTANCE);
+ ensemble = ensembleResponse.getLeft();
fail("Should get not enough bookies exception since there is only
one rack.");
} catch (BKNotEnoughBookiesException bnebe) {
}
@@ -766,19 +789,27 @@ public void testNewEnsembleWithEnforceMinNumRacks()
throws Exception {
* and there are enough bookies in 3 racks, this newEnsemble calls
should
* succeed.
*/
+ Pair<List<BookieSocketAddress>, Boolean> ensembleResponse;
List<BookieSocketAddress> ensemble;
+ boolean isEnsembleAdheringToPlacementPolicy;
int ensembleSize = numOfRacks * numOfBookiesPerRack;
int writeQuorumSize = numOfRacks;
int ackQuorumSize = numOfRacks;
- ensemble = repp.newEnsemble(ensembleSize, writeQuorumSize,
ackQuorumSize, null, new HashSet<>());
+ ensembleResponse = repp.newEnsemble(ensembleSize, writeQuorumSize,
ackQuorumSize, null, new HashSet<>());
+ ensemble = ensembleResponse.getLeft();
+ isEnsembleAdheringToPlacementPolicy = ensembleResponse.getRight();
assertEquals("Number of writeQuorum sets covered", ensembleSize,
getNumCoveredWriteQuorums(ensemble, writeQuorumSize,
clientConf.getMinNumRacksPerWriteQuorum()));
+ assertTrue(isEnsembleAdheringToPlacementPolicy);
- ensemble = repp.newEnsemble(ensembleSize, writeQuorumSize,
ackQuorumSize, new HashSet<>(),
+ ensembleResponse = repp.newEnsemble(ensembleSize, writeQuorumSize,
ackQuorumSize, new HashSet<>(),
EnsembleForReplacementWithNoConstraints.INSTANCE,
TruePredicate.INSTANCE);
+ ensemble = ensembleResponse.getLeft();
+ isEnsembleAdheringToPlacementPolicy = ensembleResponse.getRight();
assertEquals("Number of writeQuorum sets covered", ensembleSize,
getNumCoveredWriteQuorums(ensemble, writeQuorumSize,
clientConf.getMinNumRacksPerWriteQuorum()));
+ assertTrue(isEnsembleAdheringToPlacementPolicy);
}
@Test
@@ -822,16 +853,24 @@ public void
testNewEnsembleWithSufficientRacksAndEnforceMinNumRacks() throws Exc
* ensembleSizes (as long as there are enough number of bookies in each
* rack).
*/
+ Pair<List<BookieSocketAddress>, Boolean> ensembleResponse;
List<BookieSocketAddress> ensemble;
+ boolean isEnsembleAdheringToPlacementPolicy;
for (int ensembleSize = effectiveMinNumRacksPerWriteQuorum;
ensembleSize < 40; ensembleSize++) {
- ensemble = repp.newEnsemble(ensembleSize, writeQuorumSize,
ackQuorumSize, null, new HashSet<>());
+ ensembleResponse = repp.newEnsemble(ensembleSize, writeQuorumSize,
ackQuorumSize, null, new HashSet<>());
+ ensemble = ensembleResponse.getLeft();
+ isEnsembleAdheringToPlacementPolicy = ensembleResponse.getRight();
assertEquals("Number of writeQuorum sets covered", ensembleSize,
getNumCoveredWriteQuorums(ensemble, writeQuorumSize,
clientConf.getMinNumRacksPerWriteQuorum()));
+ assertTrue(isEnsembleAdheringToPlacementPolicy);
- ensemble = repp.newEnsemble(ensembleSize, writeQuorumSize,
ackQuorumSize, new HashSet<>(),
+ ensembleResponse = repp.newEnsemble(ensembleSize, writeQuorumSize,
ackQuorumSize, new HashSet<>(),
EnsembleForReplacementWithNoConstraints.INSTANCE,
TruePredicate.INSTANCE);
+ ensemble = ensembleResponse.getLeft();
+ isEnsembleAdheringToPlacementPolicy = ensembleResponse.getRight();
assertEquals("Number of writeQuorum sets covered", ensembleSize,
getNumCoveredWriteQuorums(ensemble, writeQuorumSize,
clientConf.getMinNumRacksPerWriteQuorum()));
+ assertTrue(isEnsembleAdheringToPlacementPolicy);
}
}
@@ -873,12 +912,14 @@ public void testReplaceBookieWithEnforceMinNumRacks()
throws Exception {
* and there are enough bookies in 3 racks, this newEnsemble call
should
* succeed.
*/
+ Pair<List<BookieSocketAddress>, Boolean> ensembleResponse;
List<BookieSocketAddress> ensemble;
int ensembleSize = numOfRacks * numOfBookiesPerRack;
int writeQuorumSize = numOfRacks;
int ackQuorumSize = numOfRacks;
- ensemble = repp.newEnsemble(ensembleSize, writeQuorumSize,
ackQuorumSize, null, new HashSet<>());
+ ensembleResponse = repp.newEnsemble(ensembleSize, writeQuorumSize,
ackQuorumSize, null, new HashSet<>());
+ ensemble = ensembleResponse.getLeft();
BookieSocketAddress bookieInEnsembleToBeReplaced = ensemble.get(7);
// get rack of some other bookie
@@ -895,7 +936,7 @@ public void testReplaceBookieWithEnforceMinNumRacks()
throws Exception {
repp.onClusterChanged(bookieSocketAddresses, new
HashSet<BookieSocketAddress>());
try {
repp.replaceBookie(ensembleSize, writeQuorumSize, ackQuorumSize,
null,
- new HashSet<BookieSocketAddress>(ensemble),
bookieInEnsembleToBeReplaced, new HashSet<>());
+ ensemble, bookieInEnsembleToBeReplaced, new HashSet<>());
fail("Should get not enough bookies exception since there are no
more bookies in rack"
+ "of 'bookieInEnsembleToReplace'"
+ "and new bookie added belongs to the rack of some other
bookie in the ensemble");
@@ -917,16 +958,22 @@ public void testReplaceBookieWithEnforceMinNumRacks()
throws Exception {
* this replaceBookie should succeed, because a new bookie is added to
a
* new rack.
*/
- BookieSocketAddress replacedBookieAddress =
repp.replaceBookie(ensembleSize, writeQuorumSize, ackQuorumSize,
- null, new HashSet<BookieSocketAddress>(ensemble),
bookieInEnsembleToBeReplaced, new HashSet<>());
+ Pair<BookieSocketAddress, Boolean> replaceBookieResponse;
+ BookieSocketAddress replacedBookieAddress;
+ boolean isEnsembleAdheringToPlacementPolicy;
+ replaceBookieResponse = repp.replaceBookie(ensembleSize,
writeQuorumSize, ackQuorumSize, null, ensemble,
+ bookieInEnsembleToBeReplaced, new HashSet<>());
+ replacedBookieAddress = replaceBookieResponse.getLeft();
+ isEnsembleAdheringToPlacementPolicy = replaceBookieResponse.getRight();
assertEquals("It should be newBookieAddress2", newBookieAddress2,
replacedBookieAddress);
+ assertTrue(isEnsembleAdheringToPlacementPolicy);
Set<BookieSocketAddress> bookiesToExclude = new HashSet<>();
bookiesToExclude.add(newBookieAddress2);
repp.onClusterChanged(bookieSocketAddresses, new
HashSet<BookieSocketAddress>());
try {
- repp.replaceBookie(ensembleSize, writeQuorumSize, ackQuorumSize,
null,
- new HashSet<BookieSocketAddress>(ensemble),
bookieInEnsembleToBeReplaced, bookiesToExclude);
+ repp.replaceBookie(ensembleSize, writeQuorumSize, ackQuorumSize,
null, ensemble,
+ bookieInEnsembleToBeReplaced, bookiesToExclude);
fail("Should get not enough bookies exception since the only
available bookie to replace"
+ "is added to excludedBookies list");
} catch (BKNotEnoughBookiesException bnebe) {
@@ -949,9 +996,12 @@ public void testReplaceBookieWithEnforceMinNumRacks()
throws Exception {
* replaced, so we should be able to replacebookie though
* newBookieAddress2 is added to excluded bookies list.
*/
- replacedBookieAddress = repp.replaceBookie(ensembleSize,
writeQuorumSize, ackQuorumSize, null,
- new HashSet<BookieSocketAddress>(ensemble),
bookieInEnsembleToBeReplaced, bookiesToExclude);
+ replaceBookieResponse = repp.replaceBookie(ensembleSize,
writeQuorumSize, ackQuorumSize, null,
+ ensemble, bookieInEnsembleToBeReplaced, bookiesToExclude);
+ replacedBookieAddress = replaceBookieResponse.getLeft();
+ isEnsembleAdheringToPlacementPolicy = replaceBookieResponse.getRight();
assertEquals("It should be newBookieAddress3", newBookieAddress3,
replacedBookieAddress);
+ assertTrue(isEnsembleAdheringToPlacementPolicy);
}
@Test
@@ -1298,15 +1348,21 @@ public void testNewEnsembleWithMultipleRacks() throws
Exception {
int ensembleSize = 3;
int writeQuorumSize = 2;
int acqQuorumSize = 2;
- List<BookieSocketAddress> ensemble =
repp.newEnsemble(ensembleSize, writeQuorumSize, acqQuorumSize,
- null, new HashSet<>());
+ Pair<List<BookieSocketAddress>, Boolean> ensembleResponse =
repp.newEnsemble(ensembleSize, writeQuorumSize,
+ acqQuorumSize, null, new HashSet<>());
+ List<BookieSocketAddress> ensemble = ensembleResponse.getLeft();
+ boolean isEnsembleAdheringToPlacementPolicy =
ensembleResponse.getRight();
int numCovered = getNumCoveredWriteQuorums(ensemble,
writeQuorumSize, conf.getMinNumRacksPerWriteQuorum());
assertTrue(numCovered >= 1 && numCovered < 3);
+ assertFalse(isEnsembleAdheringToPlacementPolicy);
ensembleSize = 4;
- List<BookieSocketAddress> ensemble2 =
repp.newEnsemble(ensembleSize, writeQuorumSize, acqQuorumSize,
- null, new HashSet<>());
+ Pair<List<BookieSocketAddress>, Boolean> ensembleResponse2 =
repp.newEnsemble(ensembleSize, writeQuorumSize,
+ acqQuorumSize, null, new HashSet<>());
+ List<BookieSocketAddress> ensemble2 = ensembleResponse2.getLeft();
+ boolean isEnsembleAdheringToPlacementPolicy2 =
ensembleResponse2.getRight();
numCovered = getNumCoveredWriteQuorums(ensemble2, writeQuorumSize,
conf.getMinNumRacksPerWriteQuorum());
assertTrue(numCovered >= 1 && numCovered < 3);
+ assertFalse(isEnsembleAdheringToPlacementPolicy2);
} catch (BKNotEnoughBookiesException bnebe) {
fail("Should not get not enough bookies exception even there is
only one rack.");
}
@@ -1375,11 +1431,13 @@ void
validateNumOfWriteQuorumsCoveredInEnsembleCreation(Set<BookieSocketAddress>
repp.initialize(newConf, Optional.<DNSToSwitchMapping> empty(), timer,
DISABLE_ALL, NullStatsLogger.INSTANCE);
repp.withDefaultRack(NetworkTopology.DEFAULT_REGION_AND_RACK);
repp.onClusterChanged(addrs, new HashSet<BookieSocketAddress>());
-
- List<BookieSocketAddress> ensemble = repp.newEnsemble(ensembleSize,
writeQuorumSize, writeQuorumSize, null,
- new HashSet<>());
+ Pair<List<BookieSocketAddress>, Boolean> ensembleResponse =
repp.newEnsemble(ensembleSize, writeQuorumSize,
+ writeQuorumSize, null, new HashSet<>());
+ List<BookieSocketAddress> ensemble = ensembleResponse.getLeft();
+ boolean isEnsembleAdheringToPlacementPolicy =
ensembleResponse.getRight();
int numCovered = getNumCoveredWriteQuorums(ensemble, writeQuorumSize,
minNumRacksPerWriteQuorum);
assertEquals("minimum number of racks covered for writequorum
ensemble: " + ensemble, ensembleSize, numCovered);
+ assertTrue(isEnsembleAdheringToPlacementPolicy);
}
@Test
@@ -1417,16 +1475,22 @@ public void testNewEnsembleWithEnoughRacks() throws
Exception {
int ensembleSize = 3;
int writeQuorumSize = 3;
int ackQuorumSize = 2;
- List<BookieSocketAddress> ensemble1 =
repp.newEnsemble(ensembleSize, writeQuorumSize, ackQuorumSize,
- null, new HashSet<>());
+ Pair<List<BookieSocketAddress>, Boolean> ensembleResponse =
repp.newEnsemble(ensembleSize, writeQuorumSize,
+ ackQuorumSize, null, new HashSet<>());
+ List<BookieSocketAddress> ensemble1 = ensembleResponse.getLeft();
+ boolean isEnsembleAdheringToPlacementPolicy1 =
ensembleResponse.getRight();
assertEquals(ensembleSize,
getNumCoveredWriteQuorums(ensemble1, writeQuorumSize,
conf.getMinNumRacksPerWriteQuorum()));
+ assertTrue(isEnsembleAdheringToPlacementPolicy1);
ensembleSize = 4;
writeQuorumSize = 4;
- List<BookieSocketAddress> ensemble2 =
repp.newEnsemble(ensembleSize, writeQuorumSize, 2, null,
- new HashSet<>());
+ Pair<List<BookieSocketAddress>, Boolean> ensembleResponse2 =
repp.newEnsemble(ensembleSize, writeQuorumSize,
+ 2, null, new HashSet<>());
+ List<BookieSocketAddress> ensemble2 = ensembleResponse2.getLeft();
+ boolean isEnsembleAdheringToPlacementPolicy2 =
ensembleResponse2.getRight();
assertEquals(ensembleSize,
getNumCoveredWriteQuorums(ensemble2, writeQuorumSize,
conf.getMinNumRacksPerWriteQuorum()));
+ assertTrue(isEnsembleAdheringToPlacementPolicy2);
} catch (BKNotEnoughBookiesException bnebe) {
fail("Should not get not enough bookies exception even there is
only one rack.");
}
@@ -1497,11 +1561,16 @@ public void
testWeightedPlacementAndReplaceBookieWithEnoughBookiesInSameRack() t
selectionCounts.put(addr3, 0L);
selectionCounts.put(addr4, 0L);
int numTries = 50000;
+ Pair<BookieSocketAddress, Boolean> replaceBookieResponse;
+ boolean isEnsembleAdheringToPlacementPolicy;
BookieSocketAddress replacedBookie;
for (int i = 0; i < numTries; i++) {
// replace node under r2
- replacedBookie = repp.replaceBookie(1, 1, 1, null, new
HashSet<>(), addr2, new HashSet<>());
+ replaceBookieResponse = repp.replaceBookie(1, 1, 1, null, new
ArrayList<>(), addr2, new HashSet<>());
+ replacedBookie = replaceBookieResponse.getLeft();
+ isEnsembleAdheringToPlacementPolicy =
replaceBookieResponse.getRight();
assertTrue("replaced : " + replacedBookie,
addr3.equals(replacedBookie) || addr4.equals(replacedBookie));
+ assertTrue(isEnsembleAdheringToPlacementPolicy);
selectionCounts.put(replacedBookie,
selectionCounts.get(replacedBookie) + 1);
}
double observedMultiple = ((double) selectionCounts.get(addr4) /
(double) selectionCounts.get(addr3));
@@ -1557,14 +1626,19 @@ public void
testWeightedPlacementAndReplaceBookieWithoutEnoughBookiesInSameRack(
selectionCounts.put(addr3, 0L);
selectionCounts.put(addr4, 0L);
int numTries = 50000;
+ Pair<BookieSocketAddress, Boolean> replaceBookieResponse;
BookieSocketAddress replacedBookie;
+ boolean isEnsembleAdheringToPlacementPolicy;
for (int i = 0; i < numTries; i++) {
// addr2 is on /r2 and this is the only one on this rack. So the
replacement
// will come from other racks. However, the weight should be
honored in such
// selections as well
- replacedBookie = repp.replaceBookie(1, 1, 1, null, new
HashSet<>(), addr2, new HashSet<>());
+ replaceBookieResponse = repp.replaceBookie(1, 1, 1, null, new
ArrayList<>(), addr2, new HashSet<>());
+ replacedBookie = replaceBookieResponse.getLeft();
+ isEnsembleAdheringToPlacementPolicy =
replaceBookieResponse.getRight();
assertTrue(addr0.equals(replacedBookie) ||
addr1.equals(replacedBookie) || addr3.equals(replacedBookie)
|| addr4.equals(replacedBookie));
+ assertTrue(isEnsembleAdheringToPlacementPolicy);
selectionCounts.put(replacedBookie,
selectionCounts.get(replacedBookie) + 1);
}
/*
@@ -1656,6 +1730,7 @@ public void
testWeightedPlacementAndNewEnsembleWithEnoughBookiesInSameRack() thr
int numTries = 10000;
Set<BookieSocketAddress> excludeList = new
HashSet<BookieSocketAddress>();
+ Pair<List<BookieSocketAddress>, Boolean> ensembleResponse;
List<BookieSocketAddress> ensemble;
int ensembleSize = 3;
int writeQuorumSize = 2;
@@ -1664,7 +1739,8 @@ public void
testWeightedPlacementAndNewEnsembleWithEnoughBookiesInSameRack() thr
// addr2 is on /r2 and this is the only one on this rack. So the
replacement
// will come from other racks. However, the weight should be
honored in such
// selections as well
- ensemble = repp.newEnsemble(ensembleSize, writeQuorumSize,
acqQuorumSize, null, excludeList);
+ ensembleResponse = repp.newEnsemble(ensembleSize, writeQuorumSize,
acqQuorumSize, null, excludeList);
+ ensemble = ensembleResponse.getLeft();
assertTrue(
"Rackaware selection not happening "
+ getNumCoveredWriteQuorums(ensemble,
writeQuorumSize, conf.getMinNumRacksPerWriteQuorum()),
@@ -1726,21 +1802,23 @@ public void
testWeightedPlacementAndNewEnsembleWithoutEnoughBookies() throws Exc
bookieInfoMap.put(addr5, new BookieInfo(1000L, 1000L));
repp.updateBookieInfo(bookieInfoMap);
-
- List<BookieSocketAddress> ensemble = new
ArrayList<BookieSocketAddress>();
+ Pair<List<BookieSocketAddress>, Boolean> ensembleResponse;
+ List<BookieSocketAddress> ensemble;
Set<BookieSocketAddress> excludeList = new
HashSet<BookieSocketAddress>();
try {
excludeList.add(addr1);
excludeList.add(addr2);
excludeList.add(addr3);
excludeList.add(addr4);
- ensemble = repp.newEnsemble(3, 2, 2, null, excludeList);
+ ensembleResponse = repp.newEnsemble(3, 2, 2, null, excludeList);
+ ensemble = ensembleResponse.getLeft();
fail("Should throw BKNotEnoughBookiesException when there is not
enough bookies" + ensemble);
} catch (BKNotEnoughBookiesException e) {
// this is expected
}
try {
- ensemble = repp.newEnsemble(1, 1, 1, null, excludeList);
+ ensembleResponse = repp.newEnsemble(1, 1, 1, null, excludeList);
+ ensemble = ensembleResponse.getLeft();
} catch (BKNotEnoughBookiesException e) {
fail("Should not throw BKNotEnoughBookiesException when there are
enough bookies for the ensemble");
}
@@ -1824,13 +1902,20 @@ public void testPlacementOnStabilizeNetworkTopology()
throws Exception {
// we will never use addr4 even it is in the stabilized network
topology
for (int i = 0; i < 5; i++) {
- List<BookieSocketAddress> ensemble =
- repp.newEnsemble(3, 3, 3, null, new
HashSet<BookieSocketAddress>());
+ Pair<List<BookieSocketAddress>, Boolean> ensembleResponse =
repp.newEnsemble(3, 2, 2, null,
+ new HashSet<BookieSocketAddress>());
+ List<BookieSocketAddress> ensemble = ensembleResponse.getLeft();
+ boolean isEnsembleAdheringToPlacementPolicy =
ensembleResponse.getRight();
assertFalse(ensemble.contains(addr4));
+ assertFalse(isEnsembleAdheringToPlacementPolicy);
}
// we could still use addr4 for urgent allocation if it is just bookie
flapping
- List<BookieSocketAddress> ensemble = repp.newEnsemble(4, 4, 4, null,
new HashSet<BookieSocketAddress>());
+ Pair<List<BookieSocketAddress>, Boolean> ensembleResponse =
repp.newEnsemble(4, 2, 2, null,
+ new HashSet<BookieSocketAddress>());
+ List<BookieSocketAddress> ensemble = ensembleResponse.getLeft();
+ boolean isEnsembleAdheringToPlacementPolicy =
ensembleResponse.getRight();
+ assertFalse(isEnsembleAdheringToPlacementPolicy);
assertTrue(ensemble.contains(addr4));
}
diff --git
a/bookkeeper-server/src/test/java/org/apache/bookkeeper/client/TestRackawareEnsemblePlacementPolicyUsingScript.java
b/bookkeeper-server/src/test/java/org/apache/bookkeeper/client/TestRackawareEnsemblePlacementPolicyUsingScript.java
index d9f253507e..e0fd2bdffe 100644
---
a/bookkeeper-server/src/test/java/org/apache/bookkeeper/client/TestRackawareEnsemblePlacementPolicyUsingScript.java
+++
b/bookkeeper-server/src/test/java/org/apache/bookkeeper/client/TestRackawareEnsemblePlacementPolicyUsingScript.java
@@ -30,6 +30,8 @@
import com.google.common.util.concurrent.ThreadFactoryBuilder;
import io.netty.util.HashedWheelTimer;
+
+import java.util.ArrayList;
import java.util.HashSet;
import java.util.List;
import java.util.Optional;
@@ -44,6 +46,7 @@
import org.apache.bookkeeper.net.ScriptBasedMapping;
import org.apache.bookkeeper.stats.NullStatsLogger;
import org.apache.bookkeeper.util.Shell;
+import org.apache.commons.lang3.tuple.Pair;
import org.junit.After;
import org.junit.Assume;
import org.junit.Before;
@@ -111,7 +114,9 @@ public void testReplaceBookieWithEnoughBookiesInSameRack()
throws Exception {
addrs.add(addr4);
repp.onClusterChanged(addrs, new HashSet<BookieSocketAddress>());
// replace node under r2
- BookieSocketAddress replacedBookie = repp.replaceBookie(1, 1, 1, null,
new HashSet<>(), addr2, new HashSet<>());
+ Pair<BookieSocketAddress, Boolean> replaceBookieResponse =
repp.replaceBookie(1, 1, 1, null, new ArrayList<>(),
+ addr2, new HashSet<>());
+ BookieSocketAddress replacedBookie = replaceBookieResponse.getLeft();
assertEquals(addr3, replacedBookie);
}
@@ -133,7 +138,9 @@ public void
testReplaceBookieWithEnoughBookiesInDifferentRack() throws Exception
// replace node under r2
Set<BookieSocketAddress> excludedAddrs = new
HashSet<BookieSocketAddress>();
excludedAddrs.add(addr1);
- BookieSocketAddress replacedBookie = repp.replaceBookie(1, 1, 1, null,
new HashSet<>(), addr2, excludedAddrs);
+ Pair<BookieSocketAddress, Boolean> replaceBookieResponse =
repp.replaceBookie(1, 1, 1, null, new ArrayList<>(),
+ addr2, excludedAddrs);
+ BookieSocketAddress replacedBookie = replaceBookieResponse.getLeft();
assertFalse(addr1.equals(replacedBookie));
assertTrue(addr3.equals(replacedBookie) ||
addr4.equals(replacedBookie));
@@ -160,7 +167,7 @@ public void testReplaceBookieWithNotEnoughBookies() throws
Exception {
excludedAddrs.add(addr3);
excludedAddrs.add(addr4);
try {
- repp.replaceBookie(1, 1, 1, null, new
HashSet<BookieSocketAddress>(), addr2, excludedAddrs);
+ repp.replaceBookie(1, 1, 1, null, new
ArrayList<BookieSocketAddress>(), addr2, excludedAddrs);
fail("Should throw BKNotEnoughBookiesException when there is not
enough bookies");
} catch (BKNotEnoughBookiesException bnebe) {
// should throw not BKNotEnoughBookiesException
@@ -197,7 +204,9 @@ public void testReplaceBookieWithScriptMappingError()
throws Exception {
// replace node under r2
Set<BookieSocketAddress> excludedAddrs = new
HashSet<BookieSocketAddress>();
excludedAddrs.add(addr1);
- BookieSocketAddress replacedBookie = repp.replaceBookie(1, 1, 1, null,
new HashSet<>(), addr2, excludedAddrs);
+ Pair<BookieSocketAddress, Boolean> replaceBookieResponse =
repp.replaceBookie(1, 1, 1, null, new ArrayList<>(),
+ addr2, excludedAddrs);
+ BookieSocketAddress replacedBookie = replaceBookieResponse.getLeft();
assertFalse(addr1.equals(replacedBookie));
assertFalse(addr2.equals(replacedBookie));
@@ -235,7 +244,9 @@ public void testReplaceBookieWithScriptMappingError2()
throws Exception {
// replace node under r2
Set<BookieSocketAddress> excludedAddrs = new
HashSet<BookieSocketAddress>();
excludedAddrs.add(addr1);
- BookieSocketAddress replacedBookie = repp.replaceBookie(1, 1, 1, null,
new HashSet<>(), addr2, excludedAddrs);
+ Pair<BookieSocketAddress, Boolean> replaceBookieResponse =
repp.replaceBookie(1, 1, 1, null, new ArrayList<>(),
+ addr2, excludedAddrs);
+ BookieSocketAddress replacedBookie = replaceBookieResponse.getLeft();
assertFalse(addr1.equals(replacedBookie));
assertFalse(addr2.equals(replacedBookie));
@@ -257,9 +268,13 @@ public void testNewEnsembleWithSingleRack() throws
Exception {
addrs.add(addr4);
repp.onClusterChanged(addrs, new HashSet<BookieSocketAddress>());
try {
- List<BookieSocketAddress> ensemble = repp.newEnsemble(3, 2, 2,
null, new HashSet<>());
+ Pair<List<BookieSocketAddress>, Boolean> ensembleResponse =
repp.newEnsemble(3, 2, 2, null,
+ new HashSet<>());
+ List<BookieSocketAddress> ensemble = ensembleResponse.getLeft();
assertEquals(0, getNumCoveredWriteQuorums(ensemble, 2));
- List<BookieSocketAddress> ensemble2 = repp.newEnsemble(4, 2, 2,
null, new HashSet<>());
+ Pair<List<BookieSocketAddress>, Boolean> ensembleResponse2 =
repp.newEnsemble(4, 2, 2, null,
+ new HashSet<>());
+ List<BookieSocketAddress> ensemble2 = ensembleResponse2.getLeft();
assertEquals(0, getNumCoveredWriteQuorums(ensemble2, 2));
} catch (BKNotEnoughBookiesException bnebe) {
fail("Should not get not enough bookies exception even there is
only one rack.");
@@ -281,10 +296,14 @@ public void testNewEnsembleWithMultipleRacks() throws
Exception {
addrs.add(addr4);
repp.onClusterChanged(addrs, new HashSet<BookieSocketAddress>());
try {
- List<BookieSocketAddress> ensemble = repp.newEnsemble(3, 2, 2,
null, new HashSet<>());
+ Pair<List<BookieSocketAddress>, Boolean> ensembleResponse =
repp.newEnsemble(3, 2, 2, null,
+ new HashSet<>());
+ List<BookieSocketAddress> ensemble = ensembleResponse.getLeft();
int numCovered = getNumCoveredWriteQuorums(ensemble, 2);
assertTrue(numCovered == 2);
- List<BookieSocketAddress> ensemble2 = repp.newEnsemble(4, 2, 2,
null, new HashSet<>());
+ Pair<List<BookieSocketAddress>, Boolean> ensembleResponse2 =
repp.newEnsemble(4, 2, 2, null,
+ new HashSet<>());
+ List<BookieSocketAddress> ensemble2 = ensembleResponse2.getLeft();
numCovered = getNumCoveredWriteQuorums(ensemble2, 2);
assertTrue(numCovered == 2);
} catch (BKNotEnoughBookiesException bnebe) {
@@ -315,9 +334,13 @@ public void testNewEnsembleWithEnoughRacks() throws
Exception {
addrs.add(addr8);
repp.onClusterChanged(addrs, new HashSet<BookieSocketAddress>());
try {
- List<BookieSocketAddress> ensemble1 = repp.newEnsemble(3, 2, 2,
null, new HashSet<>());
+ Pair<List<BookieSocketAddress>, Boolean> ensembleResponse1 =
repp.newEnsemble(3, 2, 2, null,
+ new HashSet<>());
+ List<BookieSocketAddress> ensemble1 = ensembleResponse1.getLeft();
assertEquals(3, getNumCoveredWriteQuorums(ensemble1, 2));
- List<BookieSocketAddress> ensemble2 = repp.newEnsemble(4, 2, 2,
null, new HashSet<>());
+ Pair<List<BookieSocketAddress>, Boolean> ensembleResponse2 =
repp.newEnsemble(4, 2, 2, null,
+ new HashSet<>());
+ List<BookieSocketAddress> ensemble2 = ensembleResponse2.getLeft();
assertEquals(4, getNumCoveredWriteQuorums(ensemble2, 2));
} catch (BKNotEnoughBookiesException bnebe) {
fail("Should not get not enough bookies exception.");
diff --git
a/bookkeeper-server/src/test/java/org/apache/bookkeeper/client/TestRackawarePolicyNotificationUpdates.java
b/bookkeeper-server/src/test/java/org/apache/bookkeeper/client/TestRackawarePolicyNotificationUpdates.java
index 7dc1d39d1d..3192d049c1 100644
---
a/bookkeeper-server/src/test/java/org/apache/bookkeeper/client/TestRackawarePolicyNotificationUpdates.java
+++
b/bookkeeper-server/src/test/java/org/apache/bookkeeper/client/TestRackawarePolicyNotificationUpdates.java
@@ -42,6 +42,7 @@
import org.apache.bookkeeper.net.NetworkTopology;
import org.apache.bookkeeper.stats.NullStatsLogger;
import org.apache.bookkeeper.util.StaticDNSResolver;
+import org.apache.commons.lang3.tuple.Pair;
import org.junit.Test;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
@@ -104,8 +105,9 @@ public void testNotifyRackChange() throws Exception {
int ensembleSize = 3;
int writeQuorumSize = 2;
int acqQuorumSize = 2;
- List<BookieSocketAddress> ensemble = repp.newEnsemble(ensembleSize,
writeQuorumSize, acqQuorumSize,
- Collections.emptyMap(), Collections.emptySet());
+ Pair<List<BookieSocketAddress>, Boolean> ensembleResponse =
repp.newEnsemble(ensembleSize, writeQuorumSize,
+ acqQuorumSize, Collections.emptyMap(), Collections.emptySet());
+ List<BookieSocketAddress> ensemble = ensembleResponse.getLeft();
int numCovered =
TestRackawareEnsemblePlacementPolicy.getNumCoveredWriteQuorums(ensemble,
writeQuorumSize,
conf.getMinNumRacksPerWriteQuorum());
assertTrue(numCovered >= 1 && numCovered < 3);
@@ -118,8 +120,9 @@ public void testNotifyRackChange() throws Exception {
StaticDNSResolver.changeRack(bookieAddressList, rackList);
numOfAvailableRacks = numOfAvailableRacks + 1;
acqQuorumSize = 1;
- ensemble = repp.newEnsemble(ensembleSize, writeQuorumSize,
acqQuorumSize, Collections.emptyMap(),
+ ensembleResponse = repp.newEnsemble(ensembleSize, writeQuorumSize,
acqQuorumSize, Collections.emptyMap(),
Collections.emptySet());
+ ensemble = ensembleResponse.getLeft();
assertEquals(3,
TestRackawareEnsemblePlacementPolicy.getNumCoveredWriteQuorums(ensemble,
writeQuorumSize,
conf.getMinNumRacksPerWriteQuorum()));
assertTrue(ensemble.contains(addr1));
diff --git
a/bookkeeper-server/src/test/java/org/apache/bookkeeper/client/TestRegionAwareEnsemblePlacementPolicy.java
b/bookkeeper-server/src/test/java/org/apache/bookkeeper/client/TestRegionAwareEnsemblePlacementPolicy.java
index e541230e35..8e4f10d5b3 100644
---
a/bookkeeper-server/src/test/java/org/apache/bookkeeper/client/TestRegionAwareEnsemblePlacementPolicy.java
+++
b/bookkeeper-server/src/test/java/org/apache/bookkeeper/client/TestRegionAwareEnsemblePlacementPolicy.java
@@ -51,6 +51,7 @@
import org.apache.bookkeeper.stats.NullStatsLogger;
import org.apache.bookkeeper.util.BookKeeperConstants;
import org.apache.bookkeeper.util.StaticDNSResolver;
+import org.apache.commons.lang3.tuple.Pair;
import org.junit.Test;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
@@ -423,8 +424,9 @@ public void
testReplaceBookieWithEnoughBookiesInSameRegion() throws Exception {
addrs.add(addr4);
repp.onClusterChanged(addrs, new HashSet<BookieSocketAddress>());
// replace node under r2
- BookieSocketAddress replacedBookie = repp.replaceBookie(1, 1, 1, null,
new HashSet<BookieSocketAddress>(),
- addr2, new HashSet<BookieSocketAddress>());
+ Pair<BookieSocketAddress, Boolean> replaceBookieResponse =
repp.replaceBookie(1, 1, 1, null,
+ new ArrayList<BookieSocketAddress>(), addr2, new
HashSet<BookieSocketAddress>());
+ BookieSocketAddress replacedBookie = replaceBookieResponse.getLeft();
assertEquals(addr3, replacedBookie);
}
@@ -449,8 +451,9 @@ public void
testReplaceBookieWithEnoughBookiesInDifferentRegion() throws Excepti
// replace node under r2
Set<BookieSocketAddress> excludedAddrs = new
HashSet<BookieSocketAddress>();
excludedAddrs.add(addr1);
- BookieSocketAddress replacedBookie = repp.replaceBookie(1, 1, 1, null,
- new HashSet<BookieSocketAddress>(), addr2, excludedAddrs);
+ Pair<BookieSocketAddress, Boolean> replaceBookieResponse =
repp.replaceBookie(1, 1, 1, null,
+ new ArrayList<BookieSocketAddress>(), addr2, excludedAddrs);
+ BookieSocketAddress replacedBookie = replaceBookieResponse.getLeft();
assertFalse(addr1.equals(replacedBookie));
assertTrue(addr3.equals(replacedBookie) ||
addr4.equals(replacedBookie));
@@ -475,7 +478,9 @@ public void testNewEnsembleBookieWithNotEnoughBookies()
throws Exception {
addrs.add(addr4);
repp.onClusterChanged(addrs, new HashSet<BookieSocketAddress>());
try {
- List<BookieSocketAddress> list = repp.newEnsemble(5, 5, 3, null,
new HashSet<BookieSocketAddress>());
+ Pair<List<BookieSocketAddress>, Boolean> ensembleResponse =
repp.newEnsemble(5, 5, 3, null,
+ new HashSet<BookieSocketAddress>());
+ List<BookieSocketAddress> list = ensembleResponse.getLeft();
LOG.info("Ensemble : {}", list);
fail("Should throw BKNotEnoughBookiesException when there is not
enough bookies");
} catch (BKNotEnoughBookiesException bnebe) {
@@ -507,7 +512,7 @@ public void testReplaceBookieWithNotEnoughBookies() throws
Exception {
excludedAddrs.add(addr3);
excludedAddrs.add(addr4);
try {
- repp.replaceBookie(1, 1, 1, null, new
HashSet<BookieSocketAddress>(), addr2, excludedAddrs);
+ repp.replaceBookie(1, 1, 1, null, new
ArrayList<BookieSocketAddress>(), addr2, excludedAddrs);
fail("Should throw BKNotEnoughBookiesException when there is not
enough bookies");
} catch (BKNotEnoughBookiesException bnebe) {
// should throw not enou
@@ -536,11 +541,13 @@ public void testNewEnsembleWithSingleRegion() throws
Exception {
addrs.add(addr4);
repp.onClusterChanged(addrs, new HashSet<BookieSocketAddress>());
try {
- List<BookieSocketAddress> ensemble = repp.newEnsemble(3, 2, 2,
null,
+ Pair<List<BookieSocketAddress>, Boolean> ensembleResponse =
repp.newEnsemble(3, 2, 2, null,
new HashSet<BookieSocketAddress>());
+ List<BookieSocketAddress> ensemble = ensembleResponse.getLeft();
assertEquals(0, getNumCoveredRegionsInWriteQuorum(ensemble, 2));
- List<BookieSocketAddress> ensemble2 = repp.newEnsemble(4, 2, 2,
null,
+ Pair<List<BookieSocketAddress>, Boolean> ensembleResponse2 =
repp.newEnsemble(4, 2, 2, null,
new HashSet<BookieSocketAddress>());
+ List<BookieSocketAddress> ensemble2 = ensembleResponse2.getLeft();
assertEquals(0, getNumCoveredRegionsInWriteQuorum(ensemble2, 2));
} catch (BKNotEnoughBookiesException bnebe) {
fail("Should not get not enough bookies exception even there is
only one rack.");
@@ -569,8 +576,9 @@ public void testNewEnsembleWithMultipleRegions() throws
Exception {
addrs.add(addr4);
repp.onClusterChanged(addrs, new HashSet<BookieSocketAddress>());
try {
- List<BookieSocketAddress> ensemble = repp.newEnsemble(3, 2, 2,
null,
+ Pair<List<BookieSocketAddress>, Boolean> ensembleResponse =
repp.newEnsemble(3, 2, 2, null,
new HashSet<BookieSocketAddress>());
+ List<BookieSocketAddress> ensemble = ensembleResponse.getLeft();
int numCovered = getNumCoveredRegionsInWriteQuorum(ensemble, 2);
assertTrue(numCovered >= 1);
assertTrue(numCovered < 3);
@@ -578,8 +586,9 @@ public void testNewEnsembleWithMultipleRegions() throws
Exception {
fail("Should not get not enough bookies exception even there is
only one rack.");
}
try {
- List<BookieSocketAddress> ensemble2 = repp.newEnsemble(4, 2, 2,
null,
+ Pair<List<BookieSocketAddress>, Boolean> ensembleResponse2 =
repp.newEnsemble(4, 2, 2, null,
new HashSet<BookieSocketAddress>());
+ List<BookieSocketAddress> ensemble2 = ensembleResponse2.getLeft();
int numCovered = getNumCoveredRegionsInWriteQuorum(ensemble2, 2);
assertTrue(numCovered >= 1 && numCovered < 3);
} catch (BKNotEnoughBookiesException bnebe) {
@@ -618,11 +627,13 @@ public void testNewEnsembleWithEnoughRegions() throws
Exception {
addrs.add(addr8);
repp.onClusterChanged(addrs, new HashSet<BookieSocketAddress>());
try {
- List<BookieSocketAddress> ensemble1 = repp.newEnsemble(3, 2, 2,
null,
+ Pair<List<BookieSocketAddress>, Boolean> ensembleResponse1 =
repp.newEnsemble(3, 2, 2, null,
new HashSet<BookieSocketAddress>());
+ List<BookieSocketAddress> ensemble1 = ensembleResponse1.getLeft();
assertEquals(3, getNumCoveredRegionsInWriteQuorum(ensemble1, 2));
- List<BookieSocketAddress> ensemble2 = repp.newEnsemble(4, 2, 2,
null,
+ Pair<List<BookieSocketAddress>, Boolean> ensembleResponse2 =
repp.newEnsemble(4, 2, 2, null,
new HashSet<BookieSocketAddress>());
+ List<BookieSocketAddress> ensemble2 = ensembleResponse2.getLeft();
assertEquals(4, getNumCoveredRegionsInWriteQuorum(ensemble2, 2));
} catch (BKNotEnoughBookiesException bnebe) {
fail("Should not get not enough bookies exception even there is
only one rack.");
@@ -669,23 +680,27 @@ public void testNewEnsembleWithThreeRegions() throws
Exception {
addrs.add(addr10);
repp.onClusterChanged(addrs, new HashSet<BookieSocketAddress>());
try {
- List<BookieSocketAddress> ensemble = repp.newEnsemble(6, 6, 4,
null,
+ Pair<List<BookieSocketAddress>, Boolean> ensembleResponse =
repp.newEnsemble(6, 6, 4, null,
new HashSet<BookieSocketAddress>());
+ List<BookieSocketAddress> ensemble = ensembleResponse.getLeft();
assert(ensemble.contains(addr4));
assert(ensemble.contains(addr8));
assert(ensemble.size() == 6);
assertEquals(3, getNumRegionsInEnsemble(ensemble));
- ensemble = repp.newEnsemble(7, 7, 4, null, new
HashSet<BookieSocketAddress>());
+ ensembleResponse = repp.newEnsemble(7, 7, 4, null, new
HashSet<BookieSocketAddress>());
+ ensemble = ensembleResponse.getLeft();
assert(ensemble.contains(addr4));
assert(ensemble.contains(addr8));
assert(ensemble.size() == 7);
assertEquals(3, getNumRegionsInEnsemble(ensemble));
- ensemble = repp.newEnsemble(8, 8, 5, null, new
HashSet<BookieSocketAddress>());
+ ensembleResponse = repp.newEnsemble(8, 8, 5, null, new
HashSet<BookieSocketAddress>());
+ ensemble = ensembleResponse.getLeft();
assert(ensemble.contains(addr4));
assert(ensemble.contains(addr8));
assert(ensemble.size() == 8);
assertEquals(3, getNumRegionsInEnsemble(ensemble));
- ensemble = repp.newEnsemble(9, 9, 5, null, new
HashSet<BookieSocketAddress>());
+ ensembleResponse = repp.newEnsemble(9, 9, 5, null, new
HashSet<BookieSocketAddress>());
+ ensemble = ensembleResponse.getLeft();
assert(ensemble.contains(addr4));
assert(ensemble.contains(addr8));
assert(ensemble.size() == 9);
@@ -738,8 +753,9 @@ public void testNewEnsembleWithThreeRegionsWithDisable()
throws Exception {
repp.onClusterChanged(addrs, new HashSet<BookieSocketAddress>());
try {
((SettableFeature)
featureProvider.scope("region1").getFeature("disallowBookies")).set(true);
- List<BookieSocketAddress> ensemble = repp.newEnsemble(6, 6, 4,
null,
+ Pair<List<BookieSocketAddress>, Boolean> ensembleResponse =
repp.newEnsemble(6, 6, 4, null,
new HashSet<BookieSocketAddress>());
+ List<BookieSocketAddress> ensemble = ensembleResponse.getLeft();
assertEquals(2, getNumRegionsInEnsemble(ensemble));
assert(ensemble.contains(addr1));
assert(ensemble.contains(addr3));
@@ -753,16 +769,16 @@ public void testNewEnsembleWithThreeRegionsWithDisable()
throws Exception {
}
try {
((SettableFeature)
featureProvider.scope("region2").getFeature("disallowBookies")).set(true);
- List<BookieSocketAddress> ensemble = repp.newEnsemble(6, 6, 4,
null,
- new HashSet<BookieSocketAddress>());
+ repp.newEnsemble(6, 6, 4, null, new
HashSet<BookieSocketAddress>());
fail("Should get not enough bookies exception even there is only
one region with insufficient bookies.");
} catch (BKNotEnoughBookiesException bnebe) {
// Expected
}
try {
((SettableFeature)
featureProvider.scope("region2").getFeature("disallowBookies")).set(false);
- List<BookieSocketAddress> ensemble = repp.newEnsemble(6, 6, 4,
null,
+ Pair<List<BookieSocketAddress>, Boolean> ensembleResponse =
repp.newEnsemble(6, 6, 4, null,
new HashSet<BookieSocketAddress>());
+ List<BookieSocketAddress> ensemble = ensembleResponse.getLeft();
assert(ensemble.contains(addr1));
assert(ensemble.contains(addr3));
assert(ensemble.contains(addr4));
@@ -835,8 +851,9 @@ public void testNewEnsembleWithFiveRegions() throws
Exception {
repp.onClusterChanged(addrs, new HashSet<BookieSocketAddress>());
try {
- List<BookieSocketAddress> ensemble = repp.newEnsemble(10, 10, 10,
null,
+ Pair<List<BookieSocketAddress>, Boolean> ensembleResponse =
repp.newEnsemble(10, 10, 10, null,
new HashSet<BookieSocketAddress>());
+ List<BookieSocketAddress> ensemble = ensembleResponse.getLeft();
assert(ensemble.size() == 10);
assertEquals(5, getNumRegionsInEnsemble(ensemble));
} catch (BKNotEnoughBookiesException bnebe) {
@@ -847,7 +864,9 @@ public void testNewEnsembleWithFiveRegions() throws
Exception {
try {
Set<BookieSocketAddress> excludedAddrs = new
HashSet<BookieSocketAddress>();
excludedAddrs.add(addr10);
- List<BookieSocketAddress> ensemble = repp.newEnsemble(10, 10, 10,
null, excludedAddrs);
+ Pair<List<BookieSocketAddress>, Boolean> ensembleResponse =
repp.newEnsemble(10, 10, 10, null,
+ excludedAddrs);
+ List<BookieSocketAddress> ensemble = ensembleResponse.getLeft();
assert(ensemble.contains(addr11) && ensemble.contains(addr12));
assert(ensemble.size() == 10);
assertEquals(5, getNumRegionsInEnsemble(ensemble));
@@ -937,9 +956,11 @@ public void
testEnsembleWithThreeRegionsReplaceInternal(int minDurability, boole
ackQuorum = 5;
}
+ Pair<List<BookieSocketAddress>, Boolean> ensembleResponse;
List<BookieSocketAddress> ensemble;
try {
- ensemble = repp.newEnsemble(6, 6, ackQuorum, null, new
HashSet<BookieSocketAddress>());
+ ensembleResponse = repp.newEnsemble(6, 6, ackQuorum, null, new
HashSet<BookieSocketAddress>());
+ ensemble = ensembleResponse.getLeft();
assert(ensemble.size() == 6);
assertEquals(3, getNumRegionsInEnsemble(ensemble));
} catch (BKNotEnoughBookiesException bnebe) {
@@ -960,9 +981,9 @@ public void testEnsembleWithThreeRegionsReplaceInternal(int
minDurability, boole
Set<BookieSocketAddress> excludedAddrs = new
HashSet<BookieSocketAddress>();
for (BookieSocketAddress addr: region2Bookies) {
if (ensemble.contains(addr)) {
- BookieSocketAddress replacedBookie = repp.replaceBookie(
- 6, 6, ackQuorum, null,
- new HashSet<>(ensemble), addr, excludedAddrs);
+ Pair<BookieSocketAddress, Boolean> replaceBookieResponse =
repp.replaceBookie(6, 6, ackQuorum, null,
+ ensemble, addr, excludedAddrs);
+ BookieSocketAddress replacedBookie =
replaceBookieResponse.getLeft();
ensemble.remove(addr);
ensemble.add(replacedBookie);
}
@@ -986,9 +1007,9 @@ public void
testEnsembleWithThreeRegionsReplaceInternal(int minDurability, boole
Set<BookieSocketAddress> excludedAddrs = new
HashSet<BookieSocketAddress>();
try {
- BookieSocketAddress replacedBookie = repp.replaceBookie(
- 6, 6, ackQuorum, null,
- new HashSet<>(ensemble), bookieToReplace, excludedAddrs);
+ Pair<BookieSocketAddress, Boolean> replaceBookieResponse =
repp.replaceBookie(6, 6, ackQuorum, null,
+ ensemble, bookieToReplace, excludedAddrs);
+ BookieSocketAddress replacedBookie =
replaceBookieResponse.getLeft();
assert (replacedBookie.equals(replacedBookieExpected));
assertEquals(3, getNumRegionsInEnsemble(ensemble));
} catch (BKNotEnoughBookiesException bnebe) {
@@ -997,9 +1018,7 @@ public void
testEnsembleWithThreeRegionsReplaceInternal(int minDurability, boole
excludedAddrs.add(replacedBookieExpected);
try {
- BookieSocketAddress replacedBookie = repp.replaceBookie(
- 6, 6, ackQuorum, null,
- new HashSet<>(ensemble), bookieToReplace, excludedAddrs);
+ repp.replaceBookie(6, 6, ackQuorum, null, ensemble,
bookieToReplace, excludedAddrs);
if (minDurability > 1 &&
!disableDurabilityFeature.isAvailable()) {
fail("Should throw BKNotEnoughBookiesException when there
is not enough bookies");
}
@@ -1073,9 +1092,11 @@ public void testEnsembleDurabilityDisabledInternal(int
minDurability, boolean di
.set(true);
}
+ Pair<List<BookieSocketAddress>, Boolean> ensembleResponse;
List<BookieSocketAddress> ensemble;
try {
- ensemble = repp.newEnsemble(6, 6, 4, null, new
HashSet<BookieSocketAddress>());
+ ensembleResponse = repp.newEnsemble(6, 6, 4, null, new
HashSet<BookieSocketAddress>());
+ ensemble = ensembleResponse.getLeft();
assert(ensemble.size() == 6);
} catch (BKNotEnoughBookiesException bnebe) {
LOG.error("BKNotEnoughBookiesException", bnebe);
@@ -1086,9 +1107,7 @@ public void testEnsembleDurabilityDisabledInternal(int
minDurability, boolean di
Set<BookieSocketAddress> excludedAddrs = new
HashSet<BookieSocketAddress>();
try {
- repp.replaceBookie(
- 6, 6, 4, null,
- new HashSet<>(ensemble), addr4, excludedAddrs);
+ repp.replaceBookie(6, 6, 4, null, ensemble, ensemble.get(2),
excludedAddrs);
} catch (BKNotEnoughBookiesException bnebe) {
fail("Should not get not enough bookies exception even there is
only one rack.");
}
@@ -1141,8 +1160,8 @@ public void testNewEnsembleFailWithFiveRegions() throws
Exception {
excludedAddrs.add(addr10);
excludedAddrs.add(addr9);
try {
- List<BookieSocketAddress> list = repp.newEnsemble(5, 5, 5, null,
excludedAddrs);
- LOG.info("Ensemble : {}", list);
+ Pair<List<BookieSocketAddress>, Boolean> list =
repp.newEnsemble(5, 5, 5, null, excludedAddrs);
+ LOG.info("Ensemble : {}", list.getLeft());
fail("Should throw BKNotEnoughBookiesException when there is not
enough bookies");
} catch (BKNotEnoughBookiesException bnebe) {
// should throw not enou
@@ -1201,8 +1220,9 @@ public void
testBasicReorderReadLACSequenceWithLocalRegion() throws Exception {
private void basicReorderReadSequenceWithLocalRegionTest(String myRegion,
boolean isReadLAC) throws Exception {
prepareNetworkTopologyForReorderTests(myRegion);
-
- List<BookieSocketAddress> ensemble = repp.newEnsemble(9, 9, 5, null,
new HashSet<BookieSocketAddress>());
+ Pair<List<BookieSocketAddress>, Boolean> ensembleResponse =
repp.newEnsemble(9, 9, 5, null,
+ new HashSet<BookieSocketAddress>());
+ List<BookieSocketAddress> ensemble = ensembleResponse.getLeft();
assertEquals(9, getNumCoveredRegionsInWriteQuorum(ensemble, 9));
DistributionSchedule ds = new RoundRobinDistributionSchedule(9, 9, 9);
@@ -1258,7 +1278,9 @@ public void
testBasicReorderReadLACSequenceWithRemoteRegion() throws Exception {
private void basicReorderReadSequenceWithRemoteRegionTest(String myRegion,
boolean isReadLAC) throws Exception {
prepareNetworkTopologyForReorderTests(myRegion);
- List<BookieSocketAddress> ensemble = repp.newEnsemble(9, 9, 5, null,
new HashSet<BookieSocketAddress>());
+ Pair<List<BookieSocketAddress>, Boolean> ensembleResponse =
repp.newEnsemble(9, 9, 5, null,
+ new HashSet<BookieSocketAddress>());
+ List<BookieSocketAddress> ensemble = ensembleResponse.getLeft();
assertEquals(9, getNumCoveredRegionsInWriteQuorum(ensemble, 9));
DistributionSchedule ds = new RoundRobinDistributionSchedule(9, 9, 9);
@@ -1328,7 +1350,9 @@ private void
reorderReadSequenceWithUnavailableOrReadOnlyBookiesTest(boolean isR
prepareNetworkTopologyForReorderTests(myRegion);
- List<BookieSocketAddress> ensemble = repp.newEnsemble(9, 9, 5, null,
new HashSet<BookieSocketAddress>());
+ Pair<List<BookieSocketAddress>, Boolean> ensembleResponse =
repp.newEnsemble(9, 9, 5, null,
+ new HashSet<BookieSocketAddress>());
+ List<BookieSocketAddress> ensemble = ensembleResponse.getLeft();
assertEquals(9, getNumCoveredRegionsInWriteQuorum(ensemble, 9));
DistributionSchedule ds = new RoundRobinDistributionSchedule(9, 9, 9);
----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on GitHub and use the
URL above to go to the specific comment.
For queries about this service, please contact Infrastructure at:
[email protected]
With regards,
Apache Git Services