This is an automated email from the ASF dual-hosted git repository.
reddycharan pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/bookkeeper.git
The following commit(s) were added to refs/heads/master by this push:
new 49135b6 Exclude defaultrack bookies when
enforceMinNumRacksPerWriteQuorum is enabled
49135b6 is described below
commit 49135b6b48a45a5fbc1fd8696dd595e4dc832da8
Author: Charan Reddy Guttapalem <[email protected]>
AuthorDate: Thu Feb 21 12:41:14 2019 -0800
Exclude defaultrack bookies when enforceMinNumRacksPerWriteQuorum is enabled
Descriptions of the changes in this PR:
- enforceMinNumRacksPerWriteQuorum is meant to be used for strict placement
policy. So when
it is enabled, bookies which belong to default faultzone/rack (because of
failure in resolving
network location) should be excluded from bookie selection.
- add gauge for number of bookies in default faultzone/rack. It will be
helpful to create alerts
based on this gauge.
- add gauge for number of ledgers found not adhering to strict placement
policy in Auditor's
placement policy check. This gauge will be more helpful in creating alert
instead of using
monotonously increasing alert.
Reviewers: Enrico Olivelli <[email protected]>, Pasha Kuznetsov <None>,
Sijie Guo <[email protected]>, Venkateswararao Jujjuri (JV) <None>
This closes #1941 from reddycharan/ignoredefaultzoneandfixmetrics
---
.../apache/bookkeeper/test/TestStatsProvider.java | 2 +-
.../bookkeeper/client/BookKeeperClientStats.java | 3 +
.../bookkeeper/client/EnsemblePlacementPolicy.java | 6 +
.../RackawareEnsemblePlacementPolicyImpl.java | 121 +++++++---
.../client/RegionAwareEnsemblePlacementPolicy.java | 25 ++-
.../TopologyAwareEnsemblePlacementPolicy.java | 2 -
.../org/apache/bookkeeper/net/NetworkTopology.java | 11 +
.../apache/bookkeeper/net/NetworkTopologyImpl.java | 12 +-
.../bookkeeper/net/StabilizeNetworkTopology.java | 6 +
.../org/apache/bookkeeper/replication/Auditor.java | 61 ++++-
.../bookkeeper/replication/ReplicationStats.java | 3 +-
.../TestRackawareEnsemblePlacementPolicy.java | 248 +++++++++++++++++++--
.../AuditorPlacementPolicyCheckTest.java | 28 ++-
13 files changed, 427 insertions(+), 101 deletions(-)
diff --git
a/bookkeeper-common/src/test/java/org/apache/bookkeeper/test/TestStatsProvider.java
b/bookkeeper-common/src/test/java/org/apache/bookkeeper/test/TestStatsProvider.java
index bef0dfe..624791c 100644
---
a/bookkeeper-common/src/test/java/org/apache/bookkeeper/test/TestStatsProvider.java
+++
b/bookkeeper-common/src/test/java/org/apache/bookkeeper/test/TestStatsProvider.java
@@ -177,7 +177,7 @@ public class TestStatsProvider implements StatsProvider {
}
public Gauge<? extends Number> getGauge(String name) {
- return gaugeMap.get(path);
+ return gaugeMap.get(getSubPath(name));
}
@Override
diff --git
a/bookkeeper-server/src/main/java/org/apache/bookkeeper/client/BookKeeperClientStats.java
b/bookkeeper-server/src/main/java/org/apache/bookkeeper/client/BookKeeperClientStats.java
index 8b358b6..cdfde67 100644
---
a/bookkeeper-server/src/main/java/org/apache/bookkeeper/client/BookKeeperClientStats.java
+++
b/bookkeeper-server/src/main/java/org/apache/bookkeeper/client/BookKeeperClientStats.java
@@ -93,6 +93,9 @@ public interface BookKeeperClientStats {
String FAILED_CONNECTION_COUNTER = "FAILED_CONNECTION_COUNTER";
String FAILED_TLS_HANDSHAKE_COUNTER = "FAILED_TLS_HANDSHAKE_COUNTER";
+ // placementpolicy stats
+ String NUM_WRITABLE_BOOKIES_IN_DEFAULT_RACK =
"NUM_WRITABLE_BOOKIES_IN_DEFAULT_RACK";
+
OpStatsLogger getCreateOpLogger();
OpStatsLogger getOpenOpLogger();
OpStatsLogger getDeleteOpLogger();
diff --git
a/bookkeeper-server/src/main/java/org/apache/bookkeeper/client/EnsemblePlacementPolicy.java
b/bookkeeper-server/src/main/java/org/apache/bookkeeper/client/EnsemblePlacementPolicy.java
index fad3f92..7dc8111 100644
---
a/bookkeeper-server/src/main/java/org/apache/bookkeeper/client/EnsemblePlacementPolicy.java
+++
b/bookkeeper-server/src/main/java/org/apache/bookkeeper/client/EnsemblePlacementPolicy.java
@@ -251,6 +251,9 @@ public interface EnsemblePlacementPolicy {
* <p>{@code customMetadata} is the same user defined data that user
provides
* when {@link BookKeeper#createLedger(int, int, int,
BookKeeper.DigestType, byte[], Map)}.
*
+ * <p>If 'enforceMinNumRacksPerWriteQuorum' config is enabled then the
bookies belonging to default
+ * faultzone (rack) will be excluded while selecting bookies.
+ *
* @param ensembleSize
* Ensemble Size
* @param writeQuorumSize
@@ -274,6 +277,9 @@ public interface EnsemblePlacementPolicy {
* Choose a new bookie to replace <i>bookieToReplace</i>. If no bookie
available in the cluster,
* {@link BKNotEnoughBookiesException} is thrown.
*
+ * <p>If 'enforceMinNumRacksPerWriteQuorum' config is enabled then the
bookies belonging to default
+ * faultzone (rack) will be excluded while selecting bookies.
+ *
* @param ensembleSize
* the value of ensembleSize
* @param writeQuorumSize
diff --git
a/bookkeeper-server/src/main/java/org/apache/bookkeeper/client/RackawareEnsemblePlacementPolicyImpl.java
b/bookkeeper-server/src/main/java/org/apache/bookkeeper/client/RackawareEnsemblePlacementPolicyImpl.java
index 304db72..ef4341a 100644
---
a/bookkeeper-server/src/main/java/org/apache/bookkeeper/client/RackawareEnsemblePlacementPolicyImpl.java
+++
b/bookkeeper-server/src/main/java/org/apache/bookkeeper/client/RackawareEnsemblePlacementPolicyImpl.java
@@ -22,6 +22,7 @@ import static
org.apache.bookkeeper.bookie.BookKeeperServerStats.BOOKIES_JOINED;
import static org.apache.bookkeeper.bookie.BookKeeperServerStats.BOOKIES_LEFT;
import static
org.apache.bookkeeper.bookie.BookKeeperServerStats.FAILED_TO_RESOLVE_NETWORK_LOCATION_COUNTER;
import static org.apache.bookkeeper.client.BookKeeperClientStats.CLIENT_SCOPE;
+import static
org.apache.bookkeeper.client.BookKeeperClientStats.NUM_WRITABLE_BOOKIES_IN_DEFAULT_RACK;
import static
org.apache.bookkeeper.client.BookKeeperClientStats.READ_REQUESTS_REORDERED;
import static
org.apache.bookkeeper.client.RegionAwareEnsemblePlacementPolicy.UNKNOWN_REGION;
@@ -67,6 +68,7 @@ import org.apache.bookkeeper.net.NodeBase;
import org.apache.bookkeeper.net.ScriptBasedMapping;
import org.apache.bookkeeper.net.StabilizeNetworkTopology;
import org.apache.bookkeeper.stats.Counter;
+import org.apache.bookkeeper.stats.Gauge;
import org.apache.bookkeeper.stats.OpStatsLogger;
import org.apache.bookkeeper.stats.StatsLogger;
import org.apache.bookkeeper.stats.annotations.StatsDoc;
@@ -114,11 +116,6 @@ public class RackawareEnsemblePlacementPolicyImpl extends
TopologyAwareEnsembleP
final Supplier<String> defaultRackSupplier;
- // for backwards compat
- public DefaultResolver() {
- this(() -> NetworkTopology.DEFAULT_REGION_AND_RACK);
- }
-
public DefaultResolver(Supplier<String> defaultRackSupplier) {
checkNotNull(defaultRackSupplier, "defaultRackSupplier should not
be null");
this.defaultRackSupplier = defaultRackSupplier;
@@ -240,7 +237,16 @@ public class RackawareEnsemblePlacementPolicyImpl extends
TopologyAwareEnsembleP
help = "The distribution of number of bookies reordered on each read
request"
)
protected OpStatsLogger readReorderedCounter = null;
+ @StatsDoc(
+ name = FAILED_TO_RESOLVE_NETWORK_LOCATION_COUNTER,
+ help = "Counter for number of times DNSResolverDecorator failed to
resolve Network Location"
+ )
protected Counter failedToResolveNetworkLocationCounter = null;
+ @StatsDoc(
+ name = NUM_WRITABLE_BOOKIES_IN_DEFAULT_RACK,
+ help = "Gauge for the number of writable Bookies in default rack"
+ )
+ protected Gauge<Integer> numWritableBookiesInDefaultRack;
private String defaultRack = NetworkTopology.DEFAULT_RACK;
@@ -282,8 +288,24 @@ public class RackawareEnsemblePlacementPolicyImpl extends
TopologyAwareEnsembleP
this.bookiesJoinedCounter =
statsLogger.getOpStatsLogger(BOOKIES_JOINED);
this.bookiesLeftCounter = statsLogger.getOpStatsLogger(BOOKIES_LEFT);
this.readReorderedCounter =
statsLogger.getOpStatsLogger(READ_REQUESTS_REORDERED);
- this.failedToResolveNetworkLocationCounter = statsLogger
- .getCounter(FAILED_TO_RESOLVE_NETWORK_LOCATION_COUNTER);
+ this.failedToResolveNetworkLocationCounter =
statsLogger.getCounter(FAILED_TO_RESOLVE_NETWORK_LOCATION_COUNTER);
+ this.numWritableBookiesInDefaultRack = new Gauge<Integer>() {
+ @Override
+ public Integer getDefaultValue() {
+ return 0;
+ }
+
+ @Override
+ public Integer getSample() {
+ rwLock.readLock().lock();
+ try {
+ return topology.countNumOfAvailableNodes(getDefaultRack(),
Collections.emptySet());
+ } finally {
+ rwLock.readLock().unlock();
+ }
+ }
+ };
+ this.statsLogger.registerGauge(NUM_WRITABLE_BOOKIES_IN_DEFAULT_RACK,
numWritableBookiesInDefaultRack);
this.reorderReadsRandom = reorderReadsRandom;
this.stabilizePeriodSeconds = stabilizePeriodSeconds;
this.reorderThresholdPendingRequests = reorderThresholdPendingRequests;
@@ -417,7 +439,9 @@ public class RackawareEnsemblePlacementPolicyImpl extends
TopologyAwareEnsembleP
if (node != null) {
// refresh the rack info if its a known bookie
topology.remove(node);
- topology.add(createBookieNode(bookieAddress));
+ BookieNode newNode = createBookieNode(bookieAddress);
+ topology.add(newNode);
+ knownBookies.put(bookieAddress, newNode);
}
}
} finally {
@@ -455,6 +479,9 @@ public class RackawareEnsemblePlacementPolicyImpl extends
TopologyAwareEnsembleP
}
}
+ /*
+ * this method should be called in writelock scope of 'rwLock'
+ */
@Override
public void handleBookiesThatLeft(Set<BookieSocketAddress> leftBookies) {
for (BookieSocketAddress addr : leftBookies) {
@@ -483,6 +510,9 @@ public class RackawareEnsemblePlacementPolicyImpl extends
TopologyAwareEnsembleP
}
}
+ /*
+ * this method should be called in writelock scope of 'rwLock'
+ */
@Override
public void handleBookiesThatJoined(Set<BookieSocketAddress>
joinedBookies) {
// node joined
@@ -531,26 +561,53 @@ public class RackawareEnsemblePlacementPolicyImpl extends
TopologyAwareEnsembleP
return networkLocs;
}
+ /*
+ * this method should be called in readlock scope of 'rwLock'
+ */
+ protected Set<BookieSocketAddress>
addDefaultRackBookiesIfMinNumRacksIsEnforced(
+ Set<BookieSocketAddress> excludeBookies) {
+ Set<BookieSocketAddress> comprehensiveExclusionBookiesSet;
+ if (enforceMinNumRacksPerWriteQuorum) {
+ Set<BookieSocketAddress> bookiesInDefaultRack = null;
+ Set<Node> defaultRackLeaves = topology.getLeaves(getDefaultRack());
+ for (Node node : defaultRackLeaves) {
+ if (node instanceof BookieNode) {
+ if (bookiesInDefaultRack == null) {
+ bookiesInDefaultRack = new
HashSet<BookieSocketAddress>(excludeBookies);
+ }
+ bookiesInDefaultRack.add(((BookieNode) node).getAddr());
+ } else {
+ LOG.error("found non-BookieNode: {} as leaf of
defaultrack: {}", node, getDefaultRack());
+ }
+ }
+ if ((bookiesInDefaultRack == null) ||
bookiesInDefaultRack.isEmpty()) {
+ comprehensiveExclusionBookiesSet = excludeBookies;
+ } else {
+ comprehensiveExclusionBookiesSet = new
HashSet<BookieSocketAddress>(excludeBookies);
+ comprehensiveExclusionBookiesSet.addAll(bookiesInDefaultRack);
+ LOG.info("enforceMinNumRacksPerWriteQuorum is enabled, so
Excluding bookies of defaultRack: {}",
+ bookiesInDefaultRack);
+ }
+ } else {
+ comprehensiveExclusionBookiesSet = excludeBookies;
+ }
+ return comprehensiveExclusionBookiesSet;
+ }
+
@Override
public PlacementResult<List<BookieSocketAddress>> newEnsemble(int
ensembleSize, int writeQuorumSize,
int ackQuorumSize, Map<String, byte[]> customMetadata,
Set<BookieSocketAddress> excludeBookies)
throws BKNotEnoughBookiesException {
- return newEnsembleInternal(ensembleSize, writeQuorumSize,
excludeBookies, null, null);
- }
-
- protected PlacementResult<List<BookieSocketAddress>>
newEnsembleInternal(int ensembleSize,
-
int writeQuorumSize,
-
Set<BookieSocketAddress> excludeBookies,
-
Ensemble<BookieNode> parentEnsemble,
-
Predicate<BookieNode> parentPredicate)
- throws BKNotEnoughBookiesException {
- return newEnsembleInternal(
- ensembleSize,
- writeQuorumSize,
- writeQuorumSize,
- excludeBookies,
- parentEnsemble,
- parentPredicate);
+ rwLock.readLock().lock();
+ try {
+ Set<BookieSocketAddress> comprehensiveExclusionBookiesSet =
addDefaultRackBookiesIfMinNumRacksIsEnforced(
+ excludeBookies);
+ PlacementResult<List<BookieSocketAddress>> newEnsembleResult =
newEnsembleInternal(ensembleSize,
+ writeQuorumSize, ackQuorumSize,
comprehensiveExclusionBookiesSet, null, null);
+ return newEnsembleResult;
+ } finally {
+ rwLock.readLock().unlock();
+ }
}
@Override
@@ -643,6 +700,7 @@ public class RackawareEnsemblePlacementPolicyImpl extends
TopologyAwareEnsembleP
throws BKNotEnoughBookiesException {
rwLock.readLock().lock();
try {
+ excludeBookies =
addDefaultRackBookiesIfMinNumRacksIsEnforced(excludeBookies);
excludeBookies.addAll(currentEnsemble);
BookieNode bn = knownBookies.get(bookieToReplace);
if (null == bn) {
@@ -1253,28 +1311,31 @@ public class RackawareEnsemblePlacementPolicyImpl
extends TopologyAwareEnsembleP
}
}
+ // this method should be called in readlock scope of 'rwlock'
@Override
public boolean
isEnsembleAdheringToPlacementPolicy(List<BookieSocketAddress> ensembleList, int
writeQuorumSize,
int ackQuorumSize) {
int ensembleSize = ensembleList.size();
int minNumRacksPerWriteQuorumForThisEnsemble =
Math.min(writeQuorumSize, minNumRacksPerWriteQuorum);
- HashSet<String> racksOrRegionsInQuorum = new HashSet<String>();
+ HashSet<String> racksInQuorum = new HashSet<String>();
BookieSocketAddress bookie;
for (int i = 0; i < ensembleList.size(); i++) {
- racksOrRegionsInQuorum.clear();
+ racksInQuorum.clear();
for (int j = 0; j < writeQuorumSize; j++) {
bookie = ensembleList.get((i + j) % ensembleSize);
try {
-
racksOrRegionsInQuorum.add(knownBookies.get(bookie).getNetworkLocation());
+
racksInQuorum.add(knownBookies.get(bookie).getNetworkLocation());
} catch (Exception e) {
/*
- * any issue/exception in analyzing whether ensemble is
strictly adhering to
- * placement policy should be swallowed.
+ * any issue/exception in analyzing whether ensemble is
+ * strictly adhering to placement policy should be
+ * swallowed.
*/
LOG.warn("Received exception while trying to get network
location of bookie: {}", bookie, e);
}
}
- if (racksOrRegionsInQuorum.size() <
minNumRacksPerWriteQuorumForThisEnsemble) {
+ if ((racksInQuorum.size() <
minNumRacksPerWriteQuorumForThisEnsemble)
+ || (enforceMinNumRacksPerWriteQuorum &&
racksInQuorum.contains(getDefaultRack()))) {
return false;
}
}
diff --git
a/bookkeeper-server/src/main/java/org/apache/bookkeeper/client/RegionAwareEnsemblePlacementPolicy.java
b/bookkeeper-server/src/main/java/org/apache/bookkeeper/client/RegionAwareEnsemblePlacementPolicy.java
index cd80fdf..399e8aa 100644
---
a/bookkeeper-server/src/main/java/org/apache/bookkeeper/client/RegionAwareEnsemblePlacementPolicy.java
+++
b/bookkeeper-server/src/main/java/org/apache/bookkeeper/client/RegionAwareEnsemblePlacementPolicy.java
@@ -228,7 +228,7 @@ public class RegionAwareEnsemblePlacementPolicy extends
RackawareEnsemblePlaceme
@Override
public PlacementResult<List<BookieSocketAddress>> newEnsemble(int
ensembleSize, int writeQuorumSize,
- int ackQuorumSize, Map<String, byte[]> customMetadata,
Set<BookieSocketAddress> excludeBookies)
+ int ackQuorumSize, Map<String, byte[]> customMetadata,
Set<BookieSocketAddress> excludedBookies)
throws BKException.BKNotEnoughBookiesException {
int effectiveMinRegionsForDurability =
disableDurabilityFeature.isAvailable() ? 1 : minRegionsForDurability;
@@ -257,7 +257,9 @@ public class RegionAwareEnsemblePlacementPolicy extends
RackawareEnsemblePlaceme
rwLock.readLock().lock();
try {
- Set<Node> excludeNodes = convertBookiesToNodes(excludeBookies);
+ Set<BookieSocketAddress> comprehensiveExclusionBookiesSet =
addDefaultRackBookiesIfMinNumRacksIsEnforced(
+ excludedBookies);
+ Set<Node> excludeNodes =
convertBookiesToNodes(comprehensiveExclusionBookiesSet);
Set<String> availableRegions = new HashSet<String>();
for (String region: perRegionPlacement.keySet()) {
if ((null == disallowBookiePlacementInRegionFeatureName)
@@ -294,8 +296,8 @@ public class RegionAwareEnsemblePlacementPolicy extends
RackawareEnsemblePlaceme
effectiveMinRegionsForDurability,
minNumRacksPerWriteQuorum);
TopologyAwareEnsemblePlacementPolicy nextPolicy =
perRegionPlacement.get(
availableRegions.iterator().next());
- return nextPolicy.newEnsemble(ensembleSize, writeQuorumSize,
writeQuorumSize, excludeBookies, ensemble,
- ensemble);
+ return nextPolicy.newEnsemble(ensembleSize, writeQuorumSize,
writeQuorumSize,
+ comprehensiveExclusionBookiesSet, ensemble, ensemble);
}
int remainingEnsemble = ensembleSize;
@@ -349,9 +351,10 @@ public class RegionAwareEnsemblePlacementPolicy extends
RackawareEnsemblePlaceme
int newEnsembleSize = currentAllocation.getLeft()
+ addToEnsembleSize;
int newWriteQuorumSize =
currentAllocation.getRight() + addToWriteQuorum;
try {
- List<BookieSocketAddress> allocated =
policyWithinRegion.newEnsemble(newEnsembleSize,
- newWriteQuorumSize,
newWriteQuorumSize, excludeBookies, tempEnsemble,
- tempEnsemble).getResult();
+ List<BookieSocketAddress> allocated =
policyWithinRegion
+ .newEnsemble(newEnsembleSize,
newWriteQuorumSize, newWriteQuorumSize,
+
comprehensiveExclusionBookiesSet, tempEnsemble, tempEnsemble)
+ .getResult();
ensemble = tempEnsemble;
remainingEnsemble -= addToEnsembleSize;
remainingWriteQuorum -= addToWriteQuorum;
@@ -379,12 +382,12 @@ public class RegionAwareEnsemblePlacementPolicy extends
RackawareEnsemblePlaceme
if (regionsReachedMaxAllocation.contains(region)) {
if (currentAllocation.getLeft() > 0) {
LOG.info("Allocating {} bookies in region {} :
ensemble {} exclude {}",
- currentAllocation.getLeft(), region,
excludeBookies, ensemble);
+ currentAllocation.getLeft(), region,
comprehensiveExclusionBookiesSet, ensemble);
policyWithinRegion.newEnsemble(
currentAllocation.getLeft(),
currentAllocation.getRight(),
currentAllocation.getRight(),
- excludeBookies,
+ comprehensiveExclusionBookiesSet,
ensemble,
ensemble);
LOG.info("Allocated {} bookies in region {} : {}",
@@ -428,7 +431,9 @@ public class RegionAwareEnsemblePlacementPolicy extends
RackawareEnsemblePlaceme
try {
boolean enforceDurability = enforceDurabilityInReplace &&
!disableDurabilityFeature.isAvailable();
int effectiveMinRegionsForDurability = enforceDurability ?
minRegionsForDurability : 1;
- Set<Node> excludeNodes = convertBookiesToNodes(excludeBookies);
+ Set<BookieSocketAddress> comprehensiveExclusionBookiesSet =
addDefaultRackBookiesIfMinNumRacksIsEnforced(
+ excludeBookies);
+ Set<Node> excludeNodes =
convertBookiesToNodes(comprehensiveExclusionBookiesSet);
RRTopologyAwareCoverageEnsemble ensemble = new
RRTopologyAwareCoverageEnsemble(ensembleSize,
writeQuorumSize,
ackQuorumSize,
diff --git
a/bookkeeper-server/src/main/java/org/apache/bookkeeper/client/TopologyAwareEnsemblePlacementPolicy.java
b/bookkeeper-server/src/main/java/org/apache/bookkeeper/client/TopologyAwareEnsemblePlacementPolicy.java
index 19cb505..7fa7555 100644
---
a/bookkeeper-server/src/main/java/org/apache/bookkeeper/client/TopologyAwareEnsemblePlacementPolicy.java
+++
b/bookkeeper-server/src/main/java/org/apache/bookkeeper/client/TopologyAwareEnsemblePlacementPolicy.java
@@ -78,7 +78,6 @@ abstract class TopologyAwareEnsemblePlacementPolicy implements
}
protected static class BookieNode extends NodeBase {
-
private final BookieSocketAddress addr; // identifier of a bookie node.
BookieNode(BookieSocketAddress addr, String networkLoc) {
@@ -108,7 +107,6 @@ abstract class TopologyAwareEnsemblePlacementPolicy
implements
public String toString() {
return String.format("<Bookie:%s>", name);
}
-
}
/**
diff --git
a/bookkeeper-server/src/main/java/org/apache/bookkeeper/net/NetworkTopology.java
b/bookkeeper-server/src/main/java/org/apache/bookkeeper/net/NetworkTopology.java
index a6bcf77..d2d37ea 100644
---
a/bookkeeper-server/src/main/java/org/apache/bookkeeper/net/NetworkTopology.java
+++
b/bookkeeper-server/src/main/java/org/apache/bookkeeper/net/NetworkTopology.java
@@ -17,6 +17,7 @@
*/
package org.apache.bookkeeper.net;
+import java.util.Collection;
import java.util.Set;
/**
@@ -76,4 +77,14 @@ public interface NetworkTopology {
*/
Set<Node> getLeaves(String loc);
+ /**
+ * Return the number of leaves in <i>scope</i> but not in
<i>excludedNodes</i>.
+ *
+ * <p>If scope starts with ~, return the number of nodes that are not
+ * in <i>scope</i> and <i>excludedNodes</i>;
+ * @param scope a path string that may start with ~
+ * @param excludedNodes a list of nodes
+ * @return number of available nodes
+ */
+ int countNumOfAvailableNodes(String scope, Collection<Node> excludedNodes);
}
diff --git
a/bookkeeper-server/src/main/java/org/apache/bookkeeper/net/NetworkTopologyImpl.java
b/bookkeeper-server/src/main/java/org/apache/bookkeeper/net/NetworkTopologyImpl.java
index d6756f8..dcf4cad 100644
---
a/bookkeeper-server/src/main/java/org/apache/bookkeeper/net/NetworkTopologyImpl.java
+++
b/bookkeeper-server/src/main/java/org/apache/bookkeeper/net/NetworkTopologyImpl.java
@@ -789,15 +789,7 @@ public class NetworkTopologyImpl implements
NetworkTopology {
}
}
- /**
- * Return the number of leaves in <i>scope</i> but not in
<i>excludedNodes</i>.
- *
- * <p>If scope starts with ~, return the number of nodes that are not
- * in <i>scope</i> and <i>excludedNodes</i>;
- * @param scope a path string that may start with ~
- * @param excludedNodes a list of nodes
- * @return number of available nodes
- */
+ @Override
public int countNumOfAvailableNodes(String scope, Collection<Node>
excludedNodes) {
boolean isExcluded = false;
if (scope.startsWith("~")) {
@@ -815,7 +807,7 @@ public class NetworkTopologyImpl implements NetworkTopology
{
}
}
Node n = getNode(scope);
- int scopeNodeCount = 1;
+ int scopeNodeCount = 0;
if (n instanceof InnerNode) {
scopeNodeCount = ((InnerNode) n).getNumOfLeaves();
}
diff --git
a/bookkeeper-server/src/main/java/org/apache/bookkeeper/net/StabilizeNetworkTopology.java
b/bookkeeper-server/src/main/java/org/apache/bookkeeper/net/StabilizeNetworkTopology.java
index 5c244f2..df80bf9 100644
---
a/bookkeeper-server/src/main/java/org/apache/bookkeeper/net/StabilizeNetworkTopology.java
+++
b/bookkeeper-server/src/main/java/org/apache/bookkeeper/net/StabilizeNetworkTopology.java
@@ -21,6 +21,7 @@ import io.netty.util.HashedWheelTimer;
import io.netty.util.Timeout;
import io.netty.util.TimerTask;
+import java.util.Collection;
import java.util.Set;
import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.ConcurrentMap;
@@ -151,4 +152,9 @@ public class StabilizeNetworkTopology implements
NetworkTopology {
public Set<Node> getLeaves(String loc) {
return impl.getLeaves(loc);
}
+
+ @Override
+ public int countNumOfAvailableNodes(String scope, Collection<Node>
excludedNodes) {
+ return impl.countNumOfAvailableNodes(scope, excludedNodes);
+ }
}
diff --git
a/bookkeeper-server/src/main/java/org/apache/bookkeeper/replication/Auditor.java
b/bookkeeper-server/src/main/java/org/apache/bookkeeper/replication/Auditor.java
index dd791be..ad58849 100644
---
a/bookkeeper-server/src/main/java/org/apache/bookkeeper/replication/Auditor.java
+++
b/bookkeeper-server/src/main/java/org/apache/bookkeeper/replication/Auditor.java
@@ -20,7 +20,6 @@
*/
package org.apache.bookkeeper.replication;
-
import static org.apache.bookkeeper.replication.ReplicationStats.AUDITOR_SCOPE;
import static
org.apache.bookkeeper.replication.ReplicationStats.AUDIT_BOOKIES_TIME;
import static
org.apache.bookkeeper.replication.ReplicationStats.BOOKIE_TO_LEDGERS_MAP_CREATION_TIME;
@@ -30,9 +29,8 @@ import static
org.apache.bookkeeper.replication.ReplicationStats.NUM_BOOKIE_AUDI
import static
org.apache.bookkeeper.replication.ReplicationStats.NUM_DELAYED_BOOKIE_AUDITS_DELAYES_CANCELLED;
import static
org.apache.bookkeeper.replication.ReplicationStats.NUM_FRAGMENTS_PER_LEDGER;
import static
org.apache.bookkeeper.replication.ReplicationStats.NUM_LEDGERS_CHECKED;
+import static
org.apache.bookkeeper.replication.ReplicationStats.NUM_LEDGERS_NOT_ADHERING_TO_PLACEMENT_POLICY;
import static
org.apache.bookkeeper.replication.ReplicationStats.NUM_UNDER_REPLICATED_LEDGERS;
-import static org.apache.bookkeeper.replication.ReplicationStats.
-
PLACEMENT_POLICY_CHECK_ENSEMBLE_NOT_ADHERING_TO_PLACEMENT_POLICY_COUNTER;
import static
org.apache.bookkeeper.replication.ReplicationStats.PLACEMENT_POLICY_CHECK_TIME;
import static
org.apache.bookkeeper.replication.ReplicationStats.URL_PUBLISH_TIME_FOR_LOST_BOOKIE;
@@ -55,6 +53,7 @@ import java.util.concurrent.Future;
import java.util.concurrent.ScheduledExecutorService;
import java.util.concurrent.ThreadFactory;
import java.util.concurrent.TimeUnit;
+import java.util.concurrent.atomic.AtomicInteger;
import java.util.stream.Collectors;
import org.apache.bookkeeper.client.BKException;
@@ -79,6 +78,7 @@ import
org.apache.bookkeeper.replication.ReplicationException.BKAuditException;
import
org.apache.bookkeeper.replication.ReplicationException.CompatibilityException;
import
org.apache.bookkeeper.replication.ReplicationException.UnavailableException;
import org.apache.bookkeeper.stats.Counter;
+import org.apache.bookkeeper.stats.Gauge;
import org.apache.bookkeeper.stats.NullStatsLogger;
import org.apache.bookkeeper.stats.OpStatsLogger;
import org.apache.bookkeeper.stats.StatsLogger;
@@ -117,6 +117,8 @@ public class Auditor implements AutoCloseable {
private volatile Future<?> auditTask;
private Set<String> bookiesToBeAudited = Sets.newHashSet();
private volatile int lostBookieRecoveryDelayBeforeChange;
+ private final AtomicInteger ledgersNotAdheringToPlacementPolicyGuageValue;
+ private final AtomicInteger numOfLedgersFoundInPlacementPolicyCheck;
private final StatsLogger statsLogger;
@StatsDoc(
@@ -175,11 +177,10 @@ public class Auditor implements AutoCloseable {
)
private final Counter numDelayedBookieAuditsCancelled;
@StatsDoc(
- name =
PLACEMENT_POLICY_CHECK_ENSEMBLE_NOT_ADHERING_TO_PLACEMENT_POLICY_COUNTER,
- help = "total number of "
- + "ledgers failed to adhere to EnsemblePlacementPolicy found in
PLACEMENT POLICY check"
+ name = NUM_LEDGERS_NOT_ADHERING_TO_PLACEMENT_POLICY,
+ help = "Gauge for number of ledgers not adhering to placement
policy found in placement policy check"
)
- private final Counter
placementPolicyCheckEnsembleNotAdheringToPlacementPolicy;
+ private final Gauge<Integer> numLedgersNotAdheringToPlacementPolicy;
static BookKeeper createBookKeeperClient(ServerConfiguration conf) throws
InterruptedException, IOException {
return createBookKeeperClient(conf, NullStatsLogger.INSTANCE);
@@ -230,6 +231,8 @@ public class Auditor implements AutoCloseable {
this.conf = conf;
this.bookieIdentifier = bookieIdentifier;
this.statsLogger = statsLogger;
+ this.numOfLedgersFoundInPlacementPolicyCheck = new AtomicInteger(0);
+ this.ledgersNotAdheringToPlacementPolicyGuageValue = new
AtomicInteger(0);
numUnderReplicatedLedger =
this.statsLogger.getOpStatsLogger(ReplicationStats.NUM_UNDER_REPLICATED_LEDGERS);
uRLPublishTimeForLostBookies = this.statsLogger
@@ -245,8 +248,20 @@ public class Auditor implements AutoCloseable {
numBookieAuditsDelayed =
this.statsLogger.getCounter(ReplicationStats.NUM_BOOKIE_AUDITS_DELAYED);
numDelayedBookieAuditsCancelled = this.statsLogger
.getCounter(ReplicationStats.NUM_DELAYED_BOOKIE_AUDITS_DELAYES_CANCELLED);
- placementPolicyCheckEnsembleNotAdheringToPlacementPolicy = statsLogger
-
.getCounter(ReplicationStats.PLACEMENT_POLICY_CHECK_ENSEMBLE_NOT_ADHERING_TO_PLACEMENT_POLICY_COUNTER);
+ numLedgersNotAdheringToPlacementPolicy = new Gauge<Integer>() {
+ @Override
+ public Integer getDefaultValue() {
+ return 0;
+ }
+
+ @Override
+ public Integer getSample() {
+ return ledgersNotAdheringToPlacementPolicyGuageValue.get();
+ }
+ };
+
this.statsLogger.registerGauge(ReplicationStats.NUM_LEDGERS_NOT_ADHERING_TO_PLACEMENT_POLICY,
+ numLedgersNotAdheringToPlacementPolicy);
+
this.bkc = bkc;
this.ownBkc = ownBkc;
initialize(conf, bkc);
@@ -612,12 +627,33 @@ public class Auditor implements AutoCloseable {
Stopwatch stopwatch = Stopwatch.createStarted();
LOG.info("Starting PlacementPolicyCheck");
placementPolicyCheck();
+ int numOfLedgersFoundInPlacementPolicyCheckValue =
numOfLedgersFoundInPlacementPolicyCheck
+ .get();
long placementPolicyCheckDuration =
stopwatch.stop().elapsed(TimeUnit.MILLISECONDS);
- LOG.info("Completed placementPolicyCheck in {}
milliSeconds", placementPolicyCheckDuration);
+ LOG.info(
+ "Completed placementPolicyCheck in {}
milliSeconds."
+ + " numOfLedgersNotAdheringToPlacementPolicy
{}",
+ placementPolicyCheckDuration,
numOfLedgersFoundInPlacementPolicyCheckValue);
+ ledgersNotAdheringToPlacementPolicyGuageValue
+
.set(numOfLedgersFoundInPlacementPolicyCheckValue);
placementPolicyCheckTime.registerSuccessfulEvent(placementPolicyCheckDuration,
TimeUnit.MILLISECONDS);
} catch (BKAuditException e) {
- LOG.error("BKAuditException running periodic
placementPolicy check", e);
+ int numOfLedgersFoundInPlacementPolicyCheckValue =
numOfLedgersFoundInPlacementPolicyCheck
+ .get();
+ if (numOfLedgersFoundInPlacementPolicyCheckValue > 0) {
+ /*
+ * Though there is BKAuditException while doing
+ * placementPolicyCheck, it found few ledgers not
+ * adhering to placement policy. So reporting it.
+ */
+ ledgersNotAdheringToPlacementPolicyGuageValue
+
.set(numOfLedgersFoundInPlacementPolicyCheckValue);
+ }
+ LOG.error(
+ "BKAuditException running periodic
placementPolicy check."
+ +
"numOfLedgersNotAdheringToPlacementPolicy {}",
+ numOfLedgersFoundInPlacementPolicyCheckValue,
e);
}
}
}, initialDelay, interval, TimeUnit.SECONDS);
@@ -893,6 +929,7 @@ public class Auditor implements AutoCloseable {
void placementPolicyCheck() throws BKAuditException {
final CountDownLatch placementPolicyCheckLatch = new CountDownLatch(1);
+ this.numOfLedgersFoundInPlacementPolicyCheck.set(0);
Processor<Long> ledgerProcessor = new Processor<Long>() {
@Override
public void process(Long ledgerId, AsyncCallback.VoidCallback
iterCallback) {
@@ -920,7 +957,7 @@ public class Auditor implements AutoCloseable {
}
}
if (foundSegmentNotAdheringToPlacementPolicy) {
-
placementPolicyCheckEnsembleNotAdheringToPlacementPolicy.inc();
+
numOfLedgersFoundInPlacementPolicyCheck.incrementAndGet();
}
} else {
if (LOG.isDebugEnabled()) {
diff --git
a/bookkeeper-server/src/main/java/org/apache/bookkeeper/replication/ReplicationStats.java
b/bookkeeper-server/src/main/java/org/apache/bookkeeper/replication/ReplicationStats.java
index e9b8999..c13bc56 100644
---
a/bookkeeper-server/src/main/java/org/apache/bookkeeper/replication/ReplicationStats.java
+++
b/bookkeeper-server/src/main/java/org/apache/bookkeeper/replication/ReplicationStats.java
@@ -40,8 +40,7 @@ public interface ReplicationStats {
String NUM_LEDGERS_CHECKED = "NUM_LEDGERS_CHECKED";
String NUM_BOOKIE_AUDITS_DELAYED = "NUM_BOOKIE_AUDITS_DELAYED";
String NUM_DELAYED_BOOKIE_AUDITS_DELAYES_CANCELLED =
"NUM_DELAYED_BOOKIE_AUDITS_CANCELLED";
- String
PLACEMENT_POLICY_CHECK_ENSEMBLE_NOT_ADHERING_TO_PLACEMENT_POLICY_COUNTER =
-
"PLACEMENT_POLICY_CHECK_ENSEMBLE_NOT_ADHERING_TO_PLACEMENT_POLICY_COUNTER";
+ String NUM_LEDGERS_NOT_ADHERING_TO_PLACEMENT_POLICY =
"NUM_LEDGERS_NOT_ADHERING_TO_PLACEMENT_POLICY";
String REPLICATION_WORKER_SCOPE = "replication_worker";
String REREPLICATE_OP = "rereplicate";
diff --git
a/bookkeeper-server/src/test/java/org/apache/bookkeeper/client/TestRackawareEnsemblePlacementPolicy.java
b/bookkeeper-server/src/test/java/org/apache/bookkeeper/client/TestRackawareEnsemblePlacementPolicy.java
index a55b560..26fa23f 100644
---
a/bookkeeper-server/src/test/java/org/apache/bookkeeper/client/TestRackawareEnsemblePlacementPolicy.java
+++
b/bookkeeper-server/src/test/java/org/apache/bookkeeper/client/TestRackawareEnsemblePlacementPolicy.java
@@ -29,6 +29,7 @@ import io.netty.util.HashedWheelTimer;
import java.net.InetAddress;
import java.util.ArrayList;
import java.util.Arrays;
+import java.util.Collection;
import java.util.HashMap;
import java.util.HashSet;
import java.util.List;
@@ -50,8 +51,12 @@ import org.apache.bookkeeper.net.BookieSocketAddress;
import org.apache.bookkeeper.net.DNSToSwitchMapping;
import org.apache.bookkeeper.net.NetworkTopology;
import org.apache.bookkeeper.net.Node;
+import org.apache.bookkeeper.stats.Gauge;
import org.apache.bookkeeper.stats.NullStatsLogger;
+import org.apache.bookkeeper.test.TestStatsProvider;
+import org.apache.bookkeeper.test.TestStatsProvider.TestStatsLogger;
import org.apache.bookkeeper.util.StaticDNSResolver;
+import org.apache.commons.collections4.CollectionUtils;
import org.junit.Test;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
@@ -740,17 +745,22 @@ public class TestRackawareEnsemblePlacementPolicy extends
TestCase {
@Test
public void testNewEnsembleWithEnforceMinNumRacks() throws Exception {
+ String defaultRackForThisTest =
NetworkTopology.DEFAULT_REGION_AND_RACK;
repp.uninitalize();
+ updateMyRack(defaultRackForThisTest);
int minNumRacksPerWriteQuorum = 4;
ClientConfiguration clientConf = new ClientConfiguration(conf);
clientConf.setMinNumRacksPerWriteQuorum(minNumRacksPerWriteQuorum);
// set enforceMinNumRacksPerWriteQuorum
clientConf.setEnforceMinNumRacksPerWriteQuorum(true);
+ TestStatsProvider statsProvider = new TestStatsProvider();
+ TestStatsLogger statsLogger = statsProvider.getStatsLogger("");
repp = new RackawareEnsemblePlacementPolicy();
- repp.initialize(clientConf, Optional.<DNSToSwitchMapping> empty(),
timer, DISABLE_ALL,
- NullStatsLogger.INSTANCE);
- repp.withDefaultRack(NetworkTopology.DEFAULT_REGION_AND_RACK);
+ repp.initialize(clientConf, Optional.<DNSToSwitchMapping> empty(),
timer, DISABLE_ALL, statsLogger);
+ repp.withDefaultRack(defaultRackForThisTest);
+ Gauge<? extends Number> numBookiesInDefaultRackGauge = statsLogger
+
.getGauge(BookKeeperClientStats.NUM_WRITABLE_BOOKIES_IN_DEFAULT_RACK);
int numOfRacks = 3;
int numOfBookiesPerRack = 5;
@@ -764,26 +774,41 @@ public class TestRackawareEnsemblePlacementPolicy extends
TestCase {
}
}
- repp.onClusterChanged(new
HashSet<BookieSocketAddress>(Arrays.asList(bookieSocketAddresses)),
- new HashSet<BookieSocketAddress>());
+ int numOfBookiesInDefaultRack = 5;
+ BookieSocketAddress[] bookieSocketAddressesInDefaultRack = new
BookieSocketAddress[numOfBookiesInDefaultRack];
+ for (int i = 0; i < numOfBookiesInDefaultRack; i++) {
+ bookieSocketAddressesInDefaultRack[i] = new
BookieSocketAddress("128.0.0." + (100 + i), 3181);
+
StaticDNSResolver.addNodeToRack(bookieSocketAddressesInDefaultRack[i].getHostName(),
+ defaultRackForThisTest);
+ }
+
+ List<BookieSocketAddress> nonDefaultRackBookiesList =
Arrays.asList(bookieSocketAddresses);
+ List<BookieSocketAddress> defaultRackBookiesList =
Arrays.asList(bookieSocketAddressesInDefaultRack);
+ Set<BookieSocketAddress> writableBookies = new
HashSet<BookieSocketAddress>(nonDefaultRackBookiesList);
+ writableBookies.addAll(defaultRackBookiesList);
+ repp.onClusterChanged(writableBookies, new
HashSet<BookieSocketAddress>());
+ assertEquals("NUM_WRITABLE_BOOKIES_IN_DEFAULT_RACK guage value",
numOfBookiesInDefaultRack,
+ numBookiesInDefaultRackGauge.getSample());
try {
+ // this newEnsemble call will exclude default rack bookies
repp.newEnsemble(8, 4, 4, null, new HashSet<>());
- fail("Should get not enough bookies exception since there are only
3 racks");
+ fail("Should get not enough bookies exception since there are only
3 non-default racks");
} catch (BKNotEnoughBookiesException bnebe) {
}
try {
- repp.newEnsemble(8, 4, 4, new HashSet<>(),
+ repp.newEnsemble(8, 4, 4, new HashSet<>(defaultRackBookiesList),
EnsembleForReplacementWithNoConstraints.INSTANCE,
TruePredicate.INSTANCE);
- fail("Should get not enough bookies exception since there are only
3 racks");
+ fail("Should get not enough bookies exception since there are only
3 non-default racks"
+ + " and defaultrack bookies are excluded");
} catch (BKNotEnoughBookiesException bnebe) {
}
/*
* Though minNumRacksPerWriteQuorum is set to 4, since writeQuorum is 3
- * and there are enough bookies in 3 racks, this newEnsemble calls
should
- * succeed.
+ * and there are enough bookies in 3 racks, this newEnsemble calls
+ * should succeed.
*/
EnsemblePlacementPolicy.PlacementResult<List<BookieSocketAddress>>
ensembleResponse;
List<BookieSocketAddress> ensemble;
@@ -799,8 +824,9 @@ public class TestRackawareEnsemblePlacementPolicy extends
TestCase {
getNumCoveredWriteQuorums(ensemble, writeQuorumSize,
clientConf.getMinNumRacksPerWriteQuorum()));
assertTrue(isEnsembleAdheringToPlacementPolicy);
- ensembleResponse = repp.newEnsemble(ensembleSize, writeQuorumSize,
ackQuorumSize, new HashSet<>(),
- EnsembleForReplacementWithNoConstraints.INSTANCE,
TruePredicate.INSTANCE);
+ ensembleResponse = repp.newEnsemble(ensembleSize, writeQuorumSize,
ackQuorumSize,
+ new HashSet<>(defaultRackBookiesList),
EnsembleForReplacementWithNoConstraints.INSTANCE,
+ TruePredicate.INSTANCE);
ensemble = ensembleResponse.getResult();
isEnsembleAdheringToPlacementPolicy =
ensembleResponse.isStrictlyAdheringToPolicy();
assertEquals("Number of writeQuorum sets covered", ensembleSize,
@@ -872,17 +898,23 @@ public class TestRackawareEnsemblePlacementPolicy extends
TestCase {
@Test
public void testReplaceBookieWithEnforceMinNumRacks() throws Exception {
+ String defaultRackForThisTest =
NetworkTopology.DEFAULT_REGION_AND_RACK;
repp.uninitalize();
+ updateMyRack(defaultRackForThisTest);
int minNumRacksPerWriteQuorum = 4;
ClientConfiguration clientConf = new ClientConfiguration(conf);
clientConf.setMinNumRacksPerWriteQuorum(minNumRacksPerWriteQuorum);
// set enforceMinNumRacksPerWriteQuorum
clientConf.setEnforceMinNumRacksPerWriteQuorum(true);
+ TestStatsProvider statsProvider = new TestStatsProvider();
+ TestStatsLogger statsLogger = statsProvider.getStatsLogger("");
repp = new RackawareEnsemblePlacementPolicy();
repp.initialize(clientConf, Optional.<DNSToSwitchMapping> empty(),
timer, DISABLE_ALL,
- NullStatsLogger.INSTANCE);
- repp.withDefaultRack(NetworkTopology.DEFAULT_REGION_AND_RACK);
+ statsLogger);
+ repp.withDefaultRack(defaultRackForThisTest);
+ Gauge<? extends Number> numBookiesInDefaultRackGauge = statsLogger
+
.getGauge(BookKeeperClientStats.NUM_WRITABLE_BOOKIES_IN_DEFAULT_RACK);
int numOfRacks = 3;
int numOfBookiesPerRack = 5;
@@ -900,8 +932,25 @@ public class TestRackawareEnsemblePlacementPolicy extends
TestCase {
bookieRackMap.put(bookieAddress, rack);
}
}
+ /*
+ * bookies in this default rack should not be returned for
replacebookie
+ * response.
+ */
+ int numOfBookiesInDefaultRack = 5;
+ BookieSocketAddress[] bookieSocketAddressesInDefaultRack = new
BookieSocketAddress[numOfBookiesInDefaultRack];
+ for (int i = 0; i < numOfBookiesInDefaultRack; i++) {
+ bookieSocketAddressesInDefaultRack[i] = new
BookieSocketAddress("127.0.0." + (i + 100), 3181);
+
StaticDNSResolver.addNodeToRack(bookieSocketAddressesInDefaultRack[i].getHostName(),
+ defaultRackForThisTest);
+ }
- repp.onClusterChanged(bookieSocketAddresses, new
HashSet<BookieSocketAddress>());
+ Set<BookieSocketAddress> nonDefaultRackBookiesList =
bookieSocketAddresses;
+ List<BookieSocketAddress> defaultRackBookiesList =
Arrays.asList(bookieSocketAddressesInDefaultRack);
+ Set<BookieSocketAddress> writableBookies = new
HashSet<BookieSocketAddress>(nonDefaultRackBookiesList);
+ writableBookies.addAll(defaultRackBookiesList);
+ repp.onClusterChanged(writableBookies, new
HashSet<BookieSocketAddress>());
+ assertEquals("NUM_WRITABLE_BOOKIES_IN_DEFAULT_RACK guage value",
numOfBookiesInDefaultRack,
+ numBookiesInDefaultRackGauge.getSample());
/*
* Though minNumRacksPerWriteQuorum is set to 4, since writeQuorum is 3
@@ -927,9 +976,12 @@ public class TestRackawareEnsemblePlacementPolicy extends
TestCase {
*/
StaticDNSResolver.addNodeToRack(newBookieAddress1.getHostName(),
rackOfOtherBookieInEnsemble);
bookieSocketAddresses.add(newBookieAddress1);
+ writableBookies.add(newBookieAddress1);
bookieRackMap.put(newBookieAddress1, rackOfOtherBookieInEnsemble);
- repp.onClusterChanged(bookieSocketAddresses, new
HashSet<BookieSocketAddress>());
+ repp.onClusterChanged(writableBookies, new
HashSet<BookieSocketAddress>());
+ assertEquals("NUM_WRITABLE_BOOKIES_IN_DEFAULT_RACK guage value",
numOfBookiesInDefaultRack,
+ numBookiesInDefaultRackGauge.getSample());
try {
repp.replaceBookie(ensembleSize, writeQuorumSize, ackQuorumSize,
null,
ensemble, bookieInEnsembleToBeReplaced, new HashSet<>());
@@ -947,9 +999,12 @@ public class TestRackawareEnsemblePlacementPolicy extends
TestCase {
*/
StaticDNSResolver.addNodeToRack(newBookieAddress2.getHostName(),
newRack);
bookieSocketAddresses.add(newBookieAddress2);
+ writableBookies.add(newBookieAddress2);
bookieRackMap.put(newBookieAddress2, newRack);
- repp.onClusterChanged(bookieSocketAddresses, new
HashSet<BookieSocketAddress>());
+ repp.onClusterChanged(writableBookies, new
HashSet<BookieSocketAddress>());
+ assertEquals("NUM_WRITABLE_BOOKIES_IN_DEFAULT_RACK guage value",
numOfBookiesInDefaultRack,
+ numBookiesInDefaultRackGauge.getSample());
/*
* this replaceBookie should succeed, because a new bookie is added to
a
* new rack.
@@ -966,7 +1021,9 @@ public class TestRackawareEnsemblePlacementPolicy extends
TestCase {
Set<BookieSocketAddress> bookiesToExclude = new HashSet<>();
bookiesToExclude.add(newBookieAddress2);
- repp.onClusterChanged(bookieSocketAddresses, new
HashSet<BookieSocketAddress>());
+ repp.onClusterChanged(writableBookies, new
HashSet<BookieSocketAddress>());
+ assertEquals("NUM_WRITABLE_BOOKIES_IN_DEFAULT_RACK guage value",
numOfBookiesInDefaultRack,
+ numBookiesInDefaultRackGauge.getSample());
try {
repp.replaceBookie(ensembleSize, writeQuorumSize, ackQuorumSize,
null, ensemble,
bookieInEnsembleToBeReplaced, bookiesToExclude);
@@ -984,9 +1041,12 @@ public class TestRackawareEnsemblePlacementPolicy extends
TestCase {
*/
StaticDNSResolver.addNodeToRack(newBookieAddress3.getHostName(),
rackOfBookieToBeReplaced);
bookieSocketAddresses.add(newBookieAddress3);
+ writableBookies.add(newBookieAddress3);
bookieRackMap.put(newBookieAddress3, rackOfBookieToBeReplaced);
- repp.onClusterChanged(bookieSocketAddresses, new
HashSet<BookieSocketAddress>());
+ repp.onClusterChanged(writableBookies, new
HashSet<BookieSocketAddress>());
+ assertEquals("NUM_WRITABLE_BOOKIES_IN_DEFAULT_RACK guage value",
numOfBookiesInDefaultRack,
+ numBookiesInDefaultRackGauge.getSample());
/*
* here we have added new bookie to the rack of the bookie to be
* replaced, so we should be able to replacebookie though
@@ -2023,4 +2083,154 @@ public class TestRackawareEnsemblePlacementPolicy
extends TestCase {
assertTrue(shuffleOccurred);
}
+ @Test
+ public void testNumBookiesInDefaultRackGauge() throws Exception {
+ String defaultRackForThisTest =
NetworkTopology.DEFAULT_REGION_AND_RACK;
+ repp.uninitalize();
+ updateMyRack(defaultRackForThisTest);
+
+ // Update cluster
+ BookieSocketAddress newAddr1 = new BookieSocketAddress("127.0.0.100",
3181);
+ BookieSocketAddress newAddr2 = new BookieSocketAddress("127.0.0.101",
3181);
+ BookieSocketAddress newAddr3 = new BookieSocketAddress("127.0.0.102",
3181);
+ BookieSocketAddress newAddr4 = new BookieSocketAddress("127.0.0.103",
3181);
+
+ // update dns mapping
+ StaticDNSResolver.addNodeToRack(newAddr1.getHostName(),
defaultRackForThisTest);
+ StaticDNSResolver.addNodeToRack(newAddr2.getHostName(),
"/default-region/r2");
+ StaticDNSResolver.addNodeToRack(newAddr3.getHostName(),
"/default-region/r3");
+ StaticDNSResolver.addNodeToRack(newAddr4.getHostName(),
defaultRackForThisTest);
+
+ TestStatsProvider statsProvider = new TestStatsProvider();
+ TestStatsLogger statsLogger = statsProvider.getStatsLogger("");
+
+ repp = new RackawareEnsemblePlacementPolicy();
+ repp.initialize(conf, Optional.<DNSToSwitchMapping> empty(), timer,
DISABLE_ALL, statsLogger);
+ repp.withDefaultRack(defaultRackForThisTest);
+
+ Gauge<? extends Number> numBookiesInDefaultRackGauge = statsLogger
+
.getGauge(BookKeeperClientStats.NUM_WRITABLE_BOOKIES_IN_DEFAULT_RACK);
+
+ Set<BookieSocketAddress> writeableBookies = new
HashSet<BookieSocketAddress>();
+ writeableBookies.add(newAddr1);
+ writeableBookies.add(newAddr2);
+ Set<BookieSocketAddress> readOnlyBookies = new
HashSet<BookieSocketAddress>();
+ readOnlyBookies.add(newAddr3);
+ readOnlyBookies.add(newAddr4);
+ repp.onClusterChanged(writeableBookies, readOnlyBookies);
+ // only writable bookie - newAddr1 in default rack
+ assertEquals("NUM_WRITABLE_BOOKIES_IN_DEFAULT_RACK guage value", 1,
numBookiesInDefaultRackGauge.getSample());
+
+ readOnlyBookies.remove(newAddr4);
+ writeableBookies.add(newAddr4);
+ repp.onClusterChanged(writeableBookies, readOnlyBookies);
+ // newAddr4 is also added to writable bookie so 2 writable bookies -
+ // newAddr1 and newAddr4
+ assertEquals("NUM_WRITABLE_BOOKIES_IN_DEFAULT_RACK guage value", 2,
numBookiesInDefaultRackGauge.getSample());
+
+ // newAddr4 rack is changed and it is not in default anymore
+ StaticDNSResolver.changeRack(Arrays.asList(newAddr4),
Arrays.asList("/default-region/r4"));
+ assertEquals("NUM_WRITABLE_BOOKIES_IN_DEFAULT_RACK guage value", 1,
numBookiesInDefaultRackGauge.getSample());
+
+ writeableBookies.clear();
+ // writeableBookies is empty so 0 writable bookies in default rack
+ repp.onClusterChanged(writeableBookies, readOnlyBookies);
+ assertEquals("NUM_WRITABLE_BOOKIES_IN_DEFAULT_RACK guage value", 0,
numBookiesInDefaultRackGauge.getSample());
+
+ StaticDNSResolver.changeRack(Arrays.asList(newAddr1),
Arrays.asList("/default-region/r2"));
+ readOnlyBookies.clear();
+ writeableBookies.add(newAddr1);
+ writeableBookies.add(newAddr2);
+ writeableBookies.add(newAddr3);
+ writeableBookies.add(newAddr4);
+ repp.onClusterChanged(writeableBookies, readOnlyBookies);
+ // newAddr1 rack is changed and it is not in default anymore. So no
+ // bookies in default rack anymore
+ assertEquals("NUM_WRITABLE_BOOKIES_IN_DEFAULT_RACK guage value", 0,
numBookiesInDefaultRackGauge.getSample());
+ }
+
+ @Test
+ public void testNewEnsembleExcludesDefaultRackBookiesEnforceMinNumRacks()
throws Exception {
+ String defaultRackForThisTest =
NetworkTopology.DEFAULT_REGION_AND_RACK;
+ repp.uninitalize();
+ updateMyRack(defaultRackForThisTest);
+ int minNumRacksPerWriteQuorum = 4;
+ ClientConfiguration clientConf = new ClientConfiguration(conf);
+ clientConf.setMinNumRacksPerWriteQuorum(minNumRacksPerWriteQuorum);
+ // set enforceMinNumRacksPerWriteQuorum
+ clientConf.setEnforceMinNumRacksPerWriteQuorum(true);
+
+ TestStatsProvider statsProvider = new TestStatsProvider();
+ TestStatsLogger statsLogger = statsProvider.getStatsLogger("");
+
+ repp = new RackawareEnsemblePlacementPolicy();
+ repp.initialize(clientConf, Optional.<DNSToSwitchMapping> empty(),
timer, DISABLE_ALL, statsLogger);
+ repp.withDefaultRack(defaultRackForThisTest);
+ Gauge<? extends Number> numBookiesInDefaultRackGauge = statsLogger
+
.getGauge(BookKeeperClientStats.NUM_WRITABLE_BOOKIES_IN_DEFAULT_RACK);
+
+ int writeQuorumSize = 3;
+ int ackQuorumSize = 3;
+ int effectiveMinNumRacksPerWriteQuorum =
Math.min(minNumRacksPerWriteQuorum, writeQuorumSize);
+
+ int numOfRacks = 2 * effectiveMinNumRacksPerWriteQuorum - 1;
+ int numOfBookiesPerRack = 20;
+ BookieSocketAddress[] bookieSocketAddresses = new
BookieSocketAddress[numOfRacks * numOfBookiesPerRack];
+
+ for (int i = 0; i < numOfRacks; i++) {
+ for (int j = 0; j < numOfBookiesPerRack; j++) {
+ int index = i * numOfBookiesPerRack + j;
+ bookieSocketAddresses[index] = new
BookieSocketAddress("128.0.0." + index, 3181);
+
StaticDNSResolver.addNodeToRack(bookieSocketAddresses[index].getHostName(),
"/default-region/r" + i);
+ }
+ }
+
+ int numOfBookiesInDefaultRack = 10;
+ BookieSocketAddress[] bookieSocketAddressesInDefaultRack = new
BookieSocketAddress[numOfBookiesInDefaultRack];
+ for (int i = 0; i < numOfBookiesInDefaultRack; i++) {
+ bookieSocketAddressesInDefaultRack[i] = new
BookieSocketAddress("127.0.0." + (i + 100), 3181);
+
StaticDNSResolver.addNodeToRack(bookieSocketAddressesInDefaultRack[i].getHostName(),
+ defaultRackForThisTest);
+ }
+
+ Set<BookieSocketAddress> writableBookies = new
HashSet<BookieSocketAddress>(
+ Arrays.asList(bookieSocketAddresses));
+
writableBookies.addAll(Arrays.asList(bookieSocketAddressesInDefaultRack));
+ repp.onClusterChanged(writableBookies, new
HashSet<BookieSocketAddress>());
+ assertEquals("NUM_WRITABLE_BOOKIES_IN_DEFAULT_RACK guage value",
numOfBookiesInDefaultRack,
+ numBookiesInDefaultRackGauge.getSample());
+
+ /*
+ * in this scenario we have enough number of racks (2 *
+ * effectiveMinNumRacksPerWriteQuorum - 1) and more number of bookies
in
+ * each rack. So we should be able to create ensemble for all
+ * ensembleSizes (as long as there are enough number of bookies in each
+ * rack).
+ *
+ * Since minNumRacksPerWriteQuorum is enforced, it shouldn't select
node
+ * from default rack.
+ */
+ EnsemblePlacementPolicy.PlacementResult<List<BookieSocketAddress>>
ensembleResponse;
+ List<BookieSocketAddress> ensemble;
+ boolean isEnsembleAdheringToPlacementPolicy;
+ for (int ensembleSize = effectiveMinNumRacksPerWriteQuorum;
ensembleSize < 40; ensembleSize++) {
+ ensembleResponse = repp.newEnsemble(ensembleSize, writeQuorumSize,
ackQuorumSize, null, new HashSet<>());
+ ensemble = ensembleResponse.getResult();
+ isEnsembleAdheringToPlacementPolicy =
ensembleResponse.isStrictlyAdheringToPolicy();
+ assertEquals("Number of writeQuorum sets covered", ensembleSize,
+ getNumCoveredWriteQuorums(ensemble, writeQuorumSize,
clientConf.getMinNumRacksPerWriteQuorum()));
+ assertTrue(isEnsembleAdheringToPlacementPolicy);
+
+ ensembleResponse = repp.newEnsemble(ensembleSize, writeQuorumSize,
ackQuorumSize, null, new HashSet<>());
+ ensemble = ensembleResponse.getResult();
+ isEnsembleAdheringToPlacementPolicy =
ensembleResponse.isStrictlyAdheringToPolicy();
+ assertEquals("Number of writeQuorum sets covered", ensembleSize,
+ getNumCoveredWriteQuorums(ensemble, writeQuorumSize,
clientConf.getMinNumRacksPerWriteQuorum()));
+ assertTrue(isEnsembleAdheringToPlacementPolicy);
+ Collection<BookieSocketAddress> bookiesOfDefaultRackInEnsemble =
CollectionUtils
+
.intersection(Arrays.asList(bookieSocketAddressesInDefaultRack), ensemble);
+ assertTrue("Ensemble is not supposed to contain bookies from
default rack, but ensemble contains - "
+ + bookiesOfDefaultRackInEnsemble,
bookiesOfDefaultRackInEnsemble.isEmpty());
+ }
+ }
}
diff --git
a/bookkeeper-server/src/test/java/org/apache/bookkeeper/replication/AuditorPlacementPolicyCheckTest.java
b/bookkeeper-server/src/test/java/org/apache/bookkeeper/replication/AuditorPlacementPolicyCheckTest.java
index 68b5df2..69134a3 100644
---
a/bookkeeper-server/src/test/java/org/apache/bookkeeper/replication/AuditorPlacementPolicyCheckTest.java
+++
b/bookkeeper-server/src/test/java/org/apache/bookkeeper/replication/AuditorPlacementPolicyCheckTest.java
@@ -52,10 +52,10 @@ import org.apache.bookkeeper.net.BookieSocketAddress;
import org.apache.bookkeeper.replication.AuditorPeriodicCheckTest.TestAuditor;
import
org.apache.bookkeeper.replication.ReplicationException.CompatibilityException;
import
org.apache.bookkeeper.replication.ReplicationException.UnavailableException;
+import org.apache.bookkeeper.stats.Gauge;
import org.apache.bookkeeper.stats.NullStatsLogger;
import org.apache.bookkeeper.test.BookKeeperClusterTestCase;
import org.apache.bookkeeper.test.TestStatsProvider;
-import org.apache.bookkeeper.test.TestStatsProvider.TestCounter;
import org.apache.bookkeeper.test.TestStatsProvider.TestOpStatsLogger;
import org.apache.bookkeeper.test.TestStatsProvider.TestStatsLogger;
import org.apache.bookkeeper.util.StaticDNSResolver;
@@ -182,14 +182,14 @@ public class AuditorPlacementPolicyCheckTest extends
BookKeeperClusterTestCase {
MutableObject<Auditor> auditorRef = new MutableObject<Auditor>();
try {
TestStatsLogger statsLogger =
startAuditorAndWaitForPlacementPolicyCheck(servConf, auditorRef);
- TestCounter
placementPolicyCheckEnsembleNotAdheringToPlacementPolicy = (TestCounter)
statsLogger.getCounter(
-
ReplicationStats.PLACEMENT_POLICY_CHECK_ENSEMBLE_NOT_ADHERING_TO_PLACEMENT_POLICY_COUNTER);
+ Gauge<? extends Number> ledgersNotAdheringToPlacementPolicyGuage =
statsLogger
+
.getGauge(ReplicationStats.NUM_LEDGERS_NOT_ADHERING_TO_PLACEMENT_POLICY);
/*
* since all of the bookies are in different racks, there
shouldn't be any ledger not adhering
* to placement policy.
*/
-
assertEquals("PLACEMENT_POLICY_CHECK_ENSEMBLE_NOT_ADHERING_TO_PLACEMENT_POLICY_COUNTER
SuccessCount", 0L,
-
placementPolicyCheckEnsembleNotAdheringToPlacementPolicy.get().longValue());
+ assertEquals("NUM_LEDGERS_NOT_ADHERING_TO_PLACEMENT_POLICY guage
value", 0,
+ ledgersNotAdheringToPlacementPolicyGuage.getSample());
} finally {
Auditor auditor = auditorRef.getValue();
if (auditor != null) {
@@ -263,11 +263,10 @@ public class AuditorPlacementPolicyCheckTest extends
BookKeeperClusterTestCase {
MutableObject<Auditor> auditorRef = new MutableObject<Auditor>();
try {
TestStatsLogger statsLogger =
startAuditorAndWaitForPlacementPolicyCheck(servConf, auditorRef);
- TestCounter
placementPolicyCheckEnsembleNotAdheringToPlacementPolicy = (TestCounter)
statsLogger.getCounter(
-
ReplicationStats.PLACEMENT_POLICY_CHECK_ENSEMBLE_NOT_ADHERING_TO_PLACEMENT_POLICY_COUNTER);
-
assertEquals("PLACEMENT_POLICY_CHECK_ENSEMBLE_NOT_ADHERING_TO_PLACEMENT_POLICY_COUNTER
SuccessCount",
- (long) numOfLedgersNotAdheringToPlacementPolicy,
-
placementPolicyCheckEnsembleNotAdheringToPlacementPolicy.get().longValue());
+ Gauge<? extends Number> ledgersNotAdheringToPlacementPolicyGuage =
statsLogger
+
.getGauge(ReplicationStats.NUM_LEDGERS_NOT_ADHERING_TO_PLACEMENT_POLICY);
+ assertEquals("NUM_LEDGERS_NOT_ADHERING_TO_PLACEMENT_POLICY guage
value",
+ numOfLedgersNotAdheringToPlacementPolicy,
ledgersNotAdheringToPlacementPolicyGuage.getSample());
} finally {
Auditor auditor = auditorRef.getValue();
if (auditor != null) {
@@ -362,11 +361,10 @@ public class AuditorPlacementPolicyCheckTest extends
BookKeeperClusterTestCase {
MutableObject<Auditor> auditorRef = new MutableObject<Auditor>();
try {
TestStatsLogger statsLogger =
startAuditorAndWaitForPlacementPolicyCheck(servConf, auditorRef);
- TestCounter
placementPolicyCheckEnsembleNotAdheringToPlacementPolicy = (TestCounter)
statsLogger.getCounter(
-
ReplicationStats.PLACEMENT_POLICY_CHECK_ENSEMBLE_NOT_ADHERING_TO_PLACEMENT_POLICY_COUNTER);
-
assertEquals("PLACEMENT_POLICY_CHECK_ENSEMBLE_NOT_ADHERING_TO_PLACEMENT_POLICY_COUNTER
SuccessCount",
- (long) numOfLedgersNotAdheringToPlacementPolicy,
-
placementPolicyCheckEnsembleNotAdheringToPlacementPolicy.get().longValue());
+ Gauge<? extends Number> ledgersNotAdheringToPlacementPolicyGuage =
statsLogger
+
.getGauge(ReplicationStats.NUM_LEDGERS_NOT_ADHERING_TO_PLACEMENT_POLICY);
+ assertEquals("NUM_LEDGERS_NOT_ADHERING_TO_PLACEMENT_POLICY gauge
value",
+ numOfLedgersNotAdheringToPlacementPolicy,
ledgersNotAdheringToPlacementPolicyGuage.getSample());
} finally {
Auditor auditor = auditorRef.getValue();
if (auditor != null) {