This is an automated email from the ASF dual-hosted git repository.

chenhang pushed a commit to branch branch-4.14
in repository https://gitbox.apache.org/repos/asf/bookkeeper.git

commit 069b51f2982f71f1b689322c3ca7bd4ad1ebff2d
Author: Yan Zhao <[email protected]>
AuthorDate: Fri Feb 17 10:12:49 2023 +0800

    New ensemble choose different rack first. (#3721)
    
    Descriptions of the changes in this PR:
    Fixes #3720
    
    ### Motivation
    When new ensemble, choose the different rack node first, if there are no 
more different rack nodes, step down to pick nodes randomly.
    
    (cherry picked from commit 7f8c31bdad3bf7261e795feafd7f969f871baa82)
---
 .../RackawareEnsemblePlacementPolicyImpl.java      | 24 ++++++---
 .../TestRackawareEnsemblePlacementPolicy.java      | 48 +++++++++++++++++-
 .../TestRegionAwareEnsemblePlacementPolicy.java    | 58 ++++++++++++++++++++++
 3 files changed, 123 insertions(+), 7 deletions(-)

diff --git 
a/bookkeeper-server/src/main/java/org/apache/bookkeeper/client/RackawareEnsemblePlacementPolicyImpl.java
 
b/bookkeeper-server/src/main/java/org/apache/bookkeeper/client/RackawareEnsemblePlacementPolicyImpl.java
index 232e609138..b4e6fc23e1 100644
--- 
a/bookkeeper-server/src/main/java/org/apache/bookkeeper/client/RackawareEnsemblePlacementPolicyImpl.java
+++ 
b/bookkeeper-server/src/main/java/org/apache/bookkeeper/client/RackawareEnsemblePlacementPolicyImpl.java
@@ -414,9 +414,9 @@ public class RackawareEnsemblePlacementPolicyImpl extends 
TopologyAwareEnsembleP
                 }
                 return PlacementResult.of(addrs, 
PlacementPolicyAdherence.FAIL);
             }
-
+            //Choose different rack nodes.
+            String curRack = null;
             for (int i = 0; i < ensembleSize; i++) {
-                String curRack;
                 if (null == prevNode) {
                     if ((null == localNode) || 
defaultRack.equals(localNode.getNetworkLocation())) {
                         curRack = NodeBase.ROOT;
@@ -424,11 +424,23 @@ public class RackawareEnsemblePlacementPolicyImpl extends 
TopologyAwareEnsembleP
                         curRack = localNode.getNetworkLocation();
                     }
                 } else {
-                    curRack = "~" + prevNode.getNetworkLocation();
+                    if (!curRack.startsWith("~")) {
+                        curRack = "~" + prevNode.getNetworkLocation();
+                    } else {
+                        curRack = curRack + NetworkTopologyImpl.NODE_SEPARATOR 
+ prevNode.getNetworkLocation();
+                    }
+                }
+                try {
+                    prevNode = selectRandomFromRack(curRack, excludeNodes, 
ensemble, ensemble);
+                } catch (BKNotEnoughBookiesException e) {
+                    if (!curRack.equals(NodeBase.ROOT)) {
+                        curRack = NodeBase.ROOT;
+                        prevNode = selectFromNetworkLocation(curRack, 
excludeNodes, ensemble, ensemble,
+                                !enforceMinNumRacksPerWriteQuorum || prevNode 
== null);
+                    } else {
+                        throw e;
+                    }
                 }
-                boolean firstBookieInTheEnsemble = (null == prevNode);
-                prevNode = selectFromNetworkLocation(curRack, excludeNodes, 
ensemble, ensemble,
-                        !enforceMinNumRacksPerWriteQuorum || 
firstBookieInTheEnsemble);
             }
             List<BookieId> bookieList = ensemble.toList();
             if (ensembleSize != bookieList.size()) {
diff --git 
a/bookkeeper-server/src/test/java/org/apache/bookkeeper/client/TestRackawareEnsemblePlacementPolicy.java
 
b/bookkeeper-server/src/test/java/org/apache/bookkeeper/client/TestRackawareEnsemblePlacementPolicy.java
index 568fdfc2f2..9d970c79bc 100644
--- 
a/bookkeeper-server/src/test/java/org/apache/bookkeeper/client/TestRackawareEnsemblePlacementPolicy.java
+++ 
b/bookkeeper-server/src/test/java/org/apache/bookkeeper/client/TestRackawareEnsemblePlacementPolicy.java
@@ -1409,7 +1409,6 @@ public class TestRackawareEnsemblePlacementPolicy extends 
TestCase {
                         public boolean validate() {
                             return false;
                         }
-
                     }, false);
             fail("Should get not enough bookies exception since ensemble 
rejects all the nodes");
         } catch (BKNotEnoughBookiesException bnebe) {
@@ -1463,6 +1462,53 @@ public class TestRackawareEnsemblePlacementPolicy 
extends TestCase {
         }
     }
 
+    @Test
+    public void testNewEnsembleWithPickDifferentRack() throws Exception {
+        BookieSocketAddress addr1 = new BookieSocketAddress("127.0.0.1", 3181);
+        BookieSocketAddress addr2 = new BookieSocketAddress("127.0.0.2", 3181);
+        BookieSocketAddress addr3 = new BookieSocketAddress("127.0.0.3", 3181);
+        BookieSocketAddress addr4 = new BookieSocketAddress("127.0.0.4", 3181);
+        // update dns mapping
+        StaticDNSResolver.addNodeToRack(addr1.getHostName(), 
"/default-region/r1");
+        StaticDNSResolver.addNodeToRack(addr2.getHostName(), 
"/default-region/r1");
+        StaticDNSResolver.addNodeToRack(addr3.getHostName(), 
"/default-region/r2");
+        StaticDNSResolver.addNodeToRack(addr4.getHostName(), 
"/default-region/r3");
+        // Update cluster
+        Set<BookieId> addrs = new HashSet<BookieId>();
+        addrs.add(addr1.toBookieId());
+        addrs.add(addr2.toBookieId());
+        addrs.add(addr3.toBookieId());
+        addrs.add(addr4.toBookieId());
+        repp.onClusterChanged(addrs, new HashSet<BookieId>());
+
+        int ensembleSize = 3;
+        int writeQuorumSize = 3;
+        int ackQuorumSize = 2;
+
+        Set<BookieId> excludeBookies = new HashSet<>();
+
+        for (int i = 0; i < 50; ++i) {
+            EnsemblePlacementPolicy.PlacementResult<List<BookieId>> 
ensembleResponse =
+                    repp.newEnsemble(ensembleSize, writeQuorumSize,
+                            ackQuorumSize, null, excludeBookies);
+            List<BookieId> ensemble = ensembleResponse.getResult();
+            if (ensemble.contains(addr1.toBookieId()) && 
ensemble.contains(addr2.toBookieId())) {
+                fail("addr1 and addr2 is same rack.");
+            }
+        }
+
+        //addr4 shutdown.
+        addrs.remove(addr4.toBookieId());
+        repp.onClusterChanged(addrs, new HashSet<BookieId>());
+        for (int i = 0; i < 50; ++i) {
+            EnsemblePlacementPolicy.PlacementResult<List<BookieId>> 
ensembleResponse =
+                    repp.newEnsemble(ensembleSize, writeQuorumSize,
+                            ackQuorumSize, null, excludeBookies);
+              List<BookieId> ensemble = ensembleResponse.getResult();
+            assertTrue(ensemble.contains(addr1.toBookieId()) && 
ensemble.contains(addr2.toBookieId()));
+        }
+    }
+
     @Test
     public void testMinNumRacksPerWriteQuorumOfRacks() throws Exception {
         int numOfRacksToCreate = 6;
diff --git 
a/bookkeeper-server/src/test/java/org/apache/bookkeeper/client/TestRegionAwareEnsemblePlacementPolicy.java
 
b/bookkeeper-server/src/test/java/org/apache/bookkeeper/client/TestRegionAwareEnsemblePlacementPolicy.java
index 7fe5b83db4..52b9a8a63b 100644
--- 
a/bookkeeper-server/src/test/java/org/apache/bookkeeper/client/TestRegionAwareEnsemblePlacementPolicy.java
+++ 
b/bookkeeper-server/src/test/java/org/apache/bookkeeper/client/TestRegionAwareEnsemblePlacementPolicy.java
@@ -605,6 +605,64 @@ public class TestRegionAwareEnsemblePlacementPolicy 
extends TestCase {
         }
     }
 
+    @Test
+    public void testNewEnsembleWithPickDifferentRack() throws Exception {
+        ClientConfiguration clientConf = new ClientConfiguration(conf);
+        clientConf.setMinNumRacksPerWriteQuorum(2);
+        clientConf.setEnforceMinNumFaultDomainsForWrite(false);
+        repp.uninitalize();
+        repp = new RegionAwareEnsemblePlacementPolicy();
+        repp.initialize(clientConf, Optional.<DNSToSwitchMapping>empty(), 
timer, DISABLE_ALL,
+                NullStatsLogger.INSTANCE, 
BookieSocketAddress.LEGACY_BOOKIEID_RESOLVER);
+
+        BookieSocketAddress addr1 = new BookieSocketAddress("127.0.0.1", 3181);
+        BookieSocketAddress addr2 = new BookieSocketAddress("127.0.0.2", 3181);
+        BookieSocketAddress addr3 = new BookieSocketAddress("127.0.0.3", 3181);
+        BookieSocketAddress addr4 = new BookieSocketAddress("127.0.0.4", 3181);
+        BookieSocketAddress addr5 = new BookieSocketAddress("127.0.0.5", 3181);
+        // update dns mapping
+        StaticDNSResolver.addNodeToRack(addr1.getHostName(), "/region-1/r1");
+        StaticDNSResolver.addNodeToRack(addr2.getHostName(), "/region-1/r1");
+        StaticDNSResolver.addNodeToRack(addr3.getHostName(), "/region-1/r2");
+        StaticDNSResolver.addNodeToRack(addr4.getHostName(), "/region-1/r3");
+        StaticDNSResolver.addNodeToRack(addr5.getHostName(), "/region-2/r1");
+        // Update cluster
+        Set<BookieId> addrs = new HashSet<BookieId>();
+        addrs.add(addr1.toBookieId());
+        addrs.add(addr2.toBookieId());
+        addrs.add(addr3.toBookieId());
+        addrs.add(addr4.toBookieId());
+        addrs.add(addr5.toBookieId());
+        repp.onClusterChanged(addrs, new HashSet<BookieId>());
+
+        int ensembleSize = 4;
+        int writeQuorumSize = 4;
+        int ackQuorumSize = 2;
+
+        Set<BookieId> excludeBookies = new HashSet<>();
+
+        for (int i = 0; i < 50; ++i) {
+            EnsemblePlacementPolicy.PlacementResult<List<BookieId>> 
ensembleResponse =
+                    repp.newEnsemble(ensembleSize, writeQuorumSize,
+                            ackQuorumSize, null, excludeBookies);
+            List<BookieId> ensemble = ensembleResponse.getResult();
+            if (ensemble.contains(addr1.toBookieId()) && 
ensemble.contains(addr2.toBookieId())) {
+                fail("addr1 and addr2 is same rack.");
+            }
+        }
+
+        //addr4 shutdown.
+        addrs.remove(addr4.toBookieId());
+        repp.onClusterChanged(addrs, new HashSet<BookieId>());
+        for (int i = 0; i < 50; ++i) {
+            EnsemblePlacementPolicy.PlacementResult<List<BookieId>> 
ensembleResponse =
+                     repp.newEnsemble(ensembleSize, writeQuorumSize,
+                            ackQuorumSize, null, excludeBookies);
+            List<BookieId> ensemble = ensembleResponse.getResult();
+            assertTrue(ensemble.contains(addr1.toBookieId()) && 
ensemble.contains(addr2.toBookieId()));
+        }
+    }
+
     @Test
     public void testNewEnsembleWithEnoughRegions() throws Exception {
         BookieSocketAddress addr1 = new BookieSocketAddress("127.0.0.2", 3181);

Reply via email to