This is an automated email from the ASF dual-hosted git repository.
sijie pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/bookkeeper.git
The following commit(s) were added to refs/heads/master by this push:
new 9ba4c4e Bookies should be from different racks in a Writequorum.
9ba4c4e is described below
commit 9ba4c4e0d8be770e03110a958fb8b75a65ae0f59
Author: cguttapalem <[email protected]>
AuthorDate: Mon May 21 14:53:43 2018 -0700
Bookies should be from different racks in a Writequorum.
Descriptions of the changes in this PR:
Ideally RackAwareEnsemblePlacementPolicy should try to select
bookies from different racks for a write quorum. So in a
WriteQuorum, bookies should be from WriteQuorum number of racks,
provided there are sufficient number of available racks and
bookies.
Author: cguttapalem <[email protected]>
Reviewers: Sijie Guo <[email protected]>
This closes #1397 from reddycharan/configracks
---
.../client/RackawareEnsemblePlacementPolicy.java | 8 +-
.../RackawareEnsemblePlacementPolicyImpl.java | 67 ++++++++++-
.../client/RegionAwareEnsemblePlacementPolicy.java | 12 +-
.../TopologyAwareEnsemblePlacementPolicy.java | 55 ++++++++--
.../bookkeeper/conf/AbstractConfiguration.java | 16 +++
.../apache/bookkeeper/net/NetworkTopologyImpl.java | 8 +-
.../TestRackawareEnsemblePlacementPolicy.java | 122 ++++++++++++++++++---
.../TestRackawarePolicyNotificationUpdates.java | 20 +++-
8 files changed, 262 insertions(+), 46 deletions(-)
diff --git
a/bookkeeper-server/src/main/java/org/apache/bookkeeper/client/RackawareEnsemblePlacementPolicy.java
b/bookkeeper-server/src/main/java/org/apache/bookkeeper/client/RackawareEnsemblePlacementPolicy.java
index def9554..32f94f3 100644
---
a/bookkeeper-server/src/main/java/org/apache/bookkeeper/client/RackawareEnsemblePlacementPolicy.java
+++
b/bookkeeper-server/src/main/java/org/apache/bookkeeper/client/RackawareEnsemblePlacementPolicy.java
@@ -52,15 +52,17 @@ public class RackawareEnsemblePlacementPolicy extends
RackawareEnsemblePlacement
int
stabilizePeriodSeconds,
boolean isWeighted,
int
maxWeightMultiple,
+ int
minNumRacksPerWriteQuorum,
StatsLogger
statsLogger) {
if (stabilizePeriodSeconds > 0) {
- super.initialize(dnsResolver, timer, reorderReadsRandom, 0,
isWeighted, maxWeightMultiple, statsLogger);
+ super.initialize(dnsResolver, timer, reorderReadsRandom, 0,
isWeighted, maxWeightMultiple,
+ minNumRacksPerWriteQuorum, statsLogger);
slave = new
RackawareEnsemblePlacementPolicyImpl(enforceDurability);
slave.initialize(dnsResolver, timer, reorderReadsRandom,
stabilizePeriodSeconds, isWeighted,
- maxWeightMultiple, statsLogger);
+ maxWeightMultiple, minNumRacksPerWriteQuorum, statsLogger);
} else {
super.initialize(dnsResolver, timer, reorderReadsRandom,
stabilizePeriodSeconds, isWeighted,
- maxWeightMultiple, statsLogger);
+ maxWeightMultiple, minNumRacksPerWriteQuorum, statsLogger);
slave = null;
}
return this;
diff --git
a/bookkeeper-server/src/main/java/org/apache/bookkeeper/client/RackawareEnsemblePlacementPolicyImpl.java
b/bookkeeper-server/src/main/java/org/apache/bookkeeper/client/RackawareEnsemblePlacementPolicyImpl.java
index 568debf..92f8701 100644
---
a/bookkeeper-server/src/main/java/org/apache/bookkeeper/client/RackawareEnsemblePlacementPolicyImpl.java
+++
b/bookkeeper-server/src/main/java/org/apache/bookkeeper/client/RackawareEnsemblePlacementPolicyImpl.java
@@ -79,6 +79,7 @@ public class RackawareEnsemblePlacementPolicyImpl extends
TopologyAwareEnsembleP
int maxWeightMultiple;
private Map<BookieNode, WeightedObject> bookieInfoMap = new
HashMap<BookieNode, WeightedObject>();
private WeightedRandomSelection<BookieNode> weightedSelection;
+ private int minNumRacksPerWriteQuorum;
public static final String REPP_DNS_RESOLVER_CLASS =
"reppDnsResolverClass";
public static final String REPP_RANDOM_READ_REORDERING =
"ensembleRandomReadReordering";
@@ -233,6 +234,7 @@ public class RackawareEnsemblePlacementPolicyImpl extends
TopologyAwareEnsembleP
int
stabilizePeriodSeconds,
boolean
isWeighted,
int
maxWeightMultiple,
+ int
minNumRacksPerWriteQuorum,
StatsLogger
statsLogger) {
checkNotNull(statsLogger, "statsLogger should not be null, use
NullStatsLogger instead.");
this.statsLogger = statsLogger;
@@ -242,6 +244,7 @@ public class RackawareEnsemblePlacementPolicyImpl extends
TopologyAwareEnsembleP
this.stabilizePeriodSeconds = stabilizePeriodSeconds;
this.dnsResolver = new DNSResolverDecorator(dnsResolver, () ->
this.getDefaultRack());
this.timer = timer;
+ this.minNumRacksPerWriteQuorum = minNumRacksPerWriteQuorum;
// create the network topology
if (stabilizePeriodSeconds > 0) {
@@ -329,6 +332,7 @@ public class RackawareEnsemblePlacementPolicyImpl extends
TopologyAwareEnsembleP
conf.getNetworkTopologyStabilizePeriodSeconds(),
conf.getDiskWeightBasedPlacementEnabled(),
conf.getBookieMaxWeightMultipleForWeightBasedPlacement(),
+ conf.getMinNumRacksPerWriteQuorum(),
statsLogger);
}
@@ -512,6 +516,7 @@ public class RackawareEnsemblePlacementPolicyImpl extends
TopologyAwareEnsembleP
rwLock.readLock().lock();
try {
Set<Node> excludeNodes = convertBookiesToNodes(excludeBookies);
+ int minNumRacksPerWriteQuorumForThisEnsemble =
Math.min(writeQuorumSize, minNumRacksPerWriteQuorum);
RRTopologyAwareCoverageEnsemble ensemble =
new RRTopologyAwareCoverageEnsemble(
ensembleSize,
@@ -519,7 +524,8 @@ public class RackawareEnsemblePlacementPolicyImpl extends
TopologyAwareEnsembleP
ackQuorumSize,
RACKNAME_DISTANCE_FROM_LEAVES,
parentEnsemble,
- parentPredicate);
+ parentPredicate,
+ minNumRacksPerWriteQuorumForThisEnsemble);
BookieNode prevNode = null;
int numRacks = topology.getNumOfRacks();
// only one rack, use the random algorithm.
@@ -532,7 +538,9 @@ public class RackawareEnsemblePlacementPolicyImpl extends
TopologyAwareEnsembleP
}
return addrs;
}
- // pick nodes by racks, to ensure there is at least two racks per
write quorum.
+ // pick nodes by racks, to ensure there is at least write quorum
number of racks.
+ int idx = 0;
+ String[] racks = new String[ensembleSize];
for (int i = 0; i < ensembleSize; i++) {
String curRack;
if (null == prevNode) {
@@ -542,9 +550,62 @@ public class RackawareEnsemblePlacementPolicyImpl extends
TopologyAwareEnsembleP
curRack = localNode.getNetworkLocation();
}
} else {
- curRack = "~" + prevNode.getNetworkLocation();
+ StringBuilder sb = new StringBuilder();
+ sb.append("~");
+
+ if (writeQuorumSize > 1) {
+ /*
+ * RackAwareEnsemblePlacementPolicy should try to
select
+ * bookies from atleast
+ * minNumRacksPerWriteQuorumForThisEnsemble number of
+ * different racks for a write quorum. So in a
+ * WriteQuorum, bookies should be from
+ * minNumRacksPerWriteQuorumForThisEnsemble number of
+ * racks. So we would add racks of
+ * (minNumRacksPerWriteQuorumForThisEnsemble-1)
+ * neighbours (both sides) to the exclusion list
+ * (~curRack).
+ */
+ for (int j = 1; j <
minNumRacksPerWriteQuorumForThisEnsemble; j++) {
+ int nextIndex = i + j;
+ if (nextIndex >= ensembleSize) {
+ nextIndex %= ensembleSize;
+ }
+ /*
+ * if racks[nextIndex] is null, then it means
bookie
+ * is not yet selected for ensemble at 'nextIndex'
+ * index.
+ */
+ if (racks[nextIndex] != null) {
+ if (!((sb.length() == 1) && (sb.charAt(0) ==
'~'))) {
+
sb.append(NetworkTopologyImpl.NODE_SEPARATOR);
+ }
+ sb.append(racks[nextIndex]);
+ }
+ }
+
+ for (int j = 1; j <
minNumRacksPerWriteQuorumForThisEnsemble; j++) {
+ int nextIndex = i - j;
+ if (nextIndex < 0) {
+ nextIndex += ensembleSize;
+ }
+ /*
+ * if racks[nextIndex] is null, then it means
bookie
+ * is not yet selected for ensemble at 'nextIndex'
+ * index.
+ */
+ if (racks[nextIndex] != null) {
+ if (!((sb.length() == 1) && (sb.charAt(0) ==
'~'))) {
+
sb.append(NetworkTopologyImpl.NODE_SEPARATOR);
+ }
+ sb.append(racks[nextIndex]);
+ }
+ }
+ }
+ curRack = sb.toString();
}
prevNode = selectFromNetworkLocation(curRack, excludeNodes,
ensemble, ensemble);
+ racks[i] = prevNode.getNetworkLocation();
}
ArrayList<BookieSocketAddress> bookieList = ensemble.toList();
if (ensembleSize != bookieList.size()) {
diff --git
a/bookkeeper-server/src/main/java/org/apache/bookkeeper/client/RegionAwareEnsemblePlacementPolicy.java
b/bookkeeper-server/src/main/java/org/apache/bookkeeper/client/RegionAwareEnsemblePlacementPolicy.java
index 0b804e7..6e2e742 100644
---
a/bookkeeper-server/src/main/java/org/apache/bookkeeper/client/RegionAwareEnsemblePlacementPolicy.java
+++
b/bookkeeper-server/src/main/java/org/apache/bookkeeper/client/RegionAwareEnsemblePlacementPolicy.java
@@ -75,6 +75,7 @@ public class RegionAwareEnsemblePlacementPolicy extends
RackawareEnsemblePlaceme
protected boolean enableValidation = true;
protected boolean enforceDurabilityInReplace = false;
protected Feature disableDurabilityFeature;
+ protected int minNumRacksPerWriteQuorum;
RegionAwareEnsemblePlacementPolicy() {
super();
@@ -130,7 +131,7 @@ public class RegionAwareEnsemblePlacementPolicy extends
RackawareEnsemblePlaceme
if (null == perRegionPlacement.get(region)) {
perRegionPlacement.put(region, new
RackawareEnsemblePlacementPolicy()
.initialize(dnsResolver, timer,
this.reorderReadsRandom, this.stabilizePeriodSeconds,
- this.isWeighted, this.maxWeightMultiple,
statsLogger)
+ this.isWeighted, this.maxWeightMultiple,
this.minNumRacksPerWriteQuorum, statsLogger)
.withDefaultRack(NetworkTopology.DEFAULT_REGION_AND_RACK));
}
@@ -178,7 +179,7 @@ public class RegionAwareEnsemblePlacementPolicy extends
RackawareEnsemblePlaceme
for (String region: regions) {
perRegionPlacement.put(region, new
RackawareEnsemblePlacementPolicy(true)
.initialize(dnsResolver, timer,
this.reorderReadsRandom, this.stabilizePeriodSeconds,
- this.isWeighted, this.maxWeightMultiple,
statsLogger)
+ this.isWeighted, this.maxWeightMultiple,
this.minNumRacksPerWriteQuorum, statsLogger)
.withDefaultRack(NetworkTopology.DEFAULT_REGION_AND_RACK));
}
minRegionsForDurability =
conf.getInt(REPP_MINIMUM_REGIONS_FOR_DURABILITY,
@@ -202,6 +203,7 @@ public class RegionAwareEnsemblePlacementPolicy extends
RackawareEnsemblePlaceme
conf.getString(REPP_DISABLE_DURABILITY_FEATURE_NAME,
BookKeeperConstants.FEATURE_REPP_DISABLE_DURABILITY_ENFORCEMENT));
}
+ this.minNumRacksPerWriteQuorum = conf.getMinNumRacksPerWriteQuorum();
return this;
}
@@ -285,7 +287,7 @@ public class RegionAwareEnsemblePlacementPolicy extends
RackawareEnsemblePlaceme
RRTopologyAwareCoverageEnsemble ensemble = new
RRTopologyAwareCoverageEnsemble(ensembleSize,
writeQuorumSize, ackQuorumSize,
REGIONID_DISTANCE_FROM_LEAVES,
effectiveMinRegionsForDurability > 0 ? new
HashSet<>(perRegionPlacement.keySet()) : null,
- effectiveMinRegionsForDurability);
+ effectiveMinRegionsForDurability,
minNumRacksPerWriteQuorum);
TopologyAwareEnsemblePlacementPolicy nextPolicy =
perRegionPlacement.get(
availableRegions.iterator().next());
return nextPolicy.newEnsemble(ensembleSize, writeQuorumSize,
writeQuorumSize, excludeBookies, ensemble,
@@ -316,7 +318,7 @@ public class RegionAwareEnsemblePlacementPolicy extends
RackawareEnsemblePlaceme
// regardless of regions that are available;
constraints are
// always applied based on all possible regions
effectiveMinRegionsForDurability > 0 ? new
HashSet<>(perRegionPlacement.keySet()) : null,
- effectiveMinRegionsForDurability);
+ effectiveMinRegionsForDurability,
minNumRacksPerWriteQuorum);
remainingEnsembleBeforeIteration = remainingEnsemble;
int regionsToAllocate = numRemainingRegions;
for (Map.Entry<String, Pair<Integer, Integer>> regionEntry:
regionsWiseAllocation.entrySet()) {
@@ -426,7 +428,7 @@ public class RegionAwareEnsemblePlacementPolicy extends
RackawareEnsemblePlaceme
ackQuorumSize,
REGIONID_DISTANCE_FROM_LEAVES,
effectiveMinRegionsForDurability > 0 ? new
HashSet<String>(perRegionPlacement.keySet()) : null,
- effectiveMinRegionsForDurability);
+ effectiveMinRegionsForDurability, minNumRacksPerWriteQuorum);
BookieNode bookieNodeToReplace = knownBookies.get(bookieToReplace);
if (null == bookieNodeToReplace) {
diff --git
a/bookkeeper-server/src/main/java/org/apache/bookkeeper/client/TopologyAwareEnsemblePlacementPolicy.java
b/bookkeeper-server/src/main/java/org/apache/bookkeeper/client/TopologyAwareEnsemblePlacementPolicy.java
index b70cb0f..85917b1 100644
---
a/bookkeeper-server/src/main/java/org/apache/bookkeeper/client/TopologyAwareEnsemblePlacementPolicy.java
+++
b/bookkeeper-server/src/main/java/org/apache/bookkeeper/client/TopologyAwareEnsemblePlacementPolicy.java
@@ -125,6 +125,11 @@ abstract class TopologyAwareEnsemblePlacementPolicy
implements
protected class RackQuorumCoverageSet implements CoverageSet {
HashSet<String> racksOrRegionsInQuorum = new HashSet<String>();
int seenBookies = 0;
+ private final int minNumRacksPerWriteQuorum;
+
+ protected RackQuorumCoverageSet(int minNumRacksPerWriteQuorum) {
+ this.minNumRacksPerWriteQuorum = Math.min(writeQuorumSize,
minNumRacksPerWriteQuorum);
+ }
@Override
public boolean apply(BookieNode candidate) {
@@ -134,9 +139,29 @@ abstract class TopologyAwareEnsemblePlacementPolicy
implements
return true;
}
- if (seenBookies + 1 == writeQuorumSize) {
- return racksOrRegionsInQuorum.size()
- >
(racksOrRegionsInQuorum.contains(candidate.getNetworkLocation(distanceFromLeaves))
? 1 : 0);
+ /*
+ * allow the initial writeQuorumSize-minRacksToWriteTo+1
bookies
+ * to be placed on any rack(including on a single rack). But
+ * after that make sure that with each new bookie chosen, we
+ * will be able to satisfy the minRackToWriteTo condition
+ * eventually
+ */
+ if (seenBookies + minNumRacksPerWriteQuorum - 1 >=
writeQuorumSize) {
+ int numRacks = racksOrRegionsInQuorum.size();
+ if
(!racksOrRegionsInQuorum.contains(candidate.getNetworkLocation(distanceFromLeaves)))
{
+ numRacks++;
+ }
+ if (numRacks >= minNumRacksPerWriteQuorum
+ || ((writeQuorumSize - seenBookies - 1) >=
(minNumRacksPerWriteQuorum - numRacks))) {
+ /*
+ * either we have reached our goal or we still have a
+ * few bookies to be selected with which to catch up to
+ * the goal
+ */
+ return true;
+ } else {
+ return false;
+ }
}
return true;
}
@@ -149,7 +174,7 @@ abstract class TopologyAwareEnsemblePlacementPolicy
implements
@Override
public RackQuorumCoverageSet duplicate() {
- RackQuorumCoverageSet ret = new RackQuorumCoverageSet();
+ RackQuorumCoverageSet ret = new
RackQuorumCoverageSet(this.minNumRacksPerWriteQuorum);
ret.racksOrRegionsInQuorum =
Sets.newHashSet(this.racksOrRegionsInQuorum);
ret.seenBookies = this.seenBookies;
return ret;
@@ -275,6 +300,7 @@ abstract class TopologyAwareEnsemblePlacementPolicy
implements
final int writeQuorumSize;
final int ackQuorumSize;
final int minRacksOrRegionsForDurability;
+ final int minNumRacksPerWriteQuorum;
final ArrayList<BookieNode> chosenNodes;
final Set<String> racksOrRegions;
private final CoverageSet[] quorums;
@@ -303,6 +329,7 @@ abstract class TopologyAwareEnsemblePlacementPolicy
implements
this.racksOrRegions = null;
}
this.minRacksOrRegionsForDurability =
that.minRacksOrRegionsForDurability;
+ this.minNumRacksPerWriteQuorum = that.minNumRacksPerWriteQuorum;
}
protected RRTopologyAwareCoverageEnsemble(int ensembleSize,
@@ -310,9 +337,10 @@ abstract class TopologyAwareEnsemblePlacementPolicy
implements
int ackQuorumSize,
int distanceFromLeaves,
Set<String> racksOrRegions,
- int
minRacksOrRegionsForDurability) {
- this(ensembleSize, writeQuorumSize, ackQuorumSize,
distanceFromLeaves, null, null,
- racksOrRegions, minRacksOrRegionsForDurability);
+ int
minRacksOrRegionsForDurability,
+ int
minNumRacksPerWriteQuorum) {
+ this(ensembleSize, writeQuorumSize, ackQuorumSize,
distanceFromLeaves, null, null, racksOrRegions,
+ minRacksOrRegionsForDurability, minNumRacksPerWriteQuorum);
}
protected RRTopologyAwareCoverageEnsemble(int ensembleSize,
@@ -320,9 +348,10 @@ abstract class TopologyAwareEnsemblePlacementPolicy
implements
int ackQuorumSize,
int distanceFromLeaves,
Ensemble<BookieNode>
parentEnsemble,
- Predicate<BookieNode>
parentPredicate) {
+ Predicate<BookieNode>
parentPredicate,
+ int
minNumRacksPerWriteQuorum) {
this(ensembleSize, writeQuorumSize, ackQuorumSize,
distanceFromLeaves, parentEnsemble, parentPredicate,
- null, 0);
+ null, 0, minNumRacksPerWriteQuorum);
}
protected RRTopologyAwareCoverageEnsemble(int ensembleSize,
@@ -332,7 +361,8 @@ abstract class TopologyAwareEnsemblePlacementPolicy
implements
Ensemble<BookieNode>
parentEnsemble,
Predicate<BookieNode>
parentPredicate,
Set<String> racksOrRegions,
- int
minRacksOrRegionsForDurability) {
+ int
minRacksOrRegionsForDurability,
+ int
minNumRacksPerWriteQuorum) {
this.ensembleSize = ensembleSize;
this.writeQuorumSize = writeQuorumSize;
this.ackQuorumSize = ackQuorumSize;
@@ -347,6 +377,7 @@ abstract class TopologyAwareEnsemblePlacementPolicy
implements
this.parentPredicate = parentPredicate;
this.racksOrRegions = racksOrRegions;
this.minRacksOrRegionsForDurability =
minRacksOrRegionsForDurability;
+ this.minNumRacksPerWriteQuorum = minNumRacksPerWriteQuorum;
}
@Override
@@ -377,7 +408,7 @@ abstract class TopologyAwareEnsemblePlacementPolicy
implements
if (minRacksOrRegionsForDurability > 0) {
quorums[idx] = new
RackOrRegionDurabilityCoverageSet();
} else {
- quorums[idx] = new RackQuorumCoverageSet();
+ quorums[idx] = new
RackQuorumCoverageSet(this.minNumRacksPerWriteQuorum);
}
}
if (!quorums[idx].apply(candidate)) {
@@ -410,7 +441,7 @@ abstract class TopologyAwareEnsemblePlacementPolicy
implements
if (minRacksOrRegionsForDurability > 0) {
quorums[idx] = new
RackOrRegionDurabilityCoverageSet();
} else {
- quorums[idx] = new RackQuorumCoverageSet();
+ quorums[idx] = new
RackQuorumCoverageSet(this.minNumRacksPerWriteQuorum);
}
}
quorums[idx].addBookie(node);
diff --git
a/bookkeeper-server/src/main/java/org/apache/bookkeeper/conf/AbstractConfiguration.java
b/bookkeeper-server/src/main/java/org/apache/bookkeeper/conf/AbstractConfiguration.java
index 4805c34..9d6cb50 100644
---
a/bookkeeper-server/src/main/java/org/apache/bookkeeper/conf/AbstractConfiguration.java
+++
b/bookkeeper-server/src/main/java/org/apache/bookkeeper/conf/AbstractConfiguration.java
@@ -141,6 +141,9 @@ public abstract class AbstractConfiguration<T extends
AbstractConfiguration>
// Validate bookie process user
public static final String PERMITTED_STARTUP_USERS =
"permittedStartupUsers";
+ // minimum number of racks per write quorum
+ public static final String MIN_NUM_RACKS_PER_WRITE_QUORUM =
"minNumRacksPerWriteQuorum";
+
protected AbstractConfiguration() {
super();
if (READ_SYSTEM_PROPERTIES) {
@@ -779,6 +782,19 @@ public abstract class AbstractConfiguration<T extends
AbstractConfiguration>
return getString(TLS_ENABLED_PROTOCOLS, null);
}
+ /**
+ * Set the minimum number of racks per write quorum.
+ */
+ public void setMinNumRacksPerWriteQuorum(int minNumRacksPerWriteQuorum) {
+ setProperty(MIN_NUM_RACKS_PER_WRITE_QUORUM, minNumRacksPerWriteQuorum);
+ }
+
+ /**
+ * Get the minimum number of racks per write quorum.
+ */
+ public int getMinNumRacksPerWriteQuorum() {
+ return getInteger(MIN_NUM_RACKS_PER_WRITE_QUORUM, 2);
+ }
/**
* Trickery to allow inheritance with fluent style.
diff --git
a/bookkeeper-server/src/main/java/org/apache/bookkeeper/net/NetworkTopologyImpl.java
b/bookkeeper-server/src/main/java/org/apache/bookkeeper/net/NetworkTopologyImpl.java
index f004879..d6756f8 100644
---
a/bookkeeper-server/src/main/java/org/apache/bookkeeper/net/NetworkTopologyImpl.java
+++
b/bookkeeper-server/src/main/java/org/apache/bookkeeper/net/NetworkTopologyImpl.java
@@ -18,6 +18,7 @@
package org.apache.bookkeeper.net;
import java.util.ArrayList;
+import java.util.Arrays;
import java.util.Collection;
import java.util.HashSet;
import java.util.List;
@@ -43,6 +44,7 @@ public class NetworkTopologyImpl implements NetworkTopology {
public static final int DEFAULT_HOST_LEVEL = 2;
public static final Logger LOG =
LoggerFactory.getLogger(NetworkTopologyImpl.class);
+ public static final String NODE_SEPARATOR = ",";
/**
* A marker for an InvalidTopology Exception.
@@ -772,7 +774,11 @@ public class NetworkTopologyImpl implements
NetworkTopology {
try {
if (scope.startsWith("~")) {
Set<Node> allNodes = doGetLeaves(NodeBase.ROOT);
- Set<Node> excludeNodes = doGetLeaves(scope.substring(1));
+ String[] excludeScopes =
scope.substring(1).split(NODE_SEPARATOR);
+ Set<Node> excludeNodes = new HashSet<Node>();
+ Arrays.stream(excludeScopes).forEach((excludeScope) -> {
+ excludeNodes.addAll(doGetLeaves(excludeScope));
+ });
allNodes.removeAll(excludeNodes);
return allNodes;
} else {
diff --git
a/bookkeeper-server/src/test/java/org/apache/bookkeeper/client/TestRackawareEnsemblePlacementPolicy.java
b/bookkeeper-server/src/test/java/org/apache/bookkeeper/client/TestRackawareEnsemblePlacementPolicy.java
index 1d32c13..ad9c9c8 100644
---
a/bookkeeper-server/src/test/java/org/apache/bookkeeper/client/TestRackawareEnsemblePlacementPolicy.java
+++
b/bookkeeper-server/src/test/java/org/apache/bookkeeper/client/TestRackawareEnsemblePlacementPolicy.java
@@ -498,9 +498,9 @@ public class TestRackawareEnsemblePlacementPolicy extends
TestCase {
repp.onClusterChanged(addrs, new HashSet<BookieSocketAddress>());
try {
ArrayList<BookieSocketAddress> ensemble = repp.newEnsemble(3, 2,
2, null, new HashSet<>());
- assertEquals(0, getNumCoveredWriteQuorums(ensemble, 2));
+ assertEquals(0, getNumCoveredWriteQuorums(ensemble, 2,
conf.getMinNumRacksPerWriteQuorum()));
ArrayList<BookieSocketAddress> ensemble2 = repp.newEnsemble(4, 2,
2, null, new HashSet<>());
- assertEquals(0, getNumCoveredWriteQuorums(ensemble2, 2));
+ assertEquals(0, getNumCoveredWriteQuorums(ensemble2, 2,
conf.getMinNumRacksPerWriteQuorum()));
} catch (BKNotEnoughBookiesException bnebe) {
fail("Should not get not enough bookies exception even there is
only one rack.");
}
@@ -525,11 +525,17 @@ public class TestRackawareEnsemblePlacementPolicy extends
TestCase {
addrs.add(addr4);
repp.onClusterChanged(addrs, new HashSet<BookieSocketAddress>());
try {
- ArrayList<BookieSocketAddress> ensemble = repp.newEnsemble(3, 2,
2, null, new HashSet<>());
- int numCovered = getNumCoveredWriteQuorums(ensemble, 2);
+ int ensembleSize = 3;
+ int writeQuorumSize = 2;
+ int acqQuorumSize = 2;
+ ArrayList<BookieSocketAddress> ensemble =
repp.newEnsemble(ensembleSize, writeQuorumSize, acqQuorumSize,
+ null, new HashSet<>());
+ int numCovered = getNumCoveredWriteQuorums(ensemble, 2,
conf.getMinNumRacksPerWriteQuorum());
assertTrue(numCovered >= 1 && numCovered < 3);
- ArrayList<BookieSocketAddress> ensemble2 = repp.newEnsemble(4, 2,
2, null, new HashSet<>());
- numCovered = getNumCoveredWriteQuorums(ensemble2, 2);
+ ensembleSize = 4;
+ ArrayList<BookieSocketAddress> ensemble2 =
repp.newEnsemble(ensembleSize, writeQuorumSize, acqQuorumSize,
+ null, new HashSet<>());
+ numCovered = getNumCoveredWriteQuorums(ensemble2, 2,
conf.getMinNumRacksPerWriteQuorum());
assertTrue(numCovered >= 1 && numCovered < 3);
} catch (BKNotEnoughBookiesException bnebe) {
fail("Should not get not enough bookies exception even there is
only one rack.");
@@ -537,6 +543,76 @@ public class TestRackawareEnsemblePlacementPolicy extends
TestCase {
}
@Test
+ public void testMinNumRacksPerWriteQuorumOfRacks() throws Exception {
+ int numOfRacksToCreate = 6;
+ int numOfNodesInEachRack = 5;
+
+ // Update cluster
+ Set<BookieSocketAddress> addrs = new HashSet<BookieSocketAddress>();
+ BookieSocketAddress addr;
+ for (int i = 0; i < numOfRacksToCreate; i++) {
+ for (int j = 0; j < numOfNodesInEachRack; j++) {
+ addr = new BookieSocketAddress("128.0.0." + ((i *
numOfNodesInEachRack) + j), 3181);
+ // update dns mapping
+ StaticDNSResolver.addNodeToRack(addr.getHostName(),
"/default-region/r" + i);
+ addrs.add(addr);
+ }
+ }
+
+ try {
+ ClientConfiguration newConf = new ClientConfiguration(conf);
+ // set MinNumRacksPerWriteQuorum to 4
+ int minNumRacksPerWriteQuorum = 4;
+ int ensembleSize = 12;
+ int writeQuorumSize = 6;
+ validateNumOfWriteQuorumsCoveredInEnsembleCreation(addrs,
minNumRacksPerWriteQuorum, ensembleSize,
+ writeQuorumSize);
+
+ // set MinNumRacksPerWriteQuorum to 6
+ newConf = new ClientConfiguration(conf);
+ minNumRacksPerWriteQuorum = 6;
+ ensembleSize = 6;
+ writeQuorumSize = 6;
+ validateNumOfWriteQuorumsCoveredInEnsembleCreation(addrs,
minNumRacksPerWriteQuorum, ensembleSize,
+ writeQuorumSize);
+
+ // set MinNumRacksPerWriteQuorum to 6
+ newConf = new ClientConfiguration(conf);
+ minNumRacksPerWriteQuorum = 6;
+ ensembleSize = 10;
+ writeQuorumSize = ensembleSize;
+ validateNumOfWriteQuorumsCoveredInEnsembleCreation(addrs,
minNumRacksPerWriteQuorum, ensembleSize,
+ writeQuorumSize);
+
+ // set MinNumRacksPerWriteQuorum to 5
+ newConf = new ClientConfiguration(conf);
+ minNumRacksPerWriteQuorum = 5;
+ ensembleSize = 24;
+ writeQuorumSize = 12;
+ validateNumOfWriteQuorumsCoveredInEnsembleCreation(addrs,
minNumRacksPerWriteQuorum, ensembleSize,
+ writeQuorumSize);
+
+ } catch (BKNotEnoughBookiesException bnebe) {
+ fail("Should not get not enough bookies exception even there is
only one rack.");
+ }
+ }
+
+ void
validateNumOfWriteQuorumsCoveredInEnsembleCreation(Set<BookieSocketAddress>
addrs,
+ int minNumRacksPerWriteQuorum, int ensembleSize, int
writeQuorumSize) throws Exception {
+ ClientConfiguration newConf = new ClientConfiguration(conf);
+ newConf.setMinNumRacksPerWriteQuorum(minNumRacksPerWriteQuorum);
+ repp = new RackawareEnsemblePlacementPolicy();
+ repp.initialize(newConf, Optional.<DNSToSwitchMapping> empty(), timer,
DISABLE_ALL, NullStatsLogger.INSTANCE);
+ repp.withDefaultRack(NetworkTopology.DEFAULT_REGION_AND_RACK);
+ repp.onClusterChanged(addrs, new HashSet<BookieSocketAddress>());
+
+ ArrayList<BookieSocketAddress> ensemble =
repp.newEnsemble(ensembleSize, writeQuorumSize, writeQuorumSize, null,
+ new HashSet<>());
+ int numCovered = getNumCoveredWriteQuorums(ensemble, writeQuorumSize,
minNumRacksPerWriteQuorum);
+ assertEquals("minimum number of racks covered for writequorum
ensemble: " + ensemble, ensembleSize, numCovered);
+ }
+
+ @Test
public void testNewEnsembleWithEnoughRacks() throws Exception {
BookieSocketAddress addr1 = new BookieSocketAddress("127.0.0.2", 3181);
BookieSocketAddress addr2 = new BookieSocketAddress("127.0.0.3", 3181);
@@ -555,6 +631,7 @@ public class TestRackawareEnsemblePlacementPolicy extends
TestCase {
StaticDNSResolver.addNodeToRack(addr6.getHostName(),
"/default-region/r2");
StaticDNSResolver.addNodeToRack(addr7.getHostName(),
"/default-region/r3");
StaticDNSResolver.addNodeToRack(addr8.getHostName(),
"/default-region/r4");
+ int availableNumOfRacks = 4;
// Update cluster
Set<BookieSocketAddress> addrs = new HashSet<BookieSocketAddress>();
addrs.add(addr1);
@@ -567,10 +644,17 @@ public class TestRackawareEnsemblePlacementPolicy extends
TestCase {
addrs.add(addr8);
repp.onClusterChanged(addrs, new HashSet<BookieSocketAddress>());
try {
- ArrayList<BookieSocketAddress> ensemble1 = repp.newEnsemble(3, 2,
2, null, new HashSet<>());
- assertEquals(3, getNumCoveredWriteQuorums(ensemble1, 2));
- ArrayList<BookieSocketAddress> ensemble2 = repp.newEnsemble(4, 2,
2, null, new HashSet<>());
- assertEquals(4, getNumCoveredWriteQuorums(ensemble2, 2));
+ int ensembleSize = 3;
+ int writeQuorumSize = 3;
+ int ackQuorumSize = 2;
+ ArrayList<BookieSocketAddress> ensemble1 =
repp.newEnsemble(ensembleSize, writeQuorumSize, ackQuorumSize,
+ null, new HashSet<>());
+ assertEquals(ensembleSize, getNumCoveredWriteQuorums(ensemble1, 2,
conf.getMinNumRacksPerWriteQuorum()));
+ ensembleSize = 4;
+ writeQuorumSize = 4;
+ ArrayList<BookieSocketAddress> ensemble2 =
repp.newEnsemble(ensembleSize, writeQuorumSize, 2, null,
+ new HashSet<>());
+ assertEquals(ensembleSize, getNumCoveredWriteQuorums(ensemble2, 2,
conf.getMinNumRacksPerWriteQuorum()));
} catch (BKNotEnoughBookiesException bnebe) {
fail("Should not get not enough bookies exception even there is
only one rack.");
}
@@ -791,13 +875,18 @@ public class TestRackawareEnsemblePlacementPolicy extends
TestCase {
Set<BookieSocketAddress> excludeList = new
HashSet<BookieSocketAddress>();
ArrayList<BookieSocketAddress> ensemble;
+ int ensembleSize = 3;
+ int writeQuorumSize = 2;
+ int acqQuorumSize = 2;
for (int i = 0; i < numTries; i++) {
// addr2 is on /r2 and this is the only one on this rack. So the
replacement
// will come from other racks. However, the weight should be
honored in such
// selections as well
- ensemble = repp.newEnsemble(3, 2, 2, null, excludeList);
- assertTrue("Rackaware selection not happening " +
getNumCoveredWriteQuorums(ensemble, 2),
- getNumCoveredWriteQuorums(ensemble, 2) >= 2);
+ ensemble = repp.newEnsemble(ensembleSize, writeQuorumSize,
acqQuorumSize, null, excludeList);
+ assertTrue(
+ "Rackaware selection not happening "
+ + getNumCoveredWriteQuorums(ensemble,
writeQuorumSize, conf.getMinNumRacksPerWriteQuorum()),
+ getNumCoveredWriteQuorums(ensemble, writeQuorumSize,
conf.getMinNumRacksPerWriteQuorum()) >= 2);
for (BookieSocketAddress b : ensemble) {
selectionCounts.put(b, selectionCounts.get(b) + 1);
}
@@ -875,8 +964,8 @@ public class TestRackawareEnsemblePlacementPolicy extends
TestCase {
}
}
- static int getNumCoveredWriteQuorums(ArrayList<BookieSocketAddress>
ensemble, int writeQuorumSize)
- throws Exception {
+ static int getNumCoveredWriteQuorums(ArrayList<BookieSocketAddress>
ensemble, int writeQuorumSize,
+ int minNumRacksPerWriteQuorumConfValue) throws Exception {
int ensembleSize = ensemble.size();
int numCoveredWriteQuorums = 0;
for (int i = 0; i < ensembleSize; i++) {
@@ -886,7 +975,8 @@ public class TestRackawareEnsemblePlacementPolicy extends
TestCase {
BookieSocketAddress addr = ensemble.get(bookieIdx);
racks.add(StaticDNSResolver.getRack(addr.getHostName()));
}
- numCoveredWriteQuorums += (racks.size() > 1 ? 1 : 0);
+ int numOfRacksToCoverTo = Math.max(Math.min(writeQuorumSize,
minNumRacksPerWriteQuorumConfValue), 2);
+ numCoveredWriteQuorums += (racks.size() >= numOfRacksToCoverTo ? 1
: 0);
}
return numCoveredWriteQuorums;
}
diff --git
a/bookkeeper-server/src/test/java/org/apache/bookkeeper/client/TestRackawarePolicyNotificationUpdates.java
b/bookkeeper-server/src/test/java/org/apache/bookkeeper/client/TestRackawarePolicyNotificationUpdates.java
index a6f28ba..b1ecdba 100644
---
a/bookkeeper-server/src/test/java/org/apache/bookkeeper/client/TestRackawarePolicyNotificationUpdates.java
+++
b/bookkeeper-server/src/test/java/org/apache/bookkeeper/client/TestRackawarePolicyNotificationUpdates.java
@@ -95,14 +95,19 @@ public class TestRackawarePolicyNotificationUpdates extends
TestCase {
StaticDNSResolver.addNodeToRack(addr2.getHostName(),
"/default-region/rack-2");
StaticDNSResolver.addNodeToRack(addr3.getHostName(),
"/default-region/rack-2");
StaticDNSResolver.addNodeToRack(addr4.getHostName(),
"/default-region/rack-2");
+ int numOfAvailableRacks = 2;
// Update cluster
Set<BookieSocketAddress> addrs = Sets.newHashSet(addr1, addr2, addr3,
addr4);
repp.onClusterChanged(addrs, new HashSet<>());
- ArrayList<BookieSocketAddress> ensemble = repp.newEnsemble(3, 2, 2,
Collections.emptyMap(),
- Collections.emptySet());
- int numCovered =
TestRackawareEnsemblePlacementPolicy.getNumCoveredWriteQuorums(ensemble, 2);
+ int ensembleSize = 3;
+ int writeQuorumSize = 2;
+ int acqQuorumSize = 2;
+ ArrayList<BookieSocketAddress> ensemble =
repp.newEnsemble(ensembleSize, writeQuorumSize, acqQuorumSize,
+ Collections.emptyMap(), Collections.emptySet());
+ int numCovered =
TestRackawareEnsemblePlacementPolicy.getNumCoveredWriteQuorums(ensemble,
writeQuorumSize,
+ conf.getMinNumRacksPerWriteQuorum());
assertTrue(numCovered >= 1 && numCovered < 3);
assertTrue(ensemble.contains(addr1));
@@ -111,9 +116,12 @@ public class TestRackawarePolicyNotificationUpdates
extends TestCase {
bookieAddressList.add(addr2);
rackList.add("/default-region/rack-3");
StaticDNSResolver.changeRack(bookieAddressList, rackList);
-
- ensemble = repp.newEnsemble(3, 2, 1, Collections.emptyMap(),
Collections.emptySet());
- assertEquals(3,
TestRackawareEnsemblePlacementPolicy.getNumCoveredWriteQuorums(ensemble, 2));
+ numOfAvailableRacks = numOfAvailableRacks + 1;
+ acqQuorumSize = 1;
+ ensemble = repp.newEnsemble(ensembleSize, writeQuorumSize,
acqQuorumSize, Collections.emptyMap(),
+ Collections.emptySet());
+ assertEquals(3,
TestRackawareEnsemblePlacementPolicy.getNumCoveredWriteQuorums(ensemble,
writeQuorumSize,
+ conf.getMinNumRacksPerWriteQuorum()));
assertTrue(ensemble.contains(addr1));
assertTrue(ensemble.contains(addr2));
}
--
To stop receiving notification emails like this one, please contact
[email protected].