This is an automated email from the ASF dual-hosted git repository.
chenhang pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/bookkeeper.git
The following commit(s) were added to refs/heads/master by this push:
new 2381d9b65a Fix RegionAwareEnsemblePlacementPolicy.newEnsemble
sometimes failed problem. (#3725)
2381d9b65a is described below
commit 2381d9b65a37b6e58ffc9c7be2aba35fd37874b7
Author: Yan Zhao <[email protected]>
AuthorDate: Fri Feb 17 15:09:36 2023 +0800
Fix RegionAwareEnsemblePlacementPolicy.newEnsemble sometimes failed
problem. (#3725)
Descriptions of the changes in this PR:
Fixes #3722
### Motivation
See
[#3722](https://github.com/apache/bookkeeper/issues/3722#issuecomment-1369859251)
---
.../RackawareEnsemblePlacementPolicyImpl.java | 7 +-
.../ZoneawareEnsemblePlacementPolicyImpl.java | 2 +-
.../apache/bookkeeper/net/NetworkTopologyImpl.java | 7 +-
.../TestRackawareEnsemblePlacementPolicy.java | 124 +++++++++++++++++++++
.../TestRegionAwareEnsemblePlacementPolicy.java | 61 ++++++++++
5 files changed, 194 insertions(+), 7 deletions(-)
diff --git
a/bookkeeper-server/src/main/java/org/apache/bookkeeper/client/RackawareEnsemblePlacementPolicyImpl.java
b/bookkeeper-server/src/main/java/org/apache/bookkeeper/client/RackawareEnsemblePlacementPolicyImpl.java
index 9f3b6fba36..376c5831a1 100644
---
a/bookkeeper-server/src/main/java/org/apache/bookkeeper/client/RackawareEnsemblePlacementPolicyImpl.java
+++
b/bookkeeper-server/src/main/java/org/apache/bookkeeper/client/RackawareEnsemblePlacementPolicyImpl.java
@@ -430,13 +430,14 @@ public class RackawareEnsemblePlacementPolicyImpl extends
TopologyAwareEnsembleP
curRack = curRack + NetworkTopologyImpl.NODE_SEPARATOR
+ prevNode.getNetworkLocation();
}
}
+ boolean firstBookieInTheEnsemble = (null == prevNode);
try {
prevNode = selectRandomFromRack(curRack, excludeNodes,
ensemble, ensemble);
} catch (BKNotEnoughBookiesException e) {
if (!curRack.equals(NodeBase.ROOT)) {
curRack = NodeBase.ROOT;
prevNode = selectFromNetworkLocation(curRack,
excludeNodes, ensemble, ensemble,
- !enforceMinNumRacksPerWriteQuorum || prevNode
== null);
+ !enforceMinNumRacksPerWriteQuorum ||
firstBookieInTheEnsemble);
} else {
throw e;
}
@@ -1185,7 +1186,7 @@ public class RackawareEnsemblePlacementPolicyImpl extends
TopologyAwareEnsembleP
curRack = localNode.getNetworkLocation();
}
} else {
- curRack = "~" + prevNode.getNetworkLocation();
+ curRack = NetworkTopologyImpl.INVERSE +
prevNode.getNetworkLocation();
}
try {
prevNode = replaceToAdherePlacementPolicyInternal(
@@ -1252,7 +1253,7 @@ public class RackawareEnsemblePlacementPolicyImpl extends
TopologyAwareEnsembleP
// avoid additional replace from write quorum candidates by
preExcludeRacks and postExcludeRacks
// avoid to use first candidate bookies for election by
provisionalEnsembleNodes
conditionList.add(Pair.of(
- "~" + String.join(",",
+ NetworkTopologyImpl.INVERSE + String.join(",",
Stream.concat(preExcludeRacks.stream(),
postExcludeRacks.stream()).collect(Collectors.toSet())),
provisionalEnsembleNodes
));
diff --git
a/bookkeeper-server/src/main/java/org/apache/bookkeeper/client/ZoneawareEnsemblePlacementPolicyImpl.java
b/bookkeeper-server/src/main/java/org/apache/bookkeeper/client/ZoneawareEnsemblePlacementPolicyImpl.java
index 598b4a63a0..1ce04c4be3 100644
---
a/bookkeeper-server/src/main/java/org/apache/bookkeeper/client/ZoneawareEnsemblePlacementPolicyImpl.java
+++
b/bookkeeper-server/src/main/java/org/apache/bookkeeper/client/ZoneawareEnsemblePlacementPolicyImpl.java
@@ -616,7 +616,7 @@ public class ZoneawareEnsemblePlacementPolicyImpl extends
TopologyAwareEnsembleP
if (excludeZones.isEmpty()) {
return "";
}
- StringBuilder excludedZonesString = new StringBuilder("~");
+ StringBuilder excludedZonesString = new
StringBuilder(NetworkTopologyImpl.INVERSE);
boolean firstZone = true;
for (String excludeZone : excludeZones) {
if (!firstZone) {
diff --git
a/bookkeeper-server/src/main/java/org/apache/bookkeeper/net/NetworkTopologyImpl.java
b/bookkeeper-server/src/main/java/org/apache/bookkeeper/net/NetworkTopologyImpl.java
index 8397437492..d4f0c4fd03 100644
---
a/bookkeeper-server/src/main/java/org/apache/bookkeeper/net/NetworkTopologyImpl.java
+++
b/bookkeeper-server/src/main/java/org/apache/bookkeeper/net/NetworkTopologyImpl.java
@@ -45,6 +45,7 @@ public class NetworkTopologyImpl implements NetworkTopology {
public static final int DEFAULT_HOST_LEVEL = 2;
public static final Logger LOG =
LoggerFactory.getLogger(NetworkTopologyImpl.class);
public static final String NODE_SEPARATOR = ",";
+ public static final String INVERSE = "~";
/**
* A marker for an InvalidTopology Exception.
@@ -708,7 +709,7 @@ public class NetworkTopologyImpl implements NetworkTopology
{
public Node chooseRandom(String scope) {
netlock.readLock().lock();
try {
- if (scope.startsWith("~")) {
+ if (scope.startsWith(INVERSE)) {
return chooseRandom(NodeBase.ROOT, scope.substring(1));
} else {
return chooseRandom(scope, null);
@@ -774,7 +775,7 @@ public class NetworkTopologyImpl implements NetworkTopology
{
public Set<Node> getLeaves(String scope) {
netlock.readLock().lock();
try {
- if (scope.startsWith("~")) {
+ if (scope.startsWith(INVERSE)) {
Set<Node> allNodes = doGetLeaves(NodeBase.ROOT);
String[] excludeScopes =
scope.substring(1).split(NODE_SEPARATOR);
Set<Node> excludeNodes = new HashSet<Node>();
@@ -794,7 +795,7 @@ public class NetworkTopologyImpl implements NetworkTopology
{
@Override
public int countNumOfAvailableNodes(String scope, Collection<Node>
excludedNodes) {
boolean isExcluded = false;
- if (scope.startsWith("~")) {
+ if (scope.startsWith(INVERSE)) {
isExcluded = true;
scope = scope.substring(1);
}
diff --git
a/bookkeeper-server/src/test/java/org/apache/bookkeeper/client/TestRackawareEnsemblePlacementPolicy.java
b/bookkeeper-server/src/test/java/org/apache/bookkeeper/client/TestRackawareEnsemblePlacementPolicy.java
index 7d843fd16d..95a7d5b40d 100644
---
a/bookkeeper-server/src/test/java/org/apache/bookkeeper/client/TestRackawareEnsemblePlacementPolicy.java
+++
b/bookkeeper-server/src/test/java/org/apache/bookkeeper/client/TestRackawareEnsemblePlacementPolicy.java
@@ -1518,6 +1518,130 @@ public class TestRackawareEnsemblePlacementPolicy
extends TestCase {
}
}
+ //see: https://github.com/apache/bookkeeper/issues/3722
+ @Test
+ public void testNewEnsembleWithMultipleRacksWithCommonRack() throws
Exception {
+ ClientConfiguration clientConf = new ClientConfiguration(conf);
+ clientConf.setEnforceMinNumRacksPerWriteQuorum(true);
+ clientConf.setMinNumRacksPerWriteQuorum(3);
+ repp.uninitalize();
+ repp = new RackawareEnsemblePlacementPolicy();
+ repp.initialize(clientConf, Optional.<DNSToSwitchMapping>empty(),
timer,
+ DISABLE_ALL, NullStatsLogger.INSTANCE,
BookieSocketAddress.LEGACY_BOOKIEID_RESOLVER);
+ repp.withDefaultRack(NetworkTopology.DEFAULT_REGION_AND_RACK);
+
+ BookieSocketAddress addr1 = new BookieSocketAddress("127.0.0.1", 3181);
+ BookieSocketAddress addr2 = new BookieSocketAddress("127.0.0.2", 3181);
+ BookieSocketAddress addr3 = new BookieSocketAddress("127.0.0.3", 3181);
+ BookieSocketAddress addr4 = new BookieSocketAddress("127.0.0.4", 3181);
+ BookieSocketAddress addr5 = new BookieSocketAddress("127.0.0.5", 3181);
+ BookieSocketAddress addr6 = new BookieSocketAddress("127.0.0.6", 3181);
+ BookieSocketAddress addr7 = new BookieSocketAddress("127.0.0.7", 3181);
+ BookieSocketAddress addr8 = new BookieSocketAddress("127.0.0.8", 3181);
+ BookieSocketAddress addr9 = new BookieSocketAddress("127.0.0.9", 3181);
+ BookieSocketAddress addr10 = new BookieSocketAddress("127.0.0.10",
3181);
+ // update dns mapping
+ StaticDNSResolver.addNodeToRack(addr1.getHostName(),
"/default-region/r1");
+ StaticDNSResolver.addNodeToRack(addr2.getHostName(),
"/default-region/r1");
+ StaticDNSResolver.addNodeToRack(addr3.getHostName(),
"/default-region/r1");
+ StaticDNSResolver.addNodeToRack(addr4.getHostName(),
"/default-region/r1");
+ StaticDNSResolver.addNodeToRack(addr5.getHostName(),
"/default-region/r1");
+ StaticDNSResolver.addNodeToRack(addr6.getHostName(),
"/default-region/r1");
+ StaticDNSResolver.addNodeToRack(addr7.getHostName(),
"/default-region/r1");
+ StaticDNSResolver.addNodeToRack(addr8.getHostName(),
"/default-region/r1");
+ StaticDNSResolver.addNodeToRack(addr9.getHostName(),
"/default-region/r2");
+ StaticDNSResolver.addNodeToRack(addr10.getHostName(),
"/default-region/r3");
+ // Update cluster
+ Set<BookieId> addrs = new HashSet<BookieId>();
+ addrs.add(addr1.toBookieId());
+ addrs.add(addr2.toBookieId());
+ addrs.add(addr3.toBookieId());
+ addrs.add(addr4.toBookieId());
+ addrs.add(addr5.toBookieId());
+ addrs.add(addr6.toBookieId());
+ addrs.add(addr7.toBookieId());
+ addrs.add(addr8.toBookieId());
+ addrs.add(addr9.toBookieId());
+ addrs.add(addr10.toBookieId());
+ repp.onClusterChanged(addrs, new HashSet<BookieId>());
+
+ try {
+ int ensembleSize = 10;
+ int writeQuorumSize = 10;
+ int ackQuorumSize = 2;
+
+ for (int i = 0; i < 50; ++i) {
+ Set<BookieId> excludeBookies = new HashSet<>();
+ EnsemblePlacementPolicy.PlacementResult<List<BookieId>>
ensembleResponse =
+ repp.newEnsemble(ensembleSize, writeQuorumSize,
+ ackQuorumSize, null, excludeBookies);
+ }
+ } catch (Exception e) {
+ fail("Can not new ensemble selection succeed");
+ }
+ }
+
+ @Test
+ public void testNewEnsembleWithMultipleRacksWithCommonRackFailed() throws
Exception {
+ ClientConfiguration clientConf = new ClientConfiguration(conf);
+ clientConf.setEnforceMinNumRacksPerWriteQuorum(true);
+ clientConf.setMinNumRacksPerWriteQuorum(3);
+ repp.uninitalize();
+ repp = new RackawareEnsemblePlacementPolicy();
+ repp.initialize(clientConf, Optional.<DNSToSwitchMapping>empty(),
timer,
+ DISABLE_ALL, NullStatsLogger.INSTANCE,
BookieSocketAddress.LEGACY_BOOKIEID_RESOLVER);
+ repp.withDefaultRack(NetworkTopology.DEFAULT_REGION_AND_RACK);
+
+ BookieSocketAddress addr1 = new BookieSocketAddress("127.0.0.1", 3181);
+ BookieSocketAddress addr2 = new BookieSocketAddress("127.0.0.2", 3181);
+ BookieSocketAddress addr3 = new BookieSocketAddress("127.0.0.3", 3181);
+ BookieSocketAddress addr4 = new BookieSocketAddress("127.0.0.4", 3181);
+ BookieSocketAddress addr5 = new BookieSocketAddress("127.0.0.5", 3181);
+ BookieSocketAddress addr6 = new BookieSocketAddress("127.0.0.6", 3181);
+ BookieSocketAddress addr7 = new BookieSocketAddress("127.0.0.7", 3181);
+ BookieSocketAddress addr8 = new BookieSocketAddress("127.0.0.8", 3181);
+ BookieSocketAddress addr9 = new BookieSocketAddress("127.0.0.9", 3181);
+ BookieSocketAddress addr10 = new BookieSocketAddress("127.0.0.10",
3181);
+ // update dns mapping
+ StaticDNSResolver.addNodeToRack(addr1.getHostName(),
"/default-region/r1");
+ StaticDNSResolver.addNodeToRack(addr2.getHostName(),
"/default-region/r1");
+ StaticDNSResolver.addNodeToRack(addr3.getHostName(),
"/default-region/r1");
+ StaticDNSResolver.addNodeToRack(addr4.getHostName(),
"/default-region/r1");
+ StaticDNSResolver.addNodeToRack(addr5.getHostName(),
"/default-region/r1");
+ StaticDNSResolver.addNodeToRack(addr6.getHostName(),
"/default-region/r1");
+ StaticDNSResolver.addNodeToRack(addr7.getHostName(),
"/default-region/r1");
+ StaticDNSResolver.addNodeToRack(addr8.getHostName(),
"/default-region/r1");
+ StaticDNSResolver.addNodeToRack(addr9.getHostName(),
"/default-region/r1");
+ StaticDNSResolver.addNodeToRack(addr10.getHostName(),
"/default-region/r2");
+ // Update cluster
+ Set<BookieId> addrs = new HashSet<BookieId>();
+ addrs.add(addr1.toBookieId());
+ addrs.add(addr2.toBookieId());
+ addrs.add(addr3.toBookieId());
+ addrs.add(addr4.toBookieId());
+ addrs.add(addr5.toBookieId());
+ addrs.add(addr6.toBookieId());
+ addrs.add(addr7.toBookieId());
+ addrs.add(addr8.toBookieId());
+ addrs.add(addr9.toBookieId());
+ addrs.add(addr10.toBookieId());
+ repp.onClusterChanged(addrs, new HashSet<BookieId>());
+
+ try {
+ int ensembleSize = 10;
+ int writeQuorumSize = 10;
+ int ackQuorumSize = 2;
+
+ Set<BookieId> excludeBookies = new HashSet<>();
+ EnsemblePlacementPolicy.PlacementResult<List<BookieId>>
ensembleResponse =
+ repp.newEnsemble(ensembleSize, writeQuorumSize,
+ ackQuorumSize, null, excludeBookies);
+ fail("Can not new ensemble selection succeed");
+ } catch (Exception e) {
+ assertTrue(e instanceof BKNotEnoughBookiesException);
+ }
+ }
+
@Test
public void testNewEnsembleWithPickDifferentRack() throws Exception {
BookieSocketAddress addr1 = new BookieSocketAddress("127.0.0.1", 3181);
diff --git
a/bookkeeper-server/src/test/java/org/apache/bookkeeper/client/TestRegionAwareEnsemblePlacementPolicy.java
b/bookkeeper-server/src/test/java/org/apache/bookkeeper/client/TestRegionAwareEnsemblePlacementPolicy.java
index 52b9a8a63b..ba9c4f862f 100644
---
a/bookkeeper-server/src/test/java/org/apache/bookkeeper/client/TestRegionAwareEnsemblePlacementPolicy.java
+++
b/bookkeeper-server/src/test/java/org/apache/bookkeeper/client/TestRegionAwareEnsemblePlacementPolicy.java
@@ -705,6 +705,67 @@ public class TestRegionAwareEnsemblePlacementPolicy
extends TestCase {
}
}
+ @Test
+ public void testNewEnsembleWithMultipleRacksWithCommonRack() throws
Exception {
+ ClientConfiguration clientConf = new ClientConfiguration(conf);
+ clientConf.setEnforceMinNumRacksPerWriteQuorum(true);
+ clientConf.setMinNumRacksPerWriteQuorum(3);
+ repp.uninitalize();
+ repp = new RegionAwareEnsemblePlacementPolicy();
+ repp.initialize(clientConf, Optional.<DNSToSwitchMapping>empty(),
timer,
+ DISABLE_ALL, NullStatsLogger.INSTANCE,
BookieSocketAddress.LEGACY_BOOKIEID_RESOLVER);
+
+ BookieSocketAddress addr1 = new BookieSocketAddress("127.0.0.1", 3181);
+ BookieSocketAddress addr2 = new BookieSocketAddress("127.0.0.2", 3181);
+ BookieSocketAddress addr3 = new BookieSocketAddress("127.0.0.3", 3181);
+ BookieSocketAddress addr4 = new BookieSocketAddress("127.0.0.4", 3181);
+ BookieSocketAddress addr5 = new BookieSocketAddress("127.0.0.5", 3181);
+ BookieSocketAddress addr6 = new BookieSocketAddress("127.0.0.6", 3181);
+ BookieSocketAddress addr7 = new BookieSocketAddress("127.0.0.7", 3181);
+ BookieSocketAddress addr8 = new BookieSocketAddress("127.0.0.8", 3181);
+ BookieSocketAddress addr9 = new BookieSocketAddress("127.0.0.9", 3181);
+ BookieSocketAddress addr10 = new BookieSocketAddress("127.0.0.10",
3181);
+ // update dns mapping
+ StaticDNSResolver.addNodeToRack(addr1.getHostName(), "/region1/r1");
+ StaticDNSResolver.addNodeToRack(addr2.getHostName(), "/region1/r1");
+ StaticDNSResolver.addNodeToRack(addr3.getHostName(), "/region1/r1");
+ StaticDNSResolver.addNodeToRack(addr4.getHostName(), "/region1/r1");
+ StaticDNSResolver.addNodeToRack(addr5.getHostName(), "/region1/r1");
+ StaticDNSResolver.addNodeToRack(addr6.getHostName(), "/region1/r1");
+ StaticDNSResolver.addNodeToRack(addr7.getHostName(), "/region1/r2");
+ StaticDNSResolver.addNodeToRack(addr8.getHostName(), "/region1/r3");
+ StaticDNSResolver.addNodeToRack(addr9.getHostName(), "/region2/r1");
+ StaticDNSResolver.addNodeToRack(addr10.getHostName(), "/region3/r1");
+ // Update cluster
+ Set<BookieId> addrs = new HashSet<BookieId>();
+ addrs.add(addr1.toBookieId());
+ addrs.add(addr2.toBookieId());
+ addrs.add(addr3.toBookieId());
+ addrs.add(addr4.toBookieId());
+ addrs.add(addr5.toBookieId());
+ addrs.add(addr6.toBookieId());
+ addrs.add(addr7.toBookieId());
+ addrs.add(addr8.toBookieId());
+ addrs.add(addr9.toBookieId());
+ addrs.add(addr10.toBookieId());
+ repp.onClusterChanged(addrs, new HashSet<BookieId>());
+
+ try {
+ int ensembleSize = 10;
+ int writeQuorumSize = 10;
+ int ackQuorumSize = 2;
+
+ for (int i = 0; i < 50; ++i) {
+ Set<BookieId> excludeBookies = new HashSet<>();
+ EnsemblePlacementPolicy.PlacementResult<List<BookieId>>
ensembleResponse =
+ repp.newEnsemble(ensembleSize, writeQuorumSize,
+ ackQuorumSize, null, excludeBookies);
+ }
+ } catch (Exception e) {
+ fail("RegionAwareEnsemblePlacementPolicy should newEnsemble
succeed.");
+ }
+ }
+
@Test
public void testNewEnsembleWithThreeRegions() throws Exception {
repp.uninitalize();