sijie closed pull request #1397: Bookies should be from different racks in a 
Writequorum.
URL: https://github.com/apache/bookkeeper/pull/1397
 
 
   

This is a PR merged from a forked repository.
As GitHub hides the original diff on merge, it is displayed below for
the sake of provenance:

As this is a foreign pull request (from a fork), the diff is supplied
below (as it won't show otherwise due to GitHub magic):

diff --git 
a/bookkeeper-server/src/main/java/org/apache/bookkeeper/client/RackawareEnsemblePlacementPolicy.java
 
b/bookkeeper-server/src/main/java/org/apache/bookkeeper/client/RackawareEnsemblePlacementPolicy.java
index def9554eb..32f94f34d 100644
--- 
a/bookkeeper-server/src/main/java/org/apache/bookkeeper/client/RackawareEnsemblePlacementPolicy.java
+++ 
b/bookkeeper-server/src/main/java/org/apache/bookkeeper/client/RackawareEnsemblePlacementPolicy.java
@@ -52,15 +52,17 @@ protected RackawareEnsemblePlacementPolicy 
initialize(DNSToSwitchMapping dnsReso
                                                           int 
stabilizePeriodSeconds,
                                                           boolean isWeighted,
                                                           int 
maxWeightMultiple,
+                                                          int 
minNumRacksPerWriteQuorum,
                                                           StatsLogger 
statsLogger) {
         if (stabilizePeriodSeconds > 0) {
-            super.initialize(dnsResolver, timer, reorderReadsRandom, 0, 
isWeighted, maxWeightMultiple, statsLogger);
+            super.initialize(dnsResolver, timer, reorderReadsRandom, 0, 
isWeighted, maxWeightMultiple,
+                    minNumRacksPerWriteQuorum, statsLogger);
             slave = new 
RackawareEnsemblePlacementPolicyImpl(enforceDurability);
             slave.initialize(dnsResolver, timer, reorderReadsRandom, 
stabilizePeriodSeconds, isWeighted,
-                    maxWeightMultiple, statsLogger);
+                    maxWeightMultiple, minNumRacksPerWriteQuorum, statsLogger);
         } else {
             super.initialize(dnsResolver, timer, reorderReadsRandom, 
stabilizePeriodSeconds, isWeighted,
-                    maxWeightMultiple, statsLogger);
+                    maxWeightMultiple, minNumRacksPerWriteQuorum, statsLogger);
             slave = null;
         }
         return this;
diff --git 
a/bookkeeper-server/src/main/java/org/apache/bookkeeper/client/RackawareEnsemblePlacementPolicyImpl.java
 
b/bookkeeper-server/src/main/java/org/apache/bookkeeper/client/RackawareEnsemblePlacementPolicyImpl.java
index 568debf02..92f87013f 100644
--- 
a/bookkeeper-server/src/main/java/org/apache/bookkeeper/client/RackawareEnsemblePlacementPolicyImpl.java
+++ 
b/bookkeeper-server/src/main/java/org/apache/bookkeeper/client/RackawareEnsemblePlacementPolicyImpl.java
@@ -79,6 +79,7 @@
     int maxWeightMultiple;
     private Map<BookieNode, WeightedObject> bookieInfoMap = new 
HashMap<BookieNode, WeightedObject>();
     private WeightedRandomSelection<BookieNode> weightedSelection;
+    private int minNumRacksPerWriteQuorum;
 
     public static final String REPP_DNS_RESOLVER_CLASS = 
"reppDnsResolverClass";
     public static final String REPP_RANDOM_READ_REORDERING = 
"ensembleRandomReadReordering";
@@ -233,6 +234,7 @@ protected RackawareEnsemblePlacementPolicyImpl 
initialize(DNSToSwitchMapping dns
                                                               int 
stabilizePeriodSeconds,
                                                               boolean 
isWeighted,
                                                               int 
maxWeightMultiple,
+                                                              int 
minNumRacksPerWriteQuorum,
                                                               StatsLogger 
statsLogger) {
         checkNotNull(statsLogger, "statsLogger should not be null, use 
NullStatsLogger instead.");
         this.statsLogger = statsLogger;
@@ -242,6 +244,7 @@ protected RackawareEnsemblePlacementPolicyImpl 
initialize(DNSToSwitchMapping dns
         this.stabilizePeriodSeconds = stabilizePeriodSeconds;
         this.dnsResolver = new DNSResolverDecorator(dnsResolver, () -> 
this.getDefaultRack());
         this.timer = timer;
+        this.minNumRacksPerWriteQuorum = minNumRacksPerWriteQuorum;
 
         // create the network topology
         if (stabilizePeriodSeconds > 0) {
@@ -329,6 +332,7 @@ public Long load(BookieSocketAddress key) throws Exception {
                 conf.getNetworkTopologyStabilizePeriodSeconds(),
                 conf.getDiskWeightBasedPlacementEnabled(),
                 conf.getBookieMaxWeightMultipleForWeightBasedPlacement(),
+                conf.getMinNumRacksPerWriteQuorum(),
                 statsLogger);
     }
 
@@ -512,6 +516,7 @@ public void 
handleBookiesThatJoined(Set<BookieSocketAddress> joinedBookies) {
         rwLock.readLock().lock();
         try {
             Set<Node> excludeNodes = convertBookiesToNodes(excludeBookies);
+            int minNumRacksPerWriteQuorumForThisEnsemble = 
Math.min(writeQuorumSize, minNumRacksPerWriteQuorum);
             RRTopologyAwareCoverageEnsemble ensemble =
                     new RRTopologyAwareCoverageEnsemble(
                             ensembleSize,
@@ -519,7 +524,8 @@ public void 
handleBookiesThatJoined(Set<BookieSocketAddress> joinedBookies) {
                             ackQuorumSize,
                             RACKNAME_DISTANCE_FROM_LEAVES,
                             parentEnsemble,
-                            parentPredicate);
+                            parentPredicate,
+                            minNumRacksPerWriteQuorumForThisEnsemble);
             BookieNode prevNode = null;
             int numRacks = topology.getNumOfRacks();
             // only one rack, use the random algorithm.
@@ -532,7 +538,9 @@ public void 
handleBookiesThatJoined(Set<BookieSocketAddress> joinedBookies) {
                 }
                 return addrs;
             }
-            // pick nodes by racks, to ensure there is at least two racks per 
write quorum.
+            // pick nodes by racks, to ensure there is at least write quorum 
number of racks.
+            int idx = 0;
+            String[] racks = new String[ensembleSize];
             for (int i = 0; i < ensembleSize; i++) {
                 String curRack;
                 if (null == prevNode) {
@@ -542,9 +550,62 @@ public void 
handleBookiesThatJoined(Set<BookieSocketAddress> joinedBookies) {
                         curRack = localNode.getNetworkLocation();
                     }
                 } else {
-                    curRack = "~" + prevNode.getNetworkLocation();
+                    StringBuilder sb = new StringBuilder();
+                    sb.append("~");
+
+                    if (writeQuorumSize > 1) {
+                        /*
+                         * RackAwareEnsemblePlacementPolicy should try to 
select
+                         * bookies from atleast
+                         * minNumRacksPerWriteQuorumForThisEnsemble number of
+                         * different racks for a write quorum. So in a
+                         * WriteQuorum, bookies should be from
+                         * minNumRacksPerWriteQuorumForThisEnsemble number of
+                         * racks. So we would add racks of
+                         * (minNumRacksPerWriteQuorumForThisEnsemble-1)
+                         * neighbours (both sides) to the exclusion list
+                         * (~curRack).
+                         */
+                        for (int j = 1; j < 
minNumRacksPerWriteQuorumForThisEnsemble; j++) {
+                            int nextIndex = i + j;
+                            if (nextIndex >= ensembleSize) {
+                                nextIndex %= ensembleSize;
+                            }
+                            /*
+                             * if racks[nextIndex] is null, then it means 
bookie
+                             * is not yet selected for ensemble at 'nextIndex'
+                             * index.
+                             */
+                            if (racks[nextIndex] != null) {
+                                if (!((sb.length() == 1) && (sb.charAt(0) == 
'~'))) {
+                                    
sb.append(NetworkTopologyImpl.NODE_SEPARATOR);
+                                }
+                                sb.append(racks[nextIndex]);
+                            }
+                        }
+
+                        for (int j = 1; j < 
minNumRacksPerWriteQuorumForThisEnsemble; j++) {
+                            int nextIndex = i - j;
+                            if (nextIndex < 0) {
+                                nextIndex += ensembleSize;
+                            }
+                            /*
+                             * if racks[nextIndex] is null, then it means 
bookie
+                             * is not yet selected for ensemble at 'nextIndex'
+                             * index.
+                             */
+                            if (racks[nextIndex] != null) {
+                                if (!((sb.length() == 1) && (sb.charAt(0) == 
'~'))) {
+                                    
sb.append(NetworkTopologyImpl.NODE_SEPARATOR);
+                                }
+                                sb.append(racks[nextIndex]);
+                            }
+                        }
+                    }
+                    curRack = sb.toString();
                 }
                 prevNode = selectFromNetworkLocation(curRack, excludeNodes, 
ensemble, ensemble);
+                racks[i] = prevNode.getNetworkLocation();
             }
             ArrayList<BookieSocketAddress> bookieList = ensemble.toList();
             if (ensembleSize != bookieList.size()) {
diff --git 
a/bookkeeper-server/src/main/java/org/apache/bookkeeper/client/RegionAwareEnsemblePlacementPolicy.java
 
b/bookkeeper-server/src/main/java/org/apache/bookkeeper/client/RegionAwareEnsemblePlacementPolicy.java
index 0b804e768..6e2e7429c 100644
--- 
a/bookkeeper-server/src/main/java/org/apache/bookkeeper/client/RegionAwareEnsemblePlacementPolicy.java
+++ 
b/bookkeeper-server/src/main/java/org/apache/bookkeeper/client/RegionAwareEnsemblePlacementPolicy.java
@@ -75,6 +75,7 @@
     protected boolean enableValidation = true;
     protected boolean enforceDurabilityInReplace = false;
     protected Feature disableDurabilityFeature;
+    protected int minNumRacksPerWriteQuorum;
 
     RegionAwareEnsemblePlacementPolicy() {
         super();
@@ -130,7 +131,7 @@ public void 
handleBookiesThatJoined(Set<BookieSocketAddress> joinedBookies) {
             if (null == perRegionPlacement.get(region)) {
                 perRegionPlacement.put(region, new 
RackawareEnsemblePlacementPolicy()
                         .initialize(dnsResolver, timer, 
this.reorderReadsRandom, this.stabilizePeriodSeconds,
-                                this.isWeighted, this.maxWeightMultiple, 
statsLogger)
+                                this.isWeighted, this.maxWeightMultiple, 
this.minNumRacksPerWriteQuorum, statsLogger)
                         
.withDefaultRack(NetworkTopology.DEFAULT_REGION_AND_RACK));
             }
 
@@ -178,7 +179,7 @@ public RegionAwareEnsemblePlacementPolicy 
initialize(ClientConfiguration conf,
             for (String region: regions) {
                 perRegionPlacement.put(region, new 
RackawareEnsemblePlacementPolicy(true)
                         .initialize(dnsResolver, timer, 
this.reorderReadsRandom, this.stabilizePeriodSeconds,
-                                this.isWeighted, this.maxWeightMultiple, 
statsLogger)
+                                this.isWeighted, this.maxWeightMultiple, 
this.minNumRacksPerWriteQuorum, statsLogger)
                         
.withDefaultRack(NetworkTopology.DEFAULT_REGION_AND_RACK));
             }
             minRegionsForDurability = 
conf.getInt(REPP_MINIMUM_REGIONS_FOR_DURABILITY,
@@ -202,6 +203,7 @@ public RegionAwareEnsemblePlacementPolicy 
initialize(ClientConfiguration conf,
                         conf.getString(REPP_DISABLE_DURABILITY_FEATURE_NAME,
                                 
BookKeeperConstants.FEATURE_REPP_DISABLE_DURABILITY_ENFORCEMENT));
         }
+        this.minNumRacksPerWriteQuorum = conf.getMinNumRacksPerWriteQuorum();
         return this;
     }
 
@@ -285,7 +287,7 @@ public RegionAwareEnsemblePlacementPolicy 
initialize(ClientConfiguration conf,
                 RRTopologyAwareCoverageEnsemble ensemble = new 
RRTopologyAwareCoverageEnsemble(ensembleSize,
                         writeQuorumSize, ackQuorumSize, 
REGIONID_DISTANCE_FROM_LEAVES,
                         effectiveMinRegionsForDurability > 0 ? new 
HashSet<>(perRegionPlacement.keySet()) : null,
-                        effectiveMinRegionsForDurability);
+                        effectiveMinRegionsForDurability, 
minNumRacksPerWriteQuorum);
                 TopologyAwareEnsemblePlacementPolicy nextPolicy = 
perRegionPlacement.get(
                         availableRegions.iterator().next());
                 return nextPolicy.newEnsemble(ensembleSize, writeQuorumSize, 
writeQuorumSize, excludeBookies, ensemble,
@@ -316,7 +318,7 @@ public RegionAwareEnsemblePlacementPolicy 
initialize(ClientConfiguration conf,
                         // regardless of regions that are available; 
constraints are
                         // always applied based on all possible regions
                         effectiveMinRegionsForDurability > 0 ? new 
HashSet<>(perRegionPlacement.keySet()) : null,
-                        effectiveMinRegionsForDurability);
+                        effectiveMinRegionsForDurability, 
minNumRacksPerWriteQuorum);
                 remainingEnsembleBeforeIteration = remainingEnsemble;
                 int regionsToAllocate = numRemainingRegions;
                 for (Map.Entry<String, Pair<Integer, Integer>> regionEntry: 
regionsWiseAllocation.entrySet()) {
@@ -426,7 +428,7 @@ public BookieSocketAddress replaceBookie(int ensembleSize, 
int writeQuorumSize,
                 ackQuorumSize,
                 REGIONID_DISTANCE_FROM_LEAVES,
                 effectiveMinRegionsForDurability > 0 ? new 
HashSet<String>(perRegionPlacement.keySet()) : null,
-                effectiveMinRegionsForDurability);
+                effectiveMinRegionsForDurability, minNumRacksPerWriteQuorum);
 
             BookieNode bookieNodeToReplace = knownBookies.get(bookieToReplace);
             if (null == bookieNodeToReplace) {
diff --git 
a/bookkeeper-server/src/main/java/org/apache/bookkeeper/client/TopologyAwareEnsemblePlacementPolicy.java
 
b/bookkeeper-server/src/main/java/org/apache/bookkeeper/client/TopologyAwareEnsemblePlacementPolicy.java
index b70cb0f15..85917b119 100644
--- 
a/bookkeeper-server/src/main/java/org/apache/bookkeeper/client/TopologyAwareEnsemblePlacementPolicy.java
+++ 
b/bookkeeper-server/src/main/java/org/apache/bookkeeper/client/TopologyAwareEnsemblePlacementPolicy.java
@@ -125,6 +125,11 @@ public String toString() {
         protected class RackQuorumCoverageSet implements CoverageSet {
             HashSet<String> racksOrRegionsInQuorum = new HashSet<String>();
             int seenBookies = 0;
+            private final int minNumRacksPerWriteQuorum;
+
+            protected RackQuorumCoverageSet(int minNumRacksPerWriteQuorum) {
+                this.minNumRacksPerWriteQuorum = Math.min(writeQuorumSize, 
minNumRacksPerWriteQuorum);
+            }
 
             @Override
             public boolean apply(BookieNode candidate) {
@@ -134,9 +139,29 @@ public boolean apply(BookieNode candidate) {
                     return true;
                 }
 
-                if (seenBookies + 1 == writeQuorumSize) {
-                    return racksOrRegionsInQuorum.size()
-                        > 
(racksOrRegionsInQuorum.contains(candidate.getNetworkLocation(distanceFromLeaves))
 ? 1 : 0);
+                /*
+                 * allow the initial writeQuorumSize-minRacksToWriteTo+1 
bookies
+                 * to be placed on any rack(including on a single rack). But
+                 * after that make sure that with each new bookie chosen, we
+                 * will be able to satisfy the minRackToWriteTo condition
+                 * eventually
+                 */
+                if (seenBookies + minNumRacksPerWriteQuorum - 1 >= 
writeQuorumSize) {
+                    int numRacks = racksOrRegionsInQuorum.size();
+                    if 
(!racksOrRegionsInQuorum.contains(candidate.getNetworkLocation(distanceFromLeaves)))
 {
+                        numRacks++;
+                    }
+                    if (numRacks >= minNumRacksPerWriteQuorum
+                            || ((writeQuorumSize - seenBookies - 1) >= 
(minNumRacksPerWriteQuorum - numRacks))) {
+                        /*
+                         * either we have reached our goal or we still have a
+                         * few bookies to be selected with which to catch up to
+                         * the goal
+                         */
+                        return true;
+                    } else {
+                        return false;
+                    }
                 }
                 return true;
             }
@@ -149,7 +174,7 @@ public void addBookie(BookieNode candidate) {
 
             @Override
             public RackQuorumCoverageSet duplicate() {
-                RackQuorumCoverageSet ret = new RackQuorumCoverageSet();
+                RackQuorumCoverageSet ret = new 
RackQuorumCoverageSet(this.minNumRacksPerWriteQuorum);
                 ret.racksOrRegionsInQuorum = 
Sets.newHashSet(this.racksOrRegionsInQuorum);
                 ret.seenBookies = this.seenBookies;
                 return ret;
@@ -275,6 +300,7 @@ public void addBookie(BookieNode candidate) {
         final int writeQuorumSize;
         final int ackQuorumSize;
         final int minRacksOrRegionsForDurability;
+        final int minNumRacksPerWriteQuorum;
         final ArrayList<BookieNode> chosenNodes;
         final Set<String> racksOrRegions;
         private final CoverageSet[] quorums;
@@ -303,6 +329,7 @@ protected 
RRTopologyAwareCoverageEnsemble(RRTopologyAwareCoverageEnsemble that)
                 this.racksOrRegions = null;
             }
             this.minRacksOrRegionsForDurability = 
that.minRacksOrRegionsForDurability;
+            this.minNumRacksPerWriteQuorum = that.minNumRacksPerWriteQuorum;
         }
 
         protected RRTopologyAwareCoverageEnsemble(int ensembleSize,
@@ -310,9 +337,10 @@ protected RRTopologyAwareCoverageEnsemble(int ensembleSize,
                                                   int ackQuorumSize,
                                                   int distanceFromLeaves,
                                                   Set<String> racksOrRegions,
-                                                  int 
minRacksOrRegionsForDurability) {
-            this(ensembleSize, writeQuorumSize, ackQuorumSize, 
distanceFromLeaves, null, null,
-                 racksOrRegions, minRacksOrRegionsForDurability);
+                                                  int 
minRacksOrRegionsForDurability,
+                                                  int 
minNumRacksPerWriteQuorum) {
+            this(ensembleSize, writeQuorumSize, ackQuorumSize, 
distanceFromLeaves, null, null, racksOrRegions,
+                    minRacksOrRegionsForDurability, minNumRacksPerWriteQuorum);
         }
 
         protected RRTopologyAwareCoverageEnsemble(int ensembleSize,
@@ -320,9 +348,10 @@ protected RRTopologyAwareCoverageEnsemble(int ensembleSize,
                                                   int ackQuorumSize,
                                                   int distanceFromLeaves,
                                                   Ensemble<BookieNode> 
parentEnsemble,
-                                                  Predicate<BookieNode> 
parentPredicate) {
+                                                  Predicate<BookieNode> 
parentPredicate,
+                                                  int 
minNumRacksPerWriteQuorum) {
             this(ensembleSize, writeQuorumSize, ackQuorumSize, 
distanceFromLeaves, parentEnsemble, parentPredicate,
-                 null, 0);
+                 null, 0, minNumRacksPerWriteQuorum);
         }
 
         protected RRTopologyAwareCoverageEnsemble(int ensembleSize,
@@ -332,7 +361,8 @@ protected RRTopologyAwareCoverageEnsemble(int ensembleSize,
                                                   Ensemble<BookieNode> 
parentEnsemble,
                                                   Predicate<BookieNode> 
parentPredicate,
                                                   Set<String> racksOrRegions,
-                                                  int 
minRacksOrRegionsForDurability) {
+                                                  int 
minRacksOrRegionsForDurability,
+                                                  int 
minNumRacksPerWriteQuorum) {
             this.ensembleSize = ensembleSize;
             this.writeQuorumSize = writeQuorumSize;
             this.ackQuorumSize = ackQuorumSize;
@@ -347,6 +377,7 @@ protected RRTopologyAwareCoverageEnsemble(int ensembleSize,
             this.parentPredicate = parentPredicate;
             this.racksOrRegions = racksOrRegions;
             this.minRacksOrRegionsForDurability = 
minRacksOrRegionsForDurability;
+            this.minNumRacksPerWriteQuorum = minNumRacksPerWriteQuorum;
         }
 
         @Override
@@ -377,7 +408,7 @@ public boolean apply(BookieNode candidate, 
Ensemble<BookieNode> ensemble) {
                         if (minRacksOrRegionsForDurability > 0) {
                             quorums[idx] = new 
RackOrRegionDurabilityCoverageSet();
                         } else {
-                            quorums[idx] = new RackQuorumCoverageSet();
+                            quorums[idx] = new 
RackQuorumCoverageSet(this.minNumRacksPerWriteQuorum);
                         }
                     }
                     if (!quorums[idx].apply(candidate)) {
@@ -410,7 +441,7 @@ public boolean addNode(BookieNode node) {
                         if (minRacksOrRegionsForDurability > 0) {
                             quorums[idx] = new 
RackOrRegionDurabilityCoverageSet();
                         } else {
-                            quorums[idx] = new RackQuorumCoverageSet();
+                            quorums[idx] = new 
RackQuorumCoverageSet(this.minNumRacksPerWriteQuorum);
                         }
                     }
                     quorums[idx].addBookie(node);
diff --git 
a/bookkeeper-server/src/main/java/org/apache/bookkeeper/conf/AbstractConfiguration.java
 
b/bookkeeper-server/src/main/java/org/apache/bookkeeper/conf/AbstractConfiguration.java
index 4805c34da..9d6cb50e7 100644
--- 
a/bookkeeper-server/src/main/java/org/apache/bookkeeper/conf/AbstractConfiguration.java
+++ 
b/bookkeeper-server/src/main/java/org/apache/bookkeeper/conf/AbstractConfiguration.java
@@ -141,6 +141,9 @@
     // Validate bookie process user
     public static final String PERMITTED_STARTUP_USERS = 
"permittedStartupUsers";
 
+    // minimum number of racks per write quorum
+    public static final String MIN_NUM_RACKS_PER_WRITE_QUORUM = 
"minNumRacksPerWriteQuorum";
+
     protected AbstractConfiguration() {
         super();
         if (READ_SYSTEM_PROPERTIES) {
@@ -779,6 +782,19 @@ public String getTLSEnabledProtocols() {
         return getString(TLS_ENABLED_PROTOCOLS, null);
     }
 
+    /**
+     * Set the minimum number of racks per write quorum.
+     */
+    public void setMinNumRacksPerWriteQuorum(int minNumRacksPerWriteQuorum) {
+        setProperty(MIN_NUM_RACKS_PER_WRITE_QUORUM, minNumRacksPerWriteQuorum);
+    }
+
+    /**
+     * Get the minimum number of racks per write quorum.
+     */
+    public int getMinNumRacksPerWriteQuorum() {
+        return getInteger(MIN_NUM_RACKS_PER_WRITE_QUORUM, 2);
+    }
 
     /**
      * Trickery to allow inheritance with fluent style.
diff --git 
a/bookkeeper-server/src/main/java/org/apache/bookkeeper/net/NetworkTopologyImpl.java
 
b/bookkeeper-server/src/main/java/org/apache/bookkeeper/net/NetworkTopologyImpl.java
index f004879f0..d6756f8cc 100644
--- 
a/bookkeeper-server/src/main/java/org/apache/bookkeeper/net/NetworkTopologyImpl.java
+++ 
b/bookkeeper-server/src/main/java/org/apache/bookkeeper/net/NetworkTopologyImpl.java
@@ -18,6 +18,7 @@
 package org.apache.bookkeeper.net;
 
 import java.util.ArrayList;
+import java.util.Arrays;
 import java.util.Collection;
 import java.util.HashSet;
 import java.util.List;
@@ -43,6 +44,7 @@
 
     public static final int DEFAULT_HOST_LEVEL = 2;
     public static final Logger LOG = 
LoggerFactory.getLogger(NetworkTopologyImpl.class);
+    public static final String NODE_SEPARATOR = ",";
 
     /**
      * A marker for an InvalidTopology Exception.
@@ -772,7 +774,11 @@ private Node chooseRandom(String scope, String 
excludedScope) {
         try {
             if (scope.startsWith("~")) {
                 Set<Node> allNodes = doGetLeaves(NodeBase.ROOT);
-                Set<Node> excludeNodes = doGetLeaves(scope.substring(1));
+                String[] excludeScopes = 
scope.substring(1).split(NODE_SEPARATOR);
+                Set<Node> excludeNodes = new HashSet<Node>();
+                Arrays.stream(excludeScopes).forEach((excludeScope) -> {
+                    excludeNodes.addAll(doGetLeaves(excludeScope));
+                });
                 allNodes.removeAll(excludeNodes);
                 return allNodes;
             } else {
diff --git 
a/bookkeeper-server/src/test/java/org/apache/bookkeeper/client/TestRackawareEnsemblePlacementPolicy.java
 
b/bookkeeper-server/src/test/java/org/apache/bookkeeper/client/TestRackawareEnsemblePlacementPolicy.java
index 1d32c1376..ad9c9c88f 100644
--- 
a/bookkeeper-server/src/test/java/org/apache/bookkeeper/client/TestRackawareEnsemblePlacementPolicy.java
+++ 
b/bookkeeper-server/src/test/java/org/apache/bookkeeper/client/TestRackawareEnsemblePlacementPolicy.java
@@ -498,9 +498,9 @@ public void testNewEnsembleWithSingleRack() throws 
Exception {
         repp.onClusterChanged(addrs, new HashSet<BookieSocketAddress>());
         try {
             ArrayList<BookieSocketAddress> ensemble = repp.newEnsemble(3, 2, 
2, null, new HashSet<>());
-            assertEquals(0, getNumCoveredWriteQuorums(ensemble, 2));
+            assertEquals(0, getNumCoveredWriteQuorums(ensemble, 2, 
conf.getMinNumRacksPerWriteQuorum()));
             ArrayList<BookieSocketAddress> ensemble2 = repp.newEnsemble(4, 2, 
2, null, new HashSet<>());
-            assertEquals(0, getNumCoveredWriteQuorums(ensemble2, 2));
+            assertEquals(0, getNumCoveredWriteQuorums(ensemble2, 2, 
conf.getMinNumRacksPerWriteQuorum()));
         } catch (BKNotEnoughBookiesException bnebe) {
             fail("Should not get not enough bookies exception even there is 
only one rack.");
         }
@@ -525,17 +525,93 @@ public void testNewEnsembleWithMultipleRacks() throws 
Exception {
         addrs.add(addr4);
         repp.onClusterChanged(addrs, new HashSet<BookieSocketAddress>());
         try {
-            ArrayList<BookieSocketAddress> ensemble = repp.newEnsemble(3, 2, 
2, null, new HashSet<>());
-            int numCovered = getNumCoveredWriteQuorums(ensemble, 2);
+            int ensembleSize = 3;
+            int writeQuorumSize = 2;
+            int acqQuorumSize = 2;
+            ArrayList<BookieSocketAddress> ensemble = 
repp.newEnsemble(ensembleSize, writeQuorumSize, acqQuorumSize,
+                    null, new HashSet<>());
+            int numCovered = getNumCoveredWriteQuorums(ensemble, 2, 
conf.getMinNumRacksPerWriteQuorum());
             assertTrue(numCovered >= 1 && numCovered < 3);
-            ArrayList<BookieSocketAddress> ensemble2 = repp.newEnsemble(4, 2, 
2, null, new HashSet<>());
-            numCovered = getNumCoveredWriteQuorums(ensemble2, 2);
+            ensembleSize = 4;
+            ArrayList<BookieSocketAddress> ensemble2 = 
repp.newEnsemble(ensembleSize, writeQuorumSize, acqQuorumSize,
+                    null, new HashSet<>());
+            numCovered = getNumCoveredWriteQuorums(ensemble2, 2, 
conf.getMinNumRacksPerWriteQuorum());
             assertTrue(numCovered >= 1 && numCovered < 3);
         } catch (BKNotEnoughBookiesException bnebe) {
             fail("Should not get not enough bookies exception even there is 
only one rack.");
         }
     }
 
+    @Test
+    public void testMinNumRacksPerWriteQuorumOfRacks() throws Exception {
+        int numOfRacksToCreate = 6;
+        int numOfNodesInEachRack = 5;
+
+        // Update cluster
+        Set<BookieSocketAddress> addrs = new HashSet<BookieSocketAddress>();
+        BookieSocketAddress addr;
+        for (int i = 0; i < numOfRacksToCreate; i++) {
+            for (int j = 0; j < numOfNodesInEachRack; j++) {
+                addr = new BookieSocketAddress("128.0.0." + ((i * 
numOfNodesInEachRack) + j), 3181);
+                // update dns mapping
+                StaticDNSResolver.addNodeToRack(addr.getHostName(), 
"/default-region/r" + i);
+                addrs.add(addr);
+            }
+        }
+
+        try {
+            ClientConfiguration newConf = new ClientConfiguration(conf);
+            // set MinNumRacksPerWriteQuorum to 4
+            int minNumRacksPerWriteQuorum = 4;
+            int ensembleSize = 12;
+            int writeQuorumSize = 6;
+            validateNumOfWriteQuorumsCoveredInEnsembleCreation(addrs, 
minNumRacksPerWriteQuorum, ensembleSize,
+                    writeQuorumSize);
+
+            // set MinNumRacksPerWriteQuorum to 6
+            newConf = new ClientConfiguration(conf);
+            minNumRacksPerWriteQuorum = 6;
+            ensembleSize = 6;
+            writeQuorumSize = 6;
+            validateNumOfWriteQuorumsCoveredInEnsembleCreation(addrs, 
minNumRacksPerWriteQuorum, ensembleSize,
+                    writeQuorumSize);
+
+            // set MinNumRacksPerWriteQuorum to 6
+            newConf = new ClientConfiguration(conf);
+            minNumRacksPerWriteQuorum = 6;
+            ensembleSize = 10;
+            writeQuorumSize = ensembleSize;
+            validateNumOfWriteQuorumsCoveredInEnsembleCreation(addrs, 
minNumRacksPerWriteQuorum, ensembleSize,
+                    writeQuorumSize);
+
+            // set MinNumRacksPerWriteQuorum to 5
+            newConf = new ClientConfiguration(conf);
+            minNumRacksPerWriteQuorum = 5;
+            ensembleSize = 24;
+            writeQuorumSize = 12;
+            validateNumOfWriteQuorumsCoveredInEnsembleCreation(addrs, 
minNumRacksPerWriteQuorum, ensembleSize,
+                    writeQuorumSize);
+
+        } catch (BKNotEnoughBookiesException bnebe) {
+            fail("Should not get not enough bookies exception even there is 
only one rack.");
+        }
+    }
+
+    void 
validateNumOfWriteQuorumsCoveredInEnsembleCreation(Set<BookieSocketAddress> 
addrs,
+            int minNumRacksPerWriteQuorum, int ensembleSize, int 
writeQuorumSize) throws Exception {
+        ClientConfiguration newConf = new ClientConfiguration(conf);
+        newConf.setMinNumRacksPerWriteQuorum(minNumRacksPerWriteQuorum);
+        repp = new RackawareEnsemblePlacementPolicy();
+        repp.initialize(newConf, Optional.<DNSToSwitchMapping> empty(), timer, 
DISABLE_ALL, NullStatsLogger.INSTANCE);
+        repp.withDefaultRack(NetworkTopology.DEFAULT_REGION_AND_RACK);
+        repp.onClusterChanged(addrs, new HashSet<BookieSocketAddress>());
+
+        ArrayList<BookieSocketAddress> ensemble = 
repp.newEnsemble(ensembleSize, writeQuorumSize, writeQuorumSize, null,
+                new HashSet<>());
+        int numCovered = getNumCoveredWriteQuorums(ensemble, writeQuorumSize, 
minNumRacksPerWriteQuorum);
+        assertEquals("minimum number of racks covered for writequorum 
ensemble: " + ensemble, ensembleSize, numCovered);
+    }
+
     @Test
     public void testNewEnsembleWithEnoughRacks() throws Exception {
         BookieSocketAddress addr1 = new BookieSocketAddress("127.0.0.2", 3181);
@@ -555,6 +631,7 @@ public void testNewEnsembleWithEnoughRacks() throws 
Exception {
         StaticDNSResolver.addNodeToRack(addr6.getHostName(), 
"/default-region/r2");
         StaticDNSResolver.addNodeToRack(addr7.getHostName(), 
"/default-region/r3");
         StaticDNSResolver.addNodeToRack(addr8.getHostName(), 
"/default-region/r4");
+        int availableNumOfRacks = 4;
         // Update cluster
         Set<BookieSocketAddress> addrs = new HashSet<BookieSocketAddress>();
         addrs.add(addr1);
@@ -567,10 +644,17 @@ public void testNewEnsembleWithEnoughRacks() throws 
Exception {
         addrs.add(addr8);
         repp.onClusterChanged(addrs, new HashSet<BookieSocketAddress>());
         try {
-            ArrayList<BookieSocketAddress> ensemble1 = repp.newEnsemble(3, 2, 
2, null, new HashSet<>());
-            assertEquals(3, getNumCoveredWriteQuorums(ensemble1, 2));
-            ArrayList<BookieSocketAddress> ensemble2 = repp.newEnsemble(4, 2, 
2, null, new HashSet<>());
-            assertEquals(4, getNumCoveredWriteQuorums(ensemble2, 2));
+            int ensembleSize = 3;
+            int writeQuorumSize = 3;
+            int ackQuorumSize = 2;
+            ArrayList<BookieSocketAddress> ensemble1 = 
repp.newEnsemble(ensembleSize, writeQuorumSize, ackQuorumSize,
+                    null, new HashSet<>());
+            assertEquals(ensembleSize, getNumCoveredWriteQuorums(ensemble1, 2, 
conf.getMinNumRacksPerWriteQuorum()));
+            ensembleSize = 4;
+            writeQuorumSize = 4;
+            ArrayList<BookieSocketAddress> ensemble2 = 
repp.newEnsemble(ensembleSize, writeQuorumSize, 2, null,
+                    new HashSet<>());
+            assertEquals(ensembleSize, getNumCoveredWriteQuorums(ensemble2, 2, 
conf.getMinNumRacksPerWriteQuorum()));
         } catch (BKNotEnoughBookiesException bnebe) {
             fail("Should not get not enough bookies exception even there is 
only one rack.");
         }
@@ -791,13 +875,18 @@ public void 
testWeightedPlacementAndNewEnsembleWithEnoughBookiesInSameRack() thr
 
         Set<BookieSocketAddress> excludeList = new 
HashSet<BookieSocketAddress>();
         ArrayList<BookieSocketAddress> ensemble;
+        int ensembleSize = 3;
+        int writeQuorumSize = 2;
+        int acqQuorumSize = 2;
         for (int i = 0; i < numTries; i++) {
             // addr2 is on /r2 and this is the only one on this rack. So the 
replacement
             // will come from other racks. However, the weight should be 
honored in such
             // selections as well
-            ensemble = repp.newEnsemble(3, 2, 2, null, excludeList);
-            assertTrue("Rackaware selection not happening " + 
getNumCoveredWriteQuorums(ensemble, 2),
-                    getNumCoveredWriteQuorums(ensemble, 2) >= 2);
+            ensemble = repp.newEnsemble(ensembleSize, writeQuorumSize, 
acqQuorumSize, null, excludeList);
+            assertTrue(
+                    "Rackaware selection not happening "
+                            + getNumCoveredWriteQuorums(ensemble, 
writeQuorumSize, conf.getMinNumRacksPerWriteQuorum()),
+                    getNumCoveredWriteQuorums(ensemble, writeQuorumSize, 
conf.getMinNumRacksPerWriteQuorum()) >= 2);
             for (BookieSocketAddress b : ensemble) {
                 selectionCounts.put(b, selectionCounts.get(b) + 1);
             }
@@ -875,8 +964,8 @@ public void 
testWeightedPlacementAndNewEnsembleWithoutEnoughBookies() throws Exc
         }
     }
 
-    static int getNumCoveredWriteQuorums(ArrayList<BookieSocketAddress> 
ensemble, int writeQuorumSize)
-            throws Exception {
+    static int getNumCoveredWriteQuorums(ArrayList<BookieSocketAddress> 
ensemble, int writeQuorumSize,
+            int minNumRacksPerWriteQuorumConfValue) throws Exception {
         int ensembleSize = ensemble.size();
         int numCoveredWriteQuorums = 0;
         for (int i = 0; i < ensembleSize; i++) {
@@ -886,7 +975,8 @@ static int 
getNumCoveredWriteQuorums(ArrayList<BookieSocketAddress> ensemble, in
                 BookieSocketAddress addr = ensemble.get(bookieIdx);
                 racks.add(StaticDNSResolver.getRack(addr.getHostName()));
             }
-            numCoveredWriteQuorums += (racks.size() > 1 ? 1 : 0);
+            int numOfRacksToCoverTo = Math.max(Math.min(writeQuorumSize, 
minNumRacksPerWriteQuorumConfValue), 2);
+            numCoveredWriteQuorums += (racks.size() >= numOfRacksToCoverTo ? 1 
: 0);
         }
         return numCoveredWriteQuorums;
     }
diff --git 
a/bookkeeper-server/src/test/java/org/apache/bookkeeper/client/TestRackawarePolicyNotificationUpdates.java
 
b/bookkeeper-server/src/test/java/org/apache/bookkeeper/client/TestRackawarePolicyNotificationUpdates.java
index a6f28bace..b1ecdba4e 100644
--- 
a/bookkeeper-server/src/test/java/org/apache/bookkeeper/client/TestRackawarePolicyNotificationUpdates.java
+++ 
b/bookkeeper-server/src/test/java/org/apache/bookkeeper/client/TestRackawarePolicyNotificationUpdates.java
@@ -95,14 +95,19 @@ public void testNotifyRackChange() throws Exception {
         StaticDNSResolver.addNodeToRack(addr2.getHostName(), 
"/default-region/rack-2");
         StaticDNSResolver.addNodeToRack(addr3.getHostName(), 
"/default-region/rack-2");
         StaticDNSResolver.addNodeToRack(addr4.getHostName(), 
"/default-region/rack-2");
+        int numOfAvailableRacks = 2;
 
         // Update cluster
         Set<BookieSocketAddress> addrs = Sets.newHashSet(addr1, addr2, addr3, 
addr4);
         repp.onClusterChanged(addrs, new HashSet<>());
 
-        ArrayList<BookieSocketAddress> ensemble = repp.newEnsemble(3, 2, 2, 
Collections.emptyMap(),
-                Collections.emptySet());
-        int numCovered = 
TestRackawareEnsemblePlacementPolicy.getNumCoveredWriteQuorums(ensemble, 2);
+        int ensembleSize = 3;
+        int writeQuorumSize = 2;
+        int acqQuorumSize = 2;
+        ArrayList<BookieSocketAddress> ensemble = 
repp.newEnsemble(ensembleSize, writeQuorumSize, acqQuorumSize,
+                Collections.emptyMap(), Collections.emptySet());
+        int numCovered = 
TestRackawareEnsemblePlacementPolicy.getNumCoveredWriteQuorums(ensemble, 
writeQuorumSize,
+                conf.getMinNumRacksPerWriteQuorum());
         assertTrue(numCovered >= 1 && numCovered < 3);
         assertTrue(ensemble.contains(addr1));
 
@@ -111,9 +116,12 @@ public void testNotifyRackChange() throws Exception {
         bookieAddressList.add(addr2);
         rackList.add("/default-region/rack-3");
         StaticDNSResolver.changeRack(bookieAddressList, rackList);
-
-        ensemble = repp.newEnsemble(3, 2, 1, Collections.emptyMap(), 
Collections.emptySet());
-        assertEquals(3, 
TestRackawareEnsemblePlacementPolicy.getNumCoveredWriteQuorums(ensemble, 2));
+        numOfAvailableRacks = numOfAvailableRacks + 1;
+        acqQuorumSize = 1;
+        ensemble = repp.newEnsemble(ensembleSize, writeQuorumSize, 
acqQuorumSize, Collections.emptyMap(),
+                Collections.emptySet());
+        assertEquals(3, 
TestRackawareEnsemblePlacementPolicy.getNumCoveredWriteQuorums(ensemble, 
writeQuorumSize,
+                conf.getMinNumRacksPerWriteQuorum()));
         assertTrue(ensemble.contains(addr1));
         assertTrue(ensemble.contains(addr2));
     }


 

----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


With regards,
Apache Git Services

Reply via email to