sijie closed pull request #887: make replaceBookie() exclude ensemble bookie 
racks
URL: https://github.com/apache/bookkeeper/pull/887
 
 
   

This is a PR merged from a forked repository.
As GitHub hides the original diff on merge, it is displayed below for
the sake of provenance:

As this is a foreign pull request (from a fork), the diff is supplied
below (as it won't show otherwise due to GitHub magic):

diff --git 
a/bookkeeper-server/src/main/java/org/apache/bookkeeper/bookie/LocalBookieEnsemblePlacementPolicy.java
 
b/bookkeeper-server/src/main/java/org/apache/bookkeeper/bookie/LocalBookieEnsemblePlacementPolicy.java
index 7c6bad93d..041455327 100644
--- 
a/bookkeeper-server/src/main/java/org/apache/bookkeeper/bookie/LocalBookieEnsemblePlacementPolicy.java
+++ 
b/bookkeeper-server/src/main/java/org/apache/bookkeeper/bookie/LocalBookieEnsemblePlacementPolicy.java
@@ -21,7 +21,6 @@
 import io.netty.util.HashedWheelTimer;
 import java.net.UnknownHostException;
 import java.util.ArrayList;
-import java.util.Collection;
 import java.util.Collections;
 import java.util.Map;
 import java.util.Optional;
@@ -82,7 +81,7 @@ public void uninitalize() {
 
     @Override
     public BookieSocketAddress replaceBookie(int ensembleSize, int 
writeQuorumSize, int ackQuorumSize,
-        java.util.Map<String, byte[]> customMetadata, 
Collection<BookieSocketAddress> currentEnsemble,
+        java.util.Map<String, byte[]> customMetadata, Set<BookieSocketAddress> 
currentEnsemble,
         BookieSocketAddress bookieToReplace, Set<BookieSocketAddress> 
excludeBookies)
             throws BKNotEnoughBookiesException {
         throw new BKNotEnoughBookiesException();
diff --git 
a/bookkeeper-server/src/main/java/org/apache/bookkeeper/client/BookKeeperAdmin.java
 
b/bookkeeper-server/src/main/java/org/apache/bookkeeper/client/BookKeeperAdmin.java
index 85e858167..7d13100e4 100644
--- 
a/bookkeeper-server/src/main/java/org/apache/bookkeeper/client/BookKeeperAdmin.java
+++ 
b/bookkeeper-server/src/main/java/org/apache/bookkeeper/client/BookKeeperAdmin.java
@@ -33,6 +33,7 @@
 import java.util.Collection;
 import java.util.Enumeration;
 import java.util.HashMap;
+import java.util.HashSet;
 import java.util.Iterator;
 import java.util.LinkedList;
 import java.util.List;
@@ -992,7 +993,7 @@ private void asyncRecoverLedgerFragment(final LedgerHandle 
lh,
                             lh.getLedgerMetadata().getWriteQuorumSize(),
                             lh.getLedgerMetadata().getAckQuorumSize(),
                             lh.getLedgerMetadata().getCustomMetadata(),
-                            ensemble,
+                            new HashSet<>(ensemble),
                             oldBookie,
                             bookiesToExclude);
             targetBookieAddresses.put(bookieIndex, newBookie);
diff --git 
a/bookkeeper-server/src/main/java/org/apache/bookkeeper/client/BookieWatcher.java
 
b/bookkeeper-server/src/main/java/org/apache/bookkeeper/client/BookieWatcher.java
index 04c62e2f8..3ecf94726 100644
--- 
a/bookkeeper-server/src/main/java/org/apache/bookkeeper/client/BookieWatcher.java
+++ 
b/bookkeeper-server/src/main/java/org/apache/bookkeeper/client/BookieWatcher.java
@@ -201,7 +201,7 @@ public void initialBlockingBookieRead() throws BKException {
                 log.debug("Not enough healthy bookies available, using 
quarantined bookies");
             }
             return placementPolicy.newEnsemble(
-                ensembleSize, writeQuorumSize, ackQuorumSize, customMetadata, 
Collections.emptySet());
+                ensembleSize, writeQuorumSize, ackQuorumSize, customMetadata, 
new HashSet<>());
         }
     }
 
diff --git 
a/bookkeeper-server/src/main/java/org/apache/bookkeeper/client/DefaultEnsemblePlacementPolicy.java
 
b/bookkeeper-server/src/main/java/org/apache/bookkeeper/client/DefaultEnsemblePlacementPolicy.java
index 219086df4..760249961 100644
--- 
a/bookkeeper-server/src/main/java/org/apache/bookkeeper/client/DefaultEnsemblePlacementPolicy.java
+++ 
b/bookkeeper-server/src/main/java/org/apache/bookkeeper/client/DefaultEnsemblePlacementPolicy.java
@@ -114,7 +114,7 @@
 
     @Override
     public BookieSocketAddress replaceBookie(int ensembleSize, int 
writeQuorumSize, int ackQuorumSize,
-            Map<String, byte[]> customMetadata, 
Collection<BookieSocketAddress> currentEnsemble,
+            Map<String, byte[]> customMetadata, Set<BookieSocketAddress> 
currentEnsemble,
             BookieSocketAddress bookieToReplace, Set<BookieSocketAddress> 
excludeBookies)
             throws BKNotEnoughBookiesException {
         excludeBookies.addAll(currentEnsemble);
diff --git 
a/bookkeeper-server/src/main/java/org/apache/bookkeeper/client/EnsemblePlacementPolicy.java
 
b/bookkeeper-server/src/main/java/org/apache/bookkeeper/client/EnsemblePlacementPolicy.java
index 7656a4960..8dab4eaed 100644
--- 
a/bookkeeper-server/src/main/java/org/apache/bookkeeper/client/EnsemblePlacementPolicy.java
+++ 
b/bookkeeper-server/src/main/java/org/apache/bookkeeper/client/EnsemblePlacementPolicy.java
@@ -285,7 +285,7 @@ BookieSocketAddress replaceBookie(int ensembleSize,
                                       int writeQuorumSize,
                                       int ackQuorumSize,
                                       Map<String, byte[]> customMetadata,
-                                      Collection<BookieSocketAddress> 
currentEnsemble,
+                                      Set<BookieSocketAddress> currentEnsemble,
                                       BookieSocketAddress bookieToReplace,
                                       Set<BookieSocketAddress> excludeBookies)
         throws BKNotEnoughBookiesException;
diff --git 
a/bookkeeper-server/src/main/java/org/apache/bookkeeper/client/RackawareEnsemblePlacementPolicy.java
 
b/bookkeeper-server/src/main/java/org/apache/bookkeeper/client/RackawareEnsemblePlacementPolicy.java
index 1691f1ec1..4359e1670 100644
--- 
a/bookkeeper-server/src/main/java/org/apache/bookkeeper/client/RackawareEnsemblePlacementPolicy.java
+++ 
b/bookkeeper-server/src/main/java/org/apache/bookkeeper/client/RackawareEnsemblePlacementPolicy.java
@@ -20,7 +20,6 @@
 import io.netty.util.HashedWheelTimer;
 
 import java.util.ArrayList;
-import java.util.Collection;
 import java.util.Map;
 import java.util.Set;
 
@@ -102,7 +101,7 @@ public void uninitalize() {
 
     @Override
     public BookieSocketAddress replaceBookie(int ensembleSize, int 
writeQuorumSize, int ackQuorumSize,
-            Map<String, byte[]> customMetadata, 
Collection<BookieSocketAddress> currentEnsemble,
+            Map<String, byte[]> customMetadata, Set<BookieSocketAddress> 
currentEnsemble,
             BookieSocketAddress bookieToReplace, Set<BookieSocketAddress> 
excludeBookies)
             throws BKException.BKNotEnoughBookiesException {
        try {
diff --git 
a/bookkeeper-server/src/main/java/org/apache/bookkeeper/client/RackawareEnsemblePlacementPolicyImpl.java
 
b/bookkeeper-server/src/main/java/org/apache/bookkeeper/client/RackawareEnsemblePlacementPolicyImpl.java
index 735324bca..24460d53b 100644
--- 
a/bookkeeper-server/src/main/java/org/apache/bookkeeper/client/RackawareEnsemblePlacementPolicyImpl.java
+++ 
b/bookkeeper-server/src/main/java/org/apache/bookkeeper/client/RackawareEnsemblePlacementPolicyImpl.java
@@ -31,7 +31,6 @@
 import java.net.InetAddress;
 import java.net.UnknownHostException;
 import java.util.ArrayList;
-import java.util.Collection;
 import java.util.Collections;
 import java.util.HashMap;
 import java.util.HashSet;
@@ -435,6 +434,14 @@ public void 
handleBookiesThatJoined(Set<BookieSocketAddress> joinedBookies) {
         return nodes;
     }
 
+    private static Set<String> getNetworkLocations(Set<Node> bookieNodes) {
+        Set<String> networkLocs = new HashSet<>();
+        for (Node bookieNode : bookieNodes) {
+            networkLocs.add(bookieNode.getNetworkLocation());
+        }
+        return networkLocs;
+    }
+
     @Override
     public ArrayList<BookieSocketAddress> newEnsemble(int ensembleSize, int 
writeQuorumSize, int ackQuorumSize,
             Map<String, byte[]> customMetadata, Set<BookieSocketAddress> 
excludeBookies)
@@ -532,7 +539,7 @@ public void 
handleBookiesThatJoined(Set<BookieSocketAddress> joinedBookies) {
 
     @Override
     public BookieSocketAddress replaceBookie(int ensembleSize, int 
writeQuorumSize, int ackQuorumSize,
-            Map<String, byte[]> customMetadata, 
Collection<BookieSocketAddress> currentEnsemble,
+            Map<String, byte[]> customMetadata, Set<BookieSocketAddress> 
currentEnsemble,
             BookieSocketAddress bookieToReplace, Set<BookieSocketAddress> 
excludeBookies)
             throws BKNotEnoughBookiesException {
         rwLock.readLock().lock();
@@ -543,16 +550,23 @@ public BookieSocketAddress replaceBookie(int 
ensembleSize, int writeQuorumSize,
                 bn = createBookieNode(bookieToReplace);
             }
 
+            Set<Node> ensembleNodes = convertBookiesToNodes(currentEnsemble);
             Set<Node> excludeNodes = convertBookiesToNodes(excludeBookies);
-            // add the bookie to replace in exclude set
+
+            excludeNodes.addAll(ensembleNodes);
             excludeNodes.add(bn);
+            ensembleNodes.remove(bn);
+
+            Set<String> networkLocationsToBeExcluded = 
getNetworkLocations(ensembleNodes);
+
             if (LOG.isDebugEnabled()) {
-                LOG.debug("Try to choose a new bookie to replace {}, excluding 
{}.", bookieToReplace,
-                        excludeNodes);
+                LOG.debug("Try to choose a new bookie to replace {} from 
ensemble {}, excluding {}.",
+                    bookieToReplace, ensembleNodes, excludeNodes);
             }
             // pick a candidate from same rack to replace
             BookieNode candidate = selectFromNetworkLocation(
                     bn.getNetworkLocation(),
+                    networkLocationsToBeExcluded,
                     excludeNodes,
                     TruePredicate.INSTANCE,
                     EnsembleForReplacementWithNoConstraints.INSTANCE);
@@ -610,8 +624,57 @@ public BookieNode selectFromNetworkLocation(
         }
     }
 
-    protected String getRemoteRack(BookieNode node) {
-        return "~" + node.getNetworkLocation();
+    protected BookieNode selectFromNetworkLocation(String networkLoc,
+                                                   Set<String> excludeRacks,
+                                                   Set<Node> excludeBookies,
+                                                   Predicate<BookieNode> 
predicate,
+                                                   Ensemble<BookieNode> 
ensemble)
+            throws BKNotEnoughBookiesException {
+        // first attempt to select one from local rack
+        try {
+            return selectRandomFromRack(networkLoc, excludeBookies, predicate, 
ensemble);
+        } catch (BKNotEnoughBookiesException e) {
+            if (isWeighted) {
+                // if weight based selection is enabled, randomly select one 
from the whole cluster
+                // based on weights and ignore the provided 
<tt>excludeRacks</tt>.
+                // randomly choose one from whole cluster, ignore the provided 
predicate.
+                return selectRandom(1, excludeBookies, predicate, 
ensemble).get(0);
+            } else {
+                // if weight based selection is disabled, and there is no 
enough bookie from local rack,
+                // select bookies from the whole cluster and exclude the racks 
specified at <tt>excludeRacks</tt>.
+                return selectFromNetworkLocation(excludeRacks, excludeBookies, 
predicate, ensemble);
+            }
+        }
+
+    }
+
+
+    /**
+     * It randomly selects a {@link BookieNode} that is not on the 
<i>excludeRacks</i> set, excluding the nodes in
+     * <i>excludeBookies</i> set. If it fails to find one, it selects a random 
{@link BookieNode} from the whole
+     * cluster.
+     */
+    protected BookieNode selectFromNetworkLocation(Set<String> excludeRacks,
+                                                   Set<Node> excludeBookies,
+                                                   Predicate<BookieNode> 
predicate,
+                                                   Ensemble<BookieNode> 
ensemble)
+            throws BKNotEnoughBookiesException {
+        List<BookieNode> knownNodes = new ArrayList<>(knownBookies.values());
+        Collections.shuffle(knownNodes);
+
+        for (BookieNode knownNode : knownNodes) {
+            if (excludeBookies.contains(knownNode)) {
+                continue;
+            }
+            if (excludeRacks.contains(knownNode.getNetworkLocation())) {
+                continue;
+            }
+            return knownNode;
+        }
+        LOG.warn("Failed to choose a bookie: excluded {}, fallback to choose 
bookie randomly from the cluster.",
+                excludeBookies);
+        // randomly choose one from whole cluster
+        return selectRandom(1, excludeBookies, predicate, ensemble).get(0);
     }
 
     private WeightedRandomSelection<BookieNode> 
prepareForWeightedSelection(List<Node> leaves) {
diff --git 
a/bookkeeper-server/src/main/java/org/apache/bookkeeper/client/RegionAwareEnsemblePlacementPolicy.java
 
b/bookkeeper-server/src/main/java/org/apache/bookkeeper/client/RegionAwareEnsemblePlacementPolicy.java
index f990b3f24..0b804e768 100644
--- 
a/bookkeeper-server/src/main/java/org/apache/bookkeeper/client/RegionAwareEnsemblePlacementPolicy.java
+++ 
b/bookkeeper-server/src/main/java/org/apache/bookkeeper/client/RegionAwareEnsemblePlacementPolicy.java
@@ -20,7 +20,6 @@
 import io.netty.util.HashedWheelTimer;
 
 import java.util.ArrayList;
-import java.util.Collection;
 import java.util.HashMap;
 import java.util.HashSet;
 import java.util.List;
@@ -414,7 +413,7 @@ public RegionAwareEnsemblePlacementPolicy 
initialize(ClientConfiguration conf,
 
     @Override
     public BookieSocketAddress replaceBookie(int ensembleSize, int 
writeQuorumSize, int ackQuorumSize,
-            Map<String, byte[]> customMetadata, 
Collection<BookieSocketAddress> currentEnsemble,
+            Map<String, byte[]> customMetadata, Set<BookieSocketAddress> 
currentEnsemble,
             BookieSocketAddress bookieToReplace, Set<BookieSocketAddress> 
excludeBookies)
             throws BKException.BKNotEnoughBookiesException {
         rwLock.readLock().lock();
diff --git 
a/bookkeeper-server/src/test/java/org/apache/bookkeeper/client/GenericEnsemblePlacementPolicyTest.java
 
b/bookkeeper-server/src/test/java/org/apache/bookkeeper/client/GenericEnsemblePlacementPolicyTest.java
index d8cea7464..91ece37cf 100644
--- 
a/bookkeeper-server/src/test/java/org/apache/bookkeeper/client/GenericEnsemblePlacementPolicyTest.java
+++ 
b/bookkeeper-server/src/test/java/org/apache/bookkeeper/client/GenericEnsemblePlacementPolicyTest.java
@@ -24,7 +24,6 @@
 
 import java.nio.charset.StandardCharsets;
 import java.util.ArrayList;
-import java.util.Collection;
 import java.util.HashMap;
 import java.util.List;
 import java.util.Map;
@@ -59,7 +58,7 @@ public GenericEnsemblePlacementPolicyTest() {
 
         @Override
         public BookieSocketAddress replaceBookie(int ensembleSize, int 
writeQuorumSize,
-            int ackQuorumSize, Map<String, byte[]> customMetadata, 
Collection<BookieSocketAddress> currentEnsemble,
+            int ackQuorumSize, Map<String, byte[]> customMetadata, 
Set<BookieSocketAddress> currentEnsemble,
             BookieSocketAddress bookieToReplace, Set<BookieSocketAddress> 
excludeBookies)
             throws BKException.BKNotEnoughBookiesException {
             new Exception("replaceBookie " + ensembleSize + "," + 
customMetadata).printStackTrace();
diff --git 
a/bookkeeper-server/src/test/java/org/apache/bookkeeper/client/TestRackawareEnsemblePlacementPolicy.java
 
b/bookkeeper-server/src/test/java/org/apache/bookkeeper/client/TestRackawareEnsemblePlacementPolicy.java
index 637df1b19..01a6617fd 100644
--- 
a/bookkeeper-server/src/test/java/org/apache/bookkeeper/client/TestRackawareEnsemblePlacementPolicy.java
+++ 
b/bookkeeper-server/src/test/java/org/apache/bookkeeper/client/TestRackawareEnsemblePlacementPolicy.java
@@ -455,6 +455,36 @@ public void testReplaceBookieWithNotEnoughBookies() throws 
Exception {
         }
     }
 
+    @Test
+    public void testReplaceBookieWithEnoughBookiesInSameRackAsEnsemble() 
throws Exception {
+        BookieSocketAddress addr1 = new BookieSocketAddress("127.0.0.1", 3181);
+        BookieSocketAddress addr2 = new BookieSocketAddress("127.0.0.2", 3181);
+        BookieSocketAddress addr3 = new BookieSocketAddress("127.0.0.3", 3181);
+        BookieSocketAddress addr4 = new BookieSocketAddress("127.0.0.4", 3181);
+        // update dns mapping
+        StaticDNSResolver.addNodeToRack(addr1.getHostName(), 
NetworkTopology.DEFAULT_RACK);
+        StaticDNSResolver.addNodeToRack(addr2.getHostName(), "/r2");
+        StaticDNSResolver.addNodeToRack(addr3.getHostName(), "/r2");
+        StaticDNSResolver.addNodeToRack(addr4.getHostName(), "/r3");
+        // Update cluster
+        Set<BookieSocketAddress> addrs = new HashSet<BookieSocketAddress>();
+        addrs.add(addr1);
+        addrs.add(addr2);
+        addrs.add(addr3);
+        addrs.add(addr4);
+        repp.onClusterChanged(addrs, new HashSet<BookieSocketAddress>());
+        // replace node under r2
+        Set<BookieSocketAddress> ensembleBookies = new 
HashSet<BookieSocketAddress>();
+        ensembleBookies.add(addr2);
+        ensembleBookies.add(addr4);
+        BookieSocketAddress replacedBookie = repp.replaceBookie(
+            1, 1, 1 , null,
+            ensembleBookies,
+            addr4,
+            new HashSet<>());
+        assertEquals(addr1, replacedBookie);
+    }
+
     @Test
     public void testNewEnsembleWithSingleRack() throws Exception {
         BookieSocketAddress addr1 = new BookieSocketAddress("127.0.0.6", 3181);
@@ -617,7 +647,7 @@ public void 
testWeightedPlacementAndReplaceBookieWithEnoughBookiesInSameRack() t
         for (int i = 0; i < numTries; i++) {
             // replace node under r2
             replacedBookie = repp.replaceBookie(1, 1, 1, null, new 
HashSet<>(), addr2, new HashSet<>());
-            assertTrue(addr3.equals(replacedBookie) || 
addr4.equals(replacedBookie));
+            assertTrue("replaced : " + replacedBookie, 
addr3.equals(replacedBookie) || addr4.equals(replacedBookie));
             selectionCounts.put(replacedBookie, 
selectionCounts.get(replacedBookie) + 1);
         }
         double observedMultiple = ((double) selectionCounts.get(addr4) / 
(double) selectionCounts.get(addr3));
diff --git 
a/bookkeeper-server/src/test/java/org/apache/bookkeeper/client/TestRegionAwareEnsemblePlacementPolicy.java
 
b/bookkeeper-server/src/test/java/org/apache/bookkeeper/client/TestRegionAwareEnsemblePlacementPolicy.java
index fe75b3105..ed8cf2960 100644
--- 
a/bookkeeper-server/src/test/java/org/apache/bookkeeper/client/TestRegionAwareEnsemblePlacementPolicy.java
+++ 
b/bookkeeper-server/src/test/java/org/apache/bookkeeper/client/TestRegionAwareEnsemblePlacementPolicy.java
@@ -960,8 +960,9 @@ public void testEnsembleWithThreeRegionsReplaceInternal(int 
minDurability, boole
             Set<BookieSocketAddress> excludedAddrs = new 
HashSet<BookieSocketAddress>();
             for (BookieSocketAddress addr: region2Bookies) {
                 if (ensemble.contains(addr)) {
-                    BookieSocketAddress replacedBookie = repp.replaceBookie(6, 
6, ackQuorum, null, ensemble,
-                            addr, excludedAddrs);
+                    BookieSocketAddress replacedBookie = repp.replaceBookie(
+                        6, 6, ackQuorum, null,
+                        new HashSet<>(ensemble), addr, excludedAddrs);
                     ensemble.remove(addr);
                     ensemble.add(replacedBookie);
                 }
@@ -985,8 +986,9 @@ public void testEnsembleWithThreeRegionsReplaceInternal(int 
minDurability, boole
             Set<BookieSocketAddress> excludedAddrs = new 
HashSet<BookieSocketAddress>();
 
             try {
-                BookieSocketAddress replacedBookie = repp.replaceBookie(6, 6, 
ackQuorum, null, ensemble,
-                        bookieToReplace, excludedAddrs);
+                BookieSocketAddress replacedBookie = repp.replaceBookie(
+                    6, 6, ackQuorum, null,
+                    new HashSet<>(ensemble), bookieToReplace, excludedAddrs);
                 assert (replacedBookie.equals(replacedBookieExpected));
                 assertEquals(3, getNumRegionsInEnsemble(ensemble));
             } catch (BKNotEnoughBookiesException bnebe) {
@@ -995,8 +997,9 @@ public void testEnsembleWithThreeRegionsReplaceInternal(int 
minDurability, boole
 
             excludedAddrs.add(replacedBookieExpected);
             try {
-                BookieSocketAddress replacedBookie = repp.replaceBookie(6, 6, 
ackQuorum, null, ensemble,
-                        bookieToReplace, excludedAddrs);
+                BookieSocketAddress replacedBookie = repp.replaceBookie(
+                    6, 6, ackQuorum, null,
+                    new HashSet<>(ensemble), bookieToReplace, excludedAddrs);
                 if (minDurability > 1 && 
!disableDurabilityFeature.isAvailable()) {
                     fail("Should throw BKNotEnoughBookiesException when there 
is not enough bookies");
                 }
@@ -1083,7 +1086,9 @@ public void testEnsembleDurabilityDisabledInternal(int 
minDurability, boolean di
         Set<BookieSocketAddress> excludedAddrs = new 
HashSet<BookieSocketAddress>();
 
         try {
-            repp.replaceBookie(6, 6, 4, null, ensemble, addr4, excludedAddrs);
+            repp.replaceBookie(
+                6, 6, 4, null,
+                new HashSet<>(ensemble), addr4, excludedAddrs);
         } catch (BKNotEnoughBookiesException bnebe) {
             fail("Should not get not enough bookies exception even there is 
only one rack.");
         }


 

----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
[email protected]


With regards,
Apache Git Services

Reply via email to