This is an automated email from the ASF dual-hosted git repository.

reddycharan pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/bookkeeper.git


The following commit(s) were added to refs/heads/master by this push:
     new 49135b6  Exclude defaultrack bookies when 
enforceMinNumRacksPerWriteQuorum is enabled
49135b6 is described below

commit 49135b6b48a45a5fbc1fd8696dd595e4dc832da8
Author: Charan Reddy Guttapalem <[email protected]>
AuthorDate: Thu Feb 21 12:41:14 2019 -0800

    Exclude defaultrack bookies when enforceMinNumRacksPerWriteQuorum is enabled
    
    
    Descriptions of the changes in this PR:
    
    - enforceMinNumRacksPerWriteQuorum is meant to be used for strict placement 
policy. So when
    it is enabled, bookies which belong to default faultzone/rack (because of 
failure in resolving
    network location) should be excluded from bookie selection.
    - add gauge for number of bookies in default faultzone/rack. It will be 
helpful to create alerts
    based on this gauge.
    - add gauge for number of ledgers found not adhering to strict placement 
policy in Auditor's
    placement policy check. This gauge will be more helpful in creating alert 
instead of using
    monotonously increasing alert.
    
    
    Reviewers: Enrico Olivelli <[email protected]>, Pasha Kuznetsov <None>, 
Sijie Guo <[email protected]>, Venkateswararao Jujjuri (JV) <None>
    
    This closes #1941 from reddycharan/ignoredefaultzoneandfixmetrics
---
 .../apache/bookkeeper/test/TestStatsProvider.java  |   2 +-
 .../bookkeeper/client/BookKeeperClientStats.java   |   3 +
 .../bookkeeper/client/EnsemblePlacementPolicy.java |   6 +
 .../RackawareEnsemblePlacementPolicyImpl.java      | 121 +++++++---
 .../client/RegionAwareEnsemblePlacementPolicy.java |  25 ++-
 .../TopologyAwareEnsemblePlacementPolicy.java      |   2 -
 .../org/apache/bookkeeper/net/NetworkTopology.java |  11 +
 .../apache/bookkeeper/net/NetworkTopologyImpl.java |  12 +-
 .../bookkeeper/net/StabilizeNetworkTopology.java   |   6 +
 .../org/apache/bookkeeper/replication/Auditor.java |  61 ++++-
 .../bookkeeper/replication/ReplicationStats.java   |   3 +-
 .../TestRackawareEnsemblePlacementPolicy.java      | 248 +++++++++++++++++++--
 .../AuditorPlacementPolicyCheckTest.java           |  28 ++-
 13 files changed, 427 insertions(+), 101 deletions(-)

diff --git 
a/bookkeeper-common/src/test/java/org/apache/bookkeeper/test/TestStatsProvider.java
 
b/bookkeeper-common/src/test/java/org/apache/bookkeeper/test/TestStatsProvider.java
index bef0dfe..624791c 100644
--- 
a/bookkeeper-common/src/test/java/org/apache/bookkeeper/test/TestStatsProvider.java
+++ 
b/bookkeeper-common/src/test/java/org/apache/bookkeeper/test/TestStatsProvider.java
@@ -177,7 +177,7 @@ public class TestStatsProvider implements StatsProvider {
         }
 
         public Gauge<? extends Number> getGauge(String name) {
-            return gaugeMap.get(path);
+            return gaugeMap.get(getSubPath(name));
         }
 
         @Override
diff --git 
a/bookkeeper-server/src/main/java/org/apache/bookkeeper/client/BookKeeperClientStats.java
 
b/bookkeeper-server/src/main/java/org/apache/bookkeeper/client/BookKeeperClientStats.java
index 8b358b6..cdfde67 100644
--- 
a/bookkeeper-server/src/main/java/org/apache/bookkeeper/client/BookKeeperClientStats.java
+++ 
b/bookkeeper-server/src/main/java/org/apache/bookkeeper/client/BookKeeperClientStats.java
@@ -93,6 +93,9 @@ public interface BookKeeperClientStats {
     String FAILED_CONNECTION_COUNTER = "FAILED_CONNECTION_COUNTER";
     String FAILED_TLS_HANDSHAKE_COUNTER = "FAILED_TLS_HANDSHAKE_COUNTER";
 
+    // placementpolicy stats
+    String NUM_WRITABLE_BOOKIES_IN_DEFAULT_RACK = 
"NUM_WRITABLE_BOOKIES_IN_DEFAULT_RACK";
+
     OpStatsLogger getCreateOpLogger();
     OpStatsLogger getOpenOpLogger();
     OpStatsLogger getDeleteOpLogger();
diff --git 
a/bookkeeper-server/src/main/java/org/apache/bookkeeper/client/EnsemblePlacementPolicy.java
 
b/bookkeeper-server/src/main/java/org/apache/bookkeeper/client/EnsemblePlacementPolicy.java
index fad3f92..7dc8111 100644
--- 
a/bookkeeper-server/src/main/java/org/apache/bookkeeper/client/EnsemblePlacementPolicy.java
+++ 
b/bookkeeper-server/src/main/java/org/apache/bookkeeper/client/EnsemblePlacementPolicy.java
@@ -251,6 +251,9 @@ public interface EnsemblePlacementPolicy {
      * <p>{@code customMetadata} is the same user defined data that user 
provides
      * when {@link BookKeeper#createLedger(int, int, int, 
BookKeeper.DigestType, byte[], Map)}.
      *
+     * <p>If 'enforceMinNumRacksPerWriteQuorum' config is enabled then the 
bookies belonging to default
+     * faultzone (rack) will be excluded while selecting bookies.
+     *
      * @param ensembleSize
      *          Ensemble Size
      * @param writeQuorumSize
@@ -274,6 +277,9 @@ public interface EnsemblePlacementPolicy {
      * Choose a new bookie to replace <i>bookieToReplace</i>. If no bookie 
available in the cluster,
      * {@link BKNotEnoughBookiesException} is thrown.
      *
+     * <p>If 'enforceMinNumRacksPerWriteQuorum' config is enabled then the 
bookies belonging to default
+     * faultzone (rack) will be excluded while selecting bookies.
+     *
      * @param ensembleSize
      *          the value of ensembleSize
      * @param writeQuorumSize
diff --git 
a/bookkeeper-server/src/main/java/org/apache/bookkeeper/client/RackawareEnsemblePlacementPolicyImpl.java
 
b/bookkeeper-server/src/main/java/org/apache/bookkeeper/client/RackawareEnsemblePlacementPolicyImpl.java
index 304db72..ef4341a 100644
--- 
a/bookkeeper-server/src/main/java/org/apache/bookkeeper/client/RackawareEnsemblePlacementPolicyImpl.java
+++ 
b/bookkeeper-server/src/main/java/org/apache/bookkeeper/client/RackawareEnsemblePlacementPolicyImpl.java
@@ -22,6 +22,7 @@ import static 
org.apache.bookkeeper.bookie.BookKeeperServerStats.BOOKIES_JOINED;
 import static org.apache.bookkeeper.bookie.BookKeeperServerStats.BOOKIES_LEFT;
 import static 
org.apache.bookkeeper.bookie.BookKeeperServerStats.FAILED_TO_RESOLVE_NETWORK_LOCATION_COUNTER;
 import static org.apache.bookkeeper.client.BookKeeperClientStats.CLIENT_SCOPE;
+import static 
org.apache.bookkeeper.client.BookKeeperClientStats.NUM_WRITABLE_BOOKIES_IN_DEFAULT_RACK;
 import static 
org.apache.bookkeeper.client.BookKeeperClientStats.READ_REQUESTS_REORDERED;
 import static 
org.apache.bookkeeper.client.RegionAwareEnsemblePlacementPolicy.UNKNOWN_REGION;
 
@@ -67,6 +68,7 @@ import org.apache.bookkeeper.net.NodeBase;
 import org.apache.bookkeeper.net.ScriptBasedMapping;
 import org.apache.bookkeeper.net.StabilizeNetworkTopology;
 import org.apache.bookkeeper.stats.Counter;
+import org.apache.bookkeeper.stats.Gauge;
 import org.apache.bookkeeper.stats.OpStatsLogger;
 import org.apache.bookkeeper.stats.StatsLogger;
 import org.apache.bookkeeper.stats.annotations.StatsDoc;
@@ -114,11 +116,6 @@ public class RackawareEnsemblePlacementPolicyImpl extends 
TopologyAwareEnsembleP
 
         final Supplier<String> defaultRackSupplier;
 
-        // for backwards compat
-        public DefaultResolver() {
-            this(() -> NetworkTopology.DEFAULT_REGION_AND_RACK);
-        }
-
         public DefaultResolver(Supplier<String> defaultRackSupplier) {
             checkNotNull(defaultRackSupplier, "defaultRackSupplier should not 
be null");
             this.defaultRackSupplier = defaultRackSupplier;
@@ -240,7 +237,16 @@ public class RackawareEnsemblePlacementPolicyImpl extends 
TopologyAwareEnsembleP
         help = "The distribution of number of bookies reordered on each read 
request"
     )
     protected OpStatsLogger readReorderedCounter = null;
+    @StatsDoc(
+            name = FAILED_TO_RESOLVE_NETWORK_LOCATION_COUNTER,
+            help = "Counter for number of times DNSResolverDecorator failed to 
resolve Network Location"
+    )
     protected Counter failedToResolveNetworkLocationCounter = null;
+    @StatsDoc(
+            name = NUM_WRITABLE_BOOKIES_IN_DEFAULT_RACK,
+            help = "Gauge for the number of writable Bookies in default rack"
+    )
+    protected Gauge<Integer> numWritableBookiesInDefaultRack;
 
     private String defaultRack = NetworkTopology.DEFAULT_RACK;
 
@@ -282,8 +288,24 @@ public class RackawareEnsemblePlacementPolicyImpl extends 
TopologyAwareEnsembleP
         this.bookiesJoinedCounter = 
statsLogger.getOpStatsLogger(BOOKIES_JOINED);
         this.bookiesLeftCounter = statsLogger.getOpStatsLogger(BOOKIES_LEFT);
         this.readReorderedCounter = 
statsLogger.getOpStatsLogger(READ_REQUESTS_REORDERED);
-        this.failedToResolveNetworkLocationCounter = statsLogger
-                .getCounter(FAILED_TO_RESOLVE_NETWORK_LOCATION_COUNTER);
+        this.failedToResolveNetworkLocationCounter = 
statsLogger.getCounter(FAILED_TO_RESOLVE_NETWORK_LOCATION_COUNTER);
+        this.numWritableBookiesInDefaultRack = new Gauge<Integer>() {
+            @Override
+            public Integer getDefaultValue() {
+                return 0;
+            }
+
+            @Override
+            public Integer getSample() {
+                rwLock.readLock().lock();
+                try {
+                    return topology.countNumOfAvailableNodes(getDefaultRack(), 
Collections.emptySet());
+                } finally {
+                    rwLock.readLock().unlock();
+                }
+            }
+        };
+        this.statsLogger.registerGauge(NUM_WRITABLE_BOOKIES_IN_DEFAULT_RACK, 
numWritableBookiesInDefaultRack);
         this.reorderReadsRandom = reorderReadsRandom;
         this.stabilizePeriodSeconds = stabilizePeriodSeconds;
         this.reorderThresholdPendingRequests = reorderThresholdPendingRequests;
@@ -417,7 +439,9 @@ public class RackawareEnsemblePlacementPolicyImpl extends 
TopologyAwareEnsembleP
                 if (node != null) {
                     // refresh the rack info if its a known bookie
                     topology.remove(node);
-                    topology.add(createBookieNode(bookieAddress));
+                    BookieNode newNode = createBookieNode(bookieAddress);
+                    topology.add(newNode);
+                    knownBookies.put(bookieAddress, newNode);
                 }
             }
         } finally {
@@ -455,6 +479,9 @@ public class RackawareEnsemblePlacementPolicyImpl extends 
TopologyAwareEnsembleP
         }
     }
 
+    /*
+     * this method should be called in writelock scope of 'rwLock'
+     */
     @Override
     public void handleBookiesThatLeft(Set<BookieSocketAddress> leftBookies) {
         for (BookieSocketAddress addr : leftBookies) {
@@ -483,6 +510,9 @@ public class RackawareEnsemblePlacementPolicyImpl extends 
TopologyAwareEnsembleP
         }
     }
 
+    /*
+     * this method should be called in writelock scope of 'rwLock'
+     */
     @Override
     public void handleBookiesThatJoined(Set<BookieSocketAddress> 
joinedBookies) {
         // node joined
@@ -531,26 +561,53 @@ public class RackawareEnsemblePlacementPolicyImpl extends 
TopologyAwareEnsembleP
         return networkLocs;
     }
 
+    /*
+     * this method should be called in readlock scope of 'rwLock'
+     */
+    protected Set<BookieSocketAddress> 
addDefaultRackBookiesIfMinNumRacksIsEnforced(
+            Set<BookieSocketAddress> excludeBookies) {
+        Set<BookieSocketAddress> comprehensiveExclusionBookiesSet;
+        if (enforceMinNumRacksPerWriteQuorum) {
+            Set<BookieSocketAddress> bookiesInDefaultRack = null;
+            Set<Node> defaultRackLeaves = topology.getLeaves(getDefaultRack());
+            for (Node node : defaultRackLeaves) {
+                if (node instanceof BookieNode) {
+                    if (bookiesInDefaultRack == null) {
+                        bookiesInDefaultRack = new 
HashSet<BookieSocketAddress>(excludeBookies);
+                    }
+                    bookiesInDefaultRack.add(((BookieNode) node).getAddr());
+                } else {
+                    LOG.error("found non-BookieNode: {} as leaf of 
defaultrack: {}", node, getDefaultRack());
+                }
+            }
+            if ((bookiesInDefaultRack == null) || 
bookiesInDefaultRack.isEmpty()) {
+                comprehensiveExclusionBookiesSet = excludeBookies;
+            } else {
+                comprehensiveExclusionBookiesSet = new 
HashSet<BookieSocketAddress>(excludeBookies);
+                comprehensiveExclusionBookiesSet.addAll(bookiesInDefaultRack);
+                LOG.info("enforceMinNumRacksPerWriteQuorum is enabled, so 
Excluding bookies of defaultRack: {}",
+                        bookiesInDefaultRack);
+            }
+        } else {
+            comprehensiveExclusionBookiesSet = excludeBookies;
+        }
+        return comprehensiveExclusionBookiesSet;
+    }
+
     @Override
     public PlacementResult<List<BookieSocketAddress>> newEnsemble(int 
ensembleSize, int writeQuorumSize,
             int ackQuorumSize, Map<String, byte[]> customMetadata, 
Set<BookieSocketAddress> excludeBookies)
             throws BKNotEnoughBookiesException {
-        return newEnsembleInternal(ensembleSize, writeQuorumSize, 
excludeBookies, null, null);
-    }
-
-    protected PlacementResult<List<BookieSocketAddress>> 
newEnsembleInternal(int ensembleSize,
-                                                                             
int writeQuorumSize,
-                                                                             
Set<BookieSocketAddress> excludeBookies,
-                                                                             
Ensemble<BookieNode> parentEnsemble,
-                                                                             
Predicate<BookieNode> parentPredicate)
-            throws BKNotEnoughBookiesException {
-        return newEnsembleInternal(
-                ensembleSize,
-                writeQuorumSize,
-                writeQuorumSize,
-                excludeBookies,
-                parentEnsemble,
-                parentPredicate);
+        rwLock.readLock().lock();
+        try {
+            Set<BookieSocketAddress> comprehensiveExclusionBookiesSet = 
addDefaultRackBookiesIfMinNumRacksIsEnforced(
+                    excludeBookies);
+            PlacementResult<List<BookieSocketAddress>> newEnsembleResult = 
newEnsembleInternal(ensembleSize,
+                    writeQuorumSize, ackQuorumSize, 
comprehensiveExclusionBookiesSet, null, null);
+            return newEnsembleResult;
+        } finally {
+            rwLock.readLock().unlock();
+        }
     }
 
     @Override
@@ -643,6 +700,7 @@ public class RackawareEnsemblePlacementPolicyImpl extends 
TopologyAwareEnsembleP
             throws BKNotEnoughBookiesException {
         rwLock.readLock().lock();
         try {
+            excludeBookies = 
addDefaultRackBookiesIfMinNumRacksIsEnforced(excludeBookies);
             excludeBookies.addAll(currentEnsemble);
             BookieNode bn = knownBookies.get(bookieToReplace);
             if (null == bn) {
@@ -1253,28 +1311,31 @@ public class RackawareEnsemblePlacementPolicyImpl 
extends TopologyAwareEnsembleP
         }
     }
 
+    // this method should be called in readlock scope of 'rwlock'
     @Override
     public boolean 
isEnsembleAdheringToPlacementPolicy(List<BookieSocketAddress> ensembleList, int 
writeQuorumSize,
             int ackQuorumSize) {
         int ensembleSize = ensembleList.size();
         int minNumRacksPerWriteQuorumForThisEnsemble = 
Math.min(writeQuorumSize, minNumRacksPerWriteQuorum);
-        HashSet<String> racksOrRegionsInQuorum = new HashSet<String>();
+        HashSet<String> racksInQuorum = new HashSet<String>();
         BookieSocketAddress bookie;
         for (int i = 0; i < ensembleList.size(); i++) {
-            racksOrRegionsInQuorum.clear();
+            racksInQuorum.clear();
             for (int j = 0; j < writeQuorumSize; j++) {
                 bookie = ensembleList.get((i + j) % ensembleSize);
                 try {
-                    
racksOrRegionsInQuorum.add(knownBookies.get(bookie).getNetworkLocation());
+                    
racksInQuorum.add(knownBookies.get(bookie).getNetworkLocation());
                 } catch (Exception e) {
                     /*
-                     * any issue/exception in analyzing whether ensemble is 
strictly adhering to
-                     * placement policy should be swallowed.
+                     * any issue/exception in analyzing whether ensemble is
+                     * strictly adhering to placement policy should be
+                     * swallowed.
                      */
                     LOG.warn("Received exception while trying to get network 
location of bookie: {}", bookie, e);
                 }
             }
-            if (racksOrRegionsInQuorum.size() < 
minNumRacksPerWriteQuorumForThisEnsemble) {
+            if ((racksInQuorum.size() < 
minNumRacksPerWriteQuorumForThisEnsemble)
+                    || (enforceMinNumRacksPerWriteQuorum && 
racksInQuorum.contains(getDefaultRack()))) {
                 return false;
             }
         }
diff --git 
a/bookkeeper-server/src/main/java/org/apache/bookkeeper/client/RegionAwareEnsemblePlacementPolicy.java
 
b/bookkeeper-server/src/main/java/org/apache/bookkeeper/client/RegionAwareEnsemblePlacementPolicy.java
index cd80fdf..399e8aa 100644
--- 
a/bookkeeper-server/src/main/java/org/apache/bookkeeper/client/RegionAwareEnsemblePlacementPolicy.java
+++ 
b/bookkeeper-server/src/main/java/org/apache/bookkeeper/client/RegionAwareEnsemblePlacementPolicy.java
@@ -228,7 +228,7 @@ public class RegionAwareEnsemblePlacementPolicy extends 
RackawareEnsemblePlaceme
 
     @Override
     public PlacementResult<List<BookieSocketAddress>> newEnsemble(int 
ensembleSize, int writeQuorumSize,
-            int ackQuorumSize, Map<String, byte[]> customMetadata, 
Set<BookieSocketAddress> excludeBookies)
+            int ackQuorumSize, Map<String, byte[]> customMetadata, 
Set<BookieSocketAddress> excludedBookies)
             throws BKException.BKNotEnoughBookiesException {
 
         int effectiveMinRegionsForDurability = 
disableDurabilityFeature.isAvailable() ? 1 : minRegionsForDurability;
@@ -257,7 +257,9 @@ public class RegionAwareEnsemblePlacementPolicy extends 
RackawareEnsemblePlaceme
 
         rwLock.readLock().lock();
         try {
-            Set<Node> excludeNodes = convertBookiesToNodes(excludeBookies);
+            Set<BookieSocketAddress> comprehensiveExclusionBookiesSet = 
addDefaultRackBookiesIfMinNumRacksIsEnforced(
+                    excludedBookies);
+            Set<Node> excludeNodes = 
convertBookiesToNodes(comprehensiveExclusionBookiesSet);
             Set<String> availableRegions = new HashSet<String>();
             for (String region: perRegionPlacement.keySet()) {
                 if ((null == disallowBookiePlacementInRegionFeatureName)
@@ -294,8 +296,8 @@ public class RegionAwareEnsemblePlacementPolicy extends 
RackawareEnsemblePlaceme
                         effectiveMinRegionsForDurability, 
minNumRacksPerWriteQuorum);
                 TopologyAwareEnsemblePlacementPolicy nextPolicy = 
perRegionPlacement.get(
                         availableRegions.iterator().next());
-                return nextPolicy.newEnsemble(ensembleSize, writeQuorumSize, 
writeQuorumSize, excludeBookies, ensemble,
-                        ensemble);
+                return nextPolicy.newEnsemble(ensembleSize, writeQuorumSize, 
writeQuorumSize,
+                        comprehensiveExclusionBookiesSet, ensemble, ensemble);
             }
 
             int remainingEnsemble = ensembleSize;
@@ -349,9 +351,10 @@ public class RegionAwareEnsemblePlacementPolicy extends 
RackawareEnsemblePlaceme
                             int newEnsembleSize = currentAllocation.getLeft() 
+ addToEnsembleSize;
                             int newWriteQuorumSize = 
currentAllocation.getRight() + addToWriteQuorum;
                             try {
-                                List<BookieSocketAddress> allocated = 
policyWithinRegion.newEnsemble(newEnsembleSize,
-                                        newWriteQuorumSize, 
newWriteQuorumSize, excludeBookies, tempEnsemble,
-                                        tempEnsemble).getResult();
+                                List<BookieSocketAddress> allocated = 
policyWithinRegion
+                                        .newEnsemble(newEnsembleSize, 
newWriteQuorumSize, newWriteQuorumSize,
+                                                
comprehensiveExclusionBookiesSet, tempEnsemble, tempEnsemble)
+                                        .getResult();
                                 ensemble = tempEnsemble;
                                 remainingEnsemble -= addToEnsembleSize;
                                 remainingWriteQuorum -= addToWriteQuorum;
@@ -379,12 +382,12 @@ public class RegionAwareEnsemblePlacementPolicy extends 
RackawareEnsemblePlaceme
                     if (regionsReachedMaxAllocation.contains(region)) {
                         if (currentAllocation.getLeft() > 0) {
                             LOG.info("Allocating {} bookies in region {} : 
ensemble {} exclude {}",
-                                    currentAllocation.getLeft(), region, 
excludeBookies, ensemble);
+                                    currentAllocation.getLeft(), region, 
comprehensiveExclusionBookiesSet, ensemble);
                             policyWithinRegion.newEnsemble(
                                     currentAllocation.getLeft(),
                                     currentAllocation.getRight(),
                                     currentAllocation.getRight(),
-                                    excludeBookies,
+                                    comprehensiveExclusionBookiesSet,
                                     ensemble,
                                     ensemble);
                             LOG.info("Allocated {} bookies in region {} : {}",
@@ -428,7 +431,9 @@ public class RegionAwareEnsemblePlacementPolicy extends 
RackawareEnsemblePlaceme
         try {
             boolean enforceDurability = enforceDurabilityInReplace && 
!disableDurabilityFeature.isAvailable();
             int effectiveMinRegionsForDurability = enforceDurability ? 
minRegionsForDurability : 1;
-            Set<Node> excludeNodes = convertBookiesToNodes(excludeBookies);
+            Set<BookieSocketAddress> comprehensiveExclusionBookiesSet = 
addDefaultRackBookiesIfMinNumRacksIsEnforced(
+                    excludeBookies);
+            Set<Node> excludeNodes = 
convertBookiesToNodes(comprehensiveExclusionBookiesSet);
             RRTopologyAwareCoverageEnsemble ensemble = new 
RRTopologyAwareCoverageEnsemble(ensembleSize,
                 writeQuorumSize,
                 ackQuorumSize,
diff --git 
a/bookkeeper-server/src/main/java/org/apache/bookkeeper/client/TopologyAwareEnsemblePlacementPolicy.java
 
b/bookkeeper-server/src/main/java/org/apache/bookkeeper/client/TopologyAwareEnsemblePlacementPolicy.java
index 19cb505..7fa7555 100644
--- 
a/bookkeeper-server/src/main/java/org/apache/bookkeeper/client/TopologyAwareEnsemblePlacementPolicy.java
+++ 
b/bookkeeper-server/src/main/java/org/apache/bookkeeper/client/TopologyAwareEnsemblePlacementPolicy.java
@@ -78,7 +78,6 @@ abstract class TopologyAwareEnsemblePlacementPolicy implements
     }
 
     protected static class BookieNode extends NodeBase {
-
         private final BookieSocketAddress addr; // identifier of a bookie node.
 
         BookieNode(BookieSocketAddress addr, String networkLoc) {
@@ -108,7 +107,6 @@ abstract class TopologyAwareEnsemblePlacementPolicy 
implements
         public String toString() {
             return String.format("<Bookie:%s>", name);
         }
-
     }
 
     /**
diff --git 
a/bookkeeper-server/src/main/java/org/apache/bookkeeper/net/NetworkTopology.java
 
b/bookkeeper-server/src/main/java/org/apache/bookkeeper/net/NetworkTopology.java
index a6bcf77..d2d37ea 100644
--- 
a/bookkeeper-server/src/main/java/org/apache/bookkeeper/net/NetworkTopology.java
+++ 
b/bookkeeper-server/src/main/java/org/apache/bookkeeper/net/NetworkTopology.java
@@ -17,6 +17,7 @@
  */
 package org.apache.bookkeeper.net;
 
+import java.util.Collection;
 import java.util.Set;
 
 /**
@@ -76,4 +77,14 @@ public interface NetworkTopology {
      */
     Set<Node> getLeaves(String loc);
 
+    /**
+     * Return the number of leaves in <i>scope</i> but not in 
<i>excludedNodes</i>.
+     *
+     * <p>If scope starts with ~, return the number of nodes that are not
+     * in <i>scope</i> and <i>excludedNodes</i>;
+     * @param scope a path string that may start with ~
+     * @param excludedNodes a list of nodes
+     * @return number of available nodes
+     */
+    int countNumOfAvailableNodes(String scope, Collection<Node> excludedNodes);
 }
diff --git 
a/bookkeeper-server/src/main/java/org/apache/bookkeeper/net/NetworkTopologyImpl.java
 
b/bookkeeper-server/src/main/java/org/apache/bookkeeper/net/NetworkTopologyImpl.java
index d6756f8..dcf4cad 100644
--- 
a/bookkeeper-server/src/main/java/org/apache/bookkeeper/net/NetworkTopologyImpl.java
+++ 
b/bookkeeper-server/src/main/java/org/apache/bookkeeper/net/NetworkTopologyImpl.java
@@ -789,15 +789,7 @@ public class NetworkTopologyImpl implements 
NetworkTopology {
         }
     }
 
-    /**
-     * Return the number of leaves in <i>scope</i> but not in 
<i>excludedNodes</i>.
-     *
-     * <p>If scope starts with ~, return the number of nodes that are not
-     * in <i>scope</i> and <i>excludedNodes</i>;
-     * @param scope a path string that may start with ~
-     * @param excludedNodes a list of nodes
-     * @return number of available nodes
-     */
+    @Override
     public int countNumOfAvailableNodes(String scope, Collection<Node> 
excludedNodes) {
         boolean isExcluded = false;
         if (scope.startsWith("~")) {
@@ -815,7 +807,7 @@ public class NetworkTopologyImpl implements NetworkTopology 
{
                 }
             }
             Node n = getNode(scope);
-            int scopeNodeCount = 1;
+            int scopeNodeCount = 0;
             if (n instanceof InnerNode) {
                 scopeNodeCount = ((InnerNode) n).getNumOfLeaves();
             }
diff --git 
a/bookkeeper-server/src/main/java/org/apache/bookkeeper/net/StabilizeNetworkTopology.java
 
b/bookkeeper-server/src/main/java/org/apache/bookkeeper/net/StabilizeNetworkTopology.java
index 5c244f2..df80bf9 100644
--- 
a/bookkeeper-server/src/main/java/org/apache/bookkeeper/net/StabilizeNetworkTopology.java
+++ 
b/bookkeeper-server/src/main/java/org/apache/bookkeeper/net/StabilizeNetworkTopology.java
@@ -21,6 +21,7 @@ import io.netty.util.HashedWheelTimer;
 import io.netty.util.Timeout;
 import io.netty.util.TimerTask;
 
+import java.util.Collection;
 import java.util.Set;
 import java.util.concurrent.ConcurrentHashMap;
 import java.util.concurrent.ConcurrentMap;
@@ -151,4 +152,9 @@ public class StabilizeNetworkTopology implements 
NetworkTopology {
     public Set<Node> getLeaves(String loc) {
         return impl.getLeaves(loc);
     }
+
+    @Override
+    public int countNumOfAvailableNodes(String scope, Collection<Node> 
excludedNodes) {
+        return impl.countNumOfAvailableNodes(scope, excludedNodes);
+    }
 }
diff --git 
a/bookkeeper-server/src/main/java/org/apache/bookkeeper/replication/Auditor.java
 
b/bookkeeper-server/src/main/java/org/apache/bookkeeper/replication/Auditor.java
index dd791be..ad58849 100644
--- 
a/bookkeeper-server/src/main/java/org/apache/bookkeeper/replication/Auditor.java
+++ 
b/bookkeeper-server/src/main/java/org/apache/bookkeeper/replication/Auditor.java
@@ -20,7 +20,6 @@
  */
 package org.apache.bookkeeper.replication;
 
-
 import static org.apache.bookkeeper.replication.ReplicationStats.AUDITOR_SCOPE;
 import static 
org.apache.bookkeeper.replication.ReplicationStats.AUDIT_BOOKIES_TIME;
 import static 
org.apache.bookkeeper.replication.ReplicationStats.BOOKIE_TO_LEDGERS_MAP_CREATION_TIME;
@@ -30,9 +29,8 @@ import static 
org.apache.bookkeeper.replication.ReplicationStats.NUM_BOOKIE_AUDI
 import static 
org.apache.bookkeeper.replication.ReplicationStats.NUM_DELAYED_BOOKIE_AUDITS_DELAYES_CANCELLED;
 import static 
org.apache.bookkeeper.replication.ReplicationStats.NUM_FRAGMENTS_PER_LEDGER;
 import static 
org.apache.bookkeeper.replication.ReplicationStats.NUM_LEDGERS_CHECKED;
+import static 
org.apache.bookkeeper.replication.ReplicationStats.NUM_LEDGERS_NOT_ADHERING_TO_PLACEMENT_POLICY;
 import static 
org.apache.bookkeeper.replication.ReplicationStats.NUM_UNDER_REPLICATED_LEDGERS;
-import static org.apache.bookkeeper.replication.ReplicationStats.
-                                    
PLACEMENT_POLICY_CHECK_ENSEMBLE_NOT_ADHERING_TO_PLACEMENT_POLICY_COUNTER;
 import static 
org.apache.bookkeeper.replication.ReplicationStats.PLACEMENT_POLICY_CHECK_TIME;
 import static 
org.apache.bookkeeper.replication.ReplicationStats.URL_PUBLISH_TIME_FOR_LOST_BOOKIE;
 
@@ -55,6 +53,7 @@ import java.util.concurrent.Future;
 import java.util.concurrent.ScheduledExecutorService;
 import java.util.concurrent.ThreadFactory;
 import java.util.concurrent.TimeUnit;
+import java.util.concurrent.atomic.AtomicInteger;
 import java.util.stream.Collectors;
 
 import org.apache.bookkeeper.client.BKException;
@@ -79,6 +78,7 @@ import 
org.apache.bookkeeper.replication.ReplicationException.BKAuditException;
 import 
org.apache.bookkeeper.replication.ReplicationException.CompatibilityException;
 import 
org.apache.bookkeeper.replication.ReplicationException.UnavailableException;
 import org.apache.bookkeeper.stats.Counter;
+import org.apache.bookkeeper.stats.Gauge;
 import org.apache.bookkeeper.stats.NullStatsLogger;
 import org.apache.bookkeeper.stats.OpStatsLogger;
 import org.apache.bookkeeper.stats.StatsLogger;
@@ -117,6 +117,8 @@ public class Auditor implements AutoCloseable {
     private volatile Future<?> auditTask;
     private Set<String> bookiesToBeAudited = Sets.newHashSet();
     private volatile int lostBookieRecoveryDelayBeforeChange;
+    private final AtomicInteger ledgersNotAdheringToPlacementPolicyGuageValue;
+    private final AtomicInteger numOfLedgersFoundInPlacementPolicyCheck;
 
     private final StatsLogger statsLogger;
     @StatsDoc(
@@ -175,11 +177,10 @@ public class Auditor implements AutoCloseable {
     )
     private final Counter numDelayedBookieAuditsCancelled;
     @StatsDoc(
-        name = 
PLACEMENT_POLICY_CHECK_ENSEMBLE_NOT_ADHERING_TO_PLACEMENT_POLICY_COUNTER,
-        help = "total number of "
-            + "ledgers failed to adhere to EnsemblePlacementPolicy found in 
PLACEMENT POLICY check"
+            name = NUM_LEDGERS_NOT_ADHERING_TO_PLACEMENT_POLICY,
+            help = "Gauge for number of ledgers not adhering to placement 
policy found in placement policy check"
     )
-    private final Counter 
placementPolicyCheckEnsembleNotAdheringToPlacementPolicy;
+    private final Gauge<Integer> numLedgersNotAdheringToPlacementPolicy;
 
     static BookKeeper createBookKeeperClient(ServerConfiguration conf) throws 
InterruptedException, IOException {
         return createBookKeeperClient(conf, NullStatsLogger.INSTANCE);
@@ -230,6 +231,8 @@ public class Auditor implements AutoCloseable {
         this.conf = conf;
         this.bookieIdentifier = bookieIdentifier;
         this.statsLogger = statsLogger;
+        this.numOfLedgersFoundInPlacementPolicyCheck = new AtomicInteger(0);
+        this.ledgersNotAdheringToPlacementPolicyGuageValue = new 
AtomicInteger(0);
 
         numUnderReplicatedLedger = 
this.statsLogger.getOpStatsLogger(ReplicationStats.NUM_UNDER_REPLICATED_LEDGERS);
         uRLPublishTimeForLostBookies = this.statsLogger
@@ -245,8 +248,20 @@ public class Auditor implements AutoCloseable {
         numBookieAuditsDelayed = 
this.statsLogger.getCounter(ReplicationStats.NUM_BOOKIE_AUDITS_DELAYED);
         numDelayedBookieAuditsCancelled = this.statsLogger
                 
.getCounter(ReplicationStats.NUM_DELAYED_BOOKIE_AUDITS_DELAYES_CANCELLED);
-        placementPolicyCheckEnsembleNotAdheringToPlacementPolicy = statsLogger
-                
.getCounter(ReplicationStats.PLACEMENT_POLICY_CHECK_ENSEMBLE_NOT_ADHERING_TO_PLACEMENT_POLICY_COUNTER);
+        numLedgersNotAdheringToPlacementPolicy = new Gauge<Integer>() {
+            @Override
+            public Integer getDefaultValue() {
+                return 0;
+            }
+
+            @Override
+            public Integer getSample() {
+                return ledgersNotAdheringToPlacementPolicyGuageValue.get();
+            }
+        };
+        
this.statsLogger.registerGauge(ReplicationStats.NUM_LEDGERS_NOT_ADHERING_TO_PLACEMENT_POLICY,
+                numLedgersNotAdheringToPlacementPolicy);
+
         this.bkc = bkc;
         this.ownBkc = ownBkc;
         initialize(conf, bkc);
@@ -612,12 +627,33 @@ public class Auditor implements AutoCloseable {
                         Stopwatch stopwatch = Stopwatch.createStarted();
                         LOG.info("Starting PlacementPolicyCheck");
                         placementPolicyCheck();
+                        int numOfLedgersFoundInPlacementPolicyCheckValue = 
numOfLedgersFoundInPlacementPolicyCheck
+                                .get();
                         long placementPolicyCheckDuration = 
stopwatch.stop().elapsed(TimeUnit.MILLISECONDS);
-                        LOG.info("Completed placementPolicyCheck in {} 
milliSeconds", placementPolicyCheckDuration);
+                        LOG.info(
+                                "Completed placementPolicyCheck in {} 
milliSeconds."
+                                + " numOfLedgersNotAdheringToPlacementPolicy 
{}",
+                                placementPolicyCheckDuration, 
numOfLedgersFoundInPlacementPolicyCheckValue);
+                        ledgersNotAdheringToPlacementPolicyGuageValue
+                                
.set(numOfLedgersFoundInPlacementPolicyCheckValue);
                         
placementPolicyCheckTime.registerSuccessfulEvent(placementPolicyCheckDuration,
                                 TimeUnit.MILLISECONDS);
                     } catch (BKAuditException e) {
-                        LOG.error("BKAuditException running periodic 
placementPolicy check", e);
+                        int numOfLedgersFoundInPlacementPolicyCheckValue = 
numOfLedgersFoundInPlacementPolicyCheck
+                                .get();
+                        if (numOfLedgersFoundInPlacementPolicyCheckValue > 0) {
+                            /*
+                             * Though there is BKAuditException while doing
+                             * placementPolicyCheck, it found few ledgers not
+                             * adhering to placement policy. So reporting it.
+                             */
+                            ledgersNotAdheringToPlacementPolicyGuageValue
+                                    
.set(numOfLedgersFoundInPlacementPolicyCheckValue);
+                        }
+                        LOG.error(
+                                "BKAuditException running periodic 
placementPolicy check."
+                                        + 
"numOfLedgersNotAdheringToPlacementPolicy {}",
+                                numOfLedgersFoundInPlacementPolicyCheckValue, 
e);
                     }
                 }
             }, initialDelay, interval, TimeUnit.SECONDS);
@@ -893,6 +929,7 @@ public class Auditor implements AutoCloseable {
 
     void placementPolicyCheck() throws BKAuditException {
         final CountDownLatch placementPolicyCheckLatch = new CountDownLatch(1);
+        this.numOfLedgersFoundInPlacementPolicyCheck.set(0);
         Processor<Long> ledgerProcessor = new Processor<Long>() {
             @Override
             public void process(Long ledgerId, AsyncCallback.VoidCallback 
iterCallback) {
@@ -920,7 +957,7 @@ public class Auditor implements AutoCloseable {
                                 }
                             }
                             if (foundSegmentNotAdheringToPlacementPolicy) {
-                                
placementPolicyCheckEnsembleNotAdheringToPlacementPolicy.inc();
+                                
numOfLedgersFoundInPlacementPolicyCheck.incrementAndGet();
                             }
                         } else {
                             if (LOG.isDebugEnabled()) {
diff --git 
a/bookkeeper-server/src/main/java/org/apache/bookkeeper/replication/ReplicationStats.java
 
b/bookkeeper-server/src/main/java/org/apache/bookkeeper/replication/ReplicationStats.java
index e9b8999..c13bc56 100644
--- 
a/bookkeeper-server/src/main/java/org/apache/bookkeeper/replication/ReplicationStats.java
+++ 
b/bookkeeper-server/src/main/java/org/apache/bookkeeper/replication/ReplicationStats.java
@@ -40,8 +40,7 @@ public interface ReplicationStats {
     String NUM_LEDGERS_CHECKED = "NUM_LEDGERS_CHECKED";
     String NUM_BOOKIE_AUDITS_DELAYED = "NUM_BOOKIE_AUDITS_DELAYED";
     String NUM_DELAYED_BOOKIE_AUDITS_DELAYES_CANCELLED = 
"NUM_DELAYED_BOOKIE_AUDITS_CANCELLED";
-    String 
PLACEMENT_POLICY_CHECK_ENSEMBLE_NOT_ADHERING_TO_PLACEMENT_POLICY_COUNTER =
-            
"PLACEMENT_POLICY_CHECK_ENSEMBLE_NOT_ADHERING_TO_PLACEMENT_POLICY_COUNTER";
+    String NUM_LEDGERS_NOT_ADHERING_TO_PLACEMENT_POLICY = 
"NUM_LEDGERS_NOT_ADHERING_TO_PLACEMENT_POLICY";
 
     String REPLICATION_WORKER_SCOPE = "replication_worker";
     String REREPLICATE_OP = "rereplicate";
diff --git 
a/bookkeeper-server/src/test/java/org/apache/bookkeeper/client/TestRackawareEnsemblePlacementPolicy.java
 
b/bookkeeper-server/src/test/java/org/apache/bookkeeper/client/TestRackawareEnsemblePlacementPolicy.java
index a55b560..26fa23f 100644
--- 
a/bookkeeper-server/src/test/java/org/apache/bookkeeper/client/TestRackawareEnsemblePlacementPolicy.java
+++ 
b/bookkeeper-server/src/test/java/org/apache/bookkeeper/client/TestRackawareEnsemblePlacementPolicy.java
@@ -29,6 +29,7 @@ import io.netty.util.HashedWheelTimer;
 import java.net.InetAddress;
 import java.util.ArrayList;
 import java.util.Arrays;
+import java.util.Collection;
 import java.util.HashMap;
 import java.util.HashSet;
 import java.util.List;
@@ -50,8 +51,12 @@ import org.apache.bookkeeper.net.BookieSocketAddress;
 import org.apache.bookkeeper.net.DNSToSwitchMapping;
 import org.apache.bookkeeper.net.NetworkTopology;
 import org.apache.bookkeeper.net.Node;
+import org.apache.bookkeeper.stats.Gauge;
 import org.apache.bookkeeper.stats.NullStatsLogger;
+import org.apache.bookkeeper.test.TestStatsProvider;
+import org.apache.bookkeeper.test.TestStatsProvider.TestStatsLogger;
 import org.apache.bookkeeper.util.StaticDNSResolver;
+import org.apache.commons.collections4.CollectionUtils;
 import org.junit.Test;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
@@ -740,17 +745,22 @@ public class TestRackawareEnsemblePlacementPolicy extends 
TestCase {
 
     @Test
     public void testNewEnsembleWithEnforceMinNumRacks() throws Exception {
+        String defaultRackForThisTest = 
NetworkTopology.DEFAULT_REGION_AND_RACK;
         repp.uninitalize();
+        updateMyRack(defaultRackForThisTest);
 
         int minNumRacksPerWriteQuorum = 4;
         ClientConfiguration clientConf = new ClientConfiguration(conf);
         clientConf.setMinNumRacksPerWriteQuorum(minNumRacksPerWriteQuorum);
         // set enforceMinNumRacksPerWriteQuorum
         clientConf.setEnforceMinNumRacksPerWriteQuorum(true);
+        TestStatsProvider statsProvider = new TestStatsProvider();
+        TestStatsLogger statsLogger = statsProvider.getStatsLogger("");
         repp = new RackawareEnsemblePlacementPolicy();
-        repp.initialize(clientConf, Optional.<DNSToSwitchMapping> empty(), 
timer, DISABLE_ALL,
-                NullStatsLogger.INSTANCE);
-        repp.withDefaultRack(NetworkTopology.DEFAULT_REGION_AND_RACK);
+        repp.initialize(clientConf, Optional.<DNSToSwitchMapping> empty(), 
timer, DISABLE_ALL, statsLogger);
+        repp.withDefaultRack(defaultRackForThisTest);
+        Gauge<? extends Number> numBookiesInDefaultRackGauge = statsLogger
+                
.getGauge(BookKeeperClientStats.NUM_WRITABLE_BOOKIES_IN_DEFAULT_RACK);
 
         int numOfRacks = 3;
         int numOfBookiesPerRack = 5;
@@ -764,26 +774,41 @@ public class TestRackawareEnsemblePlacementPolicy extends 
TestCase {
             }
         }
 
-        repp.onClusterChanged(new 
HashSet<BookieSocketAddress>(Arrays.asList(bookieSocketAddresses)),
-                new HashSet<BookieSocketAddress>());
+        int numOfBookiesInDefaultRack = 5;
+        BookieSocketAddress[] bookieSocketAddressesInDefaultRack = new 
BookieSocketAddress[numOfBookiesInDefaultRack];
+        for (int i = 0; i < numOfBookiesInDefaultRack; i++) {
+            bookieSocketAddressesInDefaultRack[i] = new 
BookieSocketAddress("128.0.0." + (100 + i), 3181);
+            
StaticDNSResolver.addNodeToRack(bookieSocketAddressesInDefaultRack[i].getHostName(),
+                    defaultRackForThisTest);
+        }
+
+        List<BookieSocketAddress> nonDefaultRackBookiesList = 
Arrays.asList(bookieSocketAddresses);
+        List<BookieSocketAddress> defaultRackBookiesList = 
Arrays.asList(bookieSocketAddressesInDefaultRack);
+        Set<BookieSocketAddress> writableBookies = new 
HashSet<BookieSocketAddress>(nonDefaultRackBookiesList);
+        writableBookies.addAll(defaultRackBookiesList);
+        repp.onClusterChanged(writableBookies, new 
HashSet<BookieSocketAddress>());
+        assertEquals("NUM_WRITABLE_BOOKIES_IN_DEFAULT_RACK guage value", 
numOfBookiesInDefaultRack,
+                numBookiesInDefaultRackGauge.getSample());
 
         try {
+            // this newEnsemble call will exclude default rack bookies
             repp.newEnsemble(8, 4, 4, null, new HashSet<>());
-            fail("Should get not enough bookies exception since there are only 
3 racks");
+            fail("Should get not enough bookies exception since there are only 
3 non-default racks");
         } catch (BKNotEnoughBookiesException bnebe) {
         }
 
         try {
-            repp.newEnsemble(8, 4, 4, new HashSet<>(),
+            repp.newEnsemble(8, 4, 4, new HashSet<>(defaultRackBookiesList),
                     EnsembleForReplacementWithNoConstraints.INSTANCE, 
TruePredicate.INSTANCE);
-            fail("Should get not enough bookies exception since there are only 
3 racks");
+            fail("Should get not enough bookies exception since there are only 
3 non-default racks"
+                    + " and defaultrack bookies are excluded");
         } catch (BKNotEnoughBookiesException bnebe) {
         }
 
         /*
          * Though minNumRacksPerWriteQuorum is set to 4, since writeQuorum is 3
-         * and there are enough bookies in 3 racks, this newEnsemble calls 
should
-         * succeed.
+         * and there are enough bookies in 3 racks, this newEnsemble calls
+         * should succeed.
          */
         EnsemblePlacementPolicy.PlacementResult<List<BookieSocketAddress>> 
ensembleResponse;
         List<BookieSocketAddress> ensemble;
@@ -799,8 +824,9 @@ public class TestRackawareEnsemblePlacementPolicy extends 
TestCase {
                 getNumCoveredWriteQuorums(ensemble, writeQuorumSize, 
clientConf.getMinNumRacksPerWriteQuorum()));
         assertTrue(isEnsembleAdheringToPlacementPolicy);
 
-        ensembleResponse = repp.newEnsemble(ensembleSize, writeQuorumSize, 
ackQuorumSize, new HashSet<>(),
-                EnsembleForReplacementWithNoConstraints.INSTANCE, 
TruePredicate.INSTANCE);
+        ensembleResponse = repp.newEnsemble(ensembleSize, writeQuorumSize, 
ackQuorumSize,
+                new HashSet<>(defaultRackBookiesList), 
EnsembleForReplacementWithNoConstraints.INSTANCE,
+                TruePredicate.INSTANCE);
         ensemble = ensembleResponse.getResult();
         isEnsembleAdheringToPlacementPolicy = 
ensembleResponse.isStrictlyAdheringToPolicy();
         assertEquals("Number of writeQuorum sets covered", ensembleSize,
@@ -872,17 +898,23 @@ public class TestRackawareEnsemblePlacementPolicy extends 
TestCase {
 
     @Test
     public void testReplaceBookieWithEnforceMinNumRacks() throws Exception {
+        String defaultRackForThisTest = 
NetworkTopology.DEFAULT_REGION_AND_RACK;
         repp.uninitalize();
+        updateMyRack(defaultRackForThisTest);
 
         int minNumRacksPerWriteQuorum = 4;
         ClientConfiguration clientConf = new ClientConfiguration(conf);
         clientConf.setMinNumRacksPerWriteQuorum(minNumRacksPerWriteQuorum);
         // set enforceMinNumRacksPerWriteQuorum
         clientConf.setEnforceMinNumRacksPerWriteQuorum(true);
+        TestStatsProvider statsProvider = new TestStatsProvider();
+        TestStatsLogger statsLogger = statsProvider.getStatsLogger("");
         repp = new RackawareEnsemblePlacementPolicy();
         repp.initialize(clientConf, Optional.<DNSToSwitchMapping> empty(), 
timer, DISABLE_ALL,
-                NullStatsLogger.INSTANCE);
-        repp.withDefaultRack(NetworkTopology.DEFAULT_REGION_AND_RACK);
+                statsLogger);
+        repp.withDefaultRack(defaultRackForThisTest);
+        Gauge<? extends Number> numBookiesInDefaultRackGauge = statsLogger
+                
.getGauge(BookKeeperClientStats.NUM_WRITABLE_BOOKIES_IN_DEFAULT_RACK);
 
         int numOfRacks = 3;
         int numOfBookiesPerRack = 5;
@@ -900,8 +932,25 @@ public class TestRackawareEnsemblePlacementPolicy extends 
TestCase {
                 bookieRackMap.put(bookieAddress, rack);
             }
         }
+        /*
+         * bookies in this default rack should not be returned for 
replacebookie
+         * response.
+         */
+        int numOfBookiesInDefaultRack = 5;
+        BookieSocketAddress[] bookieSocketAddressesInDefaultRack = new 
BookieSocketAddress[numOfBookiesInDefaultRack];
+        for (int i = 0; i < numOfBookiesInDefaultRack; i++) {
+            bookieSocketAddressesInDefaultRack[i] = new 
BookieSocketAddress("127.0.0." + (i + 100), 3181);
+            
StaticDNSResolver.addNodeToRack(bookieSocketAddressesInDefaultRack[i].getHostName(),
+                    defaultRackForThisTest);
+        }
 
-        repp.onClusterChanged(bookieSocketAddresses, new 
HashSet<BookieSocketAddress>());
+        Set<BookieSocketAddress> nonDefaultRackBookiesList = 
bookieSocketAddresses;
+        List<BookieSocketAddress> defaultRackBookiesList = 
Arrays.asList(bookieSocketAddressesInDefaultRack);
+        Set<BookieSocketAddress> writableBookies = new 
HashSet<BookieSocketAddress>(nonDefaultRackBookiesList);
+        writableBookies.addAll(defaultRackBookiesList);
+        repp.onClusterChanged(writableBookies, new 
HashSet<BookieSocketAddress>());
+        assertEquals("NUM_WRITABLE_BOOKIES_IN_DEFAULT_RACK guage value", 
numOfBookiesInDefaultRack,
+                numBookiesInDefaultRackGauge.getSample());
 
         /*
          * Though minNumRacksPerWriteQuorum is set to 4, since writeQuorum is 3
@@ -927,9 +976,12 @@ public class TestRackawareEnsemblePlacementPolicy extends 
TestCase {
          */
         StaticDNSResolver.addNodeToRack(newBookieAddress1.getHostName(), 
rackOfOtherBookieInEnsemble);
         bookieSocketAddresses.add(newBookieAddress1);
+        writableBookies.add(newBookieAddress1);
         bookieRackMap.put(newBookieAddress1, rackOfOtherBookieInEnsemble);
 
-        repp.onClusterChanged(bookieSocketAddresses, new 
HashSet<BookieSocketAddress>());
+        repp.onClusterChanged(writableBookies, new 
HashSet<BookieSocketAddress>());
+        assertEquals("NUM_WRITABLE_BOOKIES_IN_DEFAULT_RACK guage value", 
numOfBookiesInDefaultRack,
+                numBookiesInDefaultRackGauge.getSample());
         try {
             repp.replaceBookie(ensembleSize, writeQuorumSize, ackQuorumSize, 
null,
                     ensemble, bookieInEnsembleToBeReplaced, new HashSet<>());
@@ -947,9 +999,12 @@ public class TestRackawareEnsemblePlacementPolicy extends 
TestCase {
          */
         StaticDNSResolver.addNodeToRack(newBookieAddress2.getHostName(), 
newRack);
         bookieSocketAddresses.add(newBookieAddress2);
+        writableBookies.add(newBookieAddress2);
         bookieRackMap.put(newBookieAddress2, newRack);
 
-        repp.onClusterChanged(bookieSocketAddresses, new 
HashSet<BookieSocketAddress>());
+        repp.onClusterChanged(writableBookies, new 
HashSet<BookieSocketAddress>());
+        assertEquals("NUM_WRITABLE_BOOKIES_IN_DEFAULT_RACK guage value", 
numOfBookiesInDefaultRack,
+                numBookiesInDefaultRackGauge.getSample());
         /*
          * this replaceBookie should succeed, because a new bookie is added to 
a
          * new rack.
@@ -966,7 +1021,9 @@ public class TestRackawareEnsemblePlacementPolicy extends 
TestCase {
 
         Set<BookieSocketAddress> bookiesToExclude = new HashSet<>();
         bookiesToExclude.add(newBookieAddress2);
-        repp.onClusterChanged(bookieSocketAddresses, new 
HashSet<BookieSocketAddress>());
+        repp.onClusterChanged(writableBookies, new 
HashSet<BookieSocketAddress>());
+        assertEquals("NUM_WRITABLE_BOOKIES_IN_DEFAULT_RACK guage value", 
numOfBookiesInDefaultRack,
+                numBookiesInDefaultRackGauge.getSample());
         try {
             repp.replaceBookie(ensembleSize, writeQuorumSize, ackQuorumSize, 
null, ensemble,
                     bookieInEnsembleToBeReplaced, bookiesToExclude);
@@ -984,9 +1041,12 @@ public class TestRackawareEnsemblePlacementPolicy extends 
TestCase {
          */
         StaticDNSResolver.addNodeToRack(newBookieAddress3.getHostName(), 
rackOfBookieToBeReplaced);
         bookieSocketAddresses.add(newBookieAddress3);
+        writableBookies.add(newBookieAddress3);
         bookieRackMap.put(newBookieAddress3, rackOfBookieToBeReplaced);
 
-        repp.onClusterChanged(bookieSocketAddresses, new 
HashSet<BookieSocketAddress>());
+        repp.onClusterChanged(writableBookies, new 
HashSet<BookieSocketAddress>());
+        assertEquals("NUM_WRITABLE_BOOKIES_IN_DEFAULT_RACK guage value", 
numOfBookiesInDefaultRack,
+                numBookiesInDefaultRackGauge.getSample());
         /*
          * here we have added new bookie to the rack of the bookie to be
          * replaced, so we should be able to replacebookie though
@@ -2023,4 +2083,154 @@ public class TestRackawareEnsemblePlacementPolicy 
extends TestCase {
         assertTrue(shuffleOccurred);
     }
 
+    @Test
+    public void testNumBookiesInDefaultRackGauge() throws Exception {
+        String defaultRackForThisTest = 
NetworkTopology.DEFAULT_REGION_AND_RACK;
+        repp.uninitalize();
+        updateMyRack(defaultRackForThisTest);
+
+        // Update cluster
+        BookieSocketAddress newAddr1 = new BookieSocketAddress("127.0.0.100", 
3181);
+        BookieSocketAddress newAddr2 = new BookieSocketAddress("127.0.0.101", 
3181);
+        BookieSocketAddress newAddr3 = new BookieSocketAddress("127.0.0.102", 
3181);
+        BookieSocketAddress newAddr4 = new BookieSocketAddress("127.0.0.103", 
3181);
+
+        // update dns mapping
+        StaticDNSResolver.addNodeToRack(newAddr1.getHostName(), 
defaultRackForThisTest);
+        StaticDNSResolver.addNodeToRack(newAddr2.getHostName(), 
"/default-region/r2");
+        StaticDNSResolver.addNodeToRack(newAddr3.getHostName(), 
"/default-region/r3");
+        StaticDNSResolver.addNodeToRack(newAddr4.getHostName(), 
defaultRackForThisTest);
+
+        TestStatsProvider statsProvider = new TestStatsProvider();
+        TestStatsLogger statsLogger = statsProvider.getStatsLogger("");
+
+        repp = new RackawareEnsemblePlacementPolicy();
+        repp.initialize(conf, Optional.<DNSToSwitchMapping> empty(), timer, 
DISABLE_ALL, statsLogger);
+        repp.withDefaultRack(defaultRackForThisTest);
+
+        Gauge<? extends Number> numBookiesInDefaultRackGauge = statsLogger
+                
.getGauge(BookKeeperClientStats.NUM_WRITABLE_BOOKIES_IN_DEFAULT_RACK);
+
+        Set<BookieSocketAddress> writeableBookies = new 
HashSet<BookieSocketAddress>();
+        writeableBookies.add(newAddr1);
+        writeableBookies.add(newAddr2);
+        Set<BookieSocketAddress> readOnlyBookies = new 
HashSet<BookieSocketAddress>();
+        readOnlyBookies.add(newAddr3);
+        readOnlyBookies.add(newAddr4);
+        repp.onClusterChanged(writeableBookies, readOnlyBookies);
+        // only writable bookie - newAddr1 in default rack
+        assertEquals("NUM_WRITABLE_BOOKIES_IN_DEFAULT_RACK guage value", 1, 
numBookiesInDefaultRackGauge.getSample());
+
+        readOnlyBookies.remove(newAddr4);
+        writeableBookies.add(newAddr4);
+        repp.onClusterChanged(writeableBookies, readOnlyBookies);
+        // newAddr4 is also added to writable bookie so 2 writable bookies -
+        // newAddr1 and newAddr4
+        assertEquals("NUM_WRITABLE_BOOKIES_IN_DEFAULT_RACK guage value", 2, 
numBookiesInDefaultRackGauge.getSample());
+
+        // newAddr4 rack is changed and it is not in default anymore
+        StaticDNSResolver.changeRack(Arrays.asList(newAddr4), 
Arrays.asList("/default-region/r4"));
+        assertEquals("NUM_WRITABLE_BOOKIES_IN_DEFAULT_RACK guage value", 1, 
numBookiesInDefaultRackGauge.getSample());
+
+        writeableBookies.clear();
+        // writeableBookies is empty so 0 writable bookies in default rack
+        repp.onClusterChanged(writeableBookies, readOnlyBookies);
+        assertEquals("NUM_WRITABLE_BOOKIES_IN_DEFAULT_RACK guage value", 0, 
numBookiesInDefaultRackGauge.getSample());
+
+        StaticDNSResolver.changeRack(Arrays.asList(newAddr1), 
Arrays.asList("/default-region/r2"));
+        readOnlyBookies.clear();
+        writeableBookies.add(newAddr1);
+        writeableBookies.add(newAddr2);
+        writeableBookies.add(newAddr3);
+        writeableBookies.add(newAddr4);
+        repp.onClusterChanged(writeableBookies, readOnlyBookies);
+        // newAddr1 rack is changed and it is not in default anymore. So no
+        // bookies in default rack anymore
+        assertEquals("NUM_WRITABLE_BOOKIES_IN_DEFAULT_RACK guage value", 0, 
numBookiesInDefaultRackGauge.getSample());
+    }
+
+    @Test
+    public void testNewEnsembleExcludesDefaultRackBookiesEnforceMinNumRacks() 
throws Exception {
+        String defaultRackForThisTest = 
NetworkTopology.DEFAULT_REGION_AND_RACK;
+        repp.uninitalize();
+        updateMyRack(defaultRackForThisTest);
+        int minNumRacksPerWriteQuorum = 4;
+        ClientConfiguration clientConf = new ClientConfiguration(conf);
+        clientConf.setMinNumRacksPerWriteQuorum(minNumRacksPerWriteQuorum);
+        // set enforceMinNumRacksPerWriteQuorum
+        clientConf.setEnforceMinNumRacksPerWriteQuorum(true);
+
+        TestStatsProvider statsProvider = new TestStatsProvider();
+        TestStatsLogger statsLogger = statsProvider.getStatsLogger("");
+
+        repp = new RackawareEnsemblePlacementPolicy();
+        repp.initialize(clientConf, Optional.<DNSToSwitchMapping> empty(), 
timer, DISABLE_ALL, statsLogger);
+        repp.withDefaultRack(defaultRackForThisTest);
+        Gauge<? extends Number> numBookiesInDefaultRackGauge = statsLogger
+                
.getGauge(BookKeeperClientStats.NUM_WRITABLE_BOOKIES_IN_DEFAULT_RACK);
+
+        int writeQuorumSize = 3;
+        int ackQuorumSize = 3;
+        int effectiveMinNumRacksPerWriteQuorum = 
Math.min(minNumRacksPerWriteQuorum, writeQuorumSize);
+
+        int numOfRacks = 2 * effectiveMinNumRacksPerWriteQuorum - 1;
+        int numOfBookiesPerRack = 20;
+        BookieSocketAddress[] bookieSocketAddresses = new 
BookieSocketAddress[numOfRacks * numOfBookiesPerRack];
+
+        for (int i = 0; i < numOfRacks; i++) {
+            for (int j = 0; j < numOfBookiesPerRack; j++) {
+                int index = i * numOfBookiesPerRack + j;
+                bookieSocketAddresses[index] = new 
BookieSocketAddress("128.0.0." + index, 3181);
+                
StaticDNSResolver.addNodeToRack(bookieSocketAddresses[index].getHostName(), 
"/default-region/r" + i);
+            }
+        }
+
+        int numOfBookiesInDefaultRack = 10;
+        BookieSocketAddress[] bookieSocketAddressesInDefaultRack = new 
BookieSocketAddress[numOfBookiesInDefaultRack];
+        for (int i = 0; i < numOfBookiesInDefaultRack; i++) {
+            bookieSocketAddressesInDefaultRack[i] = new 
BookieSocketAddress("127.0.0." + (i + 100), 3181);
+            
StaticDNSResolver.addNodeToRack(bookieSocketAddressesInDefaultRack[i].getHostName(),
+                    defaultRackForThisTest);
+        }
+
+        Set<BookieSocketAddress> writableBookies = new 
HashSet<BookieSocketAddress>(
+                Arrays.asList(bookieSocketAddresses));
+        
writableBookies.addAll(Arrays.asList(bookieSocketAddressesInDefaultRack));
+        repp.onClusterChanged(writableBookies, new 
HashSet<BookieSocketAddress>());
+        assertEquals("NUM_WRITABLE_BOOKIES_IN_DEFAULT_RACK guage value", 
numOfBookiesInDefaultRack,
+                numBookiesInDefaultRackGauge.getSample());
+
+        /*
+         * in this scenario we have enough number of racks (2 *
+         * effectiveMinNumRacksPerWriteQuorum - 1) and more number of bookies 
in
+         * each rack. So we should be able to create ensemble for all
+         * ensembleSizes (as long as there are enough number of bookies in each
+         * rack).
+         *
+         * Since minNumRacksPerWriteQuorum is enforced, it shouldn't select 
node
+         * from default rack.
+         */
+        EnsemblePlacementPolicy.PlacementResult<List<BookieSocketAddress>> 
ensembleResponse;
+        List<BookieSocketAddress> ensemble;
+        boolean isEnsembleAdheringToPlacementPolicy;
+        for (int ensembleSize = effectiveMinNumRacksPerWriteQuorum; 
ensembleSize < 40; ensembleSize++) {
+            ensembleResponse = repp.newEnsemble(ensembleSize, writeQuorumSize, 
ackQuorumSize, null, new HashSet<>());
+            ensemble = ensembleResponse.getResult();
+            isEnsembleAdheringToPlacementPolicy = 
ensembleResponse.isStrictlyAdheringToPolicy();
+            assertEquals("Number of writeQuorum sets covered", ensembleSize,
+                    getNumCoveredWriteQuorums(ensemble, writeQuorumSize, 
clientConf.getMinNumRacksPerWriteQuorum()));
+            assertTrue(isEnsembleAdheringToPlacementPolicy);
+
+            ensembleResponse = repp.newEnsemble(ensembleSize, writeQuorumSize, 
ackQuorumSize, null, new HashSet<>());
+            ensemble = ensembleResponse.getResult();
+            isEnsembleAdheringToPlacementPolicy = 
ensembleResponse.isStrictlyAdheringToPolicy();
+            assertEquals("Number of writeQuorum sets covered", ensembleSize,
+                    getNumCoveredWriteQuorums(ensemble, writeQuorumSize, 
clientConf.getMinNumRacksPerWriteQuorum()));
+            assertTrue(isEnsembleAdheringToPlacementPolicy);
+            Collection<BookieSocketAddress> bookiesOfDefaultRackInEnsemble = 
CollectionUtils
+                    
.intersection(Arrays.asList(bookieSocketAddressesInDefaultRack), ensemble);
+            assertTrue("Ensemble is not supposed to contain bookies from 
default rack, but ensemble contains - "
+                    + bookiesOfDefaultRackInEnsemble, 
bookiesOfDefaultRackInEnsemble.isEmpty());
+        }
+    }
 }
diff --git 
a/bookkeeper-server/src/test/java/org/apache/bookkeeper/replication/AuditorPlacementPolicyCheckTest.java
 
b/bookkeeper-server/src/test/java/org/apache/bookkeeper/replication/AuditorPlacementPolicyCheckTest.java
index 68b5df2..69134a3 100644
--- 
a/bookkeeper-server/src/test/java/org/apache/bookkeeper/replication/AuditorPlacementPolicyCheckTest.java
+++ 
b/bookkeeper-server/src/test/java/org/apache/bookkeeper/replication/AuditorPlacementPolicyCheckTest.java
@@ -52,10 +52,10 @@ import org.apache.bookkeeper.net.BookieSocketAddress;
 import org.apache.bookkeeper.replication.AuditorPeriodicCheckTest.TestAuditor;
 import 
org.apache.bookkeeper.replication.ReplicationException.CompatibilityException;
 import 
org.apache.bookkeeper.replication.ReplicationException.UnavailableException;
+import org.apache.bookkeeper.stats.Gauge;
 import org.apache.bookkeeper.stats.NullStatsLogger;
 import org.apache.bookkeeper.test.BookKeeperClusterTestCase;
 import org.apache.bookkeeper.test.TestStatsProvider;
-import org.apache.bookkeeper.test.TestStatsProvider.TestCounter;
 import org.apache.bookkeeper.test.TestStatsProvider.TestOpStatsLogger;
 import org.apache.bookkeeper.test.TestStatsProvider.TestStatsLogger;
 import org.apache.bookkeeper.util.StaticDNSResolver;
@@ -182,14 +182,14 @@ public class AuditorPlacementPolicyCheckTest extends 
BookKeeperClusterTestCase {
         MutableObject<Auditor> auditorRef = new MutableObject<Auditor>();
         try {
             TestStatsLogger statsLogger = 
startAuditorAndWaitForPlacementPolicyCheck(servConf, auditorRef);
-            TestCounter 
placementPolicyCheckEnsembleNotAdheringToPlacementPolicy = (TestCounter) 
statsLogger.getCounter(
-                    
ReplicationStats.PLACEMENT_POLICY_CHECK_ENSEMBLE_NOT_ADHERING_TO_PLACEMENT_POLICY_COUNTER);
+            Gauge<? extends Number> ledgersNotAdheringToPlacementPolicyGuage = 
statsLogger
+                    
.getGauge(ReplicationStats.NUM_LEDGERS_NOT_ADHERING_TO_PLACEMENT_POLICY);
             /*
              * since all of the bookies are in different racks, there 
shouldn't be any ledger not adhering
              * to placement policy.
              */
-            
assertEquals("PLACEMENT_POLICY_CHECK_ENSEMBLE_NOT_ADHERING_TO_PLACEMENT_POLICY_COUNTER
 SuccessCount", 0L,
-                    
placementPolicyCheckEnsembleNotAdheringToPlacementPolicy.get().longValue());
+            assertEquals("NUM_LEDGERS_NOT_ADHERING_TO_PLACEMENT_POLICY guage 
value", 0,
+                    ledgersNotAdheringToPlacementPolicyGuage.getSample());
         } finally {
             Auditor auditor = auditorRef.getValue();
             if (auditor != null) {
@@ -263,11 +263,10 @@ public class AuditorPlacementPolicyCheckTest extends 
BookKeeperClusterTestCase {
         MutableObject<Auditor> auditorRef = new MutableObject<Auditor>();
         try {
             TestStatsLogger statsLogger = 
startAuditorAndWaitForPlacementPolicyCheck(servConf, auditorRef);
-            TestCounter 
placementPolicyCheckEnsembleNotAdheringToPlacementPolicy = (TestCounter) 
statsLogger.getCounter(
-                    
ReplicationStats.PLACEMENT_POLICY_CHECK_ENSEMBLE_NOT_ADHERING_TO_PLACEMENT_POLICY_COUNTER);
-            
assertEquals("PLACEMENT_POLICY_CHECK_ENSEMBLE_NOT_ADHERING_TO_PLACEMENT_POLICY_COUNTER
 SuccessCount",
-                    (long) numOfLedgersNotAdheringToPlacementPolicy,
-                    
placementPolicyCheckEnsembleNotAdheringToPlacementPolicy.get().longValue());
+            Gauge<? extends Number> ledgersNotAdheringToPlacementPolicyGuage = 
statsLogger
+                    
.getGauge(ReplicationStats.NUM_LEDGERS_NOT_ADHERING_TO_PLACEMENT_POLICY);
+            assertEquals("NUM_LEDGERS_NOT_ADHERING_TO_PLACEMENT_POLICY guage 
value",
+                    numOfLedgersNotAdheringToPlacementPolicy, 
ledgersNotAdheringToPlacementPolicyGuage.getSample());
         } finally {
             Auditor auditor = auditorRef.getValue();
             if (auditor != null) {
@@ -362,11 +361,10 @@ public class AuditorPlacementPolicyCheckTest extends 
BookKeeperClusterTestCase {
         MutableObject<Auditor> auditorRef = new MutableObject<Auditor>();
         try {
             TestStatsLogger statsLogger = 
startAuditorAndWaitForPlacementPolicyCheck(servConf, auditorRef);
-            TestCounter 
placementPolicyCheckEnsembleNotAdheringToPlacementPolicy = (TestCounter) 
statsLogger.getCounter(
-                    
ReplicationStats.PLACEMENT_POLICY_CHECK_ENSEMBLE_NOT_ADHERING_TO_PLACEMENT_POLICY_COUNTER);
-            
assertEquals("PLACEMENT_POLICY_CHECK_ENSEMBLE_NOT_ADHERING_TO_PLACEMENT_POLICY_COUNTER
 SuccessCount",
-                    (long) numOfLedgersNotAdheringToPlacementPolicy,
-                    
placementPolicyCheckEnsembleNotAdheringToPlacementPolicy.get().longValue());
+            Gauge<? extends Number> ledgersNotAdheringToPlacementPolicyGuage = 
statsLogger
+                    
.getGauge(ReplicationStats.NUM_LEDGERS_NOT_ADHERING_TO_PLACEMENT_POLICY);
+            assertEquals("NUM_LEDGERS_NOT_ADHERING_TO_PLACEMENT_POLICY gauge 
value",
+                    numOfLedgersNotAdheringToPlacementPolicy, 
ledgersNotAdheringToPlacementPolicyGuage.getSample());
         } finally {
             Auditor auditor = auditorRef.getValue();
             if (auditor != null) {

Reply via email to