This is an automated email from the ASF dual-hosted git repository.
chenhang pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/bookkeeper.git
The following commit(s) were added to refs/heads/master by this push:
new 7f8c31bdad New ensemble choose different rack first. (#3721)
7f8c31bdad is described below
commit 7f8c31bdad3bf7261e795feafd7f969f871baa82
Author: Yan Zhao <[email protected]>
AuthorDate: Fri Feb 17 10:12:49 2023 +0800
New ensemble choose different rack first. (#3721)
Descriptions of the changes in this PR:
Fixes #3720
### Motivation
When new ensemble, choose the different rack node first, if there are no
more different rack nodes, step down to pick nodes randomly.
---
.../RackawareEnsemblePlacementPolicyImpl.java | 24 ++++++---
.../TestRackawareEnsemblePlacementPolicy.java | 48 +++++++++++++++++-
.../TestRegionAwareEnsemblePlacementPolicy.java | 58 ++++++++++++++++++++++
3 files changed, 123 insertions(+), 7 deletions(-)
diff --git
a/bookkeeper-server/src/main/java/org/apache/bookkeeper/client/RackawareEnsemblePlacementPolicyImpl.java
b/bookkeeper-server/src/main/java/org/apache/bookkeeper/client/RackawareEnsemblePlacementPolicyImpl.java
index 2db47e17c4..9f3b6fba36 100644
---
a/bookkeeper-server/src/main/java/org/apache/bookkeeper/client/RackawareEnsemblePlacementPolicyImpl.java
+++
b/bookkeeper-server/src/main/java/org/apache/bookkeeper/client/RackawareEnsemblePlacementPolicyImpl.java
@@ -414,9 +414,9 @@ public class RackawareEnsemblePlacementPolicyImpl extends
TopologyAwareEnsembleP
}
return PlacementResult.of(addrs,
PlacementPolicyAdherence.FAIL);
}
-
+ //Choose different rack nodes.
+ String curRack = null;
for (int i = 0; i < ensembleSize; i++) {
- String curRack;
if (null == prevNode) {
if ((null == localNode) ||
defaultRack.equals(localNode.getNetworkLocation())) {
curRack = NodeBase.ROOT;
@@ -424,11 +424,23 @@ public class RackawareEnsemblePlacementPolicyImpl extends
TopologyAwareEnsembleP
curRack = localNode.getNetworkLocation();
}
} else {
- curRack = "~" + prevNode.getNetworkLocation();
+ if (!curRack.startsWith("~")) {
+ curRack = "~" + prevNode.getNetworkLocation();
+ } else {
+ curRack = curRack + NetworkTopologyImpl.NODE_SEPARATOR
+ prevNode.getNetworkLocation();
+ }
+ }
+ try {
+ prevNode = selectRandomFromRack(curRack, excludeNodes,
ensemble, ensemble);
+ } catch (BKNotEnoughBookiesException e) {
+ if (!curRack.equals(NodeBase.ROOT)) {
+ curRack = NodeBase.ROOT;
+ prevNode = selectFromNetworkLocation(curRack,
excludeNodes, ensemble, ensemble,
+ !enforceMinNumRacksPerWriteQuorum || prevNode
== null);
+ } else {
+ throw e;
+ }
}
- boolean firstBookieInTheEnsemble = (null == prevNode);
- prevNode = selectFromNetworkLocation(curRack, excludeNodes,
ensemble, ensemble,
- !enforceMinNumRacksPerWriteQuorum ||
firstBookieInTheEnsemble);
}
List<BookieId> bookieList = ensemble.toList();
if (ensembleSize != bookieList.size()) {
diff --git
a/bookkeeper-server/src/test/java/org/apache/bookkeeper/client/TestRackawareEnsemblePlacementPolicy.java
b/bookkeeper-server/src/test/java/org/apache/bookkeeper/client/TestRackawareEnsemblePlacementPolicy.java
index 396e86d92b..7d843fd16d 100644
---
a/bookkeeper-server/src/test/java/org/apache/bookkeeper/client/TestRackawareEnsemblePlacementPolicy.java
+++
b/bookkeeper-server/src/test/java/org/apache/bookkeeper/client/TestRackawareEnsemblePlacementPolicy.java
@@ -1465,7 +1465,6 @@ public class TestRackawareEnsemblePlacementPolicy extends
TestCase {
public boolean validate() {
return false;
}
-
}, false);
fail("Should get not enough bookies exception since ensemble
rejects all the nodes");
} catch (BKNotEnoughBookiesException bnebe) {
@@ -1519,6 +1518,53 @@ public class TestRackawareEnsemblePlacementPolicy
extends TestCase {
}
}
+ @Test
+ public void testNewEnsembleWithPickDifferentRack() throws Exception {
+ BookieSocketAddress addr1 = new BookieSocketAddress("127.0.0.1", 3181);
+ BookieSocketAddress addr2 = new BookieSocketAddress("127.0.0.2", 3181);
+ BookieSocketAddress addr3 = new BookieSocketAddress("127.0.0.3", 3181);
+ BookieSocketAddress addr4 = new BookieSocketAddress("127.0.0.4", 3181);
+ // update dns mapping
+ StaticDNSResolver.addNodeToRack(addr1.getHostName(),
"/default-region/r1");
+ StaticDNSResolver.addNodeToRack(addr2.getHostName(),
"/default-region/r1");
+ StaticDNSResolver.addNodeToRack(addr3.getHostName(),
"/default-region/r2");
+ StaticDNSResolver.addNodeToRack(addr4.getHostName(),
"/default-region/r3");
+ // Update cluster
+ Set<BookieId> addrs = new HashSet<BookieId>();
+ addrs.add(addr1.toBookieId());
+ addrs.add(addr2.toBookieId());
+ addrs.add(addr3.toBookieId());
+ addrs.add(addr4.toBookieId());
+ repp.onClusterChanged(addrs, new HashSet<BookieId>());
+
+ int ensembleSize = 3;
+ int writeQuorumSize = 3;
+ int ackQuorumSize = 2;
+
+ Set<BookieId> excludeBookies = new HashSet<>();
+
+ for (int i = 0; i < 50; ++i) {
+ EnsemblePlacementPolicy.PlacementResult<List<BookieId>>
ensembleResponse =
+ repp.newEnsemble(ensembleSize, writeQuorumSize,
+ ackQuorumSize, null, excludeBookies);
+ List<BookieId> ensemble = ensembleResponse.getResult();
+ if (ensemble.contains(addr1.toBookieId()) &&
ensemble.contains(addr2.toBookieId())) {
+ fail("addr1 and addr2 is same rack.");
+ }
+ }
+
+ //addr4 shutdown.
+ addrs.remove(addr4.toBookieId());
+ repp.onClusterChanged(addrs, new HashSet<BookieId>());
+ for (int i = 0; i < 50; ++i) {
+ EnsemblePlacementPolicy.PlacementResult<List<BookieId>>
ensembleResponse =
+ repp.newEnsemble(ensembleSize, writeQuorumSize,
+ ackQuorumSize, null, excludeBookies);
+ List<BookieId> ensemble = ensembleResponse.getResult();
+ assertTrue(ensemble.contains(addr1.toBookieId()) &&
ensemble.contains(addr2.toBookieId()));
+ }
+ }
+
@Test
public void testMinNumRacksPerWriteQuorumOfRacks() throws Exception {
int numOfRacksToCreate = 6;
diff --git
a/bookkeeper-server/src/test/java/org/apache/bookkeeper/client/TestRegionAwareEnsemblePlacementPolicy.java
b/bookkeeper-server/src/test/java/org/apache/bookkeeper/client/TestRegionAwareEnsemblePlacementPolicy.java
index 7fe5b83db4..52b9a8a63b 100644
---
a/bookkeeper-server/src/test/java/org/apache/bookkeeper/client/TestRegionAwareEnsemblePlacementPolicy.java
+++
b/bookkeeper-server/src/test/java/org/apache/bookkeeper/client/TestRegionAwareEnsemblePlacementPolicy.java
@@ -605,6 +605,64 @@ public class TestRegionAwareEnsemblePlacementPolicy
extends TestCase {
}
}
+ @Test
+ public void testNewEnsembleWithPickDifferentRack() throws Exception {
+ ClientConfiguration clientConf = new ClientConfiguration(conf);
+ clientConf.setMinNumRacksPerWriteQuorum(2);
+ clientConf.setEnforceMinNumFaultDomainsForWrite(false);
+ repp.uninitalize();
+ repp = new RegionAwareEnsemblePlacementPolicy();
+ repp.initialize(clientConf, Optional.<DNSToSwitchMapping>empty(),
timer, DISABLE_ALL,
+ NullStatsLogger.INSTANCE,
BookieSocketAddress.LEGACY_BOOKIEID_RESOLVER);
+
+ BookieSocketAddress addr1 = new BookieSocketAddress("127.0.0.1", 3181);
+ BookieSocketAddress addr2 = new BookieSocketAddress("127.0.0.2", 3181);
+ BookieSocketAddress addr3 = new BookieSocketAddress("127.0.0.3", 3181);
+ BookieSocketAddress addr4 = new BookieSocketAddress("127.0.0.4", 3181);
+ BookieSocketAddress addr5 = new BookieSocketAddress("127.0.0.5", 3181);
+ // update dns mapping
+ StaticDNSResolver.addNodeToRack(addr1.getHostName(), "/region-1/r1");
+ StaticDNSResolver.addNodeToRack(addr2.getHostName(), "/region-1/r1");
+ StaticDNSResolver.addNodeToRack(addr3.getHostName(), "/region-1/r2");
+ StaticDNSResolver.addNodeToRack(addr4.getHostName(), "/region-1/r3");
+ StaticDNSResolver.addNodeToRack(addr5.getHostName(), "/region-2/r1");
+ // Update cluster
+ Set<BookieId> addrs = new HashSet<BookieId>();
+ addrs.add(addr1.toBookieId());
+ addrs.add(addr2.toBookieId());
+ addrs.add(addr3.toBookieId());
+ addrs.add(addr4.toBookieId());
+ addrs.add(addr5.toBookieId());
+ repp.onClusterChanged(addrs, new HashSet<BookieId>());
+
+ int ensembleSize = 4;
+ int writeQuorumSize = 4;
+ int ackQuorumSize = 2;
+
+ Set<BookieId> excludeBookies = new HashSet<>();
+
+ for (int i = 0; i < 50; ++i) {
+ EnsemblePlacementPolicy.PlacementResult<List<BookieId>>
ensembleResponse =
+ repp.newEnsemble(ensembleSize, writeQuorumSize,
+ ackQuorumSize, null, excludeBookies);
+ List<BookieId> ensemble = ensembleResponse.getResult();
+ if (ensemble.contains(addr1.toBookieId()) &&
ensemble.contains(addr2.toBookieId())) {
+ fail("addr1 and addr2 is same rack.");
+ }
+ }
+
+ //addr4 shutdown.
+ addrs.remove(addr4.toBookieId());
+ repp.onClusterChanged(addrs, new HashSet<BookieId>());
+ for (int i = 0; i < 50; ++i) {
+ EnsemblePlacementPolicy.PlacementResult<List<BookieId>>
ensembleResponse =
+ repp.newEnsemble(ensembleSize, writeQuorumSize,
+ ackQuorumSize, null, excludeBookies);
+ List<BookieId> ensemble = ensembleResponse.getResult();
+ assertTrue(ensemble.contains(addr1.toBookieId()) &&
ensemble.contains(addr2.toBookieId()));
+ }
+ }
+
@Test
public void testNewEnsembleWithEnoughRegions() throws Exception {
BookieSocketAddress addr1 = new BookieSocketAddress("127.0.0.2", 3181);