horizonzy commented on code in PR #3359:
URL: https://github.com/apache/bookkeeper/pull/3359#discussion_r937364204


##########
bookkeeper-server/src/main/java/org/apache/bookkeeper/client/TopologyAwareEnsemblePlacementPolicy.java:
##########
@@ -788,6 +791,116 @@ public void updateBookieInfo(Map<BookieId, BookieInfo> 
bookieInfoMap) {
         }
     }
 
+    @Override
+    public Map<Integer, BookieId> 
replaceNotAdheringPlacementPolicyBookie(List<BookieId> ensemble, int 
writeQuorumSize,
+            int ackQuorumSize, Map<String, byte[]> customMetadata) {
+        rwLock.readLock().lock();
+        try {
+            if (CollectionUtils.isEmpty(ensemble)) {
+                return Collections.emptyMap();
+            }
+            PlacementPolicyAdherence ensembleAdheringToPlacementPolicy = 
isEnsembleAdheringToPlacementPolicy(ensemble,
+                    writeQuorumSize, ackQuorumSize);
+            if (PlacementPolicyAdherence.FAIL != 
ensembleAdheringToPlacementPolicy) {
+                return Collections.emptyMap();
+            }
+            Map<BookieId, Integer> bookieIndex = new HashMap<>();
+            for (int i = 0; i < ensemble.size(); i++) {
+                bookieIndex.put(ensemble.get(i), i);
+            }
+            Map<BookieId, BookieNode> clone = new HashMap<>(knownBookies);
+
+            Map<String, List<BookieNode>> toPlaceGroup = new HashMap<>();
+            for (BookieId bookieId : ensemble) {
+                //When ReplicationWorker.getUnderreplicatedFragments, the 
bookie is alive, so the fragment is not
+                // data_loss. When find other rack bookie to replace, the 
bookie maybe shutdown, so here we should pick
+                // the shutdown bookies. If the bookieId shutdown, put it to 
inactive. When do replace, we should
+                // replace inactive bookie firstly.
+                BookieNode bookieNode = clone.get(bookieId);
+                if (bookieNode == null) {
+                    List<BookieNode> list = 
toPlaceGroup.computeIfAbsent(NetworkTopology.INACTIVE,
+                            k -> new ArrayList<>());
+                    list.add(new BookieNode(bookieId, 
NetworkTopology.INACTIVE));
+                } else {
+                    List<BookieNode> list = 
toPlaceGroup.computeIfAbsent(bookieNode.getNetworkLocation(),
+                            k -> new ArrayList<>());
+                    list.add(bookieNode);
+                }
+            }
+            for (List<BookieNode> bookieNodes : toPlaceGroup.values()) {
+                Collections.shuffle(bookieNodes);
+            }
+
+            Map<String, List<BookieNode>> knownRackToBookies = 
clone.values().stream()
+                    
.collect(Collectors.groupingBy(NodeBase::getNetworkLocation));
+            HashSet<String> knownRacks = new 
HashSet<>(knownRackToBookies.keySet());
+
+            Set<BookieId> excludesBookies = new HashSet<>();
+
+            for (String key : toPlaceGroup.keySet()) {
+                List<BookieNode> sameRack = knownRackToBookies.get(key);
+                if (!CollectionUtils.isEmpty(sameRack)) {
+                    
excludesBookies.addAll(sameRack.stream().map(BookieNode::getAddr).collect(Collectors.toSet()));
+                }
+            }
+
+            Map<Integer, BookieId> targetBookieAddresses = new HashMap<>();
+            boolean placeSucceed = false;
+            while (knownRacks.size() > 0) {
+                BookieNode beReplaceNode = getBeReplaceNode(toPlaceGroup);
+                if (beReplaceNode == null) {
+                    break;
+                }
+                Integer index = bookieIndex.get(beReplaceNode.getAddr());
+                try {
+                    PlacementResult<BookieId> placementResult = 
replaceBookie(ensemble.size(), writeQuorumSize,
+                            ackQuorumSize, customMetadata, ensemble, 
beReplaceNode.getAddr(), excludesBookies);
+                    BookieNode replaceNode = 
clone.get(placementResult.getResult());
+                    String replaceNodeNetwork = 
replaceNode.getNetworkLocation();
+                    knownRacks.remove(replaceNodeNetwork);
+                    List<BookieNode> nodes = 
toPlaceGroup.computeIfAbsent(replaceNodeNetwork,
+                            k -> new ArrayList<>());
+                    nodes.add(replaceNode);
+                    targetBookieAddresses.put(index, replaceNode.getAddr());
+                    List<BookieNode> bookieNodes = 
knownRackToBookies.get(replaceNodeNetwork);
+                    if (!CollectionUtils.isEmpty(bookieNodes)) {
+                        excludesBookies.addAll(
+                                
bookieNodes.stream().map(BookieNode::getAddr).collect(Collectors.toSet()));
+                    }
+                } catch (BKException.BKNotEnoughBookiesException e) {
+                    LOG.warn("Didn't find replaced bookie to adhere placement 
policy.", e);
+                    break;
+                }
+
+                List<BookieId> ensembles = 
toPlaceGroup.values().stream().flatMap(Collection::stream).map(
+                        BookieNode::getAddr).collect(Collectors.toList());
+                ensembleAdheringToPlacementPolicy = 
isEnsembleAdheringToPlacementPolicy(ensembles,
+                        writeQuorumSize, ackQuorumSize);
+                if (PlacementPolicyAdherence.FAIL != 
ensembleAdheringToPlacementPolicy) {
+                    placeSucceed = true;
+                    break;
+                }
+            }

Review Comment:
   Yes, cause the same rack bookie is close, we should consider the order of 
ensemble.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: [email protected]

For queries about this service, please contact Infrastructure at:
[email protected]

Reply via email to