Repository: bookkeeper Updated Branches: refs/heads/master bf4a4d6a0 -> 4383b0b17
BOOKKEEPER-952: Fix RegionAwareEnsemblePlacementPolicy allocate bookies evenly as much as possible across all regions https://issues.apache.org/jira/browse/BOOKKEEPER-952 RB_ID=880463 Author: Yiming Zang <yz...@twitter.com> Reviewers: si...@apache.org <si...@apache.org> Closes #61 from yzang/apache_master and squashes the following commits: a0a9979 [Yiming Zang] fix Inefficient use of keySet iterator instead of entrySet iterator d882c28 [Yiming Zang] fix RegionAwareEnsemblePlacementPolicy test case fix the algorithm for balanced placement across regions. Project: http://git-wip-us.apache.org/repos/asf/bookkeeper/repo Commit: http://git-wip-us.apache.org/repos/asf/bookkeeper/commit/4383b0b1 Tree: http://git-wip-us.apache.org/repos/asf/bookkeeper/tree/4383b0b1 Diff: http://git-wip-us.apache.org/repos/asf/bookkeeper/diff/4383b0b1 Branch: refs/heads/master Commit: 4383b0b171d5fdfcdfa55c196998bbc6b7fdc260 Parents: bf4a4d6 Author: Yiming Zang <yz...@twitter.com> Authored: Thu Oct 20 10:38:33 2016 -0700 Committer: Sijie Guo <si...@apache.org> Committed: Thu Oct 20 10:38:33 2016 -0700 ---------------------------------------------------------------------- .../RegionAwareEnsemblePlacementPolicy.java | 31 ++++++++++---------- 1 file changed, 15 insertions(+), 16 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/bookkeeper/blob/4383b0b1/bookkeeper-server/src/main/java/org/apache/bookkeeper/client/RegionAwareEnsemblePlacementPolicy.java ---------------------------------------------------------------------- diff --git a/bookkeeper-server/src/main/java/org/apache/bookkeeper/client/RegionAwareEnsemblePlacementPolicy.java b/bookkeeper-server/src/main/java/org/apache/bookkeeper/client/RegionAwareEnsemblePlacementPolicy.java index abdcb61..181feca 100644 --- a/bookkeeper-server/src/main/java/org/apache/bookkeeper/client/RegionAwareEnsemblePlacementPolicy.java +++ b/bookkeeper-server/src/main/java/org/apache/bookkeeper/client/RegionAwareEnsemblePlacementPolicy.java @@ -142,12 +142,12 @@ public class RegionAwareEnsemblePlacementPolicy extends RackawareEnsemblePlaceme } } - for(String region: perRegionPlacement.keySet()) { - Set<BookieSocketAddress> regionSet = perRegionClusterChange.get(region); + for (Map.Entry<String, TopologyAwareEnsemblePlacementPolicy> regionEntry : perRegionPlacement.entrySet()) { + Set<BookieSocketAddress> regionSet = perRegionClusterChange.get(regionEntry.getKey()); if (null == regionSet) { regionSet = new HashSet<BookieSocketAddress>(); } - perRegionPlacement.get(region).handleBookiesThatJoined(regionSet); + regionEntry.getValue().handleBookiesThatJoined(regionSet); } } @@ -289,12 +289,11 @@ public class RegionAwareEnsemblePlacementPolicy extends RackawareEnsemblePlaceme regionsWiseAllocation.put(region, Pair.of(0,0)); } int remainingEnsembleBeforeIteration; + int numRemainingRegions; Set<String> regionsReachedMaxAllocation = new HashSet<String>(); RRTopologyAwareCoverageEnsemble ensemble; - int iteration = 0; do { - LOG.info("RegionAwareEnsemblePlacementPolicy#newEnsemble Iteration {}", iteration++); - int numRemainingRegions = numRegionsAvailable - regionsReachedMaxAllocation.size(); + numRemainingRegions = numRegionsAvailable - regionsReachedMaxAllocation.size(); ensemble = new RRTopologyAwareCoverageEnsemble(ensembleSize, writeQuorumSize, ackQuorumSize, @@ -305,6 +304,7 @@ public class RegionAwareEnsemblePlacementPolicy extends RackawareEnsemblePlaceme effectiveMinRegionsForDurability > 0 ? new HashSet<String>(perRegionPlacement.keySet()) : null, effectiveMinRegionsForDurability); remainingEnsembleBeforeIteration = remainingEnsemble; + int regionsToAllocate = numRemainingRegions; for (Map.Entry<String, Pair<Integer, Integer>> regionEntry: regionsWiseAllocation.entrySet()) { String region = regionEntry.getKey(); final Pair<Integer, Integer> currentAllocation = regionEntry.getValue(); @@ -314,12 +314,11 @@ public class RegionAwareEnsemblePlacementPolicy extends RackawareEnsemblePlaceme LOG.error("Inconsistent State: This should never happen"); throw new BKException.BKNotEnoughBookiesException(); } - - int addToEnsembleSize = Math.min(remainingEnsemble, (remainingEnsembleBeforeIteration + numRemainingRegions - 1) / numRemainingRegions); + // try to place the bookies as balance as possible across all the regions + int addToEnsembleSize = Math.min(remainingEnsemble, remainingEnsemble / regionsToAllocate + (remainingEnsemble % regionsToAllocate == 0 ? 0 : 1)); boolean success = false; - while(addToEnsembleSize > 0) { + while (addToEnsembleSize > 0) { int addToWriteQuorum = Math.max(1, Math.min(remainingWriteQuorum, Math.round(1.0f * writeQuorumSize * addToEnsembleSize / ensembleSize))); - // Temp ensemble will be merged back into the ensemble only if we are able to successfully allocate // the target number of bookies in this region; if we fail because we dont have enough bookies; then we // retry the process with a smaller target @@ -327,14 +326,15 @@ public class RegionAwareEnsemblePlacementPolicy extends RackawareEnsemblePlaceme int newEnsembleSize = currentAllocation.getLeft() + addToEnsembleSize; int newWriteQuorumSize = currentAllocation.getRight() + addToWriteQuorum; try { - policyWithinRegion.newEnsemble(newEnsembleSize, newWriteQuorumSize, newWriteQuorumSize, excludeBookies, tempEnsemble, tempEnsemble); + List<BookieSocketAddress> allocated = policyWithinRegion.newEnsemble(newEnsembleSize, newWriteQuorumSize, newWriteQuorumSize, excludeBookies, tempEnsemble, tempEnsemble); ensemble = tempEnsemble; remainingEnsemble -= addToEnsembleSize; - remainingWriteQuorum -= writeQuorumSize; + remainingWriteQuorum -= addToWriteQuorum; regionsWiseAllocation.put(region, Pair.of(newEnsembleSize, newWriteQuorumSize)); success = true; - LOG.info("Allocated {} bookies in region {} : {}", - new Object[]{newEnsembleSize, region, ensemble}); + regionsToAllocate--; + LOG.info("Region {} allocating bookies with ensemble size {} and write quorum size {} : {}", + new Object[]{region, newEnsembleSize, newWriteQuorumSize, allocated}); break; } catch (BKException.BKNotEnoughBookiesException exc) { LOG.warn("Could not allocate {} bookies in region {}, try allocating {} bookies", @@ -384,7 +384,7 @@ public class RegionAwareEnsemblePlacementPolicy extends RackawareEnsemblePlaceme ensembleSize, bookieList); throw new BKException.BKNotEnoughBookiesException(); } - + LOG.info("Bookies allocated successfully {}", ensemble); return ensemble.toList(); } finally { rwLock.readLock().unlock(); @@ -597,6 +597,5 @@ public class RegionAwareEnsemblePlacementPolicy extends RackawareEnsemblePlaceme } } return finalList; - } }