sijie closed pull request #1883: Enhance EnsemblePlacementPolicy and 
DNSResolverDecorator to log relevant metrics.
URL: https://github.com/apache/bookkeeper/pull/1883
 
 
   

This is a PR merged from a forked repository.
As GitHub hides the original diff on merge, it is displayed below for
the sake of provenance:

As this is a foreign pull request (from a fork), the diff is supplied
below (as it won't show otherwise due to GitHub magic):

diff --git 
a/bookkeeper-server/src/main/java/org/apache/bookkeeper/bookie/BookKeeperServerStats.java
 
b/bookkeeper-server/src/main/java/org/apache/bookkeeper/bookie/BookKeeperServerStats.java
index cdafd15a99..b58514b670 100644
--- 
a/bookkeeper-server/src/main/java/org/apache/bookkeeper/bookie/BookKeeperServerStats.java
+++ 
b/bookkeeper-server/src/main/java/org/apache/bookkeeper/bookie/BookKeeperServerStats.java
@@ -69,6 +69,8 @@
     String WATCHER_SCOPE = "bookie_watcher";
     String REPLACE_BOOKIE_TIME = "REPLACE_BOOKIE_TIME";
     String NEW_ENSEMBLE_TIME = "NEW_ENSEMBLE_TIME";
+    String FAILED_TO_RESOLVE_NETWORK_LOCATION_COUNTER = 
"FAILED_TO_RESOLVE_NETWORK_LOCATION_COUNTER";
+    String ENSEMBLE_NOT_ADHERING_TO_PLACEMENT_POLICY_COUNTER = 
"ENSEMBLE_NOT_ADHERING_TO_PLACEMENT_POLICY_COUNTER";
 
     // Bookie Operations
     String BOOKIE_ADD_ENTRY = "BOOKIE_ADD_ENTRY";
diff --git 
a/bookkeeper-server/src/main/java/org/apache/bookkeeper/bookie/LocalBookieEnsemblePlacementPolicy.java
 
b/bookkeeper-server/src/main/java/org/apache/bookkeeper/bookie/LocalBookieEnsemblePlacementPolicy.java
index 46978ea5c6..7b7cc46bcf 100644
--- 
a/bookkeeper-server/src/main/java/org/apache/bookkeeper/bookie/LocalBookieEnsemblePlacementPolicy.java
+++ 
b/bookkeeper-server/src/main/java/org/apache/bookkeeper/bookie/LocalBookieEnsemblePlacementPolicy.java
@@ -36,6 +36,7 @@
 import org.apache.bookkeeper.net.BookieSocketAddress;
 import org.apache.bookkeeper.net.DNSToSwitchMapping;
 import org.apache.bookkeeper.stats.StatsLogger;
+import org.apache.commons.lang3.tuple.Pair;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
@@ -80,9 +81,9 @@ public void uninitalize() {
     }
 
     @Override
-    public BookieSocketAddress replaceBookie(int ensembleSize, int 
writeQuorumSize, int ackQuorumSize,
-        java.util.Map<String, byte[]> customMetadata, Set<BookieSocketAddress> 
currentEnsemble,
-        BookieSocketAddress bookieToReplace, Set<BookieSocketAddress> 
excludeBookies)
+    public Pair<BookieSocketAddress, Boolean> replaceBookie(int ensembleSize, 
int writeQuorumSize, int ackQuorumSize,
+            java.util.Map<String, byte[]> customMetadata, 
List<BookieSocketAddress> currentEnsemble,
+            BookieSocketAddress bookieToReplace, Set<BookieSocketAddress> 
excludeBookies)
             throws BKNotEnoughBookiesException {
         throw new BKNotEnoughBookiesException();
     }
@@ -109,18 +110,24 @@ public void registerSlowBookie(BookieSocketAddress 
bookieSocketAddress, long ent
     }
 
     @Override
-    public List<BookieSocketAddress> newEnsemble(int ensembleSize, int 
writeQuorumSize, int ackQuorumSize,
-        java.util.Map<String, byte[]> customMetadata, Set<BookieSocketAddress> 
excludeBookies)
+    public Pair<List<BookieSocketAddress>, Boolean> newEnsemble(int 
ensembleSize, int writeQuorumSize,
+            int ackQuorumSize, java.util.Map<String, byte[]> customMetadata, 
Set<BookieSocketAddress> excludeBookies)
             throws BKNotEnoughBookiesException {
         if (ensembleSize > 1) {
             throw new IllegalArgumentException("Local ensemble policy can only 
return 1 bookie");
         }
 
-        return Lists.newArrayList(bookieAddress);
+        return Pair.of(Lists.newArrayList(bookieAddress), true);
     }
 
     @Override
     public void updateBookieInfo(Map<BookieSocketAddress, BookieInfo> 
bookieToFreeSpaceMap) {
         return;
     }
+
+    @Override
+    public boolean 
isEnsembleAdheringToPlacementPolicy(List<BookieSocketAddress> ensembleList, int 
writeQuorumSize,
+            int ackQuorumSize) {
+        return true;
+    }
 }
diff --git 
a/bookkeeper-server/src/main/java/org/apache/bookkeeper/client/BookKeeperAdmin.java
 
b/bookkeeper-server/src/main/java/org/apache/bookkeeper/client/BookKeeperAdmin.java
index 3f837dbaf7..7b492922f1 100644
--- 
a/bookkeeper-server/src/main/java/org/apache/bookkeeper/client/BookKeeperAdmin.java
+++ 
b/bookkeeper-server/src/main/java/org/apache/bookkeeper/client/BookKeeperAdmin.java
@@ -35,7 +35,6 @@
 import java.util.Collection;
 import java.util.Enumeration;
 import java.util.HashMap;
-import java.util.HashSet;
 import java.util.Iterator;
 import java.util.LinkedList;
 import java.util.List;
@@ -82,6 +81,7 @@
 import org.apache.bookkeeper.stats.NullStatsLogger;
 import org.apache.bookkeeper.stats.StatsLogger;
 import org.apache.bookkeeper.util.IOUtils;
+import org.apache.commons.lang3.tuple.Pair;
 import org.apache.zookeeper.AsyncCallback;
 import org.apache.zookeeper.KeeperException;
 import org.slf4j.Logger;
@@ -994,15 +994,25 @@ private void asyncRecoverLedgerFragment(final 
LedgerHandle lh,
         // allocate bookies
         for (Integer bookieIndex : bookieIndexesToRereplicate) {
             BookieSocketAddress oldBookie = ensemble.get(bookieIndex);
-            BookieSocketAddress newBookie =
+            Pair<BookieSocketAddress, Boolean> replaceBookieResponse =
                     bkc.getPlacementPolicy().replaceBookie(
                             lh.getLedgerMetadata().getEnsembleSize(),
                             lh.getLedgerMetadata().getWriteQuorumSize(),
                             lh.getLedgerMetadata().getAckQuorumSize(),
                             lh.getLedgerMetadata().getCustomMetadata(),
-                            new HashSet<>(ensemble),
+                            ensemble,
                             oldBookie,
                             bookiesToExclude);
+            BookieSocketAddress newBookie = replaceBookieResponse.getLeft();
+            boolean isEnsembleAdheringToPlacementPolicy = 
replaceBookieResponse.getRight();
+            if (!isEnsembleAdheringToPlacementPolicy) {
+                if (LOG.isDebugEnabled()) {
+                    LOG.debug(
+                            "replaceBookie for bookie: {} in ensemble: {} "
+                                    + "is not adhering to placement policy and 
chose {}",
+                            oldBookie, ensemble, newBookie);
+                }
+            }
             targetBookieAddresses.put(bookieIndex, newBookie);
             bookiesToExclude.add(newBookie);
         }
diff --git 
a/bookkeeper-server/src/main/java/org/apache/bookkeeper/client/BookieWatcherImpl.java
 
b/bookkeeper-server/src/main/java/org/apache/bookkeeper/client/BookieWatcherImpl.java
index 8b14b77174..74c1df984b 100644
--- 
a/bookkeeper-server/src/main/java/org/apache/bookkeeper/client/BookieWatcherImpl.java
+++ 
b/bookkeeper-server/src/main/java/org/apache/bookkeeper/client/BookieWatcherImpl.java
@@ -17,6 +17,7 @@
  */
 package org.apache.bookkeeper.client;
 
+import static 
org.apache.bookkeeper.bookie.BookKeeperServerStats.ENSEMBLE_NOT_ADHERING_TO_PLACEMENT_POLICY_COUNTER;
 import static 
org.apache.bookkeeper.bookie.BookKeeperServerStats.NEW_ENSEMBLE_TIME;
 import static 
org.apache.bookkeeper.bookie.BookKeeperServerStats.REPLACE_BOOKIE_TIME;
 import static org.apache.bookkeeper.bookie.BookKeeperServerStats.WATCHER_SCOPE;
@@ -35,6 +36,8 @@
 import java.util.concurrent.TimeUnit;
 import java.util.function.Function;
 import lombok.extern.slf4j.Slf4j;
+
+import org.apache.bookkeeper.bookie.BookKeeperServerStats;
 import org.apache.bookkeeper.client.BKException.BKInterruptedException;
 import org.apache.bookkeeper.client.BKException.BKNotEnoughBookiesException;
 import org.apache.bookkeeper.client.BKException.MetaStoreException;
@@ -43,9 +46,11 @@
 import org.apache.bookkeeper.conf.ClientConfiguration;
 import org.apache.bookkeeper.discover.RegistrationClient;
 import org.apache.bookkeeper.net.BookieSocketAddress;
+import org.apache.bookkeeper.stats.Counter;
 import org.apache.bookkeeper.stats.OpStatsLogger;
 import org.apache.bookkeeper.stats.StatsLogger;
 import org.apache.bookkeeper.stats.annotations.StatsDoc;
+import org.apache.commons.lang3.tuple.Pair;
 
 /**
  * This class is responsible for maintaining a consistent view of what bookies
@@ -88,6 +93,12 @@
         help = "operation stats of replacing bookie in an ensemble"
     )
     private final OpStatsLogger replaceBookieTimer;
+    @StatsDoc(
+            name = ENSEMBLE_NOT_ADHERING_TO_PLACEMENT_POLICY_COUNTER,
+            help = "total number of newEnsemble/replaceBookie operations 
failed to adhere"
+            + " EnsemblePlacementPolicy"
+    )
+    private final Counter ensembleNotAdheringToPlacementPolicy;
 
     // Bookies that will not be preferred to be chosen in a new ensemble
     final Cache<BookieSocketAddress, Boolean> quarantinedBookies;
@@ -117,6 +128,8 @@ public void 
onRemoval(RemovalNotification<BookieSocketAddress, Boolean> bookie)
                 }).build();
         this.newEnsembleTimer = 
statsLogger.getOpStatsLogger(NEW_ENSEMBLE_TIME);
         this.replaceBookieTimer = 
statsLogger.getOpStatsLogger(REPLACE_BOOKIE_TIME);
+        this.ensembleNotAdheringToPlacementPolicy = statsLogger
+                
.getCounter(BookKeeperServerStats.ENSEMBLE_NOT_ADHERING_TO_PLACEMENT_POLICY_COUNTER);
     }
 
     @Override
@@ -213,19 +226,34 @@ public void initialBlockingBookieRead() throws 
BKException {
         int ackQuorumSize, Map<String, byte[]> customMetadata)
             throws BKNotEnoughBookiesException {
         long startTime = MathUtils.nowInNano();
+        Pair<List<BookieSocketAddress>, Boolean> newEnsembleResponse;
         List<BookieSocketAddress> socketAddresses;
+        boolean isEnsembleAdheringToPlacementPolicy = false;
         try {
-            socketAddresses = placementPolicy.newEnsemble(ensembleSize,
-                    writeQuorumSize, ackQuorumSize, customMetadata, new 
HashSet<BookieSocketAddress>(
-                            quarantinedBookies.asMap().keySet()));
+            Set<BookieSocketAddress> quarantinedBookiesSet = 
quarantinedBookies.asMap().keySet();
+            newEnsembleResponse = placementPolicy.newEnsemble(ensembleSize, 
writeQuorumSize, ackQuorumSize,
+                    customMetadata, new 
HashSet<BookieSocketAddress>(quarantinedBookiesSet));
+            socketAddresses = newEnsembleResponse.getLeft();
+            isEnsembleAdheringToPlacementPolicy = 
newEnsembleResponse.getRight();
+            if (!isEnsembleAdheringToPlacementPolicy) {
+                ensembleNotAdheringToPlacementPolicy.inc();
+                log.warn("New ensemble: {} is not adhering to Placement 
Policy. quarantinedBookies: {}",
+                        socketAddresses, quarantinedBookiesSet);
+            }
             // we try to only get from the healthy bookies first
             newEnsembleTimer.registerSuccessfulEvent(MathUtils.nowInNano() - 
startTime, TimeUnit.NANOSECONDS);
         } catch (BKNotEnoughBookiesException e) {
             if (log.isDebugEnabled()) {
                 log.debug("Not enough healthy bookies available, using 
quarantined bookies");
             }
-            socketAddresses = placementPolicy.newEnsemble(
+            newEnsembleResponse = placementPolicy.newEnsemble(
                     ensembleSize, writeQuorumSize, ackQuorumSize, 
customMetadata, new HashSet<>());
+            socketAddresses = newEnsembleResponse.getLeft();
+            isEnsembleAdheringToPlacementPolicy = 
newEnsembleResponse.getRight();
+            if (!isEnsembleAdheringToPlacementPolicy) {
+                ensembleNotAdheringToPlacementPolicy.inc();
+                log.warn("New ensemble: {} is not adhering to Placement 
Policy", socketAddresses);
+            }
             newEnsembleTimer.registerFailedEvent(MathUtils.nowInNano() - 
startTime, TimeUnit.NANOSECONDS);
         }
         return socketAddresses;
@@ -239,22 +267,43 @@ public BookieSocketAddress replaceBookie(int 
ensembleSize, int writeQuorumSize,
             throws BKNotEnoughBookiesException {
         long startTime = MathUtils.nowInNano();
         BookieSocketAddress addr = existingBookies.get(bookieIdx);
+        Pair<BookieSocketAddress, Boolean> replaceBookieResponse;
         BookieSocketAddress socketAddress;
+        boolean isEnsembleAdheringToPlacementPolicy = false;
         try {
             // we exclude the quarantined bookies also first
-            Set<BookieSocketAddress> existingAndQuarantinedBookies = new 
HashSet<BookieSocketAddress>(existingBookies);
-            
existingAndQuarantinedBookies.addAll(quarantinedBookies.asMap().keySet());
-            socketAddress = placementPolicy.replaceBookie(
+            Set<BookieSocketAddress> excludedBookiesAndQuarantinedBookies = 
new HashSet<BookieSocketAddress>(
+                    excludeBookies);
+            Set<BookieSocketAddress> quarantinedBookiesSet = 
quarantinedBookies.asMap().keySet();
+            excludedBookiesAndQuarantinedBookies.addAll(quarantinedBookiesSet);
+            replaceBookieResponse = placementPolicy.replaceBookie(
                     ensembleSize, writeQuorumSize, ackQuorumSize, 
customMetadata,
-                    existingAndQuarantinedBookies, addr, excludeBookies);
+                    existingBookies, addr, 
excludedBookiesAndQuarantinedBookies);
+            socketAddress = replaceBookieResponse.getLeft();
+            isEnsembleAdheringToPlacementPolicy = 
replaceBookieResponse.getRight();
+            if (!isEnsembleAdheringToPlacementPolicy) {
+                ensembleNotAdheringToPlacementPolicy.inc();
+                log.warn(
+                        "replaceBookie for bookie: {} in ensemble: {} is not 
adhering to placement policy and"
+                                + " chose {}. excludedBookies {} and 
quarantinedBookies {}",
+                        addr, existingBookies, socketAddress, excludeBookies, 
quarantinedBookiesSet);
+            }
             replaceBookieTimer.registerSuccessfulEvent(MathUtils.nowInNano() - 
startTime, TimeUnit.NANOSECONDS);
         } catch (BKNotEnoughBookiesException e) {
             if (log.isDebugEnabled()) {
                 log.debug("Not enough healthy bookies available, using 
quarantined bookies");
             }
-            socketAddress = placementPolicy.replaceBookie(
-                    ensembleSize, writeQuorumSize, ackQuorumSize, 
customMetadata,
-                    new HashSet<BookieSocketAddress>(existingBookies), addr, 
excludeBookies);
+            replaceBookieResponse = 
placementPolicy.replaceBookie(ensembleSize, writeQuorumSize, ackQuorumSize,
+                    customMetadata, existingBookies, addr, excludeBookies);
+            socketAddress = replaceBookieResponse.getLeft();
+            isEnsembleAdheringToPlacementPolicy = 
replaceBookieResponse.getRight();
+            if (!isEnsembleAdheringToPlacementPolicy) {
+                ensembleNotAdheringToPlacementPolicy.inc();
+                log.warn(
+                        "replaceBookie for bookie: {} in ensemble: {} is not 
adhering to placement policy and"
+                                + " chose {}. excludedBookies {}",
+                        addr, existingBookies, socketAddress, excludeBookies);
+            }
             replaceBookieTimer.registerFailedEvent(MathUtils.nowInNano() - 
startTime, TimeUnit.NANOSECONDS);
         }
         return socketAddress;
diff --git 
a/bookkeeper-server/src/main/java/org/apache/bookkeeper/client/DefaultEnsemblePlacementPolicy.java
 
b/bookkeeper-server/src/main/java/org/apache/bookkeeper/client/DefaultEnsemblePlacementPolicy.java
index 917d18f717..dddbe1cb40 100644
--- 
a/bookkeeper-server/src/main/java/org/apache/bookkeeper/client/DefaultEnsemblePlacementPolicy.java
+++ 
b/bookkeeper-server/src/main/java/org/apache/bookkeeper/client/DefaultEnsemblePlacementPolicy.java
@@ -39,6 +39,7 @@
 import org.apache.bookkeeper.net.DNSToSwitchMapping;
 import org.apache.bookkeeper.stats.StatsLogger;
 import org.apache.commons.collections4.CollectionUtils;
+import org.apache.commons.lang3.tuple.Pair;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
@@ -64,12 +65,12 @@
     }
 
     @Override
-    public ArrayList<BookieSocketAddress> newEnsemble(int ensembleSize, int 
quorumSize, int ackQuorumSize,
+    public Pair<List<BookieSocketAddress>, Boolean> newEnsemble(int 
ensembleSize, int quorumSize, int ackQuorumSize,
             Map<String, byte[]> customMetadata, Set<BookieSocketAddress> 
excludeBookies)
             throws BKNotEnoughBookiesException {
         ArrayList<BookieSocketAddress> newBookies = new 
ArrayList<BookieSocketAddress>(ensembleSize);
         if (ensembleSize <= 0) {
-            return newBookies;
+            return Pair.of(newBookies, false);
         }
         List<BookieSocketAddress> allBookies;
         rwLock.readLock().lock();
@@ -95,7 +96,8 @@
                     newBookies.add(b);
                     --ensembleSize;
                     if (ensembleSize == 0) {
-                        return newBookies;
+                        return Pair.of(newBookies,
+                                
isEnsembleAdheringToPlacementPolicy(newBookies, quorumSize, ackQuorumSize));
                     }
                 }
             } finally {
@@ -110,7 +112,8 @@
                 newBookies.add(bookie);
                 --ensembleSize;
                 if (ensembleSize == 0) {
-                    return newBookies;
+                    return Pair.of(newBookies,
+                            isEnsembleAdheringToPlacementPolicy(newBookies, 
quorumSize, ackQuorumSize));
                 }
             }
         }
@@ -118,13 +121,17 @@
     }
 
     @Override
-    public BookieSocketAddress replaceBookie(int ensembleSize, int 
writeQuorumSize, int ackQuorumSize,
-            Map<String, byte[]> customMetadata, Set<BookieSocketAddress> 
currentEnsemble,
+    public Pair<BookieSocketAddress, Boolean> replaceBookie(int ensembleSize, 
int writeQuorumSize, int ackQuorumSize,
+            Map<String, byte[]> customMetadata, List<BookieSocketAddress> 
currentEnsemble,
             BookieSocketAddress bookieToReplace, Set<BookieSocketAddress> 
excludeBookies)
             throws BKNotEnoughBookiesException {
         excludeBookies.addAll(currentEnsemble);
-        ArrayList<BookieSocketAddress> addresses = newEnsemble(1, 1, 1, 
customMetadata, excludeBookies);
-        return addresses.get(0);
+        List<BookieSocketAddress> addresses = newEnsemble(1, 1, 1, 
customMetadata, excludeBookies).getLeft();
+
+        BookieSocketAddress candidateAddr = addresses.get(0);
+        List<BookieSocketAddress> newEnsemble = new 
ArrayList<BookieSocketAddress>(currentEnsemble);
+        newEnsemble.set(currentEnsemble.indexOf(bookieToReplace), 
candidateAddr);
+        return Pair.of(candidateAddr, 
isEnsembleAdheringToPlacementPolicy(newEnsemble, writeQuorumSize, 
ackQuorumSize));
     }
 
     @Override
@@ -210,4 +217,10 @@ public void updateBookieInfo(Map<BookieSocketAddress, 
BookieInfo> bookieInfoMap)
     public void uninitalize() {
         // do nothing
     }
+
+    @Override
+    public boolean 
isEnsembleAdheringToPlacementPolicy(List<BookieSocketAddress> ensembleList, int 
writeQuorumSize,
+            int ackQuorumSize) {
+        return true;
+    }
 }
diff --git 
a/bookkeeper-server/src/main/java/org/apache/bookkeeper/client/EnsemblePlacementPolicy.java
 
b/bookkeeper-server/src/main/java/org/apache/bookkeeper/client/EnsemblePlacementPolicy.java
index 23932a3d7d..00bac8e3c4 100644
--- 
a/bookkeeper-server/src/main/java/org/apache/bookkeeper/client/EnsemblePlacementPolicy.java
+++ 
b/bookkeeper-server/src/main/java/org/apache/bookkeeper/client/EnsemblePlacementPolicy.java
@@ -35,6 +35,7 @@
 import org.apache.bookkeeper.net.BookieSocketAddress;
 import org.apache.bookkeeper.net.DNSToSwitchMapping;
 import org.apache.bookkeeper.stats.StatsLogger;
+import org.apache.commons.lang3.tuple.Pair;
 
 /**
  * {@link EnsemblePlacementPolicy} encapsulates the algorithm that bookkeeper 
client uses to select a number of bookies
@@ -263,12 +264,12 @@ EnsemblePlacementPolicy initialize(ClientConfiguration 
conf,
      * @throws BKNotEnoughBookiesException if not enough bookies available.
      * @return the List&lt;org.apache.bookkeeper.net.BookieSocketAddress&gt;
      */
-    List<BookieSocketAddress> newEnsemble(int ensembleSize,
-                                               int writeQuorumSize,
-                                               int ackQuorumSize,
-                                               Map<String, byte[]> 
customMetadata,
-                                               Set<BookieSocketAddress> 
excludeBookies)
-        throws BKNotEnoughBookiesException;
+    Pair<List<BookieSocketAddress>, Boolean> newEnsemble(int ensembleSize,
+                                                         int writeQuorumSize,
+                                                         int ackQuorumSize,
+                                                         Map<String, byte[]> 
customMetadata,
+                                                         
Set<BookieSocketAddress> excludeBookies)
+            throws BKNotEnoughBookiesException;
 
     /**
      * Choose a new bookie to replace <i>bookieToReplace</i>. If no bookie 
available in the cluster,
@@ -287,14 +288,14 @@ EnsemblePlacementPolicy initialize(ClientConfiguration 
conf,
      * @throws BKNotEnoughBookiesException
      * @return the org.apache.bookkeeper.net.BookieSocketAddress
      */
-    BookieSocketAddress replaceBookie(int ensembleSize,
-                                      int writeQuorumSize,
-                                      int ackQuorumSize,
-                                      Map<String, byte[]> customMetadata,
-                                      Set<BookieSocketAddress> currentEnsemble,
-                                      BookieSocketAddress bookieToReplace,
-                                      Set<BookieSocketAddress> excludeBookies)
-        throws BKNotEnoughBookiesException;
+    Pair<BookieSocketAddress, Boolean> replaceBookie(int ensembleSize,
+                                                     int writeQuorumSize,
+                                                     int ackQuorumSize,
+                                                     Map<String, byte[]> 
customMetadata,
+                                                     List<BookieSocketAddress> 
currentEnsemble,
+                                                     BookieSocketAddress 
bookieToReplace,
+                                                     Set<BookieSocketAddress> 
excludeBookies)
+            throws BKNotEnoughBookiesException;
 
     /**
      * Register a bookie as slow so that it is tried after available and 
read-only bookies.
@@ -386,4 +387,25 @@ default int getStickyReadBookieIndex(LedgerMetadata 
metadata, Optional<Integer>
             return (currentStickyBookieIndex.get() + 1) % 
metadata.getEnsembleSize();
         }
     }
+
+    /**
+     * returns true if the Ensemble is strictly adhering to placement policy,
+     * like in the case of RackawareEnsemblePlacementPolicy, bookies in the
+     * writeset are from 'minNumRacksPerWriteQuorum' number of racks. And in 
the
+     * case of RegionawareEnsemblePlacementPolicy, check for
+     * minimumRegionsForDurability, reppRegionsToWrite, rack distribution 
within
+     * a region and other parameters of RegionAwareEnsemblePlacementPolicy.
+     *
+     * @param ensembleList
+     *            list of BookieSocketAddress of bookies in the ensemble
+     * @param writeQuorumSize
+     *            writeQuorumSize of the ensemble
+     * @param ackQuorumSize
+     *            ackQuorumSize of the ensemble
+     * @return
+     */
+    default boolean 
isEnsembleAdheringToPlacementPolicy(List<BookieSocketAddress> ensembleList, int 
writeQuorumSize,
+            int ackQuorumSize) {
+        return false;
+    }
 }
diff --git 
a/bookkeeper-server/src/main/java/org/apache/bookkeeper/client/ITopologyAwareEnsemblePlacementPolicy.java
 
b/bookkeeper-server/src/main/java/org/apache/bookkeeper/client/ITopologyAwareEnsemblePlacementPolicy.java
index 7c9e07cd10..254f5359c2 100644
--- 
a/bookkeeper-server/src/main/java/org/apache/bookkeeper/client/ITopologyAwareEnsemblePlacementPolicy.java
+++ 
b/bookkeeper-server/src/main/java/org/apache/bookkeeper/client/ITopologyAwareEnsemblePlacementPolicy.java
@@ -28,6 +28,7 @@
 import org.apache.bookkeeper.common.annotation.InterfaceStability;
 import org.apache.bookkeeper.net.BookieSocketAddress;
 import org.apache.bookkeeper.net.Node;
+import org.apache.commons.lang3.tuple.Pair;
 
 /**
  * Interface for topology aware ensemble placement policy.
@@ -93,7 +94,7 @@
      * @return list of bookies forming the ensemble
      * @throws BKException.BKNotEnoughBookiesException
      */
-    List<BookieSocketAddress> newEnsemble(
+    Pair<List<BookieSocketAddress>, Boolean> newEnsemble(
             int ensembleSize,
             int writeQuorumSize,
             int ackQuorumSize,
diff --git 
a/bookkeeper-server/src/main/java/org/apache/bookkeeper/client/RackawareEnsemblePlacementPolicy.java
 
b/bookkeeper-server/src/main/java/org/apache/bookkeeper/client/RackawareEnsemblePlacementPolicy.java
index 1fd7580c81..8054d97187 100644
--- 
a/bookkeeper-server/src/main/java/org/apache/bookkeeper/client/RackawareEnsemblePlacementPolicy.java
+++ 
b/bookkeeper-server/src/main/java/org/apache/bookkeeper/client/RackawareEnsemblePlacementPolicy.java
@@ -18,19 +18,16 @@
 package org.apache.bookkeeper.client;
 
 import io.netty.util.HashedWheelTimer;
-
 import java.util.List;
 import java.util.Map;
 import java.util.Set;
 
 import org.apache.bookkeeper.client.BKException.BKNotEnoughBookiesException;
-import 
org.apache.bookkeeper.client.ITopologyAwareEnsemblePlacementPolicy.Ensemble;
-import 
org.apache.bookkeeper.client.ITopologyAwareEnsemblePlacementPolicy.Predicate;
-import 
org.apache.bookkeeper.client.TopologyAwareEnsemblePlacementPolicy.BookieNode;
 import org.apache.bookkeeper.net.BookieSocketAddress;
 import org.apache.bookkeeper.net.DNSToSwitchMapping;
 import org.apache.bookkeeper.net.Node;
 import org.apache.bookkeeper.stats.StatsLogger;
+import org.apache.commons.lang3.tuple.Pair;
 
 /**
  * A placement policy implementation use rack information for placing 
ensembles.
@@ -95,8 +92,8 @@ public void uninitalize() {
     }
 
     @Override
-    public List<BookieSocketAddress> newEnsemble(int ensembleSize, int 
writeQuorumSize, int ackQuorumSize,
-            Map<String, byte[]> customMetadata, Set<BookieSocketAddress> 
excludeBookies)
+    public Pair<List<BookieSocketAddress>, Boolean> newEnsemble(int 
ensembleSize, int writeQuorumSize,
+            int ackQuorumSize, Map<String, byte[]> customMetadata, 
Set<BookieSocketAddress> excludeBookies)
             throws BKException.BKNotEnoughBookiesException {
         try {
             return super.newEnsemble(ensembleSize, writeQuorumSize, 
ackQuorumSize, customMetadata, excludeBookies);
@@ -110,8 +107,8 @@ public void uninitalize() {
     }
 
     @Override
-    public BookieSocketAddress replaceBookie(int ensembleSize, int 
writeQuorumSize, int ackQuorumSize,
-            Map<String, byte[]> customMetadata, Set<BookieSocketAddress> 
currentEnsemble,
+    public Pair<BookieSocketAddress, Boolean> replaceBookie(int ensembleSize, 
int writeQuorumSize, int ackQuorumSize,
+            Map<String, byte[]> customMetadata, List<BookieSocketAddress> 
currentEnsemble,
             BookieSocketAddress bookieToReplace, Set<BookieSocketAddress> 
excludeBookies)
             throws BKException.BKNotEnoughBookiesException {
        try {
@@ -146,7 +143,7 @@ public BookieSocketAddress replaceBookie(int ensembleSize, 
int writeQuorumSize,
     }
 
     @Override
-    public List<BookieSocketAddress> newEnsemble(int ensembleSize,
+    public Pair<List<BookieSocketAddress>, Boolean> newEnsemble(int 
ensembleSize,
                                                  int writeQuorumSize,
                                                  int ackQuorumSize,
                                                  Set<BookieSocketAddress> 
excludeBookies,
diff --git 
a/bookkeeper-server/src/main/java/org/apache/bookkeeper/client/RackawareEnsemblePlacementPolicyImpl.java
 
b/bookkeeper-server/src/main/java/org/apache/bookkeeper/client/RackawareEnsemblePlacementPolicyImpl.java
index 7578c416ee..6db7de8f72 100644
--- 
a/bookkeeper-server/src/main/java/org/apache/bookkeeper/client/RackawareEnsemblePlacementPolicyImpl.java
+++ 
b/bookkeeper-server/src/main/java/org/apache/bookkeeper/client/RackawareEnsemblePlacementPolicyImpl.java
@@ -20,6 +20,7 @@
 import static com.google.common.base.Preconditions.checkNotNull;
 import static 
org.apache.bookkeeper.bookie.BookKeeperServerStats.BOOKIES_JOINED;
 import static org.apache.bookkeeper.bookie.BookKeeperServerStats.BOOKIES_LEFT;
+import static 
org.apache.bookkeeper.bookie.BookKeeperServerStats.FAILED_TO_RESOLVE_NETWORK_LOCATION_COUNTER;
 import static org.apache.bookkeeper.client.BookKeeperClientStats.CLIENT_SCOPE;
 import static 
org.apache.bookkeeper.client.BookKeeperClientStats.READ_REQUESTS_REORDERED;
 import static 
org.apache.bookkeeper.client.RegionAwareEnsemblePlacementPolicy.UNKNOWN_REGION;
@@ -35,6 +36,7 @@
 import java.net.InetAddress;
 import java.net.UnknownHostException;
 import java.util.ArrayList;
+import java.util.Collection;
 import java.util.Collections;
 import java.util.HashMap;
 import java.util.HashSet;
@@ -64,10 +66,12 @@
 import org.apache.bookkeeper.net.NodeBase;
 import org.apache.bookkeeper.net.ScriptBasedMapping;
 import org.apache.bookkeeper.net.StabilizeNetworkTopology;
+import org.apache.bookkeeper.stats.Counter;
 import org.apache.bookkeeper.stats.OpStatsLogger;
 import org.apache.bookkeeper.stats.StatsLogger;
 import org.apache.bookkeeper.stats.annotations.StatsDoc;
 import org.apache.commons.collections4.CollectionUtils;
+import org.apache.commons.lang3.tuple.Pair;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
@@ -146,12 +150,19 @@ public void reloadCachedMappings() {
 
         final Supplier<String> defaultRackSupplier;
         final DNSToSwitchMapping resolver;
-
-        DNSResolverDecorator(DNSToSwitchMapping resolver, Supplier<String> 
defaultRackSupplier) {
+        @StatsDoc(
+                name = FAILED_TO_RESOLVE_NETWORK_LOCATION_COUNTER,
+                help = "total number of times Resolver failed to resolve rack 
information of a node"
+        )
+        final Counter failedToResolveNetworkLocationCounter;
+
+        DNSResolverDecorator(DNSToSwitchMapping resolver, Supplier<String> 
defaultRackSupplier,
+                Counter failedToResolveNetworkLocationCounter) {
             checkNotNull(resolver, "Resolver cannot be null");
             checkNotNull(defaultRackSupplier, "defaultRackSupplier should not 
be null");
             this.defaultRackSupplier = defaultRackSupplier;
             this.resolver = resolver;
+            this.failedToResolveNetworkLocationCounter = 
failedToResolveNetworkLocationCounter;
         }
 
         public List<String> resolve(List<String> names) {
@@ -167,6 +178,7 @@ public void reloadCachedMappings() {
                     if (rNames.get(i) == null) {
                         LOG.warn("Failed to resolve network location for {}, 
using default rack for it : {}.",
                                 names.get(i), defaultRack);
+                        failedToResolveNetworkLocationCounter.inc();
                         rNames.set(i, defaultRack);
                     }
                 }
@@ -178,6 +190,7 @@ public void reloadCachedMappings() {
             rNames = new ArrayList<>(names.size());
 
             for (int i = 0; i < names.size(); ++i) {
+                failedToResolveNetworkLocationCounter.inc();
                 rNames.add(defaultRack);
             }
             return rNames;
@@ -227,6 +240,7 @@ public void reloadCachedMappings() {
         help = "The distribution of number of bookies reordered on each read 
request"
     )
     protected OpStatsLogger readReorderedCounter = null;
+    protected Counter failedToResolveNetworkLocationCounter = null;
 
     private String defaultRack = NetworkTopology.DEFAULT_RACK;
 
@@ -267,10 +281,13 @@ protected RackawareEnsemblePlacementPolicyImpl 
initialize(DNSToSwitchMapping dns
         this.bookiesJoinedCounter = 
statsLogger.getOpStatsLogger(BOOKIES_JOINED);
         this.bookiesLeftCounter = statsLogger.getOpStatsLogger(BOOKIES_LEFT);
         this.readReorderedCounter = 
statsLogger.getOpStatsLogger(READ_REQUESTS_REORDERED);
+        this.failedToResolveNetworkLocationCounter = statsLogger
+                .getCounter(FAILED_TO_RESOLVE_NETWORK_LOCATION_COUNTER);
         this.reorderReadsRandom = reorderReadsRandom;
         this.stabilizePeriodSeconds = stabilizePeriodSeconds;
         this.reorderThresholdPendingRequests = reorderThresholdPendingRequests;
-        this.dnsResolver = new DNSResolverDecorator(dnsResolver, () -> 
this.getDefaultRack());
+        this.dnsResolver = new DNSResolverDecorator(dnsResolver, () -> 
this.getDefaultRack(),
+                failedToResolveNetworkLocationCounter);
         this.timer = timer;
         this.minNumRacksPerWriteQuorum = minNumRacksPerWriteQuorum;
         this.enforceMinNumRacksPerWriteQuorum = 
enforceMinNumRacksPerWriteQuorum;
@@ -342,9 +359,18 @@ public RackawareEnsemblePlacementPolicyImpl 
initialize(ClientConfiguration conf,
                     ((RackChangeNotifier) 
dnsResolver).registerRackChangeListener(this);
                 }
             } catch (RuntimeException re) {
-                LOG.info("Failed to initialize DNS Resolver {}, used default 
subnet resolver : {}",
-                    dnsResolverName, re, re.getMessage());
-                dnsResolver = new DefaultResolver(() -> this.getDefaultRack());
+                if (!enforceMinNumRacksPerWriteQuorum) {
+                    LOG.info("Failed to initialize DNS Resolver {}, used 
default subnet resolver : {}", dnsResolverName,
+                            re, re.getMessage());
+                    dnsResolver = new DefaultResolver(() -> 
this.getDefaultRack());
+                } else {
+                    /*
+                     * if minNumRacksPerWriteQuorum is enforced, then it
+                     * shouldn't continue in the case of failure to create
+                     * dnsResolver.
+                     */
+                    throw re;
+                }
             }
         }
         slowBookies = CacheBuilder.newBuilder()
@@ -479,7 +505,7 @@ public void 
handleBookiesThatJoined(Set<BookieSocketAddress> joinedBookies) {
         }
     }
 
-    protected Set<Node> convertBookiesToNodes(Set<BookieSocketAddress> 
excludeBookies) {
+    protected Set<Node> convertBookiesToNodes(Collection<BookieSocketAddress> 
excludeBookies) {
         Set<Node> nodes = new HashSet<Node>();
         for (BookieSocketAddress addr : excludeBookies) {
             BookieNode bn = knownBookies.get(addr);
@@ -500,13 +526,13 @@ public void 
handleBookiesThatJoined(Set<BookieSocketAddress> joinedBookies) {
     }
 
     @Override
-    public List<BookieSocketAddress> newEnsemble(int ensembleSize, int 
writeQuorumSize, int ackQuorumSize,
-            Map<String, byte[]> customMetadata, Set<BookieSocketAddress> 
excludeBookies)
+    public Pair<List<BookieSocketAddress>, Boolean> newEnsemble(int 
ensembleSize, int writeQuorumSize,
+            int ackQuorumSize, Map<String, byte[]> customMetadata, 
Set<BookieSocketAddress> excludeBookies)
             throws BKNotEnoughBookiesException {
         return newEnsembleInternal(ensembleSize, writeQuorumSize, 
excludeBookies, null, null);
     }
 
-    protected List<BookieSocketAddress> newEnsembleInternal(int ensembleSize,
+    protected Pair<List<BookieSocketAddress>, Boolean> newEnsembleInternal(int 
ensembleSize,
                                                             int 
writeQuorumSize,
                                                             
Set<BookieSocketAddress> excludeBookies,
                                                             
Ensemble<BookieNode> parentEnsemble,
@@ -522,7 +548,7 @@ public void 
handleBookiesThatJoined(Set<BookieSocketAddress> joinedBookies) {
     }
 
     @Override
-    public List<BookieSocketAddress> newEnsemble(int ensembleSize,
+    public Pair<List<BookieSocketAddress>, Boolean> newEnsemble(int 
ensembleSize,
                                                  int writeQuorumSize,
                                                  int ackQuorumSize,
                                                  Set<BookieSocketAddress> 
excludeBookies,
@@ -538,7 +564,7 @@ public void 
handleBookiesThatJoined(Set<BookieSocketAddress> joinedBookies) {
                 parentPredicate);
     }
 
-    protected List<BookieSocketAddress> newEnsembleInternal(
+    protected Pair<List<BookieSocketAddress>, Boolean> newEnsembleInternal(
             int ensembleSize,
             int writeQuorumSize,
             int ackQuorumSize,
@@ -572,7 +598,7 @@ public void 
handleBookiesThatJoined(Set<BookieSocketAddress> joinedBookies) {
                 for (BookieNode bn : bns) {
                     addrs.add(bn.getAddr());
                 }
-                return addrs;
+                return Pair.of(addrs, false);
             }
 
             for (int i = 0; i < ensembleSize; i++) {
@@ -596,15 +622,15 @@ public void 
handleBookiesThatJoined(Set<BookieSocketAddress> joinedBookies) {
                           ensembleSize, bookieList);
                 throw new BKNotEnoughBookiesException();
             }
-            return bookieList;
+            return Pair.of(bookieList, 
isEnsembleAdheringToPlacementPolicy(bookieList, writeQuorumSize, 
ackQuorumSize));
         } finally {
             rwLock.readLock().unlock();
         }
     }
 
     @Override
-    public BookieSocketAddress replaceBookie(int ensembleSize, int 
writeQuorumSize, int ackQuorumSize,
-            Map<String, byte[]> customMetadata, Set<BookieSocketAddress> 
currentEnsemble,
+    public Pair<BookieSocketAddress, Boolean> replaceBookie(int ensembleSize, 
int writeQuorumSize, int ackQuorumSize,
+            Map<String, byte[]> customMetadata, List<BookieSocketAddress> 
currentEnsemble,
             BookieSocketAddress bookieToReplace, Set<BookieSocketAddress> 
excludeBookies)
             throws BKNotEnoughBookiesException {
         rwLock.readLock().lock();
@@ -639,7 +665,19 @@ public BookieSocketAddress replaceBookie(int ensembleSize, 
int writeQuorumSize,
             if (LOG.isDebugEnabled()) {
                 LOG.debug("Bookie {} is chosen to replace bookie {}.", 
candidate, bn);
             }
-            return candidate.getAddr();
+            BookieSocketAddress candidateAddr = candidate.getAddr();
+            List<BookieSocketAddress> newEnsemble = new 
ArrayList<BookieSocketAddress>(currentEnsemble);
+            if (currentEnsemble.isEmpty()) {
+                /*
+                 * in testing code there are test cases which would pass empty
+                 * currentEnsemble
+                 */
+                newEnsemble.add(candidateAddr);
+            } else {
+                newEnsemble.set(currentEnsemble.indexOf(bookieToReplace), 
candidateAddr);
+            }
+            return Pair.of(candidateAddr,
+                    isEnsembleAdheringToPlacementPolicy(newEnsemble, 
writeQuorumSize, ackQuorumSize));
         } finally {
             rwLock.readLock().unlock();
         }
@@ -1206,4 +1244,32 @@ static void 
shuffleWithMask(DistributionSchedule.WriteSet writeSet,
             }
         }
     }
+
+    @Override
+    public boolean 
isEnsembleAdheringToPlacementPolicy(List<BookieSocketAddress> ensembleList, int 
writeQuorumSize,
+            int ackQuorumSize) {
+        int ensembleSize = ensembleList.size();
+        int minNumRacksPerWriteQuorumForThisEnsemble = 
Math.min(writeQuorumSize, minNumRacksPerWriteQuorum);
+        HashSet<String> racksOrRegionsInQuorum = new HashSet<String>();
+        BookieSocketAddress bookie;
+        for (int i = 0; i < ensembleList.size(); i++) {
+            racksOrRegionsInQuorum.clear();
+            for (int j = 0; j < writeQuorumSize; j++) {
+                bookie = ensembleList.get((i + j) % ensembleSize);
+                try {
+                    
racksOrRegionsInQuorum.add(knownBookies.get(bookie).getNetworkLocation());
+                } catch (Exception e) {
+                    /*
+                     * any issue/exception in analyzing whether ensemble is 
strictly adhering to
+                     * placement policy should be swallowed.
+                     */
+                    LOG.warn("Received exception while trying to get network 
location of bookie: {}", bookie, e);
+                }
+            }
+            if (racksOrRegionsInQuorum.size() < 
minNumRacksPerWriteQuorumForThisEnsemble) {
+                return false;
+            }
+        }
+        return true;
+    }
 }
diff --git 
a/bookkeeper-server/src/main/java/org/apache/bookkeeper/client/RegionAwareEnsemblePlacementPolicy.java
 
b/bookkeeper-server/src/main/java/org/apache/bookkeeper/client/RegionAwareEnsemblePlacementPolicy.java
index c52e0fee85..1bd4b75eac 100644
--- 
a/bookkeeper-server/src/main/java/org/apache/bookkeeper/client/RegionAwareEnsemblePlacementPolicy.java
+++ 
b/bookkeeper-server/src/main/java/org/apache/bookkeeper/client/RegionAwareEnsemblePlacementPolicy.java
@@ -225,8 +225,8 @@ public RegionAwareEnsemblePlacementPolicy 
initialize(ClientConfiguration conf,
 
 
     @Override
-    public List<BookieSocketAddress> newEnsemble(int ensembleSize, int 
writeQuorumSize, int ackQuorumSize,
-            Map<String, byte[]> customMetadata, Set<BookieSocketAddress> 
excludeBookies)
+    public Pair<List<BookieSocketAddress>, Boolean> newEnsemble(int 
ensembleSize, int writeQuorumSize,
+            int ackQuorumSize, Map<String, byte[]> customMetadata, 
Set<BookieSocketAddress> excludeBookies)
             throws BKException.BKNotEnoughBookiesException {
 
         int effectiveMinRegionsForDurability = 
disableDurabilityFeature.isAvailable() ? 1 : minRegionsForDurability;
@@ -279,7 +279,7 @@ public RegionAwareEnsemblePlacementPolicy 
initialize(ClientConfiguration conf,
                 for (BookieNode bn : bns) {
                     addrs.add(bn.getAddr());
                 }
-                return addrs;
+                return Pair.of(addrs, 
isEnsembleAdheringToPlacementPolicy(addrs, writeQuorumSize, ackQuorumSize));
             }
 
             // Single region, fall back to RackAwareEnsemblePlacement
@@ -347,7 +347,7 @@ public RegionAwareEnsemblePlacementPolicy 
initialize(ClientConfiguration conf,
                             try {
                                 List<BookieSocketAddress> allocated = 
policyWithinRegion.newEnsemble(newEnsembleSize,
                                         newWriteQuorumSize, 
newWriteQuorumSize, excludeBookies, tempEnsemble,
-                                        tempEnsemble);
+                                        tempEnsemble).getLeft();
                                 ensemble = tempEnsemble;
                                 remainingEnsemble -= addToEnsembleSize;
                                 remainingWriteQuorum -= addToWriteQuorum;
@@ -407,15 +407,17 @@ public RegionAwareEnsemblePlacementPolicy 
initialize(ClientConfiguration conf,
                 throw new BKException.BKNotEnoughBookiesException();
             }
             LOG.info("Bookies allocated successfully {}", ensemble);
-            return ensemble.toList();
+            List<BookieSocketAddress> ensembleList = ensemble.toList();
+            return Pair.of(ensembleList,
+                    isEnsembleAdheringToPlacementPolicy(ensembleList, 
writeQuorumSize, ackQuorumSize));
         } finally {
             rwLock.readLock().unlock();
         }
     }
 
     @Override
-    public BookieSocketAddress replaceBookie(int ensembleSize, int 
writeQuorumSize, int ackQuorumSize,
-            Map<String, byte[]> customMetadata, Set<BookieSocketAddress> 
currentEnsemble,
+    public Pair<BookieSocketAddress, Boolean> replaceBookie(int ensembleSize, 
int writeQuorumSize, int ackQuorumSize,
+            Map<String, byte[]> customMetadata, List<BookieSocketAddress> 
currentEnsemble,
             BookieSocketAddress bookieToReplace, Set<BookieSocketAddress> 
excludeBookies)
             throws BKException.BKNotEnoughBookiesException {
         rwLock.readLock().lock();
@@ -469,7 +471,19 @@ public BookieSocketAddress replaceBookie(int ensembleSize, 
int writeQuorumSize,
             if (LOG.isDebugEnabled()) {
                 LOG.debug("Bookie {} is chosen to replace bookie {}.", 
candidate, bookieNodeToReplace);
             }
-            return candidate.getAddr();
+            BookieSocketAddress candidateAddr = candidate.getAddr();
+            List<BookieSocketAddress> newEnsemble = new 
ArrayList<BookieSocketAddress>(currentEnsemble);
+            if (currentEnsemble.isEmpty()) {
+                /*
+                 * in testing code there are test cases which would pass empty
+                 * currentEnsemble
+                 */
+                newEnsemble.add(candidateAddr);
+            } else {
+                newEnsemble.set(currentEnsemble.indexOf(bookieToReplace), 
candidateAddr);
+            }
+            return Pair.of(candidateAddr,
+                    isEnsembleAdheringToPlacementPolicy(newEnsemble, 
writeQuorumSize, ackQuorumSize));
         } finally {
             rwLock.readLock().unlock();
         }
@@ -550,4 +564,16 @@ protected BookieNode replaceFromRack(BookieNode 
bookieNodeToReplace,
         finalList.addMissingIndices(ensemble.size());
         return finalList;
     }
+
+    @Override
+    public boolean 
isEnsembleAdheringToPlacementPolicy(List<BookieSocketAddress> ensembleList, int 
writeQuorumSize,
+            int ackQuorumSize) {
+        /**
+         * TODO: have to implement actual logic for this method for
+         * RegionAwareEnsemblePlacementPolicy. For now return true value.
+         *
+         * - https://github.com/apache/bookkeeper/issues/1898
+         */
+        return true;
+    }
 }
diff --git 
a/bookkeeper-server/src/main/java/org/apache/bookkeeper/net/ScriptBasedMapping.java
 
b/bookkeeper-server/src/main/java/org/apache/bookkeeper/net/ScriptBasedMapping.java
index 8c8350c3fc..230f66d411 100644
--- 
a/bookkeeper-server/src/main/java/org/apache/bookkeeper/net/ScriptBasedMapping.java
+++ 
b/bookkeeper-server/src/main/java/org/apache/bookkeeper/net/ScriptBasedMapping.java
@@ -25,8 +25,8 @@
 
 import org.apache.bookkeeper.util.Shell.ShellCommandExecutor;
 import org.apache.commons.configuration.Configuration;
-import org.apache.commons.logging.Log;
-import org.apache.commons.logging.LogFactory;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
 
 /**
  * This class implements the {@link DNSToSwitchMapping} interface using a
@@ -129,7 +129,7 @@ public void setConf(Configuration conf) {
     private static final class RawScriptBasedMapping extends 
AbstractDNSToSwitchMapping {
         private String scriptName;
         private int maxArgs; //max hostnames per call of the script
-        private static final Log LOG = 
LogFactory.getLog(ScriptBasedMapping.class);
+        private static final Logger LOG = 
LoggerFactory.getLogger(ScriptBasedMapping.class);
 
         /**
          * Set the configuration and extract the configuration parameters of 
interest.
@@ -233,7 +233,7 @@ private String runResolveCommand(List<String> args) {
                     s.execute();
                     allOutput.append(s.getOutput()).append(" ");
                 } catch (Exception e) {
-                    LOG.warn("Exception running " + s, e);
+                    LOG.warn("Exception running: {} Exception message: {}", s, 
e.getMessage());
                     return null;
                 }
                 loopCount++;
diff --git 
a/bookkeeper-server/src/test/java/org/apache/bookkeeper/client/GenericEnsemblePlacementPolicyTest.java
 
b/bookkeeper-server/src/test/java/org/apache/bookkeeper/client/GenericEnsemblePlacementPolicyTest.java
index bb55d0c6d0..205c5f4cb9 100644
--- 
a/bookkeeper-server/src/test/java/org/apache/bookkeeper/client/GenericEnsemblePlacementPolicyTest.java
+++ 
b/bookkeeper-server/src/test/java/org/apache/bookkeeper/client/GenericEnsemblePlacementPolicyTest.java
@@ -33,6 +33,7 @@
 
 import org.apache.bookkeeper.net.BookieSocketAddress;
 import org.apache.bookkeeper.test.BookKeeperClusterTestCase;
+import org.apache.commons.lang3.tuple.Pair;
 import org.junit.Before;
 import org.junit.Test;
 import org.junit.runner.RunWith;
@@ -69,8 +70,8 @@ public GenericEnsemblePlacementPolicyTest(boolean 
diskWeightBasedPlacementEnable
     public static final class CustomEnsemblePlacementPolicy extends 
DefaultEnsemblePlacementPolicy {
 
         @Override
-        public BookieSocketAddress replaceBookie(int ensembleSize, int 
writeQuorumSize,
-            int ackQuorumSize, Map<String, byte[]> customMetadata, 
Set<BookieSocketAddress> currentEnsemble,
+        public Pair<BookieSocketAddress, Boolean> replaceBookie(int 
ensembleSize, int writeQuorumSize,
+            int ackQuorumSize, Map<String, byte[]> customMetadata, 
List<BookieSocketAddress> currentEnsemble,
             BookieSocketAddress bookieToReplace, Set<BookieSocketAddress> 
excludeBookies)
             throws BKException.BKNotEnoughBookiesException {
             new Exception("replaceBookie " + ensembleSize + "," + 
customMetadata).printStackTrace();
@@ -81,7 +82,7 @@ public BookieSocketAddress replaceBookie(int ensembleSize, 
int writeQuorumSize,
         }
 
         @Override
-        public ArrayList<BookieSocketAddress> newEnsemble(int ensembleSize, 
int quorumSize,
+        public Pair<List<BookieSocketAddress>, Boolean> newEnsemble(int 
ensembleSize, int quorumSize,
             int ackQuorumSize, Map<String, byte[]> customMetadata, 
Set<BookieSocketAddress> excludeBookies)
             throws BKException.BKNotEnoughBookiesException {
             assertNotNull(customMetadata);
diff --git 
a/bookkeeper-server/src/test/java/org/apache/bookkeeper/client/TestRackawareEnsemblePlacementPolicy.java
 
b/bookkeeper-server/src/test/java/org/apache/bookkeeper/client/TestRackawareEnsemblePlacementPolicy.java
index e75e93fa86..e6cd07bcc7 100644
--- 
a/bookkeeper-server/src/test/java/org/apache/bookkeeper/client/TestRackawareEnsemblePlacementPolicy.java
+++ 
b/bookkeeper-server/src/test/java/org/apache/bookkeeper/client/TestRackawareEnsemblePlacementPolicy.java
@@ -52,6 +52,7 @@
 import org.apache.bookkeeper.net.Node;
 import org.apache.bookkeeper.stats.NullStatsLogger;
 import org.apache.bookkeeper.util.StaticDNSResolver;
+import org.apache.commons.lang3.tuple.Pair;
 import org.junit.Test;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
@@ -69,6 +70,7 @@
     ClientConfiguration conf = new ClientConfiguration();
     BookieSocketAddress addr1, addr2, addr3, addr4;
     io.netty.util.HashedWheelTimer timer;
+    final int minNumRacksPerWriteQuorumConfValue = 2;
 
     @Override
     protected void setUp() throws Exception {
@@ -80,6 +82,7 @@ protected void setUp() throws Exception {
         StaticDNSResolver.addNodeToRack("localhost", 
NetworkTopology.DEFAULT_REGION_AND_RACK);
         LOG.info("Set up static DNS Resolver.");
         conf.setProperty(REPP_DNS_RESOLVER_CLASS, 
StaticDNSResolver.class.getName());
+        conf.setMinNumRacksPerWriteQuorum(minNumRacksPerWriteQuorumConfValue);
         addr1 = new BookieSocketAddress("127.0.0.2", 3181);
         addr2 = new BookieSocketAddress("127.0.0.3", 3181);
         addr3 = new BookieSocketAddress("127.0.0.4", 3181);
@@ -564,8 +567,12 @@ public void testReplaceBookieWithEnoughBookiesInSameRack() 
throws Exception {
         addrs.add(addr4);
         repp.onClusterChanged(addrs, new HashSet<BookieSocketAddress>());
         // replace node under r2
-        BookieSocketAddress replacedBookie = repp.replaceBookie(1, 1, 1, null, 
new HashSet<>(), addr2, new HashSet<>());
+        Pair<BookieSocketAddress, Boolean> replaceBookieResponse = 
repp.replaceBookie(1, 1, 1, null, new ArrayList<>(),
+                addr2, new HashSet<>());
+        BookieSocketAddress replacedBookie = replaceBookieResponse.getLeft();
+        boolean isEnsembleAdheringToPlacementPolicy = 
replaceBookieResponse.getRight();
         assertEquals(addr3, replacedBookie);
+        assertTrue(isEnsembleAdheringToPlacementPolicy);
     }
 
     @Test
@@ -589,10 +596,13 @@ public void 
testReplaceBookieWithEnoughBookiesInDifferentRack() throws Exception
         // replace node under r2
         Set<BookieSocketAddress> excludedAddrs = new 
HashSet<BookieSocketAddress>();
         excludedAddrs.add(addr1);
-        BookieSocketAddress replacedBookie = repp.replaceBookie(1, 1, 1, null, 
new HashSet<>(), addr2, excludedAddrs);
-
+        Pair<BookieSocketAddress, Boolean> replaceBookieResponse = 
repp.replaceBookie(1, 1, 1, null, new ArrayList<>(),
+                addr2, excludedAddrs);
+        BookieSocketAddress replacedBookie = replaceBookieResponse.getLeft();
+        boolean isEnsembleAdheringToPlacementPolicy = 
replaceBookieResponse.getRight();
         assertFalse(addr1.equals(replacedBookie));
         assertTrue(addr3.equals(replacedBookie) || 
addr4.equals(replacedBookie));
+        assertTrue(isEnsembleAdheringToPlacementPolicy);
     }
 
     @Test
@@ -619,7 +629,7 @@ public void testReplaceBookieWithNotEnoughBookies() throws 
Exception {
         excludedAddrs.add(addr3);
         excludedAddrs.add(addr4);
         try {
-            repp.replaceBookie(1, 1, 1, null, new 
HashSet<BookieSocketAddress>(), addr2, excludedAddrs);
+            repp.replaceBookie(1, 1, 1, null, new 
ArrayList<BookieSocketAddress>(), addr2, excludedAddrs);
             fail("Should throw BKNotEnoughBookiesException when there is not 
enough bookies");
         } catch (BKNotEnoughBookiesException bnebe) {
             // should throw not enou
@@ -628,7 +638,7 @@ public void testReplaceBookieWithNotEnoughBookies() throws 
Exception {
 
     @Test
     public void testReplaceBookieWithEnoughBookiesInSameRackAsEnsemble() 
throws Exception {
-        BookieSocketAddress addr1 = new BookieSocketAddress("127.0.0.1", 3181);
+        BookieSocketAddress addr1 = new BookieSocketAddress("127.0.0.5", 3181);
         BookieSocketAddress addr2 = new BookieSocketAddress("127.0.0.2", 3181);
         BookieSocketAddress addr3 = new BookieSocketAddress("127.0.0.3", 3181);
         BookieSocketAddress addr4 = new BookieSocketAddress("127.0.0.4", 3181);
@@ -645,15 +655,18 @@ public void 
testReplaceBookieWithEnoughBookiesInSameRackAsEnsemble() throws Exce
         addrs.add(addr4);
         repp.onClusterChanged(addrs, new HashSet<BookieSocketAddress>());
         // replace node under r2
-        Set<BookieSocketAddress> ensembleBookies = new 
HashSet<BookieSocketAddress>();
+        List<BookieSocketAddress> ensembleBookies = new 
ArrayList<BookieSocketAddress>();
         ensembleBookies.add(addr2);
         ensembleBookies.add(addr4);
-        BookieSocketAddress replacedBookie = repp.replaceBookie(
+        Pair<BookieSocketAddress, Boolean> replaceBookieResponse = 
repp.replaceBookie(
             1, 1, 1 , null,
             ensembleBookies,
             addr4,
             new HashSet<>());
+        BookieSocketAddress replacedBookie = replaceBookieResponse.getLeft();
+        boolean isEnsembleAdheringToPlacementPolicy = 
replaceBookieResponse.getRight();
         assertEquals(addr1, replacedBookie);
+        assertTrue(isEnsembleAdheringToPlacementPolicy);
     }
 
     @Test
@@ -670,10 +683,18 @@ public void testNewEnsembleWithSingleRack() throws 
Exception {
         addrs.add(addr4);
         repp.onClusterChanged(addrs, new HashSet<BookieSocketAddress>());
         try {
-            List<BookieSocketAddress> ensemble = repp.newEnsemble(3, 2, 2, 
null, new HashSet<>());
+            Pair<List<BookieSocketAddress>, Boolean> ensembleResponse;
+            ensembleResponse = repp.newEnsemble(3, 2, 2, null, new 
HashSet<>());
+            List<BookieSocketAddress> ensemble = ensembleResponse.getLeft();
+            boolean isEnsembleAdheringToPlacementPolicy = 
ensembleResponse.getRight();
             assertEquals(0, getNumCoveredWriteQuorums(ensemble, 2, 
conf.getMinNumRacksPerWriteQuorum()));
-            List<BookieSocketAddress> ensemble2 = repp.newEnsemble(4, 2, 2, 
null, new HashSet<>());
+            assertFalse(isEnsembleAdheringToPlacementPolicy);
+            Pair<List<BookieSocketAddress>, Boolean> ensembleResponse2;
+            ensembleResponse2 = repp.newEnsemble(4, 2, 2, null, new 
HashSet<>());
+            List<BookieSocketAddress> ensemble2 = ensembleResponse2.getLeft();
+            boolean isEnsembleAdheringToPlacementPolicy2 = 
ensembleResponse2.getRight();
             assertEquals(0, getNumCoveredWriteQuorums(ensemble2, 2, 
conf.getMinNumRacksPerWriteQuorum()));
+            assertFalse(isEnsembleAdheringToPlacementPolicy2);
         } catch (BKNotEnoughBookiesException bnebe) {
             fail("Should not get not enough bookies exception even there is 
only one rack.");
         }
@@ -703,17 +724,19 @@ public void testSingleRackWithEnforceMinNumRacks() throws 
Exception {
         addrs.add(addr3);
         addrs.add(addr4);
         repp.onClusterChanged(addrs, new HashSet<BookieSocketAddress>());
-
+        Pair<List<BookieSocketAddress>, Boolean> ensembleResponse;
         List<BookieSocketAddress> ensemble;
         try {
-            ensemble = repp.newEnsemble(3, 2, 2, null, new HashSet<>());
+            ensembleResponse = repp.newEnsemble(3, 2, 2, null, new 
HashSet<>());
+            ensemble = ensembleResponse.getLeft();
             fail("Should get not enough bookies exception since there is only 
one rack.");
         } catch (BKNotEnoughBookiesException bnebe) {
         }
 
         try {
-            ensemble = repp.newEnsemble(3, 2, 2, new HashSet<>(), 
EnsembleForReplacementWithNoConstraints.INSTANCE,
-                    TruePredicate.INSTANCE);
+            ensembleResponse = repp.newEnsemble(3, 2, 2, new HashSet<>(),
+                    EnsembleForReplacementWithNoConstraints.INSTANCE, 
TruePredicate.INSTANCE);
+            ensemble = ensembleResponse.getLeft();
             fail("Should get not enough bookies exception since there is only 
one rack.");
         } catch (BKNotEnoughBookiesException bnebe) {
         }
@@ -766,19 +789,27 @@ public void testNewEnsembleWithEnforceMinNumRacks() 
throws Exception {
          * and there are enough bookies in 3 racks, this newEnsemble calls 
should
          * succeed.
          */
+        Pair<List<BookieSocketAddress>, Boolean> ensembleResponse;
         List<BookieSocketAddress> ensemble;
+        boolean isEnsembleAdheringToPlacementPolicy;
         int ensembleSize = numOfRacks * numOfBookiesPerRack;
         int writeQuorumSize = numOfRacks;
         int ackQuorumSize = numOfRacks;
 
-        ensemble = repp.newEnsemble(ensembleSize, writeQuorumSize, 
ackQuorumSize, null, new HashSet<>());
+        ensembleResponse = repp.newEnsemble(ensembleSize, writeQuorumSize, 
ackQuorumSize, null, new HashSet<>());
+        ensemble = ensembleResponse.getLeft();
+        isEnsembleAdheringToPlacementPolicy = ensembleResponse.getRight();
         assertEquals("Number of writeQuorum sets covered", ensembleSize,
                 getNumCoveredWriteQuorums(ensemble, writeQuorumSize, 
clientConf.getMinNumRacksPerWriteQuorum()));
+        assertTrue(isEnsembleAdheringToPlacementPolicy);
 
-        ensemble = repp.newEnsemble(ensembleSize, writeQuorumSize, 
ackQuorumSize, new HashSet<>(),
+        ensembleResponse = repp.newEnsemble(ensembleSize, writeQuorumSize, 
ackQuorumSize, new HashSet<>(),
                 EnsembleForReplacementWithNoConstraints.INSTANCE, 
TruePredicate.INSTANCE);
+        ensemble = ensembleResponse.getLeft();
+        isEnsembleAdheringToPlacementPolicy = ensembleResponse.getRight();
         assertEquals("Number of writeQuorum sets covered", ensembleSize,
                 getNumCoveredWriteQuorums(ensemble, writeQuorumSize, 
clientConf.getMinNumRacksPerWriteQuorum()));
+        assertTrue(isEnsembleAdheringToPlacementPolicy);
     }
 
     @Test
@@ -822,16 +853,24 @@ public void 
testNewEnsembleWithSufficientRacksAndEnforceMinNumRacks() throws Exc
          * ensembleSizes (as long as there are enough number of bookies in each
          * rack).
          */
+        Pair<List<BookieSocketAddress>, Boolean> ensembleResponse;
         List<BookieSocketAddress> ensemble;
+        boolean isEnsembleAdheringToPlacementPolicy;
         for (int ensembleSize = effectiveMinNumRacksPerWriteQuorum; 
ensembleSize < 40; ensembleSize++) {
-            ensemble = repp.newEnsemble(ensembleSize, writeQuorumSize, 
ackQuorumSize, null, new HashSet<>());
+            ensembleResponse = repp.newEnsemble(ensembleSize, writeQuorumSize, 
ackQuorumSize, null, new HashSet<>());
+            ensemble = ensembleResponse.getLeft();
+            isEnsembleAdheringToPlacementPolicy = ensembleResponse.getRight();
             assertEquals("Number of writeQuorum sets covered", ensembleSize,
                     getNumCoveredWriteQuorums(ensemble, writeQuorumSize, 
clientConf.getMinNumRacksPerWriteQuorum()));
+            assertTrue(isEnsembleAdheringToPlacementPolicy);
 
-            ensemble = repp.newEnsemble(ensembleSize, writeQuorumSize, 
ackQuorumSize, new HashSet<>(),
+            ensembleResponse = repp.newEnsemble(ensembleSize, writeQuorumSize, 
ackQuorumSize, new HashSet<>(),
                     EnsembleForReplacementWithNoConstraints.INSTANCE, 
TruePredicate.INSTANCE);
+            ensemble = ensembleResponse.getLeft();
+            isEnsembleAdheringToPlacementPolicy = ensembleResponse.getRight();
             assertEquals("Number of writeQuorum sets covered", ensembleSize,
                     getNumCoveredWriteQuorums(ensemble, writeQuorumSize, 
clientConf.getMinNumRacksPerWriteQuorum()));
+            assertTrue(isEnsembleAdheringToPlacementPolicy);
         }
     }
 
@@ -873,12 +912,14 @@ public void testReplaceBookieWithEnforceMinNumRacks() 
throws Exception {
          * and there are enough bookies in 3 racks, this newEnsemble call 
should
          * succeed.
          */
+        Pair<List<BookieSocketAddress>, Boolean> ensembleResponse;
         List<BookieSocketAddress> ensemble;
         int ensembleSize = numOfRacks * numOfBookiesPerRack;
         int writeQuorumSize = numOfRacks;
         int ackQuorumSize = numOfRacks;
 
-        ensemble = repp.newEnsemble(ensembleSize, writeQuorumSize, 
ackQuorumSize, null, new HashSet<>());
+        ensembleResponse = repp.newEnsemble(ensembleSize, writeQuorumSize, 
ackQuorumSize, null, new HashSet<>());
+        ensemble = ensembleResponse.getLeft();
 
         BookieSocketAddress bookieInEnsembleToBeReplaced = ensemble.get(7);
         // get rack of some other bookie
@@ -895,7 +936,7 @@ public void testReplaceBookieWithEnforceMinNumRacks() 
throws Exception {
         repp.onClusterChanged(bookieSocketAddresses, new 
HashSet<BookieSocketAddress>());
         try {
             repp.replaceBookie(ensembleSize, writeQuorumSize, ackQuorumSize, 
null,
-                    new HashSet<BookieSocketAddress>(ensemble), 
bookieInEnsembleToBeReplaced, new HashSet<>());
+                    ensemble, bookieInEnsembleToBeReplaced, new HashSet<>());
             fail("Should get not enough bookies exception since there are no 
more bookies in rack"
                     + "of 'bookieInEnsembleToReplace'"
                     + "and new bookie added belongs to the rack of some other 
bookie in the ensemble");
@@ -917,16 +958,22 @@ public void testReplaceBookieWithEnforceMinNumRacks() 
throws Exception {
          * this replaceBookie should succeed, because a new bookie is added to 
a
          * new rack.
          */
-        BookieSocketAddress replacedBookieAddress = 
repp.replaceBookie(ensembleSize, writeQuorumSize, ackQuorumSize,
-                null, new HashSet<BookieSocketAddress>(ensemble), 
bookieInEnsembleToBeReplaced, new HashSet<>());
+        Pair<BookieSocketAddress, Boolean> replaceBookieResponse;
+        BookieSocketAddress replacedBookieAddress;
+        boolean isEnsembleAdheringToPlacementPolicy;
+        replaceBookieResponse = repp.replaceBookie(ensembleSize, 
writeQuorumSize, ackQuorumSize, null, ensemble,
+                bookieInEnsembleToBeReplaced, new HashSet<>());
+        replacedBookieAddress = replaceBookieResponse.getLeft();
+        isEnsembleAdheringToPlacementPolicy = replaceBookieResponse.getRight();
         assertEquals("It should be newBookieAddress2", newBookieAddress2, 
replacedBookieAddress);
+        assertTrue(isEnsembleAdheringToPlacementPolicy);
 
         Set<BookieSocketAddress> bookiesToExclude = new HashSet<>();
         bookiesToExclude.add(newBookieAddress2);
         repp.onClusterChanged(bookieSocketAddresses, new 
HashSet<BookieSocketAddress>());
         try {
-            repp.replaceBookie(ensembleSize, writeQuorumSize, ackQuorumSize, 
null,
-                    new HashSet<BookieSocketAddress>(ensemble), 
bookieInEnsembleToBeReplaced, bookiesToExclude);
+            repp.replaceBookie(ensembleSize, writeQuorumSize, ackQuorumSize, 
null, ensemble,
+                    bookieInEnsembleToBeReplaced, bookiesToExclude);
             fail("Should get not enough bookies exception since the only 
available bookie to replace"
                     + "is added to excludedBookies list");
         } catch (BKNotEnoughBookiesException bnebe) {
@@ -949,9 +996,12 @@ public void testReplaceBookieWithEnforceMinNumRacks() 
throws Exception {
          * replaced, so we should be able to replacebookie though
          * newBookieAddress2 is added to excluded bookies list.
          */
-        replacedBookieAddress = repp.replaceBookie(ensembleSize, 
writeQuorumSize, ackQuorumSize, null,
-                new HashSet<BookieSocketAddress>(ensemble), 
bookieInEnsembleToBeReplaced, bookiesToExclude);
+        replaceBookieResponse = repp.replaceBookie(ensembleSize, 
writeQuorumSize, ackQuorumSize, null,
+                ensemble, bookieInEnsembleToBeReplaced, bookiesToExclude);
+        replacedBookieAddress = replaceBookieResponse.getLeft();
+        isEnsembleAdheringToPlacementPolicy = replaceBookieResponse.getRight();
         assertEquals("It should be newBookieAddress3", newBookieAddress3, 
replacedBookieAddress);
+        assertTrue(isEnsembleAdheringToPlacementPolicy);
     }
 
     @Test
@@ -1298,15 +1348,21 @@ public void testNewEnsembleWithMultipleRacks() throws 
Exception {
             int ensembleSize = 3;
             int writeQuorumSize = 2;
             int acqQuorumSize = 2;
-            List<BookieSocketAddress> ensemble = 
repp.newEnsemble(ensembleSize, writeQuorumSize, acqQuorumSize,
-                    null, new HashSet<>());
+            Pair<List<BookieSocketAddress>, Boolean> ensembleResponse = 
repp.newEnsemble(ensembleSize, writeQuorumSize,
+                    acqQuorumSize, null, new HashSet<>());
+            List<BookieSocketAddress> ensemble = ensembleResponse.getLeft();
+            boolean isEnsembleAdheringToPlacementPolicy = 
ensembleResponse.getRight();
             int numCovered = getNumCoveredWriteQuorums(ensemble, 
writeQuorumSize, conf.getMinNumRacksPerWriteQuorum());
             assertTrue(numCovered >= 1 && numCovered < 3);
+            assertFalse(isEnsembleAdheringToPlacementPolicy);
             ensembleSize = 4;
-            List<BookieSocketAddress> ensemble2 = 
repp.newEnsemble(ensembleSize, writeQuorumSize, acqQuorumSize,
-                    null, new HashSet<>());
+            Pair<List<BookieSocketAddress>, Boolean> ensembleResponse2 = 
repp.newEnsemble(ensembleSize, writeQuorumSize,
+                    acqQuorumSize, null, new HashSet<>());
+            List<BookieSocketAddress> ensemble2 = ensembleResponse2.getLeft();
+            boolean isEnsembleAdheringToPlacementPolicy2 = 
ensembleResponse2.getRight();
             numCovered = getNumCoveredWriteQuorums(ensemble2, writeQuorumSize, 
conf.getMinNumRacksPerWriteQuorum());
             assertTrue(numCovered >= 1 && numCovered < 3);
+            assertFalse(isEnsembleAdheringToPlacementPolicy2);
         } catch (BKNotEnoughBookiesException bnebe) {
             fail("Should not get not enough bookies exception even there is 
only one rack.");
         }
@@ -1375,11 +1431,13 @@ void 
validateNumOfWriteQuorumsCoveredInEnsembleCreation(Set<BookieSocketAddress>
         repp.initialize(newConf, Optional.<DNSToSwitchMapping> empty(), timer, 
DISABLE_ALL, NullStatsLogger.INSTANCE);
         repp.withDefaultRack(NetworkTopology.DEFAULT_REGION_AND_RACK);
         repp.onClusterChanged(addrs, new HashSet<BookieSocketAddress>());
-
-        List<BookieSocketAddress> ensemble = repp.newEnsemble(ensembleSize, 
writeQuorumSize, writeQuorumSize, null,
-                new HashSet<>());
+        Pair<List<BookieSocketAddress>, Boolean> ensembleResponse = 
repp.newEnsemble(ensembleSize, writeQuorumSize,
+                writeQuorumSize, null, new HashSet<>());
+        List<BookieSocketAddress> ensemble = ensembleResponse.getLeft();
+        boolean isEnsembleAdheringToPlacementPolicy = 
ensembleResponse.getRight();
         int numCovered = getNumCoveredWriteQuorums(ensemble, writeQuorumSize, 
minNumRacksPerWriteQuorum);
         assertEquals("minimum number of racks covered for writequorum 
ensemble: " + ensemble, ensembleSize, numCovered);
+        assertTrue(isEnsembleAdheringToPlacementPolicy);
     }
 
     @Test
@@ -1417,16 +1475,22 @@ public void testNewEnsembleWithEnoughRacks() throws 
Exception {
             int ensembleSize = 3;
             int writeQuorumSize = 3;
             int ackQuorumSize = 2;
-            List<BookieSocketAddress> ensemble1 = 
repp.newEnsemble(ensembleSize, writeQuorumSize, ackQuorumSize,
-                    null, new HashSet<>());
+            Pair<List<BookieSocketAddress>, Boolean> ensembleResponse = 
repp.newEnsemble(ensembleSize, writeQuorumSize,
+                    ackQuorumSize, null, new HashSet<>());
+            List<BookieSocketAddress> ensemble1 = ensembleResponse.getLeft();
+            boolean isEnsembleAdheringToPlacementPolicy1 = 
ensembleResponse.getRight();
             assertEquals(ensembleSize,
                     getNumCoveredWriteQuorums(ensemble1, writeQuorumSize, 
conf.getMinNumRacksPerWriteQuorum()));
+            assertTrue(isEnsembleAdheringToPlacementPolicy1);
             ensembleSize = 4;
             writeQuorumSize = 4;
-            List<BookieSocketAddress> ensemble2 = 
repp.newEnsemble(ensembleSize, writeQuorumSize, 2, null,
-                    new HashSet<>());
+            Pair<List<BookieSocketAddress>, Boolean> ensembleResponse2 = 
repp.newEnsemble(ensembleSize, writeQuorumSize,
+                    2, null, new HashSet<>());
+            List<BookieSocketAddress> ensemble2 = ensembleResponse2.getLeft();
+            boolean isEnsembleAdheringToPlacementPolicy2 = 
ensembleResponse2.getRight();
             assertEquals(ensembleSize,
                     getNumCoveredWriteQuorums(ensemble2, writeQuorumSize, 
conf.getMinNumRacksPerWriteQuorum()));
+            assertTrue(isEnsembleAdheringToPlacementPolicy2);
         } catch (BKNotEnoughBookiesException bnebe) {
             fail("Should not get not enough bookies exception even there is 
only one rack.");
         }
@@ -1497,11 +1561,16 @@ public void 
testWeightedPlacementAndReplaceBookieWithEnoughBookiesInSameRack() t
         selectionCounts.put(addr3, 0L);
         selectionCounts.put(addr4, 0L);
         int numTries = 50000;
+        Pair<BookieSocketAddress, Boolean> replaceBookieResponse;
+        boolean isEnsembleAdheringToPlacementPolicy;
         BookieSocketAddress replacedBookie;
         for (int i = 0; i < numTries; i++) {
             // replace node under r2
-            replacedBookie = repp.replaceBookie(1, 1, 1, null, new 
HashSet<>(), addr2, new HashSet<>());
+            replaceBookieResponse = repp.replaceBookie(1, 1, 1, null, new 
ArrayList<>(), addr2, new HashSet<>());
+            replacedBookie = replaceBookieResponse.getLeft();
+            isEnsembleAdheringToPlacementPolicy = 
replaceBookieResponse.getRight();
             assertTrue("replaced : " + replacedBookie, 
addr3.equals(replacedBookie) || addr4.equals(replacedBookie));
+            assertTrue(isEnsembleAdheringToPlacementPolicy);
             selectionCounts.put(replacedBookie, 
selectionCounts.get(replacedBookie) + 1);
         }
         double observedMultiple = ((double) selectionCounts.get(addr4) / 
(double) selectionCounts.get(addr3));
@@ -1557,14 +1626,19 @@ public void 
testWeightedPlacementAndReplaceBookieWithoutEnoughBookiesInSameRack(
         selectionCounts.put(addr3, 0L);
         selectionCounts.put(addr4, 0L);
         int numTries = 50000;
+        Pair<BookieSocketAddress, Boolean> replaceBookieResponse;
         BookieSocketAddress replacedBookie;
+        boolean isEnsembleAdheringToPlacementPolicy;
         for (int i = 0; i < numTries; i++) {
             // addr2 is on /r2 and this is the only one on this rack. So the 
replacement
             // will come from other racks. However, the weight should be 
honored in such
             // selections as well
-            replacedBookie = repp.replaceBookie(1, 1, 1, null, new 
HashSet<>(), addr2, new HashSet<>());
+            replaceBookieResponse = repp.replaceBookie(1, 1, 1, null, new 
ArrayList<>(), addr2, new HashSet<>());
+            replacedBookie = replaceBookieResponse.getLeft();
+            isEnsembleAdheringToPlacementPolicy = 
replaceBookieResponse.getRight();
             assertTrue(addr0.equals(replacedBookie) || 
addr1.equals(replacedBookie) || addr3.equals(replacedBookie)
                     || addr4.equals(replacedBookie));
+            assertTrue(isEnsembleAdheringToPlacementPolicy);
             selectionCounts.put(replacedBookie, 
selectionCounts.get(replacedBookie) + 1);
         }
         /*
@@ -1656,6 +1730,7 @@ public void 
testWeightedPlacementAndNewEnsembleWithEnoughBookiesInSameRack() thr
         int numTries = 10000;
 
         Set<BookieSocketAddress> excludeList = new 
HashSet<BookieSocketAddress>();
+        Pair<List<BookieSocketAddress>, Boolean> ensembleResponse;
         List<BookieSocketAddress> ensemble;
         int ensembleSize = 3;
         int writeQuorumSize = 2;
@@ -1664,7 +1739,8 @@ public void 
testWeightedPlacementAndNewEnsembleWithEnoughBookiesInSameRack() thr
             // addr2 is on /r2 and this is the only one on this rack. So the 
replacement
             // will come from other racks. However, the weight should be 
honored in such
             // selections as well
-            ensemble = repp.newEnsemble(ensembleSize, writeQuorumSize, 
acqQuorumSize, null, excludeList);
+            ensembleResponse = repp.newEnsemble(ensembleSize, writeQuorumSize, 
acqQuorumSize, null, excludeList);
+            ensemble = ensembleResponse.getLeft();
             assertTrue(
                     "Rackaware selection not happening "
                             + getNumCoveredWriteQuorums(ensemble, 
writeQuorumSize, conf.getMinNumRacksPerWriteQuorum()),
@@ -1726,21 +1802,23 @@ public void 
testWeightedPlacementAndNewEnsembleWithoutEnoughBookies() throws Exc
         bookieInfoMap.put(addr5, new BookieInfo(1000L, 1000L));
 
         repp.updateBookieInfo(bookieInfoMap);
-
-        List<BookieSocketAddress> ensemble = new 
ArrayList<BookieSocketAddress>();
+        Pair<List<BookieSocketAddress>, Boolean> ensembleResponse;
+        List<BookieSocketAddress> ensemble;
         Set<BookieSocketAddress> excludeList = new 
HashSet<BookieSocketAddress>();
         try {
             excludeList.add(addr1);
             excludeList.add(addr2);
             excludeList.add(addr3);
             excludeList.add(addr4);
-            ensemble = repp.newEnsemble(3, 2, 2, null, excludeList);
+            ensembleResponse = repp.newEnsemble(3, 2, 2, null, excludeList);
+            ensemble = ensembleResponse.getLeft();
             fail("Should throw BKNotEnoughBookiesException when there is not 
enough bookies" + ensemble);
         } catch (BKNotEnoughBookiesException e) {
             // this is expected
         }
         try {
-            ensemble = repp.newEnsemble(1, 1, 1, null, excludeList);
+            ensembleResponse = repp.newEnsemble(1, 1, 1, null, excludeList);
+            ensemble = ensembleResponse.getLeft();
         } catch (BKNotEnoughBookiesException e) {
             fail("Should not throw BKNotEnoughBookiesException when there are 
enough bookies for the ensemble");
         }
@@ -1824,13 +1902,20 @@ public void testPlacementOnStabilizeNetworkTopology() 
throws Exception {
 
         // we will never use addr4 even it is in the stabilized network 
topology
         for (int i = 0; i < 5; i++) {
-            List<BookieSocketAddress> ensemble =
-                    repp.newEnsemble(3, 3, 3, null, new 
HashSet<BookieSocketAddress>());
+            Pair<List<BookieSocketAddress>, Boolean> ensembleResponse = 
repp.newEnsemble(3, 2, 2, null,
+                    new HashSet<BookieSocketAddress>());
+            List<BookieSocketAddress> ensemble = ensembleResponse.getLeft();
+            boolean isEnsembleAdheringToPlacementPolicy = 
ensembleResponse.getRight();
             assertFalse(ensemble.contains(addr4));
+            assertFalse(isEnsembleAdheringToPlacementPolicy);
         }
 
         // we could still use addr4 for urgent allocation if it is just bookie 
flapping
-        List<BookieSocketAddress> ensemble = repp.newEnsemble(4, 4, 4, null, 
new HashSet<BookieSocketAddress>());
+        Pair<List<BookieSocketAddress>, Boolean> ensembleResponse = 
repp.newEnsemble(4, 2, 2, null,
+                new HashSet<BookieSocketAddress>());
+        List<BookieSocketAddress> ensemble = ensembleResponse.getLeft();
+        boolean isEnsembleAdheringToPlacementPolicy = 
ensembleResponse.getRight();
+        assertFalse(isEnsembleAdheringToPlacementPolicy);
         assertTrue(ensemble.contains(addr4));
     }
 
diff --git 
a/bookkeeper-server/src/test/java/org/apache/bookkeeper/client/TestRackawareEnsemblePlacementPolicyUsingScript.java
 
b/bookkeeper-server/src/test/java/org/apache/bookkeeper/client/TestRackawareEnsemblePlacementPolicyUsingScript.java
index d9f253507e..e0fd2bdffe 100644
--- 
a/bookkeeper-server/src/test/java/org/apache/bookkeeper/client/TestRackawareEnsemblePlacementPolicyUsingScript.java
+++ 
b/bookkeeper-server/src/test/java/org/apache/bookkeeper/client/TestRackawareEnsemblePlacementPolicyUsingScript.java
@@ -30,6 +30,8 @@
 import com.google.common.util.concurrent.ThreadFactoryBuilder;
 
 import io.netty.util.HashedWheelTimer;
+
+import java.util.ArrayList;
 import java.util.HashSet;
 import java.util.List;
 import java.util.Optional;
@@ -44,6 +46,7 @@
 import org.apache.bookkeeper.net.ScriptBasedMapping;
 import org.apache.bookkeeper.stats.NullStatsLogger;
 import org.apache.bookkeeper.util.Shell;
+import org.apache.commons.lang3.tuple.Pair;
 import org.junit.After;
 import org.junit.Assume;
 import org.junit.Before;
@@ -111,7 +114,9 @@ public void testReplaceBookieWithEnoughBookiesInSameRack() 
throws Exception {
         addrs.add(addr4);
         repp.onClusterChanged(addrs, new HashSet<BookieSocketAddress>());
         // replace node under r2
-        BookieSocketAddress replacedBookie = repp.replaceBookie(1, 1, 1, null, 
new HashSet<>(), addr2, new HashSet<>());
+        Pair<BookieSocketAddress, Boolean> replaceBookieResponse = 
repp.replaceBookie(1, 1, 1, null, new ArrayList<>(),
+                addr2, new HashSet<>());
+        BookieSocketAddress replacedBookie = replaceBookieResponse.getLeft();
         assertEquals(addr3, replacedBookie);
     }
 
@@ -133,7 +138,9 @@ public void 
testReplaceBookieWithEnoughBookiesInDifferentRack() throws Exception
         // replace node under r2
         Set<BookieSocketAddress> excludedAddrs = new 
HashSet<BookieSocketAddress>();
         excludedAddrs.add(addr1);
-        BookieSocketAddress replacedBookie = repp.replaceBookie(1, 1, 1, null, 
new HashSet<>(), addr2, excludedAddrs);
+        Pair<BookieSocketAddress, Boolean> replaceBookieResponse = 
repp.replaceBookie(1, 1, 1, null, new ArrayList<>(),
+                addr2, excludedAddrs);
+        BookieSocketAddress replacedBookie = replaceBookieResponse.getLeft();
 
         assertFalse(addr1.equals(replacedBookie));
         assertTrue(addr3.equals(replacedBookie) || 
addr4.equals(replacedBookie));
@@ -160,7 +167,7 @@ public void testReplaceBookieWithNotEnoughBookies() throws 
Exception {
         excludedAddrs.add(addr3);
         excludedAddrs.add(addr4);
         try {
-            repp.replaceBookie(1, 1, 1, null, new 
HashSet<BookieSocketAddress>(), addr2, excludedAddrs);
+            repp.replaceBookie(1, 1, 1, null, new 
ArrayList<BookieSocketAddress>(), addr2, excludedAddrs);
             fail("Should throw BKNotEnoughBookiesException when there is not 
enough bookies");
         } catch (BKNotEnoughBookiesException bnebe) {
             // should throw not BKNotEnoughBookiesException
@@ -197,7 +204,9 @@ public void testReplaceBookieWithScriptMappingError() 
throws Exception {
         // replace node under r2
         Set<BookieSocketAddress> excludedAddrs = new 
HashSet<BookieSocketAddress>();
         excludedAddrs.add(addr1);
-        BookieSocketAddress replacedBookie = repp.replaceBookie(1, 1, 1, null, 
new HashSet<>(), addr2, excludedAddrs);
+        Pair<BookieSocketAddress, Boolean> replaceBookieResponse = 
repp.replaceBookie(1, 1, 1, null, new ArrayList<>(),
+                addr2, excludedAddrs);
+        BookieSocketAddress replacedBookie = replaceBookieResponse.getLeft();
 
         assertFalse(addr1.equals(replacedBookie));
         assertFalse(addr2.equals(replacedBookie));
@@ -235,7 +244,9 @@ public void testReplaceBookieWithScriptMappingError2() 
throws Exception {
         // replace node under r2
         Set<BookieSocketAddress> excludedAddrs = new 
HashSet<BookieSocketAddress>();
         excludedAddrs.add(addr1);
-        BookieSocketAddress replacedBookie = repp.replaceBookie(1, 1, 1, null, 
new HashSet<>(), addr2, excludedAddrs);
+        Pair<BookieSocketAddress, Boolean> replaceBookieResponse = 
repp.replaceBookie(1, 1, 1, null, new ArrayList<>(),
+                addr2, excludedAddrs);
+        BookieSocketAddress replacedBookie = replaceBookieResponse.getLeft();
 
         assertFalse(addr1.equals(replacedBookie));
         assertFalse(addr2.equals(replacedBookie));
@@ -257,9 +268,13 @@ public void testNewEnsembleWithSingleRack() throws 
Exception {
         addrs.add(addr4);
         repp.onClusterChanged(addrs, new HashSet<BookieSocketAddress>());
         try {
-            List<BookieSocketAddress> ensemble = repp.newEnsemble(3, 2, 2, 
null, new HashSet<>());
+            Pair<List<BookieSocketAddress>, Boolean> ensembleResponse = 
repp.newEnsemble(3, 2, 2, null,
+                    new HashSet<>());
+            List<BookieSocketAddress> ensemble = ensembleResponse.getLeft();
             assertEquals(0, getNumCoveredWriteQuorums(ensemble, 2));
-            List<BookieSocketAddress> ensemble2 = repp.newEnsemble(4, 2, 2, 
null, new HashSet<>());
+            Pair<List<BookieSocketAddress>, Boolean> ensembleResponse2 = 
repp.newEnsemble(4, 2, 2, null,
+                    new HashSet<>());
+            List<BookieSocketAddress> ensemble2 = ensembleResponse2.getLeft();
             assertEquals(0, getNumCoveredWriteQuorums(ensemble2, 2));
         } catch (BKNotEnoughBookiesException bnebe) {
             fail("Should not get not enough bookies exception even there is 
only one rack.");
@@ -281,10 +296,14 @@ public void testNewEnsembleWithMultipleRacks() throws 
Exception {
         addrs.add(addr4);
         repp.onClusterChanged(addrs, new HashSet<BookieSocketAddress>());
         try {
-            List<BookieSocketAddress> ensemble = repp.newEnsemble(3, 2, 2, 
null, new HashSet<>());
+            Pair<List<BookieSocketAddress>, Boolean> ensembleResponse = 
repp.newEnsemble(3, 2, 2, null,
+                    new HashSet<>());
+            List<BookieSocketAddress> ensemble = ensembleResponse.getLeft();
             int numCovered = getNumCoveredWriteQuorums(ensemble, 2);
             assertTrue(numCovered == 2);
-            List<BookieSocketAddress> ensemble2 = repp.newEnsemble(4, 2, 2, 
null, new HashSet<>());
+            Pair<List<BookieSocketAddress>, Boolean> ensembleResponse2 = 
repp.newEnsemble(4, 2, 2, null,
+                    new HashSet<>());
+            List<BookieSocketAddress> ensemble2 = ensembleResponse2.getLeft();
             numCovered = getNumCoveredWriteQuorums(ensemble2, 2);
             assertTrue(numCovered == 2);
         } catch (BKNotEnoughBookiesException bnebe) {
@@ -315,9 +334,13 @@ public void testNewEnsembleWithEnoughRacks() throws 
Exception {
         addrs.add(addr8);
         repp.onClusterChanged(addrs, new HashSet<BookieSocketAddress>());
         try {
-            List<BookieSocketAddress> ensemble1 = repp.newEnsemble(3, 2, 2, 
null, new HashSet<>());
+            Pair<List<BookieSocketAddress>, Boolean> ensembleResponse1 = 
repp.newEnsemble(3, 2, 2, null,
+                    new HashSet<>());
+            List<BookieSocketAddress> ensemble1 = ensembleResponse1.getLeft();
             assertEquals(3, getNumCoveredWriteQuorums(ensemble1, 2));
-            List<BookieSocketAddress> ensemble2 = repp.newEnsemble(4, 2, 2, 
null, new HashSet<>());
+            Pair<List<BookieSocketAddress>, Boolean> ensembleResponse2 = 
repp.newEnsemble(4, 2, 2, null,
+                    new HashSet<>());
+            List<BookieSocketAddress> ensemble2 = ensembleResponse2.getLeft();
             assertEquals(4, getNumCoveredWriteQuorums(ensemble2, 2));
         } catch (BKNotEnoughBookiesException bnebe) {
             fail("Should not get not enough bookies exception.");
diff --git 
a/bookkeeper-server/src/test/java/org/apache/bookkeeper/client/TestRackawarePolicyNotificationUpdates.java
 
b/bookkeeper-server/src/test/java/org/apache/bookkeeper/client/TestRackawarePolicyNotificationUpdates.java
index 7dc1d39d1d..3192d049c1 100644
--- 
a/bookkeeper-server/src/test/java/org/apache/bookkeeper/client/TestRackawarePolicyNotificationUpdates.java
+++ 
b/bookkeeper-server/src/test/java/org/apache/bookkeeper/client/TestRackawarePolicyNotificationUpdates.java
@@ -42,6 +42,7 @@
 import org.apache.bookkeeper.net.NetworkTopology;
 import org.apache.bookkeeper.stats.NullStatsLogger;
 import org.apache.bookkeeper.util.StaticDNSResolver;
+import org.apache.commons.lang3.tuple.Pair;
 import org.junit.Test;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
@@ -104,8 +105,9 @@ public void testNotifyRackChange() throws Exception {
         int ensembleSize = 3;
         int writeQuorumSize = 2;
         int acqQuorumSize = 2;
-        List<BookieSocketAddress> ensemble = repp.newEnsemble(ensembleSize, 
writeQuorumSize, acqQuorumSize,
-                Collections.emptyMap(), Collections.emptySet());
+        Pair<List<BookieSocketAddress>, Boolean> ensembleResponse = 
repp.newEnsemble(ensembleSize, writeQuorumSize,
+                acqQuorumSize, Collections.emptyMap(), Collections.emptySet());
+        List<BookieSocketAddress> ensemble = ensembleResponse.getLeft();
         int numCovered = 
TestRackawareEnsemblePlacementPolicy.getNumCoveredWriteQuorums(ensemble, 
writeQuorumSize,
                 conf.getMinNumRacksPerWriteQuorum());
         assertTrue(numCovered >= 1 && numCovered < 3);
@@ -118,8 +120,9 @@ public void testNotifyRackChange() throws Exception {
         StaticDNSResolver.changeRack(bookieAddressList, rackList);
         numOfAvailableRacks = numOfAvailableRacks + 1;
         acqQuorumSize = 1;
-        ensemble = repp.newEnsemble(ensembleSize, writeQuorumSize, 
acqQuorumSize, Collections.emptyMap(),
+        ensembleResponse = repp.newEnsemble(ensembleSize, writeQuorumSize, 
acqQuorumSize, Collections.emptyMap(),
                 Collections.emptySet());
+        ensemble = ensembleResponse.getLeft();
         assertEquals(3, 
TestRackawareEnsemblePlacementPolicy.getNumCoveredWriteQuorums(ensemble, 
writeQuorumSize,
                 conf.getMinNumRacksPerWriteQuorum()));
         assertTrue(ensemble.contains(addr1));
diff --git 
a/bookkeeper-server/src/test/java/org/apache/bookkeeper/client/TestRegionAwareEnsemblePlacementPolicy.java
 
b/bookkeeper-server/src/test/java/org/apache/bookkeeper/client/TestRegionAwareEnsemblePlacementPolicy.java
index e541230e35..8e4f10d5b3 100644
--- 
a/bookkeeper-server/src/test/java/org/apache/bookkeeper/client/TestRegionAwareEnsemblePlacementPolicy.java
+++ 
b/bookkeeper-server/src/test/java/org/apache/bookkeeper/client/TestRegionAwareEnsemblePlacementPolicy.java
@@ -51,6 +51,7 @@
 import org.apache.bookkeeper.stats.NullStatsLogger;
 import org.apache.bookkeeper.util.BookKeeperConstants;
 import org.apache.bookkeeper.util.StaticDNSResolver;
+import org.apache.commons.lang3.tuple.Pair;
 import org.junit.Test;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
@@ -423,8 +424,9 @@ public void 
testReplaceBookieWithEnoughBookiesInSameRegion() throws Exception {
         addrs.add(addr4);
         repp.onClusterChanged(addrs, new HashSet<BookieSocketAddress>());
         // replace node under r2
-        BookieSocketAddress replacedBookie = repp.replaceBookie(1, 1, 1, null, 
new HashSet<BookieSocketAddress>(),
-                addr2, new HashSet<BookieSocketAddress>());
+        Pair<BookieSocketAddress, Boolean> replaceBookieResponse = 
repp.replaceBookie(1, 1, 1, null,
+                new ArrayList<BookieSocketAddress>(), addr2, new 
HashSet<BookieSocketAddress>());
+        BookieSocketAddress replacedBookie = replaceBookieResponse.getLeft();
         assertEquals(addr3, replacedBookie);
     }
 
@@ -449,8 +451,9 @@ public void 
testReplaceBookieWithEnoughBookiesInDifferentRegion() throws Excepti
         // replace node under r2
         Set<BookieSocketAddress> excludedAddrs = new 
HashSet<BookieSocketAddress>();
         excludedAddrs.add(addr1);
-        BookieSocketAddress replacedBookie = repp.replaceBookie(1, 1, 1, null,
-                new HashSet<BookieSocketAddress>(), addr2, excludedAddrs);
+        Pair<BookieSocketAddress, Boolean> replaceBookieResponse = 
repp.replaceBookie(1, 1, 1, null,
+                new ArrayList<BookieSocketAddress>(), addr2, excludedAddrs);
+        BookieSocketAddress replacedBookie = replaceBookieResponse.getLeft();
 
         assertFalse(addr1.equals(replacedBookie));
         assertTrue(addr3.equals(replacedBookie) || 
addr4.equals(replacedBookie));
@@ -475,7 +478,9 @@ public void testNewEnsembleBookieWithNotEnoughBookies() 
throws Exception {
         addrs.add(addr4);
         repp.onClusterChanged(addrs, new HashSet<BookieSocketAddress>());
         try {
-            List<BookieSocketAddress> list = repp.newEnsemble(5, 5, 3, null, 
new HashSet<BookieSocketAddress>());
+            Pair<List<BookieSocketAddress>, Boolean> ensembleResponse = 
repp.newEnsemble(5, 5, 3, null,
+                    new HashSet<BookieSocketAddress>());
+            List<BookieSocketAddress> list = ensembleResponse.getLeft();
             LOG.info("Ensemble : {}", list);
             fail("Should throw BKNotEnoughBookiesException when there is not 
enough bookies");
         } catch (BKNotEnoughBookiesException bnebe) {
@@ -507,7 +512,7 @@ public void testReplaceBookieWithNotEnoughBookies() throws 
Exception {
         excludedAddrs.add(addr3);
         excludedAddrs.add(addr4);
         try {
-            repp.replaceBookie(1, 1, 1, null, new 
HashSet<BookieSocketAddress>(), addr2, excludedAddrs);
+            repp.replaceBookie(1, 1, 1, null, new 
ArrayList<BookieSocketAddress>(), addr2, excludedAddrs);
             fail("Should throw BKNotEnoughBookiesException when there is not 
enough bookies");
         } catch (BKNotEnoughBookiesException bnebe) {
             // should throw not enou
@@ -536,11 +541,13 @@ public void testNewEnsembleWithSingleRegion() throws 
Exception {
         addrs.add(addr4);
         repp.onClusterChanged(addrs, new HashSet<BookieSocketAddress>());
         try {
-            List<BookieSocketAddress> ensemble = repp.newEnsemble(3, 2, 2, 
null,
+            Pair<List<BookieSocketAddress>, Boolean> ensembleResponse = 
repp.newEnsemble(3, 2, 2, null,
                     new HashSet<BookieSocketAddress>());
+            List<BookieSocketAddress> ensemble = ensembleResponse.getLeft();
             assertEquals(0, getNumCoveredRegionsInWriteQuorum(ensemble, 2));
-            List<BookieSocketAddress> ensemble2 = repp.newEnsemble(4, 2, 2, 
null,
+            Pair<List<BookieSocketAddress>, Boolean> ensembleResponse2 = 
repp.newEnsemble(4, 2, 2, null,
                     new HashSet<BookieSocketAddress>());
+            List<BookieSocketAddress> ensemble2 = ensembleResponse2.getLeft();
             assertEquals(0, getNumCoveredRegionsInWriteQuorum(ensemble2, 2));
         } catch (BKNotEnoughBookiesException bnebe) {
             fail("Should not get not enough bookies exception even there is 
only one rack.");
@@ -569,8 +576,9 @@ public void testNewEnsembleWithMultipleRegions() throws 
Exception {
         addrs.add(addr4);
         repp.onClusterChanged(addrs, new HashSet<BookieSocketAddress>());
         try {
-            List<BookieSocketAddress> ensemble = repp.newEnsemble(3, 2, 2, 
null,
+            Pair<List<BookieSocketAddress>, Boolean> ensembleResponse = 
repp.newEnsemble(3, 2, 2, null,
                     new HashSet<BookieSocketAddress>());
+            List<BookieSocketAddress> ensemble = ensembleResponse.getLeft();
             int numCovered = getNumCoveredRegionsInWriteQuorum(ensemble, 2);
             assertTrue(numCovered >= 1);
             assertTrue(numCovered < 3);
@@ -578,8 +586,9 @@ public void testNewEnsembleWithMultipleRegions() throws 
Exception {
             fail("Should not get not enough bookies exception even there is 
only one rack.");
         }
         try {
-            List<BookieSocketAddress> ensemble2 = repp.newEnsemble(4, 2, 2, 
null,
+            Pair<List<BookieSocketAddress>, Boolean> ensembleResponse2 = 
repp.newEnsemble(4, 2, 2, null,
                     new HashSet<BookieSocketAddress>());
+            List<BookieSocketAddress> ensemble2 = ensembleResponse2.getLeft();
             int numCovered = getNumCoveredRegionsInWriteQuorum(ensemble2, 2);
             assertTrue(numCovered >= 1 && numCovered < 3);
         } catch (BKNotEnoughBookiesException bnebe) {
@@ -618,11 +627,13 @@ public void testNewEnsembleWithEnoughRegions() throws 
Exception {
         addrs.add(addr8);
         repp.onClusterChanged(addrs, new HashSet<BookieSocketAddress>());
         try {
-            List<BookieSocketAddress> ensemble1 = repp.newEnsemble(3, 2, 2, 
null,
+            Pair<List<BookieSocketAddress>, Boolean> ensembleResponse1 = 
repp.newEnsemble(3, 2, 2, null,
                     new HashSet<BookieSocketAddress>());
+            List<BookieSocketAddress> ensemble1 = ensembleResponse1.getLeft();
             assertEquals(3, getNumCoveredRegionsInWriteQuorum(ensemble1, 2));
-            List<BookieSocketAddress> ensemble2 = repp.newEnsemble(4, 2, 2, 
null,
+            Pair<List<BookieSocketAddress>, Boolean> ensembleResponse2 = 
repp.newEnsemble(4, 2, 2, null,
                     new HashSet<BookieSocketAddress>());
+            List<BookieSocketAddress> ensemble2 = ensembleResponse2.getLeft();
             assertEquals(4, getNumCoveredRegionsInWriteQuorum(ensemble2, 2));
         } catch (BKNotEnoughBookiesException bnebe) {
             fail("Should not get not enough bookies exception even there is 
only one rack.");
@@ -669,23 +680,27 @@ public void testNewEnsembleWithThreeRegions() throws 
Exception {
         addrs.add(addr10);
         repp.onClusterChanged(addrs, new HashSet<BookieSocketAddress>());
         try {
-            List<BookieSocketAddress> ensemble = repp.newEnsemble(6, 6, 4, 
null,
+            Pair<List<BookieSocketAddress>, Boolean> ensembleResponse = 
repp.newEnsemble(6, 6, 4, null,
                     new HashSet<BookieSocketAddress>());
+            List<BookieSocketAddress> ensemble = ensembleResponse.getLeft();
             assert(ensemble.contains(addr4));
             assert(ensemble.contains(addr8));
             assert(ensemble.size() == 6);
             assertEquals(3, getNumRegionsInEnsemble(ensemble));
-            ensemble = repp.newEnsemble(7, 7, 4, null, new 
HashSet<BookieSocketAddress>());
+            ensembleResponse = repp.newEnsemble(7, 7, 4, null, new 
HashSet<BookieSocketAddress>());
+            ensemble = ensembleResponse.getLeft();
             assert(ensemble.contains(addr4));
             assert(ensemble.contains(addr8));
             assert(ensemble.size() == 7);
             assertEquals(3, getNumRegionsInEnsemble(ensemble));
-            ensemble = repp.newEnsemble(8, 8, 5, null, new 
HashSet<BookieSocketAddress>());
+            ensembleResponse = repp.newEnsemble(8, 8, 5, null, new 
HashSet<BookieSocketAddress>());
+            ensemble = ensembleResponse.getLeft();
             assert(ensemble.contains(addr4));
             assert(ensemble.contains(addr8));
             assert(ensemble.size() == 8);
             assertEquals(3, getNumRegionsInEnsemble(ensemble));
-            ensemble = repp.newEnsemble(9, 9, 5, null, new 
HashSet<BookieSocketAddress>());
+            ensembleResponse = repp.newEnsemble(9, 9, 5, null, new 
HashSet<BookieSocketAddress>());
+            ensemble = ensembleResponse.getLeft();
             assert(ensemble.contains(addr4));
             assert(ensemble.contains(addr8));
             assert(ensemble.size() == 9);
@@ -738,8 +753,9 @@ public void testNewEnsembleWithThreeRegionsWithDisable() 
throws Exception {
         repp.onClusterChanged(addrs, new HashSet<BookieSocketAddress>());
         try {
             ((SettableFeature) 
featureProvider.scope("region1").getFeature("disallowBookies")).set(true);
-            List<BookieSocketAddress> ensemble = repp.newEnsemble(6, 6, 4, 
null,
+            Pair<List<BookieSocketAddress>, Boolean> ensembleResponse = 
repp.newEnsemble(6, 6, 4, null,
                     new HashSet<BookieSocketAddress>());
+            List<BookieSocketAddress> ensemble = ensembleResponse.getLeft();
             assertEquals(2, getNumRegionsInEnsemble(ensemble));
             assert(ensemble.contains(addr1));
             assert(ensemble.contains(addr3));
@@ -753,16 +769,16 @@ public void testNewEnsembleWithThreeRegionsWithDisable() 
throws Exception {
         }
         try {
             ((SettableFeature) 
featureProvider.scope("region2").getFeature("disallowBookies")).set(true);
-            List<BookieSocketAddress> ensemble = repp.newEnsemble(6, 6, 4, 
null,
-                    new HashSet<BookieSocketAddress>());
+            repp.newEnsemble(6, 6, 4, null, new 
HashSet<BookieSocketAddress>());
             fail("Should get not enough bookies exception even there is only 
one region with insufficient bookies.");
         } catch (BKNotEnoughBookiesException bnebe) {
             // Expected
         }
         try {
             ((SettableFeature) 
featureProvider.scope("region2").getFeature("disallowBookies")).set(false);
-            List<BookieSocketAddress> ensemble = repp.newEnsemble(6, 6, 4, 
null,
+            Pair<List<BookieSocketAddress>, Boolean> ensembleResponse = 
repp.newEnsemble(6, 6, 4, null,
                     new HashSet<BookieSocketAddress>());
+            List<BookieSocketAddress> ensemble = ensembleResponse.getLeft();
             assert(ensemble.contains(addr1));
             assert(ensemble.contains(addr3));
             assert(ensemble.contains(addr4));
@@ -835,8 +851,9 @@ public void testNewEnsembleWithFiveRegions() throws 
Exception {
         repp.onClusterChanged(addrs, new HashSet<BookieSocketAddress>());
 
         try {
-            List<BookieSocketAddress> ensemble = repp.newEnsemble(10, 10, 10, 
null,
+            Pair<List<BookieSocketAddress>, Boolean> ensembleResponse = 
repp.newEnsemble(10, 10, 10, null,
                     new HashSet<BookieSocketAddress>());
+            List<BookieSocketAddress> ensemble = ensembleResponse.getLeft();
             assert(ensemble.size() == 10);
             assertEquals(5, getNumRegionsInEnsemble(ensemble));
         } catch (BKNotEnoughBookiesException bnebe) {
@@ -847,7 +864,9 @@ public void testNewEnsembleWithFiveRegions() throws 
Exception {
         try {
             Set<BookieSocketAddress> excludedAddrs = new 
HashSet<BookieSocketAddress>();
             excludedAddrs.add(addr10);
-            List<BookieSocketAddress> ensemble = repp.newEnsemble(10, 10, 10, 
null, excludedAddrs);
+            Pair<List<BookieSocketAddress>, Boolean> ensembleResponse = 
repp.newEnsemble(10, 10, 10, null,
+                    excludedAddrs);
+            List<BookieSocketAddress> ensemble = ensembleResponse.getLeft();
             assert(ensemble.contains(addr11) && ensemble.contains(addr12));
             assert(ensemble.size() == 10);
             assertEquals(5, getNumRegionsInEnsemble(ensemble));
@@ -937,9 +956,11 @@ public void 
testEnsembleWithThreeRegionsReplaceInternal(int minDurability, boole
             ackQuorum = 5;
         }
 
+        Pair<List<BookieSocketAddress>, Boolean> ensembleResponse;
         List<BookieSocketAddress> ensemble;
         try {
-            ensemble = repp.newEnsemble(6, 6, ackQuorum, null, new 
HashSet<BookieSocketAddress>());
+            ensembleResponse = repp.newEnsemble(6, 6, ackQuorum, null, new 
HashSet<BookieSocketAddress>());
+            ensemble = ensembleResponse.getLeft();
             assert(ensemble.size() == 6);
             assertEquals(3, getNumRegionsInEnsemble(ensemble));
         } catch (BKNotEnoughBookiesException bnebe) {
@@ -960,9 +981,9 @@ public void testEnsembleWithThreeRegionsReplaceInternal(int 
minDurability, boole
             Set<BookieSocketAddress> excludedAddrs = new 
HashSet<BookieSocketAddress>();
             for (BookieSocketAddress addr: region2Bookies) {
                 if (ensemble.contains(addr)) {
-                    BookieSocketAddress replacedBookie = repp.replaceBookie(
-                        6, 6, ackQuorum, null,
-                        new HashSet<>(ensemble), addr, excludedAddrs);
+                    Pair<BookieSocketAddress, Boolean> replaceBookieResponse = 
repp.replaceBookie(6, 6, ackQuorum, null,
+                            ensemble, addr, excludedAddrs);
+                    BookieSocketAddress replacedBookie = 
replaceBookieResponse.getLeft();
                     ensemble.remove(addr);
                     ensemble.add(replacedBookie);
                 }
@@ -986,9 +1007,9 @@ public void 
testEnsembleWithThreeRegionsReplaceInternal(int minDurability, boole
             Set<BookieSocketAddress> excludedAddrs = new 
HashSet<BookieSocketAddress>();
 
             try {
-                BookieSocketAddress replacedBookie = repp.replaceBookie(
-                    6, 6, ackQuorum, null,
-                    new HashSet<>(ensemble), bookieToReplace, excludedAddrs);
+                Pair<BookieSocketAddress, Boolean> replaceBookieResponse = 
repp.replaceBookie(6, 6, ackQuorum, null,
+                        ensemble, bookieToReplace, excludedAddrs);
+                BookieSocketAddress replacedBookie = 
replaceBookieResponse.getLeft();
                 assert (replacedBookie.equals(replacedBookieExpected));
                 assertEquals(3, getNumRegionsInEnsemble(ensemble));
             } catch (BKNotEnoughBookiesException bnebe) {
@@ -997,9 +1018,7 @@ public void 
testEnsembleWithThreeRegionsReplaceInternal(int minDurability, boole
 
             excludedAddrs.add(replacedBookieExpected);
             try {
-                BookieSocketAddress replacedBookie = repp.replaceBookie(
-                    6, 6, ackQuorum, null,
-                    new HashSet<>(ensemble), bookieToReplace, excludedAddrs);
+                repp.replaceBookie(6, 6, ackQuorum, null, ensemble, 
bookieToReplace, excludedAddrs);
                 if (minDurability > 1 && 
!disableDurabilityFeature.isAvailable()) {
                     fail("Should throw BKNotEnoughBookiesException when there 
is not enough bookies");
                 }
@@ -1073,9 +1092,11 @@ public void testEnsembleDurabilityDisabledInternal(int 
minDurability, boolean di
                     .set(true);
         }
 
+        Pair<List<BookieSocketAddress>, Boolean> ensembleResponse;
         List<BookieSocketAddress> ensemble;
         try {
-            ensemble = repp.newEnsemble(6, 6, 4, null, new 
HashSet<BookieSocketAddress>());
+            ensembleResponse = repp.newEnsemble(6, 6, 4, null, new 
HashSet<BookieSocketAddress>());
+            ensemble = ensembleResponse.getLeft();
             assert(ensemble.size() == 6);
         } catch (BKNotEnoughBookiesException bnebe) {
             LOG.error("BKNotEnoughBookiesException", bnebe);
@@ -1086,9 +1107,7 @@ public void testEnsembleDurabilityDisabledInternal(int 
minDurability, boolean di
         Set<BookieSocketAddress> excludedAddrs = new 
HashSet<BookieSocketAddress>();
 
         try {
-            repp.replaceBookie(
-                6, 6, 4, null,
-                new HashSet<>(ensemble), addr4, excludedAddrs);
+            repp.replaceBookie(6, 6, 4, null, ensemble, ensemble.get(2), 
excludedAddrs);
         } catch (BKNotEnoughBookiesException bnebe) {
             fail("Should not get not enough bookies exception even there is 
only one rack.");
         }
@@ -1141,8 +1160,8 @@ public void testNewEnsembleFailWithFiveRegions() throws 
Exception {
         excludedAddrs.add(addr10);
         excludedAddrs.add(addr9);
         try {
-            List<BookieSocketAddress> list = repp.newEnsemble(5, 5, 5, null, 
excludedAddrs);
-            LOG.info("Ensemble : {}", list);
+            Pair<List<BookieSocketAddress>, Boolean> list = 
repp.newEnsemble(5, 5, 5, null, excludedAddrs);
+            LOG.info("Ensemble : {}", list.getLeft());
             fail("Should throw BKNotEnoughBookiesException when there is not 
enough bookies");
         } catch (BKNotEnoughBookiesException bnebe) {
             // should throw not enou
@@ -1201,8 +1220,9 @@ public void 
testBasicReorderReadLACSequenceWithLocalRegion() throws Exception {
 
     private void basicReorderReadSequenceWithLocalRegionTest(String myRegion, 
boolean isReadLAC) throws Exception {
         prepareNetworkTopologyForReorderTests(myRegion);
-
-        List<BookieSocketAddress> ensemble = repp.newEnsemble(9, 9, 5, null, 
new HashSet<BookieSocketAddress>());
+        Pair<List<BookieSocketAddress>, Boolean> ensembleResponse = 
repp.newEnsemble(9, 9, 5, null,
+                new HashSet<BookieSocketAddress>());
+        List<BookieSocketAddress> ensemble = ensembleResponse.getLeft();
         assertEquals(9, getNumCoveredRegionsInWriteQuorum(ensemble, 9));
 
         DistributionSchedule ds = new RoundRobinDistributionSchedule(9, 9, 9);
@@ -1258,7 +1278,9 @@ public void 
testBasicReorderReadLACSequenceWithRemoteRegion() throws Exception {
     private void basicReorderReadSequenceWithRemoteRegionTest(String myRegion, 
boolean isReadLAC) throws Exception {
         prepareNetworkTopologyForReorderTests(myRegion);
 
-        List<BookieSocketAddress> ensemble = repp.newEnsemble(9, 9, 5, null, 
new HashSet<BookieSocketAddress>());
+        Pair<List<BookieSocketAddress>, Boolean> ensembleResponse = 
repp.newEnsemble(9, 9, 5, null,
+                new HashSet<BookieSocketAddress>());
+        List<BookieSocketAddress> ensemble = ensembleResponse.getLeft();
         assertEquals(9, getNumCoveredRegionsInWriteQuorum(ensemble, 9));
 
         DistributionSchedule ds = new RoundRobinDistributionSchedule(9, 9, 9);
@@ -1328,7 +1350,9 @@ private void 
reorderReadSequenceWithUnavailableOrReadOnlyBookiesTest(boolean isR
 
         prepareNetworkTopologyForReorderTests(myRegion);
 
-        List<BookieSocketAddress> ensemble = repp.newEnsemble(9, 9, 5, null, 
new HashSet<BookieSocketAddress>());
+        Pair<List<BookieSocketAddress>, Boolean> ensembleResponse = 
repp.newEnsemble(9, 9, 5, null,
+                new HashSet<BookieSocketAddress>());
+        List<BookieSocketAddress> ensemble = ensembleResponse.getLeft();
         assertEquals(9, getNumCoveredRegionsInWriteQuorum(ensemble, 9));
 
         DistributionSchedule ds = new RoundRobinDistributionSchedule(9, 9, 9);


 

----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
[email protected]


With regards,
Apache Git Services

Reply via email to