equanz commented on code in PR #3359:
URL: https://github.com/apache/bookkeeper/pull/3359#discussion_r937302896


##########
bookkeeper-server/src/test/java/org/apache/bookkeeper/client/TestRackawareEnsemblePlacementPolicy.java:
##########
@@ -1465,6 +1465,176 @@ public boolean validate() {
         }
     }
 
+    @Test
+    public void testReplaceNotAdheringPlacementPolicyBookie() throws Exception 
{
+        repp.uninitalize();
+
+        int minNumRacksPerWriteQuorum = 3;
+        ClientConfiguration clientConf = new ClientConfiguration(conf);
+        clientConf.setMinNumRacksPerWriteQuorum(minNumRacksPerWriteQuorum);
+        // set enforceMinNumRacksPerWriteQuorum
+        clientConf.setEnforceMinNumRacksPerWriteQuorum(true);

Review Comment:
   Why don't you set `enforceMinNumRacksPerWriteQuorum` to `false`? In my 
understanding, if `true`, we can't create a `[r1, r1, r1]` ensemble.



##########
bookkeeper-server/src/test/java/org/apache/bookkeeper/client/TestRackawareEnsemblePlacementPolicy.java:
##########
@@ -1465,6 +1465,176 @@ public boolean validate() {
         }
     }
 
+    @Test
+    public void testReplaceNotAdheringPlacementPolicyBookie() throws Exception 
{
+        repp.uninitalize();
+
+        int minNumRacksPerWriteQuorum = 3;
+        ClientConfiguration clientConf = new ClientConfiguration(conf);
+        clientConf.setMinNumRacksPerWriteQuorum(minNumRacksPerWriteQuorum);
+        // set enforceMinNumRacksPerWriteQuorum
+        clientConf.setEnforceMinNumRacksPerWriteQuorum(true);
+        repp = new RackawareEnsemblePlacementPolicy();
+        repp.initialize(clientConf, Optional.<DNSToSwitchMapping> empty(), 
timer, DISABLE_ALL,
+                NullStatsLogger.INSTANCE, 
BookieSocketAddress.LEGACY_BOOKIEID_RESOLVER);
+        repp.withDefaultRack(NetworkTopology.DEFAULT_REGION_AND_RACK);
+
+        int numOfRacks = 3;
+        int numOfBookiesPerRack = 3;
+        String[] rackLocationNames = new String[numOfRacks];
+        List<BookieId> bookieSocketAddresses = new ArrayList<BookieId>();
+        Map<BookieId, String> bookieRackMap = new HashMap<BookieId, String>();
+        BookieId bookieAddress;
+
+        for (int i = 0; i < numOfRacks; i++) {
+            rackLocationNames[i] = "/default-region/r" + i;
+            for (int j = 0; j < numOfBookiesPerRack; j++) {
+                int index = i * numOfBookiesPerRack + j;
+                bookieAddress = new BookieSocketAddress("128.0.0." + index, 
3181).toBookieId();
+                StaticDNSResolver.addNodeToRack("128.0.0." + index, 
rackLocationNames[i]);
+                bookieSocketAddresses.add(bookieAddress);
+                bookieRackMap.put(bookieAddress, rackLocationNames[i]);
+            }
+        }
+
+        repp.onClusterChanged(new HashSet<BookieId>(bookieSocketAddresses), 
new HashSet<BookieId>());
+
+        int writeQuorum = 3;
+        int ackQuorum = 3;
+
+        //test three knows bookie
+        List<BookieId> knowsEnsemble = new ArrayList<>();
+        knowsEnsemble.add(BookieId.parse("128.0.0.0:3181"));
+        knowsEnsemble.add(BookieId.parse("128.0.0.1:3181"));
+        knowsEnsemble.add(BookieId.parse("128.0.0.2:3181"));
+
+        PlacementPolicyAdherence placementPolicyAdherence = 
repp.isEnsembleAdheringToPlacementPolicy(
+                knowsEnsemble, writeQuorum, ackQuorum);
+        assertEquals(placementPolicyAdherence, PlacementPolicyAdherence.FAIL);

Review Comment:
   [nits]
   Expected value and actual value are opposite. Other lines are too.
   
https://github.com/junit-team/junit4/blob/r4.12/src/main/java/junit/framework/TestCase.java#L247-L254
   



##########
bookkeeper-server/src/main/java/org/apache/bookkeeper/client/TopologyAwareEnsemblePlacementPolicy.java:
##########
@@ -788,6 +791,116 @@ public void updateBookieInfo(Map<BookieId, BookieInfo> 
bookieInfoMap) {
         }
     }
 
+    @Override
+    public Map<Integer, BookieId> 
replaceNotAdheringPlacementPolicyBookie(List<BookieId> ensemble, int 
writeQuorumSize,
+            int ackQuorumSize, Map<String, byte[]> customMetadata) {
+        rwLock.readLock().lock();
+        try {
+            if (CollectionUtils.isEmpty(ensemble)) {
+                return Collections.emptyMap();
+            }
+            PlacementPolicyAdherence ensembleAdheringToPlacementPolicy = 
isEnsembleAdheringToPlacementPolicy(ensemble,
+                    writeQuorumSize, ackQuorumSize);
+            if (PlacementPolicyAdherence.FAIL != 
ensembleAdheringToPlacementPolicy) {
+                return Collections.emptyMap();
+            }
+            Map<BookieId, Integer> bookieIndex = new HashMap<>();
+            for (int i = 0; i < ensemble.size(); i++) {
+                bookieIndex.put(ensemble.get(i), i);
+            }
+            Map<BookieId, BookieNode> clone = new HashMap<>(knownBookies);
+
+            Map<String, List<BookieNode>> toPlaceGroup = new HashMap<>();
+            for (BookieId bookieId : ensemble) {
+                //When ReplicationWorker.getUnderreplicatedFragments, the 
bookie is alive, so the fragment is not
+                // data_loss. When find other rack bookie to replace, the 
bookie maybe shutdown, so here we should pick
+                // the shutdown bookies. If the bookieId shutdown, put it to 
inactive. When do replace, we should
+                // replace inactive bookie firstly.
+                BookieNode bookieNode = clone.get(bookieId);
+                if (bookieNode == null) {
+                    List<BookieNode> list = 
toPlaceGroup.computeIfAbsent(NetworkTopology.INACTIVE,
+                            k -> new ArrayList<>());
+                    list.add(new BookieNode(bookieId, 
NetworkTopology.INACTIVE));
+                } else {
+                    List<BookieNode> list = 
toPlaceGroup.computeIfAbsent(bookieNode.getNetworkLocation(),
+                            k -> new ArrayList<>());
+                    list.add(bookieNode);
+                }
+            }
+            for (List<BookieNode> bookieNodes : toPlaceGroup.values()) {
+                Collections.shuffle(bookieNodes);
+            }
+
+            Map<String, List<BookieNode>> knownRackToBookies = 
clone.values().stream()
+                    
.collect(Collectors.groupingBy(NodeBase::getNetworkLocation));
+            HashSet<String> knownRacks = new 
HashSet<>(knownRackToBookies.keySet());
+
+            Set<BookieId> excludesBookies = new HashSet<>();
+
+            for (String key : toPlaceGroup.keySet()) {
+                List<BookieNode> sameRack = knownRackToBookies.get(key);
+                if (!CollectionUtils.isEmpty(sameRack)) {
+                    
excludesBookies.addAll(sameRack.stream().map(BookieNode::getAddr).collect(Collectors.toSet()));
+                }
+            }
+
+            Map<Integer, BookieId> targetBookieAddresses = new HashMap<>();
+            boolean placeSucceed = false;
+            while (knownRacks.size() > 0) {
+                BookieNode beReplaceNode = getBeReplaceNode(toPlaceGroup);
+                if (beReplaceNode == null) {
+                    break;
+                }
+                Integer index = bookieIndex.get(beReplaceNode.getAddr());
+                try {
+                    PlacementResult<BookieId> placementResult = 
replaceBookie(ensemble.size(), writeQuorumSize,
+                            ackQuorumSize, customMetadata, ensemble, 
beReplaceNode.getAddr(), excludesBookies);
+                    BookieNode replaceNode = 
clone.get(placementResult.getResult());
+                    String replaceNodeNetwork = 
replaceNode.getNetworkLocation();
+                    knownRacks.remove(replaceNodeNetwork);
+                    List<BookieNode> nodes = 
toPlaceGroup.computeIfAbsent(replaceNodeNetwork,
+                            k -> new ArrayList<>());
+                    nodes.add(replaceNode);
+                    targetBookieAddresses.put(index, replaceNode.getAddr());
+                    List<BookieNode> bookieNodes = 
knownRackToBookies.get(replaceNodeNetwork);
+                    if (!CollectionUtils.isEmpty(bookieNodes)) {
+                        excludesBookies.addAll(
+                                
bookieNodes.stream().map(BookieNode::getAddr).collect(Collectors.toSet()));
+                    }
+                } catch (BKException.BKNotEnoughBookiesException e) {
+                    LOG.warn("Didn't find replaced bookie to adhere placement 
policy.", e);
+                    break;
+                }
+
+                List<BookieId> ensembles = 
toPlaceGroup.values().stream().flatMap(Collection::stream).map(
+                        BookieNode::getAddr).collect(Collectors.toList());
+                ensembleAdheringToPlacementPolicy = 
isEnsembleAdheringToPlacementPolicy(ensembles,
+                        writeQuorumSize, ackQuorumSize);
+                if (PlacementPolicyAdherence.FAIL != 
ensembleAdheringToPlacementPolicy) {
+                    placeSucceed = true;
+                    break;
+                }
+            }

Review Comment:
   In https://github.com/apache/bookkeeper/pull/2931 PR, I've tried to fix a 
similar issue considering the above case.
   
   If we don't care about the above case in this PR, but it is still an issue, 
I'll try to fix it in https://github.com/apache/bookkeeper/pull/2931 by 
following your interfaces.



##########
bookkeeper-server/src/test/java/org/apache/bookkeeper/client/TestRackawareEnsemblePlacementPolicy.java:
##########
@@ -1465,6 +1465,176 @@ public boolean validate() {
         }
     }
 
+    @Test
+    public void testReplaceNotAdheringPlacementPolicyBookie() throws Exception 
{
+        repp.uninitalize();
+
+        int minNumRacksPerWriteQuorum = 3;
+        ClientConfiguration clientConf = new ClientConfiguration(conf);
+        clientConf.setMinNumRacksPerWriteQuorum(minNumRacksPerWriteQuorum);
+        // set enforceMinNumRacksPerWriteQuorum
+        clientConf.setEnforceMinNumRacksPerWriteQuorum(true);
+        repp = new RackawareEnsemblePlacementPolicy();
+        repp.initialize(clientConf, Optional.<DNSToSwitchMapping> empty(), 
timer, DISABLE_ALL,
+                NullStatsLogger.INSTANCE, 
BookieSocketAddress.LEGACY_BOOKIEID_RESOLVER);
+        repp.withDefaultRack(NetworkTopology.DEFAULT_REGION_AND_RACK);
+
+        int numOfRacks = 3;
+        int numOfBookiesPerRack = 3;
+        String[] rackLocationNames = new String[numOfRacks];
+        List<BookieId> bookieSocketAddresses = new ArrayList<BookieId>();
+        Map<BookieId, String> bookieRackMap = new HashMap<BookieId, String>();
+        BookieId bookieAddress;
+
+        for (int i = 0; i < numOfRacks; i++) {
+            rackLocationNames[i] = "/default-region/r" + i;
+            for (int j = 0; j < numOfBookiesPerRack; j++) {
+                int index = i * numOfBookiesPerRack + j;
+                bookieAddress = new BookieSocketAddress("128.0.0." + index, 
3181).toBookieId();
+                StaticDNSResolver.addNodeToRack("128.0.0." + index, 
rackLocationNames[i]);
+                bookieSocketAddresses.add(bookieAddress);
+                bookieRackMap.put(bookieAddress, rackLocationNames[i]);
+            }
+        }
+
+        repp.onClusterChanged(new HashSet<BookieId>(bookieSocketAddresses), 
new HashSet<BookieId>());
+
+        int writeQuorum = 3;
+        int ackQuorum = 3;
+
+        //test three knows bookie
+        List<BookieId> knowsEnsemble = new ArrayList<>();
+        knowsEnsemble.add(BookieId.parse("128.0.0.0:3181"));
+        knowsEnsemble.add(BookieId.parse("128.0.0.1:3181"));
+        knowsEnsemble.add(BookieId.parse("128.0.0.2:3181"));
+
+        PlacementPolicyAdherence placementPolicyAdherence = 
repp.isEnsembleAdheringToPlacementPolicy(
+                knowsEnsemble, writeQuorum, ackQuorum);
+        assertEquals(placementPolicyAdherence, PlacementPolicyAdherence.FAIL);
+
+        Map<Integer, BookieId> targetBookie =
+                repp.replaceNotAdheringPlacementPolicyBookie(knowsEnsemble, 
ackQuorum, writeQuorum,
+                        Collections.emptyMap());

Review Comment:
   [nits]
   ```suggestion
                   repp.replaceNotAdheringPlacementPolicyBookie(knowsEnsemble, 
writeQuorum, ackQuorum
                           Collections.emptyMap());
   ```
   
   Other lines are too.



##########
bookkeeper-server/src/main/java/org/apache/bookkeeper/client/TopologyAwareEnsemblePlacementPolicy.java:
##########
@@ -788,6 +791,116 @@ public void updateBookieInfo(Map<BookieId, BookieInfo> 
bookieInfoMap) {
         }
     }
 
+    @Override
+    public Map<Integer, BookieId> 
replaceNotAdheringPlacementPolicyBookie(List<BookieId> ensemble, int 
writeQuorumSize,
+            int ackQuorumSize, Map<String, byte[]> customMetadata) {
+        rwLock.readLock().lock();
+        try {
+            if (CollectionUtils.isEmpty(ensemble)) {
+                return Collections.emptyMap();
+            }
+            PlacementPolicyAdherence ensembleAdheringToPlacementPolicy = 
isEnsembleAdheringToPlacementPolicy(ensemble,
+                    writeQuorumSize, ackQuorumSize);
+            if (PlacementPolicyAdherence.FAIL != 
ensembleAdheringToPlacementPolicy) {
+                return Collections.emptyMap();
+            }
+            Map<BookieId, Integer> bookieIndex = new HashMap<>();
+            for (int i = 0; i < ensemble.size(); i++) {
+                bookieIndex.put(ensemble.get(i), i);
+            }
+            Map<BookieId, BookieNode> clone = new HashMap<>(knownBookies);
+
+            Map<String, List<BookieNode>> toPlaceGroup = new HashMap<>();
+            for (BookieId bookieId : ensemble) {
+                //When ReplicationWorker.getUnderreplicatedFragments, the 
bookie is alive, so the fragment is not
+                // data_loss. When find other rack bookie to replace, the 
bookie maybe shutdown, so here we should pick
+                // the shutdown bookies. If the bookieId shutdown, put it to 
inactive. When do replace, we should
+                // replace inactive bookie firstly.
+                BookieNode bookieNode = clone.get(bookieId);
+                if (bookieNode == null) {
+                    List<BookieNode> list = 
toPlaceGroup.computeIfAbsent(NetworkTopology.INACTIVE,
+                            k -> new ArrayList<>());
+                    list.add(new BookieNode(bookieId, 
NetworkTopology.INACTIVE));
+                } else {
+                    List<BookieNode> list = 
toPlaceGroup.computeIfAbsent(bookieNode.getNetworkLocation(),
+                            k -> new ArrayList<>());
+                    list.add(bookieNode);
+                }
+            }
+            for (List<BookieNode> bookieNodes : toPlaceGroup.values()) {
+                Collections.shuffle(bookieNodes);
+            }
+
+            Map<String, List<BookieNode>> knownRackToBookies = 
clone.values().stream()
+                    
.collect(Collectors.groupingBy(NodeBase::getNetworkLocation));
+            HashSet<String> knownRacks = new 
HashSet<>(knownRackToBookies.keySet());
+
+            Set<BookieId> excludesBookies = new HashSet<>();
+
+            for (String key : toPlaceGroup.keySet()) {
+                List<BookieNode> sameRack = knownRackToBookies.get(key);
+                if (!CollectionUtils.isEmpty(sameRack)) {
+                    
excludesBookies.addAll(sameRack.stream().map(BookieNode::getAddr).collect(Collectors.toSet()));
+                }
+            }
+
+            Map<Integer, BookieId> targetBookieAddresses = new HashMap<>();
+            boolean placeSucceed = false;
+            while (knownRacks.size() > 0) {
+                BookieNode beReplaceNode = getBeReplaceNode(toPlaceGroup);
+                if (beReplaceNode == null) {
+                    break;
+                }
+                Integer index = bookieIndex.get(beReplaceNode.getAddr());
+                try {
+                    PlacementResult<BookieId> placementResult = 
replaceBookie(ensemble.size(), writeQuorumSize,
+                            ackQuorumSize, customMetadata, ensemble, 
beReplaceNode.getAddr(), excludesBookies);
+                    BookieNode replaceNode = 
clone.get(placementResult.getResult());
+                    String replaceNodeNetwork = 
replaceNode.getNetworkLocation();
+                    knownRacks.remove(replaceNodeNetwork);
+                    List<BookieNode> nodes = 
toPlaceGroup.computeIfAbsent(replaceNodeNetwork,
+                            k -> new ArrayList<>());
+                    nodes.add(replaceNode);
+                    targetBookieAddresses.put(index, replaceNode.getAddr());
+                    List<BookieNode> bookieNodes = 
knownRackToBookies.get(replaceNodeNetwork);
+                    if (!CollectionUtils.isEmpty(bookieNodes)) {
+                        excludesBookies.addAll(
+                                
bookieNodes.stream().map(BookieNode::getAddr).collect(Collectors.toSet()));
+                    }
+                } catch (BKException.BKNotEnoughBookiesException e) {
+                    LOG.warn("Didn't find replaced bookie to adhere placement 
policy.", e);
+                    break;
+                }
+
+                List<BookieId> ensembles = 
toPlaceGroup.values().stream().flatMap(Collection::stream).map(
+                        BookieNode::getAddr).collect(Collectors.toList());
+                ensembleAdheringToPlacementPolicy = 
isEnsembleAdheringToPlacementPolicy(ensembles,
+                        writeQuorumSize, ackQuorumSize);
+                if (PlacementPolicyAdherence.FAIL != 
ensembleAdheringToPlacementPolicy) {
+                    placeSucceed = true;
+                    break;
+                }
+            }

Review Comment:
   In my understanding, we can't recover the ensemble fragment, which has 
multiple bookies for a specific rack.
   
   For example, consider `E=5, Qw=2, Qa=2` ensemble fragment like 
`["128.0.0.0:3181", "128.0.0.3:3181", "128.0.0.1:3181", "128.0.0.6:3181", 
"128.0.0.4:3181"]`, use `RackawareEnsemblePlacementPolicy`,  and set 
`enforceMinNumRacksPerWriteQuorum` to `false`.
   (definition of rack: 
https://github.com/apache/bookkeeper/pull/3359/files#diff-aac3491b47dd2a6b2936e726b36151e2d232409878845c599beb7df5a3c739afR1489-R1498
 )
   
   When `/default-region/r3` goes down, 
`RackawareEnsemblePlacementPolicy#replaceBookie` returns new ensemble fragment 
by random selection like `["128.0.0.0:3181", "128.0.0.3:3181", 
"128.0.0.1:3181", "128.0.0.2:3181", "128.0.0.4:3181"]`.
   
   After that and `/default-region/r3` is recovered, then 
`TopologyAwareEnsemblePlacementPolicy#replaceNotAdheringPlacementPolicyBookie` 
can't return `MEET_STRICT` result.
   
   Test cases are as below.
   
   ```sh
   % git --no-pager show --no-patch HEAD
   commit 11701505ffda2fe68c84a2621aaf595ef51d9979 (HEAD, 
horizonzy/feature-auto-recover-match-placement)
   Merge: 0ff5d96f3 c3706e9c2
   Author: horizonzy <[email protected]>
   Date:   Wed Jul 13 09:27:50 2022 +0800
   
       Merge branch 'master' into feature-auto-recover-match-placement
   
       # Conflicts:
       #       
bookkeeper-server/src/main/java/org/apache/bookkeeper/replication/ReplicationWorker.java
       #       
bookkeeper-server/src/test/java/org/apache/bookkeeper/replication/TestReplicationWorker.java
   ```
   
   ```diff
   diff --git 
a/bookkeeper-server/src/test/java/org/apache/bookkeeper/client/TestRackawareEnsemblePlacementPolicy.java
 
b/bookkeeper-server/src/test/java/org/apache/bookkeeper/client/TestRackawareEnsemblePlacementPolicy.java
   index 0b80f7af0..5dee731e6 100644
   --- 
a/bookkeeper-server/src/test/java/org/apache/bookkeeper/client/TestRackawareEnsemblePlacementPolicy.java
   +++ 
b/bookkeeper-server/src/test/java/org/apache/bookkeeper/client/TestRackawareEnsemblePlacementPolicy.java
   @@ -22,6 +22,7 @@ import static 
org.apache.bookkeeper.client.RackawareEnsemblePlacementPolicyImpl.
    import static 
org.apache.bookkeeper.client.RoundRobinDistributionSchedule.writeSetFromValues;
    import static 
org.apache.bookkeeper.feature.SettableFeatureProvider.DISABLE_ALL;
   
   +import com.google.common.collect.Sets;
    import com.google.common.util.concurrent.ThreadFactoryBuilder;
    import io.netty.util.HashedWheelTimer;
    import java.net.InetAddress;
   @@ -40,6 +41,7 @@ import junit.framework.TestCase;
    import org.apache.bookkeeper.client.BKException.BKNotEnoughBookiesException;
    import org.apache.bookkeeper.client.BookieInfoReader.BookieInfo;
    import 
org.apache.bookkeeper.client.EnsemblePlacementPolicy.PlacementPolicyAdherence;
   +import org.apache.bookkeeper.client.EnsemblePlacementPolicy.PlacementResult;
    import 
org.apache.bookkeeper.client.ITopologyAwareEnsemblePlacementPolicy.Ensemble;
    import 
org.apache.bookkeeper.client.TopologyAwareEnsemblePlacementPolicy.EnsembleForReplacementWithNoConstraints;
    import 
org.apache.bookkeeper.client.TopologyAwareEnsemblePlacementPolicy.TruePredicate;
   @@ -1421,11 +1423,11 @@ public class TestRackawareEnsemblePlacementPolicy 
extends TestCase {
        public void testReplaceNotAdheringPlacementPolicyBookie() throws 
Exception {
            repp.uninitalize();
   
   -        int minNumRacksPerWriteQuorum = 3;
   +        int minNumRacksPerWriteQuorum = 2;
            ClientConfiguration clientConf = new ClientConfiguration(conf);
            clientConf.setMinNumRacksPerWriteQuorum(minNumRacksPerWriteQuorum);
            // set enforceMinNumRacksPerWriteQuorum
   -        clientConf.setEnforceMinNumRacksPerWriteQuorum(true);
   +        clientConf.setEnforceMinNumRacksPerWriteQuorum(false);
            repp = new RackawareEnsemblePlacementPolicy();
            repp.initialize(clientConf, Optional.<DNSToSwitchMapping> empty(), 
timer, DISABLE_ALL,
                    NullStatsLogger.INSTANCE, 
BookieSocketAddress.LEGACY_BOOKIEID_RESOLVER);
   @@ -1436,6 +1438,7 @@ public class TestRackawareEnsemblePlacementPolicy 
extends TestCase {
            String[] rackLocationNames = new String[numOfRacks];
            List<BookieId> bookieSocketAddresses = new ArrayList<BookieId>();
            Map<BookieId, String> bookieRackMap = new HashMap<BookieId, 
String>();
   +        Map<String, Set<BookieId>> rackMap = new HashMap<>();
            BookieId bookieAddress;
   
            for (int i = 0; i < numOfRacks; i++) {
   @@ -1446,29 +1449,45 @@ public class TestRackawareEnsemblePlacementPolicy 
extends TestCase {
                    StaticDNSResolver.addNodeToRack("128.0.0." + index, 
rackLocationNames[i]);
                    bookieSocketAddresses.add(bookieAddress);
                    bookieRackMap.put(bookieAddress, rackLocationNames[i]);
   +                rackMap.computeIfAbsent(rackLocationNames[i], k -> 
Sets.newHashSet()).add(bookieAddress);
                }
            }
   
            repp.onClusterChanged(new HashSet<BookieId>(bookieSocketAddresses), 
new HashSet<BookieId>());
   
   -        int writeQuorum = 3;
   -        int ackQuorum = 3;
   +        int writeQuorum = 2;
   +        int ackQuorum = 2;
   
            //test three knows bookie
            List<BookieId> knowsEnsemble = new ArrayList<>();
            knowsEnsemble.add(BookieId.parse("128.0.0.0:3181"));
   +        knowsEnsemble.add(BookieId.parse("128.0.0.3:3181"));
            knowsEnsemble.add(BookieId.parse("128.0.0.1:3181"));
   -        knowsEnsemble.add(BookieId.parse("128.0.0.2:3181"));
   +        //knowsEnsemble.add(BookieId.parse("128.0.0.2:3181")); // should be 
replaced to /r3 like 128.0.0.6
   +        knowsEnsemble.add(BookieId.parse("128.0.0.6:3181"));
   +        knowsEnsemble.add(BookieId.parse("128.0.0.4:3181"));
   
            PlacementPolicyAdherence placementPolicyAdherence = 
repp.isEnsembleAdheringToPlacementPolicy(
                    knowsEnsemble, writeQuorum, ackQuorum);
   -        assertEquals(placementPolicyAdherence, 
PlacementPolicyAdherence.FAIL);
   +        //assertEquals(PlacementPolicyAdherence.FAIL, 
placementPolicyAdherence);
   +
   +        // /default-region/r3 goes down
   +        repp.handleBookiesThatLeft(rackMap.get(rackLocationNames[2]));
   +
   +        final PlacementResult<BookieId> result = 
repp.replaceBookie(knowsEnsemble.size(), writeQuorum, ackQuorum,
   +                Collections.emptyMap(), knowsEnsemble, 
knowsEnsemble.get(3), Sets.newHashSet());
   +
   +        assertNotSame(PlacementPolicyAdherence.MEETS_STRICT, 
result.isAdheringToPolicy());
   +        assertNotSame(rackLocationNames[2], 
bookieRackMap.get(result.getResult()));
   +        knowsEnsemble.set(3, result.getResult());
   +        LOG.error("{}", knowsEnsemble);
   +
   +        // /default-region/r3 is recovered
   +        repp.handleBookiesThatJoined(rackMap.get(rackLocationNames[2]));
   
            Map<Integer, BookieId> targetBookie =
   -                repp.replaceNotAdheringPlacementPolicyBookie(knowsEnsemble, 
ackQuorum, writeQuorum,
   +                repp.replaceNotAdheringPlacementPolicyBookie(knowsEnsemble, 
writeQuorum, ackQuorum,
                            Collections.emptyMap());
   -        //should replace two bookie
   -        assertEquals(targetBookie.size(), 2);
   
            for (Map.Entry<Integer, BookieId> entry : targetBookie.entrySet()) {
                knowsEnsemble.set(entry.getKey(), entry.getValue());
   @@ -1478,6 +1497,10 @@ public class TestRackawareEnsemblePlacementPolicy 
extends TestCase {
                    knowsEnsemble, writeQuorum, ackQuorum);
            assertEquals(placementPolicyAdherence, 
PlacementPolicyAdherence.MEETS_STRICT);
   
   +        //should replace one bookie
   +        LOG.error(String.valueOf(targetBookie));
   +        assertEquals(1, targetBookie.size());
   +
            //test three unknowns bookie
            List<BookieId> unknownEnsembles = new ArrayList<>();
            unknownEnsembles.add(BookieId.parse("128.0.0.100:3181"));
   ```



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: [email protected]

For queries about this service, please contact Infrastructure at:
[email protected]

Reply via email to