This is an automated email from the ASF dual-hosted git repository. chenhang pushed a commit to branch branch-4.14 in repository https://gitbox.apache.org/repos/asf/bookkeeper.git
commit 069b51f2982f71f1b689322c3ca7bd4ad1ebff2d Author: Yan Zhao <[email protected]> AuthorDate: Fri Feb 17 10:12:49 2023 +0800 New ensemble choose different rack first. (#3721) Descriptions of the changes in this PR: Fixes #3720 ### Motivation When new ensemble, choose the different rack node first, if there are no more different rack nodes, step down to pick nodes randomly. (cherry picked from commit 7f8c31bdad3bf7261e795feafd7f969f871baa82) --- .../RackawareEnsemblePlacementPolicyImpl.java | 24 ++++++--- .../TestRackawareEnsemblePlacementPolicy.java | 48 +++++++++++++++++- .../TestRegionAwareEnsemblePlacementPolicy.java | 58 ++++++++++++++++++++++ 3 files changed, 123 insertions(+), 7 deletions(-) diff --git a/bookkeeper-server/src/main/java/org/apache/bookkeeper/client/RackawareEnsemblePlacementPolicyImpl.java b/bookkeeper-server/src/main/java/org/apache/bookkeeper/client/RackawareEnsemblePlacementPolicyImpl.java index 232e609138..b4e6fc23e1 100644 --- a/bookkeeper-server/src/main/java/org/apache/bookkeeper/client/RackawareEnsemblePlacementPolicyImpl.java +++ b/bookkeeper-server/src/main/java/org/apache/bookkeeper/client/RackawareEnsemblePlacementPolicyImpl.java @@ -414,9 +414,9 @@ public class RackawareEnsemblePlacementPolicyImpl extends TopologyAwareEnsembleP } return PlacementResult.of(addrs, PlacementPolicyAdherence.FAIL); } - + //Choose different rack nodes. + String curRack = null; for (int i = 0; i < ensembleSize; i++) { - String curRack; if (null == prevNode) { if ((null == localNode) || defaultRack.equals(localNode.getNetworkLocation())) { curRack = NodeBase.ROOT; @@ -424,11 +424,23 @@ public class RackawareEnsemblePlacementPolicyImpl extends TopologyAwareEnsembleP curRack = localNode.getNetworkLocation(); } } else { - curRack = "~" + prevNode.getNetworkLocation(); + if (!curRack.startsWith("~")) { + curRack = "~" + prevNode.getNetworkLocation(); + } else { + curRack = curRack + NetworkTopologyImpl.NODE_SEPARATOR + prevNode.getNetworkLocation(); + } + } + try { + prevNode = selectRandomFromRack(curRack, excludeNodes, ensemble, ensemble); + } catch (BKNotEnoughBookiesException e) { + if (!curRack.equals(NodeBase.ROOT)) { + curRack = NodeBase.ROOT; + prevNode = selectFromNetworkLocation(curRack, excludeNodes, ensemble, ensemble, + !enforceMinNumRacksPerWriteQuorum || prevNode == null); + } else { + throw e; + } } - boolean firstBookieInTheEnsemble = (null == prevNode); - prevNode = selectFromNetworkLocation(curRack, excludeNodes, ensemble, ensemble, - !enforceMinNumRacksPerWriteQuorum || firstBookieInTheEnsemble); } List<BookieId> bookieList = ensemble.toList(); if (ensembleSize != bookieList.size()) { diff --git a/bookkeeper-server/src/test/java/org/apache/bookkeeper/client/TestRackawareEnsemblePlacementPolicy.java b/bookkeeper-server/src/test/java/org/apache/bookkeeper/client/TestRackawareEnsemblePlacementPolicy.java index 568fdfc2f2..9d970c79bc 100644 --- a/bookkeeper-server/src/test/java/org/apache/bookkeeper/client/TestRackawareEnsemblePlacementPolicy.java +++ b/bookkeeper-server/src/test/java/org/apache/bookkeeper/client/TestRackawareEnsemblePlacementPolicy.java @@ -1409,7 +1409,6 @@ public class TestRackawareEnsemblePlacementPolicy extends TestCase { public boolean validate() { return false; } - }, false); fail("Should get not enough bookies exception since ensemble rejects all the nodes"); } catch (BKNotEnoughBookiesException bnebe) { @@ -1463,6 +1462,53 @@ public class TestRackawareEnsemblePlacementPolicy extends TestCase { } } + @Test + public void testNewEnsembleWithPickDifferentRack() throws Exception { + BookieSocketAddress addr1 = new BookieSocketAddress("127.0.0.1", 3181); + BookieSocketAddress addr2 = new BookieSocketAddress("127.0.0.2", 3181); + BookieSocketAddress addr3 = new BookieSocketAddress("127.0.0.3", 3181); + BookieSocketAddress addr4 = new BookieSocketAddress("127.0.0.4", 3181); + // update dns mapping + StaticDNSResolver.addNodeToRack(addr1.getHostName(), "/default-region/r1"); + StaticDNSResolver.addNodeToRack(addr2.getHostName(), "/default-region/r1"); + StaticDNSResolver.addNodeToRack(addr3.getHostName(), "/default-region/r2"); + StaticDNSResolver.addNodeToRack(addr4.getHostName(), "/default-region/r3"); + // Update cluster + Set<BookieId> addrs = new HashSet<BookieId>(); + addrs.add(addr1.toBookieId()); + addrs.add(addr2.toBookieId()); + addrs.add(addr3.toBookieId()); + addrs.add(addr4.toBookieId()); + repp.onClusterChanged(addrs, new HashSet<BookieId>()); + + int ensembleSize = 3; + int writeQuorumSize = 3; + int ackQuorumSize = 2; + + Set<BookieId> excludeBookies = new HashSet<>(); + + for (int i = 0; i < 50; ++i) { + EnsemblePlacementPolicy.PlacementResult<List<BookieId>> ensembleResponse = + repp.newEnsemble(ensembleSize, writeQuorumSize, + ackQuorumSize, null, excludeBookies); + List<BookieId> ensemble = ensembleResponse.getResult(); + if (ensemble.contains(addr1.toBookieId()) && ensemble.contains(addr2.toBookieId())) { + fail("addr1 and addr2 is same rack."); + } + } + + //addr4 shutdown. + addrs.remove(addr4.toBookieId()); + repp.onClusterChanged(addrs, new HashSet<BookieId>()); + for (int i = 0; i < 50; ++i) { + EnsemblePlacementPolicy.PlacementResult<List<BookieId>> ensembleResponse = + repp.newEnsemble(ensembleSize, writeQuorumSize, + ackQuorumSize, null, excludeBookies); + List<BookieId> ensemble = ensembleResponse.getResult(); + assertTrue(ensemble.contains(addr1.toBookieId()) && ensemble.contains(addr2.toBookieId())); + } + } + @Test public void testMinNumRacksPerWriteQuorumOfRacks() throws Exception { int numOfRacksToCreate = 6; diff --git a/bookkeeper-server/src/test/java/org/apache/bookkeeper/client/TestRegionAwareEnsemblePlacementPolicy.java b/bookkeeper-server/src/test/java/org/apache/bookkeeper/client/TestRegionAwareEnsemblePlacementPolicy.java index 7fe5b83db4..52b9a8a63b 100644 --- a/bookkeeper-server/src/test/java/org/apache/bookkeeper/client/TestRegionAwareEnsemblePlacementPolicy.java +++ b/bookkeeper-server/src/test/java/org/apache/bookkeeper/client/TestRegionAwareEnsemblePlacementPolicy.java @@ -605,6 +605,64 @@ public class TestRegionAwareEnsemblePlacementPolicy extends TestCase { } } + @Test + public void testNewEnsembleWithPickDifferentRack() throws Exception { + ClientConfiguration clientConf = new ClientConfiguration(conf); + clientConf.setMinNumRacksPerWriteQuorum(2); + clientConf.setEnforceMinNumFaultDomainsForWrite(false); + repp.uninitalize(); + repp = new RegionAwareEnsemblePlacementPolicy(); + repp.initialize(clientConf, Optional.<DNSToSwitchMapping>empty(), timer, DISABLE_ALL, + NullStatsLogger.INSTANCE, BookieSocketAddress.LEGACY_BOOKIEID_RESOLVER); + + BookieSocketAddress addr1 = new BookieSocketAddress("127.0.0.1", 3181); + BookieSocketAddress addr2 = new BookieSocketAddress("127.0.0.2", 3181); + BookieSocketAddress addr3 = new BookieSocketAddress("127.0.0.3", 3181); + BookieSocketAddress addr4 = new BookieSocketAddress("127.0.0.4", 3181); + BookieSocketAddress addr5 = new BookieSocketAddress("127.0.0.5", 3181); + // update dns mapping + StaticDNSResolver.addNodeToRack(addr1.getHostName(), "/region-1/r1"); + StaticDNSResolver.addNodeToRack(addr2.getHostName(), "/region-1/r1"); + StaticDNSResolver.addNodeToRack(addr3.getHostName(), "/region-1/r2"); + StaticDNSResolver.addNodeToRack(addr4.getHostName(), "/region-1/r3"); + StaticDNSResolver.addNodeToRack(addr5.getHostName(), "/region-2/r1"); + // Update cluster + Set<BookieId> addrs = new HashSet<BookieId>(); + addrs.add(addr1.toBookieId()); + addrs.add(addr2.toBookieId()); + addrs.add(addr3.toBookieId()); + addrs.add(addr4.toBookieId()); + addrs.add(addr5.toBookieId()); + repp.onClusterChanged(addrs, new HashSet<BookieId>()); + + int ensembleSize = 4; + int writeQuorumSize = 4; + int ackQuorumSize = 2; + + Set<BookieId> excludeBookies = new HashSet<>(); + + for (int i = 0; i < 50; ++i) { + EnsemblePlacementPolicy.PlacementResult<List<BookieId>> ensembleResponse = + repp.newEnsemble(ensembleSize, writeQuorumSize, + ackQuorumSize, null, excludeBookies); + List<BookieId> ensemble = ensembleResponse.getResult(); + if (ensemble.contains(addr1.toBookieId()) && ensemble.contains(addr2.toBookieId())) { + fail("addr1 and addr2 is same rack."); + } + } + + //addr4 shutdown. + addrs.remove(addr4.toBookieId()); + repp.onClusterChanged(addrs, new HashSet<BookieId>()); + for (int i = 0; i < 50; ++i) { + EnsemblePlacementPolicy.PlacementResult<List<BookieId>> ensembleResponse = + repp.newEnsemble(ensembleSize, writeQuorumSize, + ackQuorumSize, null, excludeBookies); + List<BookieId> ensemble = ensembleResponse.getResult(); + assertTrue(ensemble.contains(addr1.toBookieId()) && ensemble.contains(addr2.toBookieId())); + } + } + @Test public void testNewEnsembleWithEnoughRegions() throws Exception { BookieSocketAddress addr1 = new BookieSocketAddress("127.0.0.2", 3181);
