equanz commented on a change in pull request #2931:
URL: https://github.com/apache/bookkeeper/pull/2931#discussion_r828814467
##########
File path:
bookkeeper-server/src/main/java/org/apache/bookkeeper/client/RackawareEnsemblePlacementPolicyImpl.java
##########
@@ -1071,4 +1074,175 @@ public boolean
areAckedBookiesAdheringToPlacementPolicy(Set<BookieId> ackedBooki
}
return rackCounter.size() >= minWriteQuorumNumRacksPerWriteQuorum;
}
+
+ @Override
+ public PlacementResult<List<BookieId>> replaceToAdherePlacementPolicy(
+ int ensembleSize,
+ int writeQuorumSize,
+ int ackQuorumSize,
+ Set<BookieId> excludeBookies,
+ List<BookieId> currentEnsemble) {
+ rwLock.readLock().lock();
+ try {
+ final List<BookieNode> provisionalEnsembleNodes =
currentEnsemble.stream()
+
.map(this::convertBookieToNode).collect(Collectors.toList());
+ final Set<Node> excludeNodes = convertBookiesToNodes(
+
addDefaultRackBookiesIfMinNumRacksIsEnforced(excludeBookies));
+ int minNumRacksPerWriteQuorumForThisEnsemble =
Math.min(writeQuorumSize, minNumRacksPerWriteQuorum);
+ final RRTopologyAwareCoverageEnsemble ensemble =
+ new RRTopologyAwareCoverageEnsemble(
+ ensembleSize,
+ writeQuorumSize,
+ ackQuorumSize,
+ RACKNAME_DISTANCE_FROM_LEAVES,
+ null,
+ null,
+ minNumRacksPerWriteQuorumForThisEnsemble);
+
+ int numRacks = topology.getNumOfRacks();
+ // only one rack or less than
minNumRacksPerWriteQuorumForThisEnsemble, stop calculation to skip relocation
+ if (numRacks < 2 || numRacks <
minNumRacksPerWriteQuorumForThisEnsemble) {
+ LOG.warn("Skip ensemble relocation because the cluster has
only {} rack.", numRacks);
+ return PlacementResult.of(Collections.emptyList(),
PlacementPolicyAdherence.FAIL);
+ }
+
+ BookieNode prevNode = null;
+ final BookieNode firstNode = provisionalEnsembleNodes.get(0);
+ // use same bookie at first to reduce ledger replication
+ if (!excludeNodes.contains(firstNode) && ensemble.apply(firstNode,
ensemble)
+ && ensemble.addNode(firstNode)) {
+ excludeNodes.add(firstNode);
+ prevNode = firstNode;
+ }
+
+ for (int i = prevNode == null ? 0 : 1; i < ensembleSize; i++) {
+ final String curRack;
+ if (null == prevNode) {
+ if ((null == localNode) ||
defaultRack.equals(localNode.getNetworkLocation())) {
+ curRack = NodeBase.ROOT;
+ } else {
+ curRack = localNode.getNetworkLocation();
+ }
+ } else {
+ curRack = "~" + prevNode.getNetworkLocation();
+ }
+
+ try {
+ prevNode = replaceToAdherePlacementPolicyInternal(
+ curRack, excludeNodes, ensemble, ensemble,
+ provisionalEnsembleNodes, i, ensembleSize,
minNumRacksPerWriteQuorumForThisEnsemble);
+ // replace to newer node
+ provisionalEnsembleNodes.set(i, prevNode);
+ } catch (BKNotEnoughBookiesException e) {
+ LOG.warn("Skip ensemble relocation because the cluster has
not enough bookies.");
+ return PlacementResult.of(Collections.emptyList(),
PlacementPolicyAdherence.FAIL);
+ }
+ }
+ List<BookieId> bookieList = ensemble.toList();
+ if (ensembleSize != bookieList.size()) {
+ LOG.warn("Not enough {} bookies are available to form an
ensemble : {}.",
+ ensembleSize, bookieList);
+ return PlacementResult.of(Collections.emptyList(),
PlacementPolicyAdherence.FAIL);
+ }
+ return PlacementResult.of(bookieList,
+ isEnsembleAdheringToPlacementPolicy(
+ bookieList, writeQuorumSize, ackQuorumSize));
+ } finally {
+ rwLock.readLock().unlock();
+ }
+ }
+
+ private BookieNode replaceToAdherePlacementPolicyInternal(
+ String netPath, Set<Node> excludeBookies, Predicate<BookieNode>
predicate,
+ Ensemble<BookieNode> ensemble, List<BookieNode>
provisionalEnsembleNodes, int ensembleIndex,
+ int ensembleSize, int minNumRacksPerWriteQuorumForThisEnsemble)
throws BKNotEnoughBookiesException {
+ final BookieNode currentNode =
provisionalEnsembleNodes.get(ensembleIndex);
+ // if the current bookie could be applied to the ensemble, apply it to
minify the number of bookies replaced
+ if (!excludeBookies.contains(currentNode) &&
predicate.apply(currentNode, ensemble)) {
+ if (ensemble.addNode(currentNode)) {
+ // add the candidate to exclude set
+ excludeBookies.add(currentNode);
Review comment:
modify the `excludeBookies` to notify exclude bookie to called method
like below.
https://github.com/apache/bookkeeper/blob/release-4.14.4/bookkeeper-server/src/main/java/org/apache/bookkeeper/client/RackawareEnsemblePlacementPolicyImpl.java#L671-L675
However, as you said, it might have unwanted side effects. Therefore, I'll
modify to put to excludeBookies at called method.
```
diff --git
a/bookkeeper-server/src/main/java/org/apache/bookkeeper/client/RackawareEnsemblePlacementPolicyImpl.java
b/bookkeeper-server/src/main/java/org/apache/bookkeeper/client/RackawareEnsemblePlacementPolicyImpl.java
index d9167c321..fa3e59cd6 100644
---
a/bookkeeper-server/src/main/java/org/apache/bookkeeper/client/RackawareEnsemblePlacementPolicyImpl.java
+++
b/bookkeeper-server/src/main/java/org/apache/bookkeeper/client/RackawareEnsemblePlacementPolicyImpl.java
@@ -1131,6 +1131,13 @@ public class RackawareEnsemblePlacementPolicyImpl
extends TopologyAwareEnsembleP
prevNode = replaceToAdherePlacementPolicyInternal(
curRack, excludeNodes, ensemble, ensemble,
provisionalEnsembleNodes, i, ensembleSize,
minNumRacksPerWriteQuorumForThisEnsemble);
+ // got a good candidate
+ if (ensemble.addNode(prevNode)) {
+ // add the candidate to exclude set
+ excludeNodes.add(prevNode);
+ } else {
+ throw new BKNotEnoughBookiesException();
+ }
// replace to newer node
provisionalEnsembleNodes.set(i, prevNode);
} catch (BKNotEnoughBookiesException e) {
@@ -1159,10 +1166,6 @@ public class RackawareEnsemblePlacementPolicyImpl
extends TopologyAwareEnsembleP
final BookieNode currentNode =
provisionalEnsembleNodes.get(ensembleIndex);
// if the current bookie could be applied to the ensemble, apply it
to minify the number of bookies replaced
if (!excludeBookies.contains(currentNode) &&
predicate.apply(currentNode, ensemble)) {
- if (ensemble.addNode(currentNode)) {
- // add the candidate to exclude set
- excludeBookies.add(currentNode);
- }
return currentNode;
}
@@ -1234,11 +1237,6 @@ public class RackawareEnsemblePlacementPolicyImpl
extends TopologyAwareEnsembleP
continue;
}
BookieNode bn = (BookieNode) n;
- // got a good candidate
- if (ensemble.addNode(bn)) {
- // add the candidate to exclude set
- excludeBookies.add(bn);
- }
return bn;
}
}
```
--
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
To unsubscribe, e-mail: [email protected]
For queries about this service, please contact Infrastructure at:
[email protected]