sijie closed pull request #1397: Bookies should be from different racks in a Writequorum. URL: https://github.com/apache/bookkeeper/pull/1397
This is a PR merged from a forked repository. As GitHub hides the original diff on merge, it is displayed below for the sake of provenance: As this is a foreign pull request (from a fork), the diff is supplied below (as it won't show otherwise due to GitHub magic): diff --git a/bookkeeper-server/src/main/java/org/apache/bookkeeper/client/RackawareEnsemblePlacementPolicy.java b/bookkeeper-server/src/main/java/org/apache/bookkeeper/client/RackawareEnsemblePlacementPolicy.java index def9554eb..32f94f34d 100644 --- a/bookkeeper-server/src/main/java/org/apache/bookkeeper/client/RackawareEnsemblePlacementPolicy.java +++ b/bookkeeper-server/src/main/java/org/apache/bookkeeper/client/RackawareEnsemblePlacementPolicy.java @@ -52,15 +52,17 @@ protected RackawareEnsemblePlacementPolicy initialize(DNSToSwitchMapping dnsReso int stabilizePeriodSeconds, boolean isWeighted, int maxWeightMultiple, + int minNumRacksPerWriteQuorum, StatsLogger statsLogger) { if (stabilizePeriodSeconds > 0) { - super.initialize(dnsResolver, timer, reorderReadsRandom, 0, isWeighted, maxWeightMultiple, statsLogger); + super.initialize(dnsResolver, timer, reorderReadsRandom, 0, isWeighted, maxWeightMultiple, + minNumRacksPerWriteQuorum, statsLogger); slave = new RackawareEnsemblePlacementPolicyImpl(enforceDurability); slave.initialize(dnsResolver, timer, reorderReadsRandom, stabilizePeriodSeconds, isWeighted, - maxWeightMultiple, statsLogger); + maxWeightMultiple, minNumRacksPerWriteQuorum, statsLogger); } else { super.initialize(dnsResolver, timer, reorderReadsRandom, stabilizePeriodSeconds, isWeighted, - maxWeightMultiple, statsLogger); + maxWeightMultiple, minNumRacksPerWriteQuorum, statsLogger); slave = null; } return this; diff --git a/bookkeeper-server/src/main/java/org/apache/bookkeeper/client/RackawareEnsemblePlacementPolicyImpl.java b/bookkeeper-server/src/main/java/org/apache/bookkeeper/client/RackawareEnsemblePlacementPolicyImpl.java index 568debf02..92f87013f 100644 --- a/bookkeeper-server/src/main/java/org/apache/bookkeeper/client/RackawareEnsemblePlacementPolicyImpl.java +++ b/bookkeeper-server/src/main/java/org/apache/bookkeeper/client/RackawareEnsemblePlacementPolicyImpl.java @@ -79,6 +79,7 @@ int maxWeightMultiple; private Map<BookieNode, WeightedObject> bookieInfoMap = new HashMap<BookieNode, WeightedObject>(); private WeightedRandomSelection<BookieNode> weightedSelection; + private int minNumRacksPerWriteQuorum; public static final String REPP_DNS_RESOLVER_CLASS = "reppDnsResolverClass"; public static final String REPP_RANDOM_READ_REORDERING = "ensembleRandomReadReordering"; @@ -233,6 +234,7 @@ protected RackawareEnsemblePlacementPolicyImpl initialize(DNSToSwitchMapping dns int stabilizePeriodSeconds, boolean isWeighted, int maxWeightMultiple, + int minNumRacksPerWriteQuorum, StatsLogger statsLogger) { checkNotNull(statsLogger, "statsLogger should not be null, use NullStatsLogger instead."); this.statsLogger = statsLogger; @@ -242,6 +244,7 @@ protected RackawareEnsemblePlacementPolicyImpl initialize(DNSToSwitchMapping dns this.stabilizePeriodSeconds = stabilizePeriodSeconds; this.dnsResolver = new DNSResolverDecorator(dnsResolver, () -> this.getDefaultRack()); this.timer = timer; + this.minNumRacksPerWriteQuorum = minNumRacksPerWriteQuorum; // create the network topology if (stabilizePeriodSeconds > 0) { @@ -329,6 +332,7 @@ public Long load(BookieSocketAddress key) throws Exception { conf.getNetworkTopologyStabilizePeriodSeconds(), conf.getDiskWeightBasedPlacementEnabled(), conf.getBookieMaxWeightMultipleForWeightBasedPlacement(), + conf.getMinNumRacksPerWriteQuorum(), statsLogger); } @@ -512,6 +516,7 @@ public void handleBookiesThatJoined(Set<BookieSocketAddress> joinedBookies) { rwLock.readLock().lock(); try { Set<Node> excludeNodes = convertBookiesToNodes(excludeBookies); + int minNumRacksPerWriteQuorumForThisEnsemble = Math.min(writeQuorumSize, minNumRacksPerWriteQuorum); RRTopologyAwareCoverageEnsemble ensemble = new RRTopologyAwareCoverageEnsemble( ensembleSize, @@ -519,7 +524,8 @@ public void handleBookiesThatJoined(Set<BookieSocketAddress> joinedBookies) { ackQuorumSize, RACKNAME_DISTANCE_FROM_LEAVES, parentEnsemble, - parentPredicate); + parentPredicate, + minNumRacksPerWriteQuorumForThisEnsemble); BookieNode prevNode = null; int numRacks = topology.getNumOfRacks(); // only one rack, use the random algorithm. @@ -532,7 +538,9 @@ public void handleBookiesThatJoined(Set<BookieSocketAddress> joinedBookies) { } return addrs; } - // pick nodes by racks, to ensure there is at least two racks per write quorum. + // pick nodes by racks, to ensure there is at least write quorum number of racks. + int idx = 0; + String[] racks = new String[ensembleSize]; for (int i = 0; i < ensembleSize; i++) { String curRack; if (null == prevNode) { @@ -542,9 +550,62 @@ public void handleBookiesThatJoined(Set<BookieSocketAddress> joinedBookies) { curRack = localNode.getNetworkLocation(); } } else { - curRack = "~" + prevNode.getNetworkLocation(); + StringBuilder sb = new StringBuilder(); + sb.append("~"); + + if (writeQuorumSize > 1) { + /* + * RackAwareEnsemblePlacementPolicy should try to select + * bookies from atleast + * minNumRacksPerWriteQuorumForThisEnsemble number of + * different racks for a write quorum. So in a + * WriteQuorum, bookies should be from + * minNumRacksPerWriteQuorumForThisEnsemble number of + * racks. So we would add racks of + * (minNumRacksPerWriteQuorumForThisEnsemble-1) + * neighbours (both sides) to the exclusion list + * (~curRack). + */ + for (int j = 1; j < minNumRacksPerWriteQuorumForThisEnsemble; j++) { + int nextIndex = i + j; + if (nextIndex >= ensembleSize) { + nextIndex %= ensembleSize; + } + /* + * if racks[nextIndex] is null, then it means bookie + * is not yet selected for ensemble at 'nextIndex' + * index. + */ + if (racks[nextIndex] != null) { + if (!((sb.length() == 1) && (sb.charAt(0) == '~'))) { + sb.append(NetworkTopologyImpl.NODE_SEPARATOR); + } + sb.append(racks[nextIndex]); + } + } + + for (int j = 1; j < minNumRacksPerWriteQuorumForThisEnsemble; j++) { + int nextIndex = i - j; + if (nextIndex < 0) { + nextIndex += ensembleSize; + } + /* + * if racks[nextIndex] is null, then it means bookie + * is not yet selected for ensemble at 'nextIndex' + * index. + */ + if (racks[nextIndex] != null) { + if (!((sb.length() == 1) && (sb.charAt(0) == '~'))) { + sb.append(NetworkTopologyImpl.NODE_SEPARATOR); + } + sb.append(racks[nextIndex]); + } + } + } + curRack = sb.toString(); } prevNode = selectFromNetworkLocation(curRack, excludeNodes, ensemble, ensemble); + racks[i] = prevNode.getNetworkLocation(); } ArrayList<BookieSocketAddress> bookieList = ensemble.toList(); if (ensembleSize != bookieList.size()) { diff --git a/bookkeeper-server/src/main/java/org/apache/bookkeeper/client/RegionAwareEnsemblePlacementPolicy.java b/bookkeeper-server/src/main/java/org/apache/bookkeeper/client/RegionAwareEnsemblePlacementPolicy.java index 0b804e768..6e2e7429c 100644 --- a/bookkeeper-server/src/main/java/org/apache/bookkeeper/client/RegionAwareEnsemblePlacementPolicy.java +++ b/bookkeeper-server/src/main/java/org/apache/bookkeeper/client/RegionAwareEnsemblePlacementPolicy.java @@ -75,6 +75,7 @@ protected boolean enableValidation = true; protected boolean enforceDurabilityInReplace = false; protected Feature disableDurabilityFeature; + protected int minNumRacksPerWriteQuorum; RegionAwareEnsemblePlacementPolicy() { super(); @@ -130,7 +131,7 @@ public void handleBookiesThatJoined(Set<BookieSocketAddress> joinedBookies) { if (null == perRegionPlacement.get(region)) { perRegionPlacement.put(region, new RackawareEnsemblePlacementPolicy() .initialize(dnsResolver, timer, this.reorderReadsRandom, this.stabilizePeriodSeconds, - this.isWeighted, this.maxWeightMultiple, statsLogger) + this.isWeighted, this.maxWeightMultiple, this.minNumRacksPerWriteQuorum, statsLogger) .withDefaultRack(NetworkTopology.DEFAULT_REGION_AND_RACK)); } @@ -178,7 +179,7 @@ public RegionAwareEnsemblePlacementPolicy initialize(ClientConfiguration conf, for (String region: regions) { perRegionPlacement.put(region, new RackawareEnsemblePlacementPolicy(true) .initialize(dnsResolver, timer, this.reorderReadsRandom, this.stabilizePeriodSeconds, - this.isWeighted, this.maxWeightMultiple, statsLogger) + this.isWeighted, this.maxWeightMultiple, this.minNumRacksPerWriteQuorum, statsLogger) .withDefaultRack(NetworkTopology.DEFAULT_REGION_AND_RACK)); } minRegionsForDurability = conf.getInt(REPP_MINIMUM_REGIONS_FOR_DURABILITY, @@ -202,6 +203,7 @@ public RegionAwareEnsemblePlacementPolicy initialize(ClientConfiguration conf, conf.getString(REPP_DISABLE_DURABILITY_FEATURE_NAME, BookKeeperConstants.FEATURE_REPP_DISABLE_DURABILITY_ENFORCEMENT)); } + this.minNumRacksPerWriteQuorum = conf.getMinNumRacksPerWriteQuorum(); return this; } @@ -285,7 +287,7 @@ public RegionAwareEnsemblePlacementPolicy initialize(ClientConfiguration conf, RRTopologyAwareCoverageEnsemble ensemble = new RRTopologyAwareCoverageEnsemble(ensembleSize, writeQuorumSize, ackQuorumSize, REGIONID_DISTANCE_FROM_LEAVES, effectiveMinRegionsForDurability > 0 ? new HashSet<>(perRegionPlacement.keySet()) : null, - effectiveMinRegionsForDurability); + effectiveMinRegionsForDurability, minNumRacksPerWriteQuorum); TopologyAwareEnsemblePlacementPolicy nextPolicy = perRegionPlacement.get( availableRegions.iterator().next()); return nextPolicy.newEnsemble(ensembleSize, writeQuorumSize, writeQuorumSize, excludeBookies, ensemble, @@ -316,7 +318,7 @@ public RegionAwareEnsemblePlacementPolicy initialize(ClientConfiguration conf, // regardless of regions that are available; constraints are // always applied based on all possible regions effectiveMinRegionsForDurability > 0 ? new HashSet<>(perRegionPlacement.keySet()) : null, - effectiveMinRegionsForDurability); + effectiveMinRegionsForDurability, minNumRacksPerWriteQuorum); remainingEnsembleBeforeIteration = remainingEnsemble; int regionsToAllocate = numRemainingRegions; for (Map.Entry<String, Pair<Integer, Integer>> regionEntry: regionsWiseAllocation.entrySet()) { @@ -426,7 +428,7 @@ public BookieSocketAddress replaceBookie(int ensembleSize, int writeQuorumSize, ackQuorumSize, REGIONID_DISTANCE_FROM_LEAVES, effectiveMinRegionsForDurability > 0 ? new HashSet<String>(perRegionPlacement.keySet()) : null, - effectiveMinRegionsForDurability); + effectiveMinRegionsForDurability, minNumRacksPerWriteQuorum); BookieNode bookieNodeToReplace = knownBookies.get(bookieToReplace); if (null == bookieNodeToReplace) { diff --git a/bookkeeper-server/src/main/java/org/apache/bookkeeper/client/TopologyAwareEnsemblePlacementPolicy.java b/bookkeeper-server/src/main/java/org/apache/bookkeeper/client/TopologyAwareEnsemblePlacementPolicy.java index b70cb0f15..85917b119 100644 --- a/bookkeeper-server/src/main/java/org/apache/bookkeeper/client/TopologyAwareEnsemblePlacementPolicy.java +++ b/bookkeeper-server/src/main/java/org/apache/bookkeeper/client/TopologyAwareEnsemblePlacementPolicy.java @@ -125,6 +125,11 @@ public String toString() { protected class RackQuorumCoverageSet implements CoverageSet { HashSet<String> racksOrRegionsInQuorum = new HashSet<String>(); int seenBookies = 0; + private final int minNumRacksPerWriteQuorum; + + protected RackQuorumCoverageSet(int minNumRacksPerWriteQuorum) { + this.minNumRacksPerWriteQuorum = Math.min(writeQuorumSize, minNumRacksPerWriteQuorum); + } @Override public boolean apply(BookieNode candidate) { @@ -134,9 +139,29 @@ public boolean apply(BookieNode candidate) { return true; } - if (seenBookies + 1 == writeQuorumSize) { - return racksOrRegionsInQuorum.size() - > (racksOrRegionsInQuorum.contains(candidate.getNetworkLocation(distanceFromLeaves)) ? 1 : 0); + /* + * allow the initial writeQuorumSize-minRacksToWriteTo+1 bookies + * to be placed on any rack(including on a single rack). But + * after that make sure that with each new bookie chosen, we + * will be able to satisfy the minRackToWriteTo condition + * eventually + */ + if (seenBookies + minNumRacksPerWriteQuorum - 1 >= writeQuorumSize) { + int numRacks = racksOrRegionsInQuorum.size(); + if (!racksOrRegionsInQuorum.contains(candidate.getNetworkLocation(distanceFromLeaves))) { + numRacks++; + } + if (numRacks >= minNumRacksPerWriteQuorum + || ((writeQuorumSize - seenBookies - 1) >= (minNumRacksPerWriteQuorum - numRacks))) { + /* + * either we have reached our goal or we still have a + * few bookies to be selected with which to catch up to + * the goal + */ + return true; + } else { + return false; + } } return true; } @@ -149,7 +174,7 @@ public void addBookie(BookieNode candidate) { @Override public RackQuorumCoverageSet duplicate() { - RackQuorumCoverageSet ret = new RackQuorumCoverageSet(); + RackQuorumCoverageSet ret = new RackQuorumCoverageSet(this.minNumRacksPerWriteQuorum); ret.racksOrRegionsInQuorum = Sets.newHashSet(this.racksOrRegionsInQuorum); ret.seenBookies = this.seenBookies; return ret; @@ -275,6 +300,7 @@ public void addBookie(BookieNode candidate) { final int writeQuorumSize; final int ackQuorumSize; final int minRacksOrRegionsForDurability; + final int minNumRacksPerWriteQuorum; final ArrayList<BookieNode> chosenNodes; final Set<String> racksOrRegions; private final CoverageSet[] quorums; @@ -303,6 +329,7 @@ protected RRTopologyAwareCoverageEnsemble(RRTopologyAwareCoverageEnsemble that) this.racksOrRegions = null; } this.minRacksOrRegionsForDurability = that.minRacksOrRegionsForDurability; + this.minNumRacksPerWriteQuorum = that.minNumRacksPerWriteQuorum; } protected RRTopologyAwareCoverageEnsemble(int ensembleSize, @@ -310,9 +337,10 @@ protected RRTopologyAwareCoverageEnsemble(int ensembleSize, int ackQuorumSize, int distanceFromLeaves, Set<String> racksOrRegions, - int minRacksOrRegionsForDurability) { - this(ensembleSize, writeQuorumSize, ackQuorumSize, distanceFromLeaves, null, null, - racksOrRegions, minRacksOrRegionsForDurability); + int minRacksOrRegionsForDurability, + int minNumRacksPerWriteQuorum) { + this(ensembleSize, writeQuorumSize, ackQuorumSize, distanceFromLeaves, null, null, racksOrRegions, + minRacksOrRegionsForDurability, minNumRacksPerWriteQuorum); } protected RRTopologyAwareCoverageEnsemble(int ensembleSize, @@ -320,9 +348,10 @@ protected RRTopologyAwareCoverageEnsemble(int ensembleSize, int ackQuorumSize, int distanceFromLeaves, Ensemble<BookieNode> parentEnsemble, - Predicate<BookieNode> parentPredicate) { + Predicate<BookieNode> parentPredicate, + int minNumRacksPerWriteQuorum) { this(ensembleSize, writeQuorumSize, ackQuorumSize, distanceFromLeaves, parentEnsemble, parentPredicate, - null, 0); + null, 0, minNumRacksPerWriteQuorum); } protected RRTopologyAwareCoverageEnsemble(int ensembleSize, @@ -332,7 +361,8 @@ protected RRTopologyAwareCoverageEnsemble(int ensembleSize, Ensemble<BookieNode> parentEnsemble, Predicate<BookieNode> parentPredicate, Set<String> racksOrRegions, - int minRacksOrRegionsForDurability) { + int minRacksOrRegionsForDurability, + int minNumRacksPerWriteQuorum) { this.ensembleSize = ensembleSize; this.writeQuorumSize = writeQuorumSize; this.ackQuorumSize = ackQuorumSize; @@ -347,6 +377,7 @@ protected RRTopologyAwareCoverageEnsemble(int ensembleSize, this.parentPredicate = parentPredicate; this.racksOrRegions = racksOrRegions; this.minRacksOrRegionsForDurability = minRacksOrRegionsForDurability; + this.minNumRacksPerWriteQuorum = minNumRacksPerWriteQuorum; } @Override @@ -377,7 +408,7 @@ public boolean apply(BookieNode candidate, Ensemble<BookieNode> ensemble) { if (minRacksOrRegionsForDurability > 0) { quorums[idx] = new RackOrRegionDurabilityCoverageSet(); } else { - quorums[idx] = new RackQuorumCoverageSet(); + quorums[idx] = new RackQuorumCoverageSet(this.minNumRacksPerWriteQuorum); } } if (!quorums[idx].apply(candidate)) { @@ -410,7 +441,7 @@ public boolean addNode(BookieNode node) { if (minRacksOrRegionsForDurability > 0) { quorums[idx] = new RackOrRegionDurabilityCoverageSet(); } else { - quorums[idx] = new RackQuorumCoverageSet(); + quorums[idx] = new RackQuorumCoverageSet(this.minNumRacksPerWriteQuorum); } } quorums[idx].addBookie(node); diff --git a/bookkeeper-server/src/main/java/org/apache/bookkeeper/conf/AbstractConfiguration.java b/bookkeeper-server/src/main/java/org/apache/bookkeeper/conf/AbstractConfiguration.java index 4805c34da..9d6cb50e7 100644 --- a/bookkeeper-server/src/main/java/org/apache/bookkeeper/conf/AbstractConfiguration.java +++ b/bookkeeper-server/src/main/java/org/apache/bookkeeper/conf/AbstractConfiguration.java @@ -141,6 +141,9 @@ // Validate bookie process user public static final String PERMITTED_STARTUP_USERS = "permittedStartupUsers"; + // minimum number of racks per write quorum + public static final String MIN_NUM_RACKS_PER_WRITE_QUORUM = "minNumRacksPerWriteQuorum"; + protected AbstractConfiguration() { super(); if (READ_SYSTEM_PROPERTIES) { @@ -779,6 +782,19 @@ public String getTLSEnabledProtocols() { return getString(TLS_ENABLED_PROTOCOLS, null); } + /** + * Set the minimum number of racks per write quorum. + */ + public void setMinNumRacksPerWriteQuorum(int minNumRacksPerWriteQuorum) { + setProperty(MIN_NUM_RACKS_PER_WRITE_QUORUM, minNumRacksPerWriteQuorum); + } + + /** + * Get the minimum number of racks per write quorum. + */ + public int getMinNumRacksPerWriteQuorum() { + return getInteger(MIN_NUM_RACKS_PER_WRITE_QUORUM, 2); + } /** * Trickery to allow inheritance with fluent style. diff --git a/bookkeeper-server/src/main/java/org/apache/bookkeeper/net/NetworkTopologyImpl.java b/bookkeeper-server/src/main/java/org/apache/bookkeeper/net/NetworkTopologyImpl.java index f004879f0..d6756f8cc 100644 --- a/bookkeeper-server/src/main/java/org/apache/bookkeeper/net/NetworkTopologyImpl.java +++ b/bookkeeper-server/src/main/java/org/apache/bookkeeper/net/NetworkTopologyImpl.java @@ -18,6 +18,7 @@ package org.apache.bookkeeper.net; import java.util.ArrayList; +import java.util.Arrays; import java.util.Collection; import java.util.HashSet; import java.util.List; @@ -43,6 +44,7 @@ public static final int DEFAULT_HOST_LEVEL = 2; public static final Logger LOG = LoggerFactory.getLogger(NetworkTopologyImpl.class); + public static final String NODE_SEPARATOR = ","; /** * A marker for an InvalidTopology Exception. @@ -772,7 +774,11 @@ private Node chooseRandom(String scope, String excludedScope) { try { if (scope.startsWith("~")) { Set<Node> allNodes = doGetLeaves(NodeBase.ROOT); - Set<Node> excludeNodes = doGetLeaves(scope.substring(1)); + String[] excludeScopes = scope.substring(1).split(NODE_SEPARATOR); + Set<Node> excludeNodes = new HashSet<Node>(); + Arrays.stream(excludeScopes).forEach((excludeScope) -> { + excludeNodes.addAll(doGetLeaves(excludeScope)); + }); allNodes.removeAll(excludeNodes); return allNodes; } else { diff --git a/bookkeeper-server/src/test/java/org/apache/bookkeeper/client/TestRackawareEnsemblePlacementPolicy.java b/bookkeeper-server/src/test/java/org/apache/bookkeeper/client/TestRackawareEnsemblePlacementPolicy.java index 1d32c1376..ad9c9c88f 100644 --- a/bookkeeper-server/src/test/java/org/apache/bookkeeper/client/TestRackawareEnsemblePlacementPolicy.java +++ b/bookkeeper-server/src/test/java/org/apache/bookkeeper/client/TestRackawareEnsemblePlacementPolicy.java @@ -498,9 +498,9 @@ public void testNewEnsembleWithSingleRack() throws Exception { repp.onClusterChanged(addrs, new HashSet<BookieSocketAddress>()); try { ArrayList<BookieSocketAddress> ensemble = repp.newEnsemble(3, 2, 2, null, new HashSet<>()); - assertEquals(0, getNumCoveredWriteQuorums(ensemble, 2)); + assertEquals(0, getNumCoveredWriteQuorums(ensemble, 2, conf.getMinNumRacksPerWriteQuorum())); ArrayList<BookieSocketAddress> ensemble2 = repp.newEnsemble(4, 2, 2, null, new HashSet<>()); - assertEquals(0, getNumCoveredWriteQuorums(ensemble2, 2)); + assertEquals(0, getNumCoveredWriteQuorums(ensemble2, 2, conf.getMinNumRacksPerWriteQuorum())); } catch (BKNotEnoughBookiesException bnebe) { fail("Should not get not enough bookies exception even there is only one rack."); } @@ -525,17 +525,93 @@ public void testNewEnsembleWithMultipleRacks() throws Exception { addrs.add(addr4); repp.onClusterChanged(addrs, new HashSet<BookieSocketAddress>()); try { - ArrayList<BookieSocketAddress> ensemble = repp.newEnsemble(3, 2, 2, null, new HashSet<>()); - int numCovered = getNumCoveredWriteQuorums(ensemble, 2); + int ensembleSize = 3; + int writeQuorumSize = 2; + int acqQuorumSize = 2; + ArrayList<BookieSocketAddress> ensemble = repp.newEnsemble(ensembleSize, writeQuorumSize, acqQuorumSize, + null, new HashSet<>()); + int numCovered = getNumCoveredWriteQuorums(ensemble, 2, conf.getMinNumRacksPerWriteQuorum()); assertTrue(numCovered >= 1 && numCovered < 3); - ArrayList<BookieSocketAddress> ensemble2 = repp.newEnsemble(4, 2, 2, null, new HashSet<>()); - numCovered = getNumCoveredWriteQuorums(ensemble2, 2); + ensembleSize = 4; + ArrayList<BookieSocketAddress> ensemble2 = repp.newEnsemble(ensembleSize, writeQuorumSize, acqQuorumSize, + null, new HashSet<>()); + numCovered = getNumCoveredWriteQuorums(ensemble2, 2, conf.getMinNumRacksPerWriteQuorum()); assertTrue(numCovered >= 1 && numCovered < 3); } catch (BKNotEnoughBookiesException bnebe) { fail("Should not get not enough bookies exception even there is only one rack."); } } + @Test + public void testMinNumRacksPerWriteQuorumOfRacks() throws Exception { + int numOfRacksToCreate = 6; + int numOfNodesInEachRack = 5; + + // Update cluster + Set<BookieSocketAddress> addrs = new HashSet<BookieSocketAddress>(); + BookieSocketAddress addr; + for (int i = 0; i < numOfRacksToCreate; i++) { + for (int j = 0; j < numOfNodesInEachRack; j++) { + addr = new BookieSocketAddress("128.0.0." + ((i * numOfNodesInEachRack) + j), 3181); + // update dns mapping + StaticDNSResolver.addNodeToRack(addr.getHostName(), "/default-region/r" + i); + addrs.add(addr); + } + } + + try { + ClientConfiguration newConf = new ClientConfiguration(conf); + // set MinNumRacksPerWriteQuorum to 4 + int minNumRacksPerWriteQuorum = 4; + int ensembleSize = 12; + int writeQuorumSize = 6; + validateNumOfWriteQuorumsCoveredInEnsembleCreation(addrs, minNumRacksPerWriteQuorum, ensembleSize, + writeQuorumSize); + + // set MinNumRacksPerWriteQuorum to 6 + newConf = new ClientConfiguration(conf); + minNumRacksPerWriteQuorum = 6; + ensembleSize = 6; + writeQuorumSize = 6; + validateNumOfWriteQuorumsCoveredInEnsembleCreation(addrs, minNumRacksPerWriteQuorum, ensembleSize, + writeQuorumSize); + + // set MinNumRacksPerWriteQuorum to 6 + newConf = new ClientConfiguration(conf); + minNumRacksPerWriteQuorum = 6; + ensembleSize = 10; + writeQuorumSize = ensembleSize; + validateNumOfWriteQuorumsCoveredInEnsembleCreation(addrs, minNumRacksPerWriteQuorum, ensembleSize, + writeQuorumSize); + + // set MinNumRacksPerWriteQuorum to 5 + newConf = new ClientConfiguration(conf); + minNumRacksPerWriteQuorum = 5; + ensembleSize = 24; + writeQuorumSize = 12; + validateNumOfWriteQuorumsCoveredInEnsembleCreation(addrs, minNumRacksPerWriteQuorum, ensembleSize, + writeQuorumSize); + + } catch (BKNotEnoughBookiesException bnebe) { + fail("Should not get not enough bookies exception even there is only one rack."); + } + } + + void validateNumOfWriteQuorumsCoveredInEnsembleCreation(Set<BookieSocketAddress> addrs, + int minNumRacksPerWriteQuorum, int ensembleSize, int writeQuorumSize) throws Exception { + ClientConfiguration newConf = new ClientConfiguration(conf); + newConf.setMinNumRacksPerWriteQuorum(minNumRacksPerWriteQuorum); + repp = new RackawareEnsemblePlacementPolicy(); + repp.initialize(newConf, Optional.<DNSToSwitchMapping> empty(), timer, DISABLE_ALL, NullStatsLogger.INSTANCE); + repp.withDefaultRack(NetworkTopology.DEFAULT_REGION_AND_RACK); + repp.onClusterChanged(addrs, new HashSet<BookieSocketAddress>()); + + ArrayList<BookieSocketAddress> ensemble = repp.newEnsemble(ensembleSize, writeQuorumSize, writeQuorumSize, null, + new HashSet<>()); + int numCovered = getNumCoveredWriteQuorums(ensemble, writeQuorumSize, minNumRacksPerWriteQuorum); + assertEquals("minimum number of racks covered for writequorum ensemble: " + ensemble, ensembleSize, numCovered); + } + @Test public void testNewEnsembleWithEnoughRacks() throws Exception { BookieSocketAddress addr1 = new BookieSocketAddress("127.0.0.2", 3181); @@ -555,6 +631,7 @@ public void testNewEnsembleWithEnoughRacks() throws Exception { StaticDNSResolver.addNodeToRack(addr6.getHostName(), "/default-region/r2"); StaticDNSResolver.addNodeToRack(addr7.getHostName(), "/default-region/r3"); StaticDNSResolver.addNodeToRack(addr8.getHostName(), "/default-region/r4"); + int availableNumOfRacks = 4; // Update cluster Set<BookieSocketAddress> addrs = new HashSet<BookieSocketAddress>(); addrs.add(addr1); @@ -567,10 +644,17 @@ public void testNewEnsembleWithEnoughRacks() throws Exception { addrs.add(addr8); repp.onClusterChanged(addrs, new HashSet<BookieSocketAddress>()); try { - ArrayList<BookieSocketAddress> ensemble1 = repp.newEnsemble(3, 2, 2, null, new HashSet<>()); - assertEquals(3, getNumCoveredWriteQuorums(ensemble1, 2)); - ArrayList<BookieSocketAddress> ensemble2 = repp.newEnsemble(4, 2, 2, null, new HashSet<>()); - assertEquals(4, getNumCoveredWriteQuorums(ensemble2, 2)); + int ensembleSize = 3; + int writeQuorumSize = 3; + int ackQuorumSize = 2; + ArrayList<BookieSocketAddress> ensemble1 = repp.newEnsemble(ensembleSize, writeQuorumSize, ackQuorumSize, + null, new HashSet<>()); + assertEquals(ensembleSize, getNumCoveredWriteQuorums(ensemble1, 2, conf.getMinNumRacksPerWriteQuorum())); + ensembleSize = 4; + writeQuorumSize = 4; + ArrayList<BookieSocketAddress> ensemble2 = repp.newEnsemble(ensembleSize, writeQuorumSize, 2, null, + new HashSet<>()); + assertEquals(ensembleSize, getNumCoveredWriteQuorums(ensemble2, 2, conf.getMinNumRacksPerWriteQuorum())); } catch (BKNotEnoughBookiesException bnebe) { fail("Should not get not enough bookies exception even there is only one rack."); } @@ -791,13 +875,18 @@ public void testWeightedPlacementAndNewEnsembleWithEnoughBookiesInSameRack() thr Set<BookieSocketAddress> excludeList = new HashSet<BookieSocketAddress>(); ArrayList<BookieSocketAddress> ensemble; + int ensembleSize = 3; + int writeQuorumSize = 2; + int acqQuorumSize = 2; for (int i = 0; i < numTries; i++) { // addr2 is on /r2 and this is the only one on this rack. So the replacement // will come from other racks. However, the weight should be honored in such // selections as well - ensemble = repp.newEnsemble(3, 2, 2, null, excludeList); - assertTrue("Rackaware selection not happening " + getNumCoveredWriteQuorums(ensemble, 2), - getNumCoveredWriteQuorums(ensemble, 2) >= 2); + ensemble = repp.newEnsemble(ensembleSize, writeQuorumSize, acqQuorumSize, null, excludeList); + assertTrue( + "Rackaware selection not happening " + + getNumCoveredWriteQuorums(ensemble, writeQuorumSize, conf.getMinNumRacksPerWriteQuorum()), + getNumCoveredWriteQuorums(ensemble, writeQuorumSize, conf.getMinNumRacksPerWriteQuorum()) >= 2); for (BookieSocketAddress b : ensemble) { selectionCounts.put(b, selectionCounts.get(b) + 1); } @@ -875,8 +964,8 @@ public void testWeightedPlacementAndNewEnsembleWithoutEnoughBookies() throws Exc } } - static int getNumCoveredWriteQuorums(ArrayList<BookieSocketAddress> ensemble, int writeQuorumSize) - throws Exception { + static int getNumCoveredWriteQuorums(ArrayList<BookieSocketAddress> ensemble, int writeQuorumSize, + int minNumRacksPerWriteQuorumConfValue) throws Exception { int ensembleSize = ensemble.size(); int numCoveredWriteQuorums = 0; for (int i = 0; i < ensembleSize; i++) { @@ -886,7 +975,8 @@ static int getNumCoveredWriteQuorums(ArrayList<BookieSocketAddress> ensemble, in BookieSocketAddress addr = ensemble.get(bookieIdx); racks.add(StaticDNSResolver.getRack(addr.getHostName())); } - numCoveredWriteQuorums += (racks.size() > 1 ? 1 : 0); + int numOfRacksToCoverTo = Math.max(Math.min(writeQuorumSize, minNumRacksPerWriteQuorumConfValue), 2); + numCoveredWriteQuorums += (racks.size() >= numOfRacksToCoverTo ? 1 : 0); } return numCoveredWriteQuorums; } diff --git a/bookkeeper-server/src/test/java/org/apache/bookkeeper/client/TestRackawarePolicyNotificationUpdates.java b/bookkeeper-server/src/test/java/org/apache/bookkeeper/client/TestRackawarePolicyNotificationUpdates.java index a6f28bace..b1ecdba4e 100644 --- a/bookkeeper-server/src/test/java/org/apache/bookkeeper/client/TestRackawarePolicyNotificationUpdates.java +++ b/bookkeeper-server/src/test/java/org/apache/bookkeeper/client/TestRackawarePolicyNotificationUpdates.java @@ -95,14 +95,19 @@ public void testNotifyRackChange() throws Exception { StaticDNSResolver.addNodeToRack(addr2.getHostName(), "/default-region/rack-2"); StaticDNSResolver.addNodeToRack(addr3.getHostName(), "/default-region/rack-2"); StaticDNSResolver.addNodeToRack(addr4.getHostName(), "/default-region/rack-2"); + int numOfAvailableRacks = 2; // Update cluster Set<BookieSocketAddress> addrs = Sets.newHashSet(addr1, addr2, addr3, addr4); repp.onClusterChanged(addrs, new HashSet<>()); - ArrayList<BookieSocketAddress> ensemble = repp.newEnsemble(3, 2, 2, Collections.emptyMap(), - Collections.emptySet()); - int numCovered = TestRackawareEnsemblePlacementPolicy.getNumCoveredWriteQuorums(ensemble, 2); + int ensembleSize = 3; + int writeQuorumSize = 2; + int acqQuorumSize = 2; + ArrayList<BookieSocketAddress> ensemble = repp.newEnsemble(ensembleSize, writeQuorumSize, acqQuorumSize, + Collections.emptyMap(), Collections.emptySet()); + int numCovered = TestRackawareEnsemblePlacementPolicy.getNumCoveredWriteQuorums(ensemble, writeQuorumSize, + conf.getMinNumRacksPerWriteQuorum()); assertTrue(numCovered >= 1 && numCovered < 3); assertTrue(ensemble.contains(addr1)); @@ -111,9 +116,12 @@ public void testNotifyRackChange() throws Exception { bookieAddressList.add(addr2); rackList.add("/default-region/rack-3"); StaticDNSResolver.changeRack(bookieAddressList, rackList); - - ensemble = repp.newEnsemble(3, 2, 1, Collections.emptyMap(), Collections.emptySet()); - assertEquals(3, TestRackawareEnsemblePlacementPolicy.getNumCoveredWriteQuorums(ensemble, 2)); + numOfAvailableRacks = numOfAvailableRacks + 1; + acqQuorumSize = 1; + ensemble = repp.newEnsemble(ensembleSize, writeQuorumSize, acqQuorumSize, Collections.emptyMap(), + Collections.emptySet()); + assertEquals(3, TestRackawareEnsemblePlacementPolicy.getNumCoveredWriteQuorums(ensemble, writeQuorumSize, + conf.getMinNumRacksPerWriteQuorum())); assertTrue(ensemble.contains(addr1)); assertTrue(ensemble.contains(addr2)); } ---------------------------------------------------------------- This is an automated message from the Apache Git Service. To respond to the message, please log on GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org With regards, Apache Git Services