horizonzy commented on code in PR #3359:
URL: https://github.com/apache/bookkeeper/pull/3359#discussion_r937364204
##########
bookkeeper-server/src/main/java/org/apache/bookkeeper/client/TopologyAwareEnsemblePlacementPolicy.java:
##########
@@ -788,6 +791,116 @@ public void updateBookieInfo(Map<BookieId, BookieInfo>
bookieInfoMap) {
}
}
+ @Override
+ public Map<Integer, BookieId>
replaceNotAdheringPlacementPolicyBookie(List<BookieId> ensemble, int
writeQuorumSize,
+ int ackQuorumSize, Map<String, byte[]> customMetadata) {
+ rwLock.readLock().lock();
+ try {
+ if (CollectionUtils.isEmpty(ensemble)) {
+ return Collections.emptyMap();
+ }
+ PlacementPolicyAdherence ensembleAdheringToPlacementPolicy =
isEnsembleAdheringToPlacementPolicy(ensemble,
+ writeQuorumSize, ackQuorumSize);
+ if (PlacementPolicyAdherence.FAIL !=
ensembleAdheringToPlacementPolicy) {
+ return Collections.emptyMap();
+ }
+ Map<BookieId, Integer> bookieIndex = new HashMap<>();
+ for (int i = 0; i < ensemble.size(); i++) {
+ bookieIndex.put(ensemble.get(i), i);
+ }
+ Map<BookieId, BookieNode> clone = new HashMap<>(knownBookies);
+
+ Map<String, List<BookieNode>> toPlaceGroup = new HashMap<>();
+ for (BookieId bookieId : ensemble) {
+ //When ReplicationWorker.getUnderreplicatedFragments, the
bookie is alive, so the fragment is not
+ // data_loss. When find other rack bookie to replace, the
bookie maybe shutdown, so here we should pick
+ // the shutdown bookies. If the bookieId shutdown, put it to
inactive. When do replace, we should
+ // replace inactive bookie firstly.
+ BookieNode bookieNode = clone.get(bookieId);
+ if (bookieNode == null) {
+ List<BookieNode> list =
toPlaceGroup.computeIfAbsent(NetworkTopology.INACTIVE,
+ k -> new ArrayList<>());
+ list.add(new BookieNode(bookieId,
NetworkTopology.INACTIVE));
+ } else {
+ List<BookieNode> list =
toPlaceGroup.computeIfAbsent(bookieNode.getNetworkLocation(),
+ k -> new ArrayList<>());
+ list.add(bookieNode);
+ }
+ }
+ for (List<BookieNode> bookieNodes : toPlaceGroup.values()) {
+ Collections.shuffle(bookieNodes);
+ }
+
+ Map<String, List<BookieNode>> knownRackToBookies =
clone.values().stream()
+
.collect(Collectors.groupingBy(NodeBase::getNetworkLocation));
+ HashSet<String> knownRacks = new
HashSet<>(knownRackToBookies.keySet());
+
+ Set<BookieId> excludesBookies = new HashSet<>();
+
+ for (String key : toPlaceGroup.keySet()) {
+ List<BookieNode> sameRack = knownRackToBookies.get(key);
+ if (!CollectionUtils.isEmpty(sameRack)) {
+
excludesBookies.addAll(sameRack.stream().map(BookieNode::getAddr).collect(Collectors.toSet()));
+ }
+ }
+
+ Map<Integer, BookieId> targetBookieAddresses = new HashMap<>();
+ boolean placeSucceed = false;
+ while (knownRacks.size() > 0) {
+ BookieNode beReplaceNode = getBeReplaceNode(toPlaceGroup);
+ if (beReplaceNode == null) {
+ break;
+ }
+ Integer index = bookieIndex.get(beReplaceNode.getAddr());
+ try {
+ PlacementResult<BookieId> placementResult =
replaceBookie(ensemble.size(), writeQuorumSize,
+ ackQuorumSize, customMetadata, ensemble,
beReplaceNode.getAddr(), excludesBookies);
+ BookieNode replaceNode =
clone.get(placementResult.getResult());
+ String replaceNodeNetwork =
replaceNode.getNetworkLocation();
+ knownRacks.remove(replaceNodeNetwork);
+ List<BookieNode> nodes =
toPlaceGroup.computeIfAbsent(replaceNodeNetwork,
+ k -> new ArrayList<>());
+ nodes.add(replaceNode);
+ targetBookieAddresses.put(index, replaceNode.getAddr());
+ List<BookieNode> bookieNodes =
knownRackToBookies.get(replaceNodeNetwork);
+ if (!CollectionUtils.isEmpty(bookieNodes)) {
+ excludesBookies.addAll(
+
bookieNodes.stream().map(BookieNode::getAddr).collect(Collectors.toSet()));
+ }
+ } catch (BKException.BKNotEnoughBookiesException e) {
+ LOG.warn("Didn't find replaced bookie to adhere placement
policy.", e);
+ break;
+ }
+
+ List<BookieId> ensembles =
toPlaceGroup.values().stream().flatMap(Collection::stream).map(
+ BookieNode::getAddr).collect(Collectors.toList());
+ ensembleAdheringToPlacementPolicy =
isEnsembleAdheringToPlacementPolicy(ensembles,
+ writeQuorumSize, ackQuorumSize);
+ if (PlacementPolicyAdherence.FAIL !=
ensembleAdheringToPlacementPolicy) {
+ placeSucceed = true;
+ break;
+ }
+ }
Review Comment:
Yes, cause the same rack bookie is close, we should consider the order of
ensemble.
--
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
To unsubscribe, e-mail: [email protected]
For queries about this service, please contact Infrastructure at:
[email protected]