sijie closed pull request #887: make replaceBookie() exclude ensemble bookie
racks
URL: https://github.com/apache/bookkeeper/pull/887
This is a PR merged from a forked repository.
As GitHub hides the original diff on merge, it is displayed below for
the sake of provenance:
As this is a foreign pull request (from a fork), the diff is supplied
below (as it won't show otherwise due to GitHub magic):
diff --git
a/bookkeeper-server/src/main/java/org/apache/bookkeeper/bookie/LocalBookieEnsemblePlacementPolicy.java
b/bookkeeper-server/src/main/java/org/apache/bookkeeper/bookie/LocalBookieEnsemblePlacementPolicy.java
index 7c6bad93d..041455327 100644
---
a/bookkeeper-server/src/main/java/org/apache/bookkeeper/bookie/LocalBookieEnsemblePlacementPolicy.java
+++
b/bookkeeper-server/src/main/java/org/apache/bookkeeper/bookie/LocalBookieEnsemblePlacementPolicy.java
@@ -21,7 +21,6 @@
import io.netty.util.HashedWheelTimer;
import java.net.UnknownHostException;
import java.util.ArrayList;
-import java.util.Collection;
import java.util.Collections;
import java.util.Map;
import java.util.Optional;
@@ -82,7 +81,7 @@ public void uninitalize() {
@Override
public BookieSocketAddress replaceBookie(int ensembleSize, int
writeQuorumSize, int ackQuorumSize,
- java.util.Map<String, byte[]> customMetadata,
Collection<BookieSocketAddress> currentEnsemble,
+ java.util.Map<String, byte[]> customMetadata, Set<BookieSocketAddress>
currentEnsemble,
BookieSocketAddress bookieToReplace, Set<BookieSocketAddress>
excludeBookies)
throws BKNotEnoughBookiesException {
throw new BKNotEnoughBookiesException();
diff --git
a/bookkeeper-server/src/main/java/org/apache/bookkeeper/client/BookKeeperAdmin.java
b/bookkeeper-server/src/main/java/org/apache/bookkeeper/client/BookKeeperAdmin.java
index 85e858167..7d13100e4 100644
---
a/bookkeeper-server/src/main/java/org/apache/bookkeeper/client/BookKeeperAdmin.java
+++
b/bookkeeper-server/src/main/java/org/apache/bookkeeper/client/BookKeeperAdmin.java
@@ -33,6 +33,7 @@
import java.util.Collection;
import java.util.Enumeration;
import java.util.HashMap;
+import java.util.HashSet;
import java.util.Iterator;
import java.util.LinkedList;
import java.util.List;
@@ -992,7 +993,7 @@ private void asyncRecoverLedgerFragment(final LedgerHandle
lh,
lh.getLedgerMetadata().getWriteQuorumSize(),
lh.getLedgerMetadata().getAckQuorumSize(),
lh.getLedgerMetadata().getCustomMetadata(),
- ensemble,
+ new HashSet<>(ensemble),
oldBookie,
bookiesToExclude);
targetBookieAddresses.put(bookieIndex, newBookie);
diff --git
a/bookkeeper-server/src/main/java/org/apache/bookkeeper/client/BookieWatcher.java
b/bookkeeper-server/src/main/java/org/apache/bookkeeper/client/BookieWatcher.java
index 04c62e2f8..3ecf94726 100644
---
a/bookkeeper-server/src/main/java/org/apache/bookkeeper/client/BookieWatcher.java
+++
b/bookkeeper-server/src/main/java/org/apache/bookkeeper/client/BookieWatcher.java
@@ -201,7 +201,7 @@ public void initialBlockingBookieRead() throws BKException {
log.debug("Not enough healthy bookies available, using
quarantined bookies");
}
return placementPolicy.newEnsemble(
- ensembleSize, writeQuorumSize, ackQuorumSize, customMetadata,
Collections.emptySet());
+ ensembleSize, writeQuorumSize, ackQuorumSize, customMetadata,
new HashSet<>());
}
}
diff --git
a/bookkeeper-server/src/main/java/org/apache/bookkeeper/client/DefaultEnsemblePlacementPolicy.java
b/bookkeeper-server/src/main/java/org/apache/bookkeeper/client/DefaultEnsemblePlacementPolicy.java
index 219086df4..760249961 100644
---
a/bookkeeper-server/src/main/java/org/apache/bookkeeper/client/DefaultEnsemblePlacementPolicy.java
+++
b/bookkeeper-server/src/main/java/org/apache/bookkeeper/client/DefaultEnsemblePlacementPolicy.java
@@ -114,7 +114,7 @@
@Override
public BookieSocketAddress replaceBookie(int ensembleSize, int
writeQuorumSize, int ackQuorumSize,
- Map<String, byte[]> customMetadata,
Collection<BookieSocketAddress> currentEnsemble,
+ Map<String, byte[]> customMetadata, Set<BookieSocketAddress>
currentEnsemble,
BookieSocketAddress bookieToReplace, Set<BookieSocketAddress>
excludeBookies)
throws BKNotEnoughBookiesException {
excludeBookies.addAll(currentEnsemble);
diff --git
a/bookkeeper-server/src/main/java/org/apache/bookkeeper/client/EnsemblePlacementPolicy.java
b/bookkeeper-server/src/main/java/org/apache/bookkeeper/client/EnsemblePlacementPolicy.java
index 7656a4960..8dab4eaed 100644
---
a/bookkeeper-server/src/main/java/org/apache/bookkeeper/client/EnsemblePlacementPolicy.java
+++
b/bookkeeper-server/src/main/java/org/apache/bookkeeper/client/EnsemblePlacementPolicy.java
@@ -285,7 +285,7 @@ BookieSocketAddress replaceBookie(int ensembleSize,
int writeQuorumSize,
int ackQuorumSize,
Map<String, byte[]> customMetadata,
- Collection<BookieSocketAddress>
currentEnsemble,
+ Set<BookieSocketAddress> currentEnsemble,
BookieSocketAddress bookieToReplace,
Set<BookieSocketAddress> excludeBookies)
throws BKNotEnoughBookiesException;
diff --git
a/bookkeeper-server/src/main/java/org/apache/bookkeeper/client/RackawareEnsemblePlacementPolicy.java
b/bookkeeper-server/src/main/java/org/apache/bookkeeper/client/RackawareEnsemblePlacementPolicy.java
index 1691f1ec1..4359e1670 100644
---
a/bookkeeper-server/src/main/java/org/apache/bookkeeper/client/RackawareEnsemblePlacementPolicy.java
+++
b/bookkeeper-server/src/main/java/org/apache/bookkeeper/client/RackawareEnsemblePlacementPolicy.java
@@ -20,7 +20,6 @@
import io.netty.util.HashedWheelTimer;
import java.util.ArrayList;
-import java.util.Collection;
import java.util.Map;
import java.util.Set;
@@ -102,7 +101,7 @@ public void uninitalize() {
@Override
public BookieSocketAddress replaceBookie(int ensembleSize, int
writeQuorumSize, int ackQuorumSize,
- Map<String, byte[]> customMetadata,
Collection<BookieSocketAddress> currentEnsemble,
+ Map<String, byte[]> customMetadata, Set<BookieSocketAddress>
currentEnsemble,
BookieSocketAddress bookieToReplace, Set<BookieSocketAddress>
excludeBookies)
throws BKException.BKNotEnoughBookiesException {
try {
diff --git
a/bookkeeper-server/src/main/java/org/apache/bookkeeper/client/RackawareEnsemblePlacementPolicyImpl.java
b/bookkeeper-server/src/main/java/org/apache/bookkeeper/client/RackawareEnsemblePlacementPolicyImpl.java
index 735324bca..24460d53b 100644
---
a/bookkeeper-server/src/main/java/org/apache/bookkeeper/client/RackawareEnsemblePlacementPolicyImpl.java
+++
b/bookkeeper-server/src/main/java/org/apache/bookkeeper/client/RackawareEnsemblePlacementPolicyImpl.java
@@ -31,7 +31,6 @@
import java.net.InetAddress;
import java.net.UnknownHostException;
import java.util.ArrayList;
-import java.util.Collection;
import java.util.Collections;
import java.util.HashMap;
import java.util.HashSet;
@@ -435,6 +434,14 @@ public void
handleBookiesThatJoined(Set<BookieSocketAddress> joinedBookies) {
return nodes;
}
+ private static Set<String> getNetworkLocations(Set<Node> bookieNodes) {
+ Set<String> networkLocs = new HashSet<>();
+ for (Node bookieNode : bookieNodes) {
+ networkLocs.add(bookieNode.getNetworkLocation());
+ }
+ return networkLocs;
+ }
+
@Override
public ArrayList<BookieSocketAddress> newEnsemble(int ensembleSize, int
writeQuorumSize, int ackQuorumSize,
Map<String, byte[]> customMetadata, Set<BookieSocketAddress>
excludeBookies)
@@ -532,7 +539,7 @@ public void
handleBookiesThatJoined(Set<BookieSocketAddress> joinedBookies) {
@Override
public BookieSocketAddress replaceBookie(int ensembleSize, int
writeQuorumSize, int ackQuorumSize,
- Map<String, byte[]> customMetadata,
Collection<BookieSocketAddress> currentEnsemble,
+ Map<String, byte[]> customMetadata, Set<BookieSocketAddress>
currentEnsemble,
BookieSocketAddress bookieToReplace, Set<BookieSocketAddress>
excludeBookies)
throws BKNotEnoughBookiesException {
rwLock.readLock().lock();
@@ -543,16 +550,23 @@ public BookieSocketAddress replaceBookie(int
ensembleSize, int writeQuorumSize,
bn = createBookieNode(bookieToReplace);
}
+ Set<Node> ensembleNodes = convertBookiesToNodes(currentEnsemble);
Set<Node> excludeNodes = convertBookiesToNodes(excludeBookies);
- // add the bookie to replace in exclude set
+
+ excludeNodes.addAll(ensembleNodes);
excludeNodes.add(bn);
+ ensembleNodes.remove(bn);
+
+ Set<String> networkLocationsToBeExcluded =
getNetworkLocations(ensembleNodes);
+
if (LOG.isDebugEnabled()) {
- LOG.debug("Try to choose a new bookie to replace {}, excluding
{}.", bookieToReplace,
- excludeNodes);
+ LOG.debug("Try to choose a new bookie to replace {} from
ensemble {}, excluding {}.",
+ bookieToReplace, ensembleNodes, excludeNodes);
}
// pick a candidate from same rack to replace
BookieNode candidate = selectFromNetworkLocation(
bn.getNetworkLocation(),
+ networkLocationsToBeExcluded,
excludeNodes,
TruePredicate.INSTANCE,
EnsembleForReplacementWithNoConstraints.INSTANCE);
@@ -610,8 +624,57 @@ public BookieNode selectFromNetworkLocation(
}
}
- protected String getRemoteRack(BookieNode node) {
- return "~" + node.getNetworkLocation();
+ protected BookieNode selectFromNetworkLocation(String networkLoc,
+ Set<String> excludeRacks,
+ Set<Node> excludeBookies,
+ Predicate<BookieNode>
predicate,
+ Ensemble<BookieNode>
ensemble)
+ throws BKNotEnoughBookiesException {
+ // first attempt to select one from local rack
+ try {
+ return selectRandomFromRack(networkLoc, excludeBookies, predicate,
ensemble);
+ } catch (BKNotEnoughBookiesException e) {
+ if (isWeighted) {
+ // if weight based selection is enabled, randomly select one
from the whole cluster
+ // based on weights and ignore the provided
<tt>excludeRacks</tt>.
+ // randomly choose one from whole cluster, ignore the provided
predicate.
+ return selectRandom(1, excludeBookies, predicate,
ensemble).get(0);
+ } else {
+ // if weight based selection is disabled, and there is no
enough bookie from local rack,
+ // select bookies from the whole cluster and exclude the racks
specified at <tt>excludeRacks</tt>.
+ return selectFromNetworkLocation(excludeRacks, excludeBookies,
predicate, ensemble);
+ }
+ }
+
+ }
+
+
+ /**
+ * It randomly selects a {@link BookieNode} that is not on the
<i>excludeRacks</i> set, excluding the nodes in
+ * <i>excludeBookies</i> set. If it fails to find one, it selects a random
{@link BookieNode} from the whole
+ * cluster.
+ */
+ protected BookieNode selectFromNetworkLocation(Set<String> excludeRacks,
+ Set<Node> excludeBookies,
+ Predicate<BookieNode>
predicate,
+ Ensemble<BookieNode>
ensemble)
+ throws BKNotEnoughBookiesException {
+ List<BookieNode> knownNodes = new ArrayList<>(knownBookies.values());
+ Collections.shuffle(knownNodes);
+
+ for (BookieNode knownNode : knownNodes) {
+ if (excludeBookies.contains(knownNode)) {
+ continue;
+ }
+ if (excludeRacks.contains(knownNode.getNetworkLocation())) {
+ continue;
+ }
+ return knownNode;
+ }
+ LOG.warn("Failed to choose a bookie: excluded {}, fallback to choose
bookie randomly from the cluster.",
+ excludeBookies);
+ // randomly choose one from whole cluster
+ return selectRandom(1, excludeBookies, predicate, ensemble).get(0);
}
private WeightedRandomSelection<BookieNode>
prepareForWeightedSelection(List<Node> leaves) {
diff --git
a/bookkeeper-server/src/main/java/org/apache/bookkeeper/client/RegionAwareEnsemblePlacementPolicy.java
b/bookkeeper-server/src/main/java/org/apache/bookkeeper/client/RegionAwareEnsemblePlacementPolicy.java
index f990b3f24..0b804e768 100644
---
a/bookkeeper-server/src/main/java/org/apache/bookkeeper/client/RegionAwareEnsemblePlacementPolicy.java
+++
b/bookkeeper-server/src/main/java/org/apache/bookkeeper/client/RegionAwareEnsemblePlacementPolicy.java
@@ -20,7 +20,6 @@
import io.netty.util.HashedWheelTimer;
import java.util.ArrayList;
-import java.util.Collection;
import java.util.HashMap;
import java.util.HashSet;
import java.util.List;
@@ -414,7 +413,7 @@ public RegionAwareEnsemblePlacementPolicy
initialize(ClientConfiguration conf,
@Override
public BookieSocketAddress replaceBookie(int ensembleSize, int
writeQuorumSize, int ackQuorumSize,
- Map<String, byte[]> customMetadata,
Collection<BookieSocketAddress> currentEnsemble,
+ Map<String, byte[]> customMetadata, Set<BookieSocketAddress>
currentEnsemble,
BookieSocketAddress bookieToReplace, Set<BookieSocketAddress>
excludeBookies)
throws BKException.BKNotEnoughBookiesException {
rwLock.readLock().lock();
diff --git
a/bookkeeper-server/src/test/java/org/apache/bookkeeper/client/GenericEnsemblePlacementPolicyTest.java
b/bookkeeper-server/src/test/java/org/apache/bookkeeper/client/GenericEnsemblePlacementPolicyTest.java
index d8cea7464..91ece37cf 100644
---
a/bookkeeper-server/src/test/java/org/apache/bookkeeper/client/GenericEnsemblePlacementPolicyTest.java
+++
b/bookkeeper-server/src/test/java/org/apache/bookkeeper/client/GenericEnsemblePlacementPolicyTest.java
@@ -24,7 +24,6 @@
import java.nio.charset.StandardCharsets;
import java.util.ArrayList;
-import java.util.Collection;
import java.util.HashMap;
import java.util.List;
import java.util.Map;
@@ -59,7 +58,7 @@ public GenericEnsemblePlacementPolicyTest() {
@Override
public BookieSocketAddress replaceBookie(int ensembleSize, int
writeQuorumSize,
- int ackQuorumSize, Map<String, byte[]> customMetadata,
Collection<BookieSocketAddress> currentEnsemble,
+ int ackQuorumSize, Map<String, byte[]> customMetadata,
Set<BookieSocketAddress> currentEnsemble,
BookieSocketAddress bookieToReplace, Set<BookieSocketAddress>
excludeBookies)
throws BKException.BKNotEnoughBookiesException {
new Exception("replaceBookie " + ensembleSize + "," +
customMetadata).printStackTrace();
diff --git
a/bookkeeper-server/src/test/java/org/apache/bookkeeper/client/TestRackawareEnsemblePlacementPolicy.java
b/bookkeeper-server/src/test/java/org/apache/bookkeeper/client/TestRackawareEnsemblePlacementPolicy.java
index 637df1b19..01a6617fd 100644
---
a/bookkeeper-server/src/test/java/org/apache/bookkeeper/client/TestRackawareEnsemblePlacementPolicy.java
+++
b/bookkeeper-server/src/test/java/org/apache/bookkeeper/client/TestRackawareEnsemblePlacementPolicy.java
@@ -455,6 +455,36 @@ public void testReplaceBookieWithNotEnoughBookies() throws
Exception {
}
}
+ @Test
+ public void testReplaceBookieWithEnoughBookiesInSameRackAsEnsemble()
throws Exception {
+ BookieSocketAddress addr1 = new BookieSocketAddress("127.0.0.1", 3181);
+ BookieSocketAddress addr2 = new BookieSocketAddress("127.0.0.2", 3181);
+ BookieSocketAddress addr3 = new BookieSocketAddress("127.0.0.3", 3181);
+ BookieSocketAddress addr4 = new BookieSocketAddress("127.0.0.4", 3181);
+ // update dns mapping
+ StaticDNSResolver.addNodeToRack(addr1.getHostName(),
NetworkTopology.DEFAULT_RACK);
+ StaticDNSResolver.addNodeToRack(addr2.getHostName(), "/r2");
+ StaticDNSResolver.addNodeToRack(addr3.getHostName(), "/r2");
+ StaticDNSResolver.addNodeToRack(addr4.getHostName(), "/r3");
+ // Update cluster
+ Set<BookieSocketAddress> addrs = new HashSet<BookieSocketAddress>();
+ addrs.add(addr1);
+ addrs.add(addr2);
+ addrs.add(addr3);
+ addrs.add(addr4);
+ repp.onClusterChanged(addrs, new HashSet<BookieSocketAddress>());
+ // replace node under r2
+ Set<BookieSocketAddress> ensembleBookies = new
HashSet<BookieSocketAddress>();
+ ensembleBookies.add(addr2);
+ ensembleBookies.add(addr4);
+ BookieSocketAddress replacedBookie = repp.replaceBookie(
+ 1, 1, 1 , null,
+ ensembleBookies,
+ addr4,
+ new HashSet<>());
+ assertEquals(addr1, replacedBookie);
+ }
+
@Test
public void testNewEnsembleWithSingleRack() throws Exception {
BookieSocketAddress addr1 = new BookieSocketAddress("127.0.0.6", 3181);
@@ -617,7 +647,7 @@ public void
testWeightedPlacementAndReplaceBookieWithEnoughBookiesInSameRack() t
for (int i = 0; i < numTries; i++) {
// replace node under r2
replacedBookie = repp.replaceBookie(1, 1, 1, null, new
HashSet<>(), addr2, new HashSet<>());
- assertTrue(addr3.equals(replacedBookie) ||
addr4.equals(replacedBookie));
+ assertTrue("replaced : " + replacedBookie,
addr3.equals(replacedBookie) || addr4.equals(replacedBookie));
selectionCounts.put(replacedBookie,
selectionCounts.get(replacedBookie) + 1);
}
double observedMultiple = ((double) selectionCounts.get(addr4) /
(double) selectionCounts.get(addr3));
diff --git
a/bookkeeper-server/src/test/java/org/apache/bookkeeper/client/TestRegionAwareEnsemblePlacementPolicy.java
b/bookkeeper-server/src/test/java/org/apache/bookkeeper/client/TestRegionAwareEnsemblePlacementPolicy.java
index fe75b3105..ed8cf2960 100644
---
a/bookkeeper-server/src/test/java/org/apache/bookkeeper/client/TestRegionAwareEnsemblePlacementPolicy.java
+++
b/bookkeeper-server/src/test/java/org/apache/bookkeeper/client/TestRegionAwareEnsemblePlacementPolicy.java
@@ -960,8 +960,9 @@ public void testEnsembleWithThreeRegionsReplaceInternal(int
minDurability, boole
Set<BookieSocketAddress> excludedAddrs = new
HashSet<BookieSocketAddress>();
for (BookieSocketAddress addr: region2Bookies) {
if (ensemble.contains(addr)) {
- BookieSocketAddress replacedBookie = repp.replaceBookie(6,
6, ackQuorum, null, ensemble,
- addr, excludedAddrs);
+ BookieSocketAddress replacedBookie = repp.replaceBookie(
+ 6, 6, ackQuorum, null,
+ new HashSet<>(ensemble), addr, excludedAddrs);
ensemble.remove(addr);
ensemble.add(replacedBookie);
}
@@ -985,8 +986,9 @@ public void testEnsembleWithThreeRegionsReplaceInternal(int
minDurability, boole
Set<BookieSocketAddress> excludedAddrs = new
HashSet<BookieSocketAddress>();
try {
- BookieSocketAddress replacedBookie = repp.replaceBookie(6, 6,
ackQuorum, null, ensemble,
- bookieToReplace, excludedAddrs);
+ BookieSocketAddress replacedBookie = repp.replaceBookie(
+ 6, 6, ackQuorum, null,
+ new HashSet<>(ensemble), bookieToReplace, excludedAddrs);
assert (replacedBookie.equals(replacedBookieExpected));
assertEquals(3, getNumRegionsInEnsemble(ensemble));
} catch (BKNotEnoughBookiesException bnebe) {
@@ -995,8 +997,9 @@ public void testEnsembleWithThreeRegionsReplaceInternal(int
minDurability, boole
excludedAddrs.add(replacedBookieExpected);
try {
- BookieSocketAddress replacedBookie = repp.replaceBookie(6, 6,
ackQuorum, null, ensemble,
- bookieToReplace, excludedAddrs);
+ BookieSocketAddress replacedBookie = repp.replaceBookie(
+ 6, 6, ackQuorum, null,
+ new HashSet<>(ensemble), bookieToReplace, excludedAddrs);
if (minDurability > 1 &&
!disableDurabilityFeature.isAvailable()) {
fail("Should throw BKNotEnoughBookiesException when there
is not enough bookies");
}
@@ -1083,7 +1086,9 @@ public void testEnsembleDurabilityDisabledInternal(int
minDurability, boolean di
Set<BookieSocketAddress> excludedAddrs = new
HashSet<BookieSocketAddress>();
try {
- repp.replaceBookie(6, 6, 4, null, ensemble, addr4, excludedAddrs);
+ repp.replaceBookie(
+ 6, 6, 4, null,
+ new HashSet<>(ensemble), addr4, excludedAddrs);
} catch (BKNotEnoughBookiesException bnebe) {
fail("Should not get not enough bookies exception even there is
only one rack.");
}
----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on GitHub and use the
URL above to go to the specific comment.
For queries about this service, please contact Infrastructure at:
[email protected]
With regards,
Apache Git Services