This is an automated email from the ASF dual-hosted git repository.

sijie pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/bookkeeper.git


The following commit(s) were added to refs/heads/master by this push:
     new 9ba4c4e  Bookies should be from different racks in a Writequorum.
9ba4c4e is described below

commit 9ba4c4e0d8be770e03110a958fb8b75a65ae0f59
Author: cguttapalem <[email protected]>
AuthorDate: Mon May 21 14:53:43 2018 -0700

    Bookies should be from different racks in a Writequorum.
    
    Descriptions of the changes in this PR:
    
    Ideally RackAwareEnsemblePlacementPolicy should try to select
    bookies from different racks for a write quorum. So in a
    WriteQuorum, bookies should be from WriteQuorum number of racks,
    provided there are sufficient number of available racks and
    bookies.
    
    Author: cguttapalem <[email protected]>
    
    Reviewers: Sijie Guo <[email protected]>
    
    This closes #1397 from reddycharan/configracks
---
 .../client/RackawareEnsemblePlacementPolicy.java   |   8 +-
 .../RackawareEnsemblePlacementPolicyImpl.java      |  67 ++++++++++-
 .../client/RegionAwareEnsemblePlacementPolicy.java |  12 +-
 .../TopologyAwareEnsemblePlacementPolicy.java      |  55 ++++++++--
 .../bookkeeper/conf/AbstractConfiguration.java     |  16 +++
 .../apache/bookkeeper/net/NetworkTopologyImpl.java |   8 +-
 .../TestRackawareEnsemblePlacementPolicy.java      | 122 ++++++++++++++++++---
 .../TestRackawarePolicyNotificationUpdates.java    |  20 +++-
 8 files changed, 262 insertions(+), 46 deletions(-)

diff --git 
a/bookkeeper-server/src/main/java/org/apache/bookkeeper/client/RackawareEnsemblePlacementPolicy.java
 
b/bookkeeper-server/src/main/java/org/apache/bookkeeper/client/RackawareEnsemblePlacementPolicy.java
index def9554..32f94f3 100644
--- 
a/bookkeeper-server/src/main/java/org/apache/bookkeeper/client/RackawareEnsemblePlacementPolicy.java
+++ 
b/bookkeeper-server/src/main/java/org/apache/bookkeeper/client/RackawareEnsemblePlacementPolicy.java
@@ -52,15 +52,17 @@ public class RackawareEnsemblePlacementPolicy extends 
RackawareEnsemblePlacement
                                                           int 
stabilizePeriodSeconds,
                                                           boolean isWeighted,
                                                           int 
maxWeightMultiple,
+                                                          int 
minNumRacksPerWriteQuorum,
                                                           StatsLogger 
statsLogger) {
         if (stabilizePeriodSeconds > 0) {
-            super.initialize(dnsResolver, timer, reorderReadsRandom, 0, 
isWeighted, maxWeightMultiple, statsLogger);
+            super.initialize(dnsResolver, timer, reorderReadsRandom, 0, 
isWeighted, maxWeightMultiple,
+                    minNumRacksPerWriteQuorum, statsLogger);
             slave = new 
RackawareEnsemblePlacementPolicyImpl(enforceDurability);
             slave.initialize(dnsResolver, timer, reorderReadsRandom, 
stabilizePeriodSeconds, isWeighted,
-                    maxWeightMultiple, statsLogger);
+                    maxWeightMultiple, minNumRacksPerWriteQuorum, statsLogger);
         } else {
             super.initialize(dnsResolver, timer, reorderReadsRandom, 
stabilizePeriodSeconds, isWeighted,
-                    maxWeightMultiple, statsLogger);
+                    maxWeightMultiple, minNumRacksPerWriteQuorum, statsLogger);
             slave = null;
         }
         return this;
diff --git 
a/bookkeeper-server/src/main/java/org/apache/bookkeeper/client/RackawareEnsemblePlacementPolicyImpl.java
 
b/bookkeeper-server/src/main/java/org/apache/bookkeeper/client/RackawareEnsemblePlacementPolicyImpl.java
index 568debf..92f8701 100644
--- 
a/bookkeeper-server/src/main/java/org/apache/bookkeeper/client/RackawareEnsemblePlacementPolicyImpl.java
+++ 
b/bookkeeper-server/src/main/java/org/apache/bookkeeper/client/RackawareEnsemblePlacementPolicyImpl.java
@@ -79,6 +79,7 @@ public class RackawareEnsemblePlacementPolicyImpl extends 
TopologyAwareEnsembleP
     int maxWeightMultiple;
     private Map<BookieNode, WeightedObject> bookieInfoMap = new 
HashMap<BookieNode, WeightedObject>();
     private WeightedRandomSelection<BookieNode> weightedSelection;
+    private int minNumRacksPerWriteQuorum;
 
     public static final String REPP_DNS_RESOLVER_CLASS = 
"reppDnsResolverClass";
     public static final String REPP_RANDOM_READ_REORDERING = 
"ensembleRandomReadReordering";
@@ -233,6 +234,7 @@ public class RackawareEnsemblePlacementPolicyImpl extends 
TopologyAwareEnsembleP
                                                               int 
stabilizePeriodSeconds,
                                                               boolean 
isWeighted,
                                                               int 
maxWeightMultiple,
+                                                              int 
minNumRacksPerWriteQuorum,
                                                               StatsLogger 
statsLogger) {
         checkNotNull(statsLogger, "statsLogger should not be null, use 
NullStatsLogger instead.");
         this.statsLogger = statsLogger;
@@ -242,6 +244,7 @@ public class RackawareEnsemblePlacementPolicyImpl extends 
TopologyAwareEnsembleP
         this.stabilizePeriodSeconds = stabilizePeriodSeconds;
         this.dnsResolver = new DNSResolverDecorator(dnsResolver, () -> 
this.getDefaultRack());
         this.timer = timer;
+        this.minNumRacksPerWriteQuorum = minNumRacksPerWriteQuorum;
 
         // create the network topology
         if (stabilizePeriodSeconds > 0) {
@@ -329,6 +332,7 @@ public class RackawareEnsemblePlacementPolicyImpl extends 
TopologyAwareEnsembleP
                 conf.getNetworkTopologyStabilizePeriodSeconds(),
                 conf.getDiskWeightBasedPlacementEnabled(),
                 conf.getBookieMaxWeightMultipleForWeightBasedPlacement(),
+                conf.getMinNumRacksPerWriteQuorum(),
                 statsLogger);
     }
 
@@ -512,6 +516,7 @@ public class RackawareEnsemblePlacementPolicyImpl extends 
TopologyAwareEnsembleP
         rwLock.readLock().lock();
         try {
             Set<Node> excludeNodes = convertBookiesToNodes(excludeBookies);
+            int minNumRacksPerWriteQuorumForThisEnsemble = 
Math.min(writeQuorumSize, minNumRacksPerWriteQuorum);
             RRTopologyAwareCoverageEnsemble ensemble =
                     new RRTopologyAwareCoverageEnsemble(
                             ensembleSize,
@@ -519,7 +524,8 @@ public class RackawareEnsemblePlacementPolicyImpl extends 
TopologyAwareEnsembleP
                             ackQuorumSize,
                             RACKNAME_DISTANCE_FROM_LEAVES,
                             parentEnsemble,
-                            parentPredicate);
+                            parentPredicate,
+                            minNumRacksPerWriteQuorumForThisEnsemble);
             BookieNode prevNode = null;
             int numRacks = topology.getNumOfRacks();
             // only one rack, use the random algorithm.
@@ -532,7 +538,9 @@ public class RackawareEnsemblePlacementPolicyImpl extends 
TopologyAwareEnsembleP
                 }
                 return addrs;
             }
-            // pick nodes by racks, to ensure there is at least two racks per 
write quorum.
+            // pick nodes by racks, to ensure there is at least write quorum 
number of racks.
+            int idx = 0;
+            String[] racks = new String[ensembleSize];
             for (int i = 0; i < ensembleSize; i++) {
                 String curRack;
                 if (null == prevNode) {
@@ -542,9 +550,62 @@ public class RackawareEnsemblePlacementPolicyImpl extends 
TopologyAwareEnsembleP
                         curRack = localNode.getNetworkLocation();
                     }
                 } else {
-                    curRack = "~" + prevNode.getNetworkLocation();
+                    StringBuilder sb = new StringBuilder();
+                    sb.append("~");
+
+                    if (writeQuorumSize > 1) {
+                        /*
+                         * RackAwareEnsemblePlacementPolicy should try to 
select
+                         * bookies from atleast
+                         * minNumRacksPerWriteQuorumForThisEnsemble number of
+                         * different racks for a write quorum. So in a
+                         * WriteQuorum, bookies should be from
+                         * minNumRacksPerWriteQuorumForThisEnsemble number of
+                         * racks. So we would add racks of
+                         * (minNumRacksPerWriteQuorumForThisEnsemble-1)
+                         * neighbours (both sides) to the exclusion list
+                         * (~curRack).
+                         */
+                        for (int j = 1; j < 
minNumRacksPerWriteQuorumForThisEnsemble; j++) {
+                            int nextIndex = i + j;
+                            if (nextIndex >= ensembleSize) {
+                                nextIndex %= ensembleSize;
+                            }
+                            /*
+                             * if racks[nextIndex] is null, then it means 
bookie
+                             * is not yet selected for ensemble at 'nextIndex'
+                             * index.
+                             */
+                            if (racks[nextIndex] != null) {
+                                if (!((sb.length() == 1) && (sb.charAt(0) == 
'~'))) {
+                                    
sb.append(NetworkTopologyImpl.NODE_SEPARATOR);
+                                }
+                                sb.append(racks[nextIndex]);
+                            }
+                        }
+
+                        for (int j = 1; j < 
minNumRacksPerWriteQuorumForThisEnsemble; j++) {
+                            int nextIndex = i - j;
+                            if (nextIndex < 0) {
+                                nextIndex += ensembleSize;
+                            }
+                            /*
+                             * if racks[nextIndex] is null, then it means 
bookie
+                             * is not yet selected for ensemble at 'nextIndex'
+                             * index.
+                             */
+                            if (racks[nextIndex] != null) {
+                                if (!((sb.length() == 1) && (sb.charAt(0) == 
'~'))) {
+                                    
sb.append(NetworkTopologyImpl.NODE_SEPARATOR);
+                                }
+                                sb.append(racks[nextIndex]);
+                            }
+                        }
+                    }
+                    curRack = sb.toString();
                 }
                 prevNode = selectFromNetworkLocation(curRack, excludeNodes, 
ensemble, ensemble);
+                racks[i] = prevNode.getNetworkLocation();
             }
             ArrayList<BookieSocketAddress> bookieList = ensemble.toList();
             if (ensembleSize != bookieList.size()) {
diff --git 
a/bookkeeper-server/src/main/java/org/apache/bookkeeper/client/RegionAwareEnsemblePlacementPolicy.java
 
b/bookkeeper-server/src/main/java/org/apache/bookkeeper/client/RegionAwareEnsemblePlacementPolicy.java
index 0b804e7..6e2e742 100644
--- 
a/bookkeeper-server/src/main/java/org/apache/bookkeeper/client/RegionAwareEnsemblePlacementPolicy.java
+++ 
b/bookkeeper-server/src/main/java/org/apache/bookkeeper/client/RegionAwareEnsemblePlacementPolicy.java
@@ -75,6 +75,7 @@ public class RegionAwareEnsemblePlacementPolicy extends 
RackawareEnsemblePlaceme
     protected boolean enableValidation = true;
     protected boolean enforceDurabilityInReplace = false;
     protected Feature disableDurabilityFeature;
+    protected int minNumRacksPerWriteQuorum;
 
     RegionAwareEnsemblePlacementPolicy() {
         super();
@@ -130,7 +131,7 @@ public class RegionAwareEnsemblePlacementPolicy extends 
RackawareEnsemblePlaceme
             if (null == perRegionPlacement.get(region)) {
                 perRegionPlacement.put(region, new 
RackawareEnsemblePlacementPolicy()
                         .initialize(dnsResolver, timer, 
this.reorderReadsRandom, this.stabilizePeriodSeconds,
-                                this.isWeighted, this.maxWeightMultiple, 
statsLogger)
+                                this.isWeighted, this.maxWeightMultiple, 
this.minNumRacksPerWriteQuorum, statsLogger)
                         
.withDefaultRack(NetworkTopology.DEFAULT_REGION_AND_RACK));
             }
 
@@ -178,7 +179,7 @@ public class RegionAwareEnsemblePlacementPolicy extends 
RackawareEnsemblePlaceme
             for (String region: regions) {
                 perRegionPlacement.put(region, new 
RackawareEnsemblePlacementPolicy(true)
                         .initialize(dnsResolver, timer, 
this.reorderReadsRandom, this.stabilizePeriodSeconds,
-                                this.isWeighted, this.maxWeightMultiple, 
statsLogger)
+                                this.isWeighted, this.maxWeightMultiple, 
this.minNumRacksPerWriteQuorum, statsLogger)
                         
.withDefaultRack(NetworkTopology.DEFAULT_REGION_AND_RACK));
             }
             minRegionsForDurability = 
conf.getInt(REPP_MINIMUM_REGIONS_FOR_DURABILITY,
@@ -202,6 +203,7 @@ public class RegionAwareEnsemblePlacementPolicy extends 
RackawareEnsemblePlaceme
                         conf.getString(REPP_DISABLE_DURABILITY_FEATURE_NAME,
                                 
BookKeeperConstants.FEATURE_REPP_DISABLE_DURABILITY_ENFORCEMENT));
         }
+        this.minNumRacksPerWriteQuorum = conf.getMinNumRacksPerWriteQuorum();
         return this;
     }
 
@@ -285,7 +287,7 @@ public class RegionAwareEnsemblePlacementPolicy extends 
RackawareEnsemblePlaceme
                 RRTopologyAwareCoverageEnsemble ensemble = new 
RRTopologyAwareCoverageEnsemble(ensembleSize,
                         writeQuorumSize, ackQuorumSize, 
REGIONID_DISTANCE_FROM_LEAVES,
                         effectiveMinRegionsForDurability > 0 ? new 
HashSet<>(perRegionPlacement.keySet()) : null,
-                        effectiveMinRegionsForDurability);
+                        effectiveMinRegionsForDurability, 
minNumRacksPerWriteQuorum);
                 TopologyAwareEnsemblePlacementPolicy nextPolicy = 
perRegionPlacement.get(
                         availableRegions.iterator().next());
                 return nextPolicy.newEnsemble(ensembleSize, writeQuorumSize, 
writeQuorumSize, excludeBookies, ensemble,
@@ -316,7 +318,7 @@ public class RegionAwareEnsemblePlacementPolicy extends 
RackawareEnsemblePlaceme
                         // regardless of regions that are available; 
constraints are
                         // always applied based on all possible regions
                         effectiveMinRegionsForDurability > 0 ? new 
HashSet<>(perRegionPlacement.keySet()) : null,
-                        effectiveMinRegionsForDurability);
+                        effectiveMinRegionsForDurability, 
minNumRacksPerWriteQuorum);
                 remainingEnsembleBeforeIteration = remainingEnsemble;
                 int regionsToAllocate = numRemainingRegions;
                 for (Map.Entry<String, Pair<Integer, Integer>> regionEntry: 
regionsWiseAllocation.entrySet()) {
@@ -426,7 +428,7 @@ public class RegionAwareEnsemblePlacementPolicy extends 
RackawareEnsemblePlaceme
                 ackQuorumSize,
                 REGIONID_DISTANCE_FROM_LEAVES,
                 effectiveMinRegionsForDurability > 0 ? new 
HashSet<String>(perRegionPlacement.keySet()) : null,
-                effectiveMinRegionsForDurability);
+                effectiveMinRegionsForDurability, minNumRacksPerWriteQuorum);
 
             BookieNode bookieNodeToReplace = knownBookies.get(bookieToReplace);
             if (null == bookieNodeToReplace) {
diff --git 
a/bookkeeper-server/src/main/java/org/apache/bookkeeper/client/TopologyAwareEnsemblePlacementPolicy.java
 
b/bookkeeper-server/src/main/java/org/apache/bookkeeper/client/TopologyAwareEnsemblePlacementPolicy.java
index b70cb0f..85917b1 100644
--- 
a/bookkeeper-server/src/main/java/org/apache/bookkeeper/client/TopologyAwareEnsemblePlacementPolicy.java
+++ 
b/bookkeeper-server/src/main/java/org/apache/bookkeeper/client/TopologyAwareEnsemblePlacementPolicy.java
@@ -125,6 +125,11 @@ abstract class TopologyAwareEnsemblePlacementPolicy 
implements
         protected class RackQuorumCoverageSet implements CoverageSet {
             HashSet<String> racksOrRegionsInQuorum = new HashSet<String>();
             int seenBookies = 0;
+            private final int minNumRacksPerWriteQuorum;
+
+            protected RackQuorumCoverageSet(int minNumRacksPerWriteQuorum) {
+                this.minNumRacksPerWriteQuorum = Math.min(writeQuorumSize, 
minNumRacksPerWriteQuorum);
+            }
 
             @Override
             public boolean apply(BookieNode candidate) {
@@ -134,9 +139,29 @@ abstract class TopologyAwareEnsemblePlacementPolicy 
implements
                     return true;
                 }
 
-                if (seenBookies + 1 == writeQuorumSize) {
-                    return racksOrRegionsInQuorum.size()
-                        > 
(racksOrRegionsInQuorum.contains(candidate.getNetworkLocation(distanceFromLeaves))
 ? 1 : 0);
+                /*
+                 * allow the initial writeQuorumSize-minRacksToWriteTo+1 
bookies
+                 * to be placed on any rack(including on a single rack). But
+                 * after that make sure that with each new bookie chosen, we
+                 * will be able to satisfy the minRackToWriteTo condition
+                 * eventually
+                 */
+                if (seenBookies + minNumRacksPerWriteQuorum - 1 >= 
writeQuorumSize) {
+                    int numRacks = racksOrRegionsInQuorum.size();
+                    if 
(!racksOrRegionsInQuorum.contains(candidate.getNetworkLocation(distanceFromLeaves)))
 {
+                        numRacks++;
+                    }
+                    if (numRacks >= minNumRacksPerWriteQuorum
+                            || ((writeQuorumSize - seenBookies - 1) >= 
(minNumRacksPerWriteQuorum - numRacks))) {
+                        /*
+                         * either we have reached our goal or we still have a
+                         * few bookies to be selected with which to catch up to
+                         * the goal
+                         */
+                        return true;
+                    } else {
+                        return false;
+                    }
                 }
                 return true;
             }
@@ -149,7 +174,7 @@ abstract class TopologyAwareEnsemblePlacementPolicy 
implements
 
             @Override
             public RackQuorumCoverageSet duplicate() {
-                RackQuorumCoverageSet ret = new RackQuorumCoverageSet();
+                RackQuorumCoverageSet ret = new 
RackQuorumCoverageSet(this.minNumRacksPerWriteQuorum);
                 ret.racksOrRegionsInQuorum = 
Sets.newHashSet(this.racksOrRegionsInQuorum);
                 ret.seenBookies = this.seenBookies;
                 return ret;
@@ -275,6 +300,7 @@ abstract class TopologyAwareEnsemblePlacementPolicy 
implements
         final int writeQuorumSize;
         final int ackQuorumSize;
         final int minRacksOrRegionsForDurability;
+        final int minNumRacksPerWriteQuorum;
         final ArrayList<BookieNode> chosenNodes;
         final Set<String> racksOrRegions;
         private final CoverageSet[] quorums;
@@ -303,6 +329,7 @@ abstract class TopologyAwareEnsemblePlacementPolicy 
implements
                 this.racksOrRegions = null;
             }
             this.minRacksOrRegionsForDurability = 
that.minRacksOrRegionsForDurability;
+            this.minNumRacksPerWriteQuorum = that.minNumRacksPerWriteQuorum;
         }
 
         protected RRTopologyAwareCoverageEnsemble(int ensembleSize,
@@ -310,9 +337,10 @@ abstract class TopologyAwareEnsemblePlacementPolicy 
implements
                                                   int ackQuorumSize,
                                                   int distanceFromLeaves,
                                                   Set<String> racksOrRegions,
-                                                  int 
minRacksOrRegionsForDurability) {
-            this(ensembleSize, writeQuorumSize, ackQuorumSize, 
distanceFromLeaves, null, null,
-                 racksOrRegions, minRacksOrRegionsForDurability);
+                                                  int 
minRacksOrRegionsForDurability,
+                                                  int 
minNumRacksPerWriteQuorum) {
+            this(ensembleSize, writeQuorumSize, ackQuorumSize, 
distanceFromLeaves, null, null, racksOrRegions,
+                    minRacksOrRegionsForDurability, minNumRacksPerWriteQuorum);
         }
 
         protected RRTopologyAwareCoverageEnsemble(int ensembleSize,
@@ -320,9 +348,10 @@ abstract class TopologyAwareEnsemblePlacementPolicy 
implements
                                                   int ackQuorumSize,
                                                   int distanceFromLeaves,
                                                   Ensemble<BookieNode> 
parentEnsemble,
-                                                  Predicate<BookieNode> 
parentPredicate) {
+                                                  Predicate<BookieNode> 
parentPredicate,
+                                                  int 
minNumRacksPerWriteQuorum) {
             this(ensembleSize, writeQuorumSize, ackQuorumSize, 
distanceFromLeaves, parentEnsemble, parentPredicate,
-                 null, 0);
+                 null, 0, minNumRacksPerWriteQuorum);
         }
 
         protected RRTopologyAwareCoverageEnsemble(int ensembleSize,
@@ -332,7 +361,8 @@ abstract class TopologyAwareEnsemblePlacementPolicy 
implements
                                                   Ensemble<BookieNode> 
parentEnsemble,
                                                   Predicate<BookieNode> 
parentPredicate,
                                                   Set<String> racksOrRegions,
-                                                  int 
minRacksOrRegionsForDurability) {
+                                                  int 
minRacksOrRegionsForDurability,
+                                                  int 
minNumRacksPerWriteQuorum) {
             this.ensembleSize = ensembleSize;
             this.writeQuorumSize = writeQuorumSize;
             this.ackQuorumSize = ackQuorumSize;
@@ -347,6 +377,7 @@ abstract class TopologyAwareEnsemblePlacementPolicy 
implements
             this.parentPredicate = parentPredicate;
             this.racksOrRegions = racksOrRegions;
             this.minRacksOrRegionsForDurability = 
minRacksOrRegionsForDurability;
+            this.minNumRacksPerWriteQuorum = minNumRacksPerWriteQuorum;
         }
 
         @Override
@@ -377,7 +408,7 @@ abstract class TopologyAwareEnsemblePlacementPolicy 
implements
                         if (minRacksOrRegionsForDurability > 0) {
                             quorums[idx] = new 
RackOrRegionDurabilityCoverageSet();
                         } else {
-                            quorums[idx] = new RackQuorumCoverageSet();
+                            quorums[idx] = new 
RackQuorumCoverageSet(this.minNumRacksPerWriteQuorum);
                         }
                     }
                     if (!quorums[idx].apply(candidate)) {
@@ -410,7 +441,7 @@ abstract class TopologyAwareEnsemblePlacementPolicy 
implements
                         if (minRacksOrRegionsForDurability > 0) {
                             quorums[idx] = new 
RackOrRegionDurabilityCoverageSet();
                         } else {
-                            quorums[idx] = new RackQuorumCoverageSet();
+                            quorums[idx] = new 
RackQuorumCoverageSet(this.minNumRacksPerWriteQuorum);
                         }
                     }
                     quorums[idx].addBookie(node);
diff --git 
a/bookkeeper-server/src/main/java/org/apache/bookkeeper/conf/AbstractConfiguration.java
 
b/bookkeeper-server/src/main/java/org/apache/bookkeeper/conf/AbstractConfiguration.java
index 4805c34..9d6cb50 100644
--- 
a/bookkeeper-server/src/main/java/org/apache/bookkeeper/conf/AbstractConfiguration.java
+++ 
b/bookkeeper-server/src/main/java/org/apache/bookkeeper/conf/AbstractConfiguration.java
@@ -141,6 +141,9 @@ public abstract class AbstractConfiguration<T extends 
AbstractConfiguration>
     // Validate bookie process user
     public static final String PERMITTED_STARTUP_USERS = 
"permittedStartupUsers";
 
+    // minimum number of racks per write quorum
+    public static final String MIN_NUM_RACKS_PER_WRITE_QUORUM = 
"minNumRacksPerWriteQuorum";
+
     protected AbstractConfiguration() {
         super();
         if (READ_SYSTEM_PROPERTIES) {
@@ -779,6 +782,19 @@ public abstract class AbstractConfiguration<T extends 
AbstractConfiguration>
         return getString(TLS_ENABLED_PROTOCOLS, null);
     }
 
+    /**
+     * Set the minimum number of racks per write quorum.
+     */
+    public void setMinNumRacksPerWriteQuorum(int minNumRacksPerWriteQuorum) {
+        setProperty(MIN_NUM_RACKS_PER_WRITE_QUORUM, minNumRacksPerWriteQuorum);
+    }
+
+    /**
+     * Get the minimum number of racks per write quorum.
+     */
+    public int getMinNumRacksPerWriteQuorum() {
+        return getInteger(MIN_NUM_RACKS_PER_WRITE_QUORUM, 2);
+    }
 
     /**
      * Trickery to allow inheritance with fluent style.
diff --git 
a/bookkeeper-server/src/main/java/org/apache/bookkeeper/net/NetworkTopologyImpl.java
 
b/bookkeeper-server/src/main/java/org/apache/bookkeeper/net/NetworkTopologyImpl.java
index f004879..d6756f8 100644
--- 
a/bookkeeper-server/src/main/java/org/apache/bookkeeper/net/NetworkTopologyImpl.java
+++ 
b/bookkeeper-server/src/main/java/org/apache/bookkeeper/net/NetworkTopologyImpl.java
@@ -18,6 +18,7 @@
 package org.apache.bookkeeper.net;
 
 import java.util.ArrayList;
+import java.util.Arrays;
 import java.util.Collection;
 import java.util.HashSet;
 import java.util.List;
@@ -43,6 +44,7 @@ public class NetworkTopologyImpl implements NetworkTopology {
 
     public static final int DEFAULT_HOST_LEVEL = 2;
     public static final Logger LOG = 
LoggerFactory.getLogger(NetworkTopologyImpl.class);
+    public static final String NODE_SEPARATOR = ",";
 
     /**
      * A marker for an InvalidTopology Exception.
@@ -772,7 +774,11 @@ public class NetworkTopologyImpl implements 
NetworkTopology {
         try {
             if (scope.startsWith("~")) {
                 Set<Node> allNodes = doGetLeaves(NodeBase.ROOT);
-                Set<Node> excludeNodes = doGetLeaves(scope.substring(1));
+                String[] excludeScopes = 
scope.substring(1).split(NODE_SEPARATOR);
+                Set<Node> excludeNodes = new HashSet<Node>();
+                Arrays.stream(excludeScopes).forEach((excludeScope) -> {
+                    excludeNodes.addAll(doGetLeaves(excludeScope));
+                });
                 allNodes.removeAll(excludeNodes);
                 return allNodes;
             } else {
diff --git 
a/bookkeeper-server/src/test/java/org/apache/bookkeeper/client/TestRackawareEnsemblePlacementPolicy.java
 
b/bookkeeper-server/src/test/java/org/apache/bookkeeper/client/TestRackawareEnsemblePlacementPolicy.java
index 1d32c13..ad9c9c8 100644
--- 
a/bookkeeper-server/src/test/java/org/apache/bookkeeper/client/TestRackawareEnsemblePlacementPolicy.java
+++ 
b/bookkeeper-server/src/test/java/org/apache/bookkeeper/client/TestRackawareEnsemblePlacementPolicy.java
@@ -498,9 +498,9 @@ public class TestRackawareEnsemblePlacementPolicy extends 
TestCase {
         repp.onClusterChanged(addrs, new HashSet<BookieSocketAddress>());
         try {
             ArrayList<BookieSocketAddress> ensemble = repp.newEnsemble(3, 2, 
2, null, new HashSet<>());
-            assertEquals(0, getNumCoveredWriteQuorums(ensemble, 2));
+            assertEquals(0, getNumCoveredWriteQuorums(ensemble, 2, 
conf.getMinNumRacksPerWriteQuorum()));
             ArrayList<BookieSocketAddress> ensemble2 = repp.newEnsemble(4, 2, 
2, null, new HashSet<>());
-            assertEquals(0, getNumCoveredWriteQuorums(ensemble2, 2));
+            assertEquals(0, getNumCoveredWriteQuorums(ensemble2, 2, 
conf.getMinNumRacksPerWriteQuorum()));
         } catch (BKNotEnoughBookiesException bnebe) {
             fail("Should not get not enough bookies exception even there is 
only one rack.");
         }
@@ -525,11 +525,17 @@ public class TestRackawareEnsemblePlacementPolicy extends 
TestCase {
         addrs.add(addr4);
         repp.onClusterChanged(addrs, new HashSet<BookieSocketAddress>());
         try {
-            ArrayList<BookieSocketAddress> ensemble = repp.newEnsemble(3, 2, 
2, null, new HashSet<>());
-            int numCovered = getNumCoveredWriteQuorums(ensemble, 2);
+            int ensembleSize = 3;
+            int writeQuorumSize = 2;
+            int acqQuorumSize = 2;
+            ArrayList<BookieSocketAddress> ensemble = 
repp.newEnsemble(ensembleSize, writeQuorumSize, acqQuorumSize,
+                    null, new HashSet<>());
+            int numCovered = getNumCoveredWriteQuorums(ensemble, 2, 
conf.getMinNumRacksPerWriteQuorum());
             assertTrue(numCovered >= 1 && numCovered < 3);
-            ArrayList<BookieSocketAddress> ensemble2 = repp.newEnsemble(4, 2, 
2, null, new HashSet<>());
-            numCovered = getNumCoveredWriteQuorums(ensemble2, 2);
+            ensembleSize = 4;
+            ArrayList<BookieSocketAddress> ensemble2 = 
repp.newEnsemble(ensembleSize, writeQuorumSize, acqQuorumSize,
+                    null, new HashSet<>());
+            numCovered = getNumCoveredWriteQuorums(ensemble2, 2, 
conf.getMinNumRacksPerWriteQuorum());
             assertTrue(numCovered >= 1 && numCovered < 3);
         } catch (BKNotEnoughBookiesException bnebe) {
             fail("Should not get not enough bookies exception even there is 
only one rack.");
@@ -537,6 +543,76 @@ public class TestRackawareEnsemblePlacementPolicy extends 
TestCase {
     }
 
     @Test
+    public void testMinNumRacksPerWriteQuorumOfRacks() throws Exception {
+        int numOfRacksToCreate = 6;
+        int numOfNodesInEachRack = 5;
+
+        // Update cluster
+        Set<BookieSocketAddress> addrs = new HashSet<BookieSocketAddress>();
+        BookieSocketAddress addr;
+        for (int i = 0; i < numOfRacksToCreate; i++) {
+            for (int j = 0; j < numOfNodesInEachRack; j++) {
+                addr = new BookieSocketAddress("128.0.0." + ((i * 
numOfNodesInEachRack) + j), 3181);
+                // update dns mapping
+                StaticDNSResolver.addNodeToRack(addr.getHostName(), 
"/default-region/r" + i);
+                addrs.add(addr);
+            }
+        }
+
+        try {
+            ClientConfiguration newConf = new ClientConfiguration(conf);
+            // set MinNumRacksPerWriteQuorum to 4
+            int minNumRacksPerWriteQuorum = 4;
+            int ensembleSize = 12;
+            int writeQuorumSize = 6;
+            validateNumOfWriteQuorumsCoveredInEnsembleCreation(addrs, 
minNumRacksPerWriteQuorum, ensembleSize,
+                    writeQuorumSize);
+
+            // set MinNumRacksPerWriteQuorum to 6
+            newConf = new ClientConfiguration(conf);
+            minNumRacksPerWriteQuorum = 6;
+            ensembleSize = 6;
+            writeQuorumSize = 6;
+            validateNumOfWriteQuorumsCoveredInEnsembleCreation(addrs, 
minNumRacksPerWriteQuorum, ensembleSize,
+                    writeQuorumSize);
+
+            // set MinNumRacksPerWriteQuorum to 6
+            newConf = new ClientConfiguration(conf);
+            minNumRacksPerWriteQuorum = 6;
+            ensembleSize = 10;
+            writeQuorumSize = ensembleSize;
+            validateNumOfWriteQuorumsCoveredInEnsembleCreation(addrs, 
minNumRacksPerWriteQuorum, ensembleSize,
+                    writeQuorumSize);
+
+            // set MinNumRacksPerWriteQuorum to 5
+            newConf = new ClientConfiguration(conf);
+            minNumRacksPerWriteQuorum = 5;
+            ensembleSize = 24;
+            writeQuorumSize = 12;
+            validateNumOfWriteQuorumsCoveredInEnsembleCreation(addrs, 
minNumRacksPerWriteQuorum, ensembleSize,
+                    writeQuorumSize);
+
+        } catch (BKNotEnoughBookiesException bnebe) {
+            fail("Should not get not enough bookies exception even there is 
only one rack.");
+        }
+    }
+
+    void 
validateNumOfWriteQuorumsCoveredInEnsembleCreation(Set<BookieSocketAddress> 
addrs,
+            int minNumRacksPerWriteQuorum, int ensembleSize, int 
writeQuorumSize) throws Exception {
+        ClientConfiguration newConf = new ClientConfiguration(conf);
+        newConf.setMinNumRacksPerWriteQuorum(minNumRacksPerWriteQuorum);
+        repp = new RackawareEnsemblePlacementPolicy();
+        repp.initialize(newConf, Optional.<DNSToSwitchMapping> empty(), timer, 
DISABLE_ALL, NullStatsLogger.INSTANCE);
+        repp.withDefaultRack(NetworkTopology.DEFAULT_REGION_AND_RACK);
+        repp.onClusterChanged(addrs, new HashSet<BookieSocketAddress>());
+
+        ArrayList<BookieSocketAddress> ensemble = 
repp.newEnsemble(ensembleSize, writeQuorumSize, writeQuorumSize, null,
+                new HashSet<>());
+        int numCovered = getNumCoveredWriteQuorums(ensemble, writeQuorumSize, 
minNumRacksPerWriteQuorum);
+        assertEquals("minimum number of racks covered for writequorum 
ensemble: " + ensemble, ensembleSize, numCovered);
+    }
+
+    @Test
     public void testNewEnsembleWithEnoughRacks() throws Exception {
         BookieSocketAddress addr1 = new BookieSocketAddress("127.0.0.2", 3181);
         BookieSocketAddress addr2 = new BookieSocketAddress("127.0.0.3", 3181);
@@ -555,6 +631,7 @@ public class TestRackawareEnsemblePlacementPolicy extends 
TestCase {
         StaticDNSResolver.addNodeToRack(addr6.getHostName(), 
"/default-region/r2");
         StaticDNSResolver.addNodeToRack(addr7.getHostName(), 
"/default-region/r3");
         StaticDNSResolver.addNodeToRack(addr8.getHostName(), 
"/default-region/r4");
+        int availableNumOfRacks = 4;
         // Update cluster
         Set<BookieSocketAddress> addrs = new HashSet<BookieSocketAddress>();
         addrs.add(addr1);
@@ -567,10 +644,17 @@ public class TestRackawareEnsemblePlacementPolicy extends 
TestCase {
         addrs.add(addr8);
         repp.onClusterChanged(addrs, new HashSet<BookieSocketAddress>());
         try {
-            ArrayList<BookieSocketAddress> ensemble1 = repp.newEnsemble(3, 2, 
2, null, new HashSet<>());
-            assertEquals(3, getNumCoveredWriteQuorums(ensemble1, 2));
-            ArrayList<BookieSocketAddress> ensemble2 = repp.newEnsemble(4, 2, 
2, null, new HashSet<>());
-            assertEquals(4, getNumCoveredWriteQuorums(ensemble2, 2));
+            int ensembleSize = 3;
+            int writeQuorumSize = 3;
+            int ackQuorumSize = 2;
+            ArrayList<BookieSocketAddress> ensemble1 = 
repp.newEnsemble(ensembleSize, writeQuorumSize, ackQuorumSize,
+                    null, new HashSet<>());
+            assertEquals(ensembleSize, getNumCoveredWriteQuorums(ensemble1, 2, 
conf.getMinNumRacksPerWriteQuorum()));
+            ensembleSize = 4;
+            writeQuorumSize = 4;
+            ArrayList<BookieSocketAddress> ensemble2 = 
repp.newEnsemble(ensembleSize, writeQuorumSize, 2, null,
+                    new HashSet<>());
+            assertEquals(ensembleSize, getNumCoveredWriteQuorums(ensemble2, 2, 
conf.getMinNumRacksPerWriteQuorum()));
         } catch (BKNotEnoughBookiesException bnebe) {
             fail("Should not get not enough bookies exception even there is 
only one rack.");
         }
@@ -791,13 +875,18 @@ public class TestRackawareEnsemblePlacementPolicy extends 
TestCase {
 
         Set<BookieSocketAddress> excludeList = new 
HashSet<BookieSocketAddress>();
         ArrayList<BookieSocketAddress> ensemble;
+        int ensembleSize = 3;
+        int writeQuorumSize = 2;
+        int acqQuorumSize = 2;
         for (int i = 0; i < numTries; i++) {
             // addr2 is on /r2 and this is the only one on this rack. So the 
replacement
             // will come from other racks. However, the weight should be 
honored in such
             // selections as well
-            ensemble = repp.newEnsemble(3, 2, 2, null, excludeList);
-            assertTrue("Rackaware selection not happening " + 
getNumCoveredWriteQuorums(ensemble, 2),
-                    getNumCoveredWriteQuorums(ensemble, 2) >= 2);
+            ensemble = repp.newEnsemble(ensembleSize, writeQuorumSize, 
acqQuorumSize, null, excludeList);
+            assertTrue(
+                    "Rackaware selection not happening "
+                            + getNumCoveredWriteQuorums(ensemble, 
writeQuorumSize, conf.getMinNumRacksPerWriteQuorum()),
+                    getNumCoveredWriteQuorums(ensemble, writeQuorumSize, 
conf.getMinNumRacksPerWriteQuorum()) >= 2);
             for (BookieSocketAddress b : ensemble) {
                 selectionCounts.put(b, selectionCounts.get(b) + 1);
             }
@@ -875,8 +964,8 @@ public class TestRackawareEnsemblePlacementPolicy extends 
TestCase {
         }
     }
 
-    static int getNumCoveredWriteQuorums(ArrayList<BookieSocketAddress> 
ensemble, int writeQuorumSize)
-            throws Exception {
+    static int getNumCoveredWriteQuorums(ArrayList<BookieSocketAddress> 
ensemble, int writeQuorumSize,
+            int minNumRacksPerWriteQuorumConfValue) throws Exception {
         int ensembleSize = ensemble.size();
         int numCoveredWriteQuorums = 0;
         for (int i = 0; i < ensembleSize; i++) {
@@ -886,7 +975,8 @@ public class TestRackawareEnsemblePlacementPolicy extends 
TestCase {
                 BookieSocketAddress addr = ensemble.get(bookieIdx);
                 racks.add(StaticDNSResolver.getRack(addr.getHostName()));
             }
-            numCoveredWriteQuorums += (racks.size() > 1 ? 1 : 0);
+            int numOfRacksToCoverTo = Math.max(Math.min(writeQuorumSize, 
minNumRacksPerWriteQuorumConfValue), 2);
+            numCoveredWriteQuorums += (racks.size() >= numOfRacksToCoverTo ? 1 
: 0);
         }
         return numCoveredWriteQuorums;
     }
diff --git 
a/bookkeeper-server/src/test/java/org/apache/bookkeeper/client/TestRackawarePolicyNotificationUpdates.java
 
b/bookkeeper-server/src/test/java/org/apache/bookkeeper/client/TestRackawarePolicyNotificationUpdates.java
index a6f28ba..b1ecdba 100644
--- 
a/bookkeeper-server/src/test/java/org/apache/bookkeeper/client/TestRackawarePolicyNotificationUpdates.java
+++ 
b/bookkeeper-server/src/test/java/org/apache/bookkeeper/client/TestRackawarePolicyNotificationUpdates.java
@@ -95,14 +95,19 @@ public class TestRackawarePolicyNotificationUpdates extends 
TestCase {
         StaticDNSResolver.addNodeToRack(addr2.getHostName(), 
"/default-region/rack-2");
         StaticDNSResolver.addNodeToRack(addr3.getHostName(), 
"/default-region/rack-2");
         StaticDNSResolver.addNodeToRack(addr4.getHostName(), 
"/default-region/rack-2");
+        int numOfAvailableRacks = 2;
 
         // Update cluster
         Set<BookieSocketAddress> addrs = Sets.newHashSet(addr1, addr2, addr3, 
addr4);
         repp.onClusterChanged(addrs, new HashSet<>());
 
-        ArrayList<BookieSocketAddress> ensemble = repp.newEnsemble(3, 2, 2, 
Collections.emptyMap(),
-                Collections.emptySet());
-        int numCovered = 
TestRackawareEnsemblePlacementPolicy.getNumCoveredWriteQuorums(ensemble, 2);
+        int ensembleSize = 3;
+        int writeQuorumSize = 2;
+        int acqQuorumSize = 2;
+        ArrayList<BookieSocketAddress> ensemble = 
repp.newEnsemble(ensembleSize, writeQuorumSize, acqQuorumSize,
+                Collections.emptyMap(), Collections.emptySet());
+        int numCovered = 
TestRackawareEnsemblePlacementPolicy.getNumCoveredWriteQuorums(ensemble, 
writeQuorumSize,
+                conf.getMinNumRacksPerWriteQuorum());
         assertTrue(numCovered >= 1 && numCovered < 3);
         assertTrue(ensemble.contains(addr1));
 
@@ -111,9 +116,12 @@ public class TestRackawarePolicyNotificationUpdates 
extends TestCase {
         bookieAddressList.add(addr2);
         rackList.add("/default-region/rack-3");
         StaticDNSResolver.changeRack(bookieAddressList, rackList);
-
-        ensemble = repp.newEnsemble(3, 2, 1, Collections.emptyMap(), 
Collections.emptySet());
-        assertEquals(3, 
TestRackawareEnsemblePlacementPolicy.getNumCoveredWriteQuorums(ensemble, 2));
+        numOfAvailableRacks = numOfAvailableRacks + 1;
+        acqQuorumSize = 1;
+        ensemble = repp.newEnsemble(ensembleSize, writeQuorumSize, 
acqQuorumSize, Collections.emptyMap(),
+                Collections.emptySet());
+        assertEquals(3, 
TestRackawareEnsemblePlacementPolicy.getNumCoveredWriteQuorums(ensemble, 
writeQuorumSize,
+                conf.getMinNumRacksPerWriteQuorum()));
         assertTrue(ensemble.contains(addr1));
         assertTrue(ensemble.contains(addr2));
     }

-- 
To stop receiving notification emails like this one, please contact
[email protected].

Reply via email to