equanz commented on code in PR #3359:
URL: https://github.com/apache/bookkeeper/pull/3359#discussion_r939784939
##########
bookkeeper-server/src/main/java/org/apache/bookkeeper/client/RackawareEnsemblePlacementPolicyImpl.java:
##########
@@ -1079,4 +1083,220 @@ public boolean
areAckedBookiesAdheringToPlacementPolicy(Set<BookieId> ackedBooki
}
return rackCounter.size() >= minWriteQuorumNumRacksPerWriteQuorum;
}
+
+ @Override
+ public PlacementResult<List<BookieId>> replaceToAdherePlacementPolicy(
+ int ensembleSize,
+ int writeQuorumSize,
+ int ackQuorumSize,
+ Set<BookieId> excludeBookies,
+ List<BookieId> currentEnsemble) {
+ rwLock.readLock().lock();
+ try {
+ PlacementPolicyAdherence currentPlacementAdherence =
isEnsembleAdheringToPlacementPolicy(
+ currentEnsemble, writeQuorumSize, ackQuorumSize);
+ if (PlacementPolicyAdherence.FAIL != currentPlacementAdherence) {
+ return PlacementResult.of(new ArrayList<>(currentEnsemble),
currentPlacementAdherence);
+ }
+ for (BookieId bookieId : currentEnsemble) {
+ if (!knownBookies.containsKey(bookieId)) {
+ excludeBookies.add(bookieId);
+ }
+ }
+ PlacementResult<List<BookieId>> placementResult =
PlacementResult.of(Collections.emptyList(),
+ PlacementPolicyAdherence.FAIL);
+ int minDiffer = Integer.MAX_VALUE;
+ for (int i = 0; i < currentEnsemble.size(); i++) {
+ PlacementResult<List<BookieId>> result =
doReplaceToAdherePlacementPolicy(ensembleSize,
+ writeQuorumSize, ackQuorumSize, excludeBookies,
currentEnsemble, i);
+ if (PlacementPolicyAdherence.FAIL ==
result.getAdheringToPolicy()) {
+ continue;
+ }
+ int differ = differBetweenBookies(currentEnsemble,
result.getResult());
+ if (differ < minDiffer) {
+ minDiffer = differ;
+ placementResult = result;
+ if (minDiffer == 1) {
+ break;
+ }
+ }
+ }
+ return placementResult;
+ } finally {
+ rwLock.readLock().unlock();
+ }
+ }
+
+ private PlacementResult<List<BookieId>> doReplaceToAdherePlacementPolicy(
+ int ensembleSize,
+ int writeQuorumSize,
+ int ackQuorumSize,
+ Set<BookieId> excludeBookies,
+ List<BookieId> currentEnsemble,
+ int startIndex) {
+ final List<BookieNode> provisionalEnsembleNodes =
currentEnsemble.stream()
+ .map(this::convertBookieToNode).collect(Collectors.toList());
+ final Set<Node> excludeNodes = convertBookiesToNodes(
+ addDefaultRackBookiesIfMinNumRacksIsEnforced(excludeBookies));
+ int minNumRacksPerWriteQuorumForThisEnsemble =
Math.min(writeQuorumSize, minNumRacksPerWriteQuorum);
+ final RRTopologyAwareCoverageEnsemble ensemble =
+ new RRTopologyAwareCoverageEnsemble(
+ ensembleSize,
+ writeQuorumSize,
+ ackQuorumSize,
+ RACKNAME_DISTANCE_FROM_LEAVES,
+ null,
+ null,
+ minNumRacksPerWriteQuorumForThisEnsemble);
+ int numRacks = topology.getNumOfRacks();
+ // only one rack or less than
minNumRacksPerWriteQuorumForThisEnsemble, stop calculation to skip relocation
+ if (numRacks < 2 || numRacks <
minNumRacksPerWriteQuorumForThisEnsemble) {
+ LOG.warn("Skip ensemble relocation because the cluster has only {}
rack.", numRacks);
+ return PlacementResult.of(Collections.emptyList(),
PlacementPolicyAdherence.FAIL);
+ }
+ BookieNode prevNode = null;
+ final BookieNode firstNode = provisionalEnsembleNodes.get(startIndex);
+ // use same bookie at first to reduce ledger replication
+ if (!excludeNodes.contains(firstNode) && ensemble.apply(firstNode,
ensemble)
+ && ensemble.addNode(firstNode)) {
+ excludeNodes.add(firstNode);
+ prevNode = firstNode;
+ }
Review Comment:
Should set firstNode to the first index of the `provisionalEnsembleNodes`.
This variable is used in `replaceToAdherePlacementPolicyInternal` for
calculation.
##########
bookkeeper-server/src/main/java/org/apache/bookkeeper/client/RackawareEnsemblePlacementPolicyImpl.java:
##########
@@ -1079,4 +1083,220 @@ public boolean
areAckedBookiesAdheringToPlacementPolicy(Set<BookieId> ackedBooki
}
return rackCounter.size() >= minWriteQuorumNumRacksPerWriteQuorum;
}
+
+ @Override
+ public PlacementResult<List<BookieId>> replaceToAdherePlacementPolicy(
+ int ensembleSize,
+ int writeQuorumSize,
+ int ackQuorumSize,
+ Set<BookieId> excludeBookies,
+ List<BookieId> currentEnsemble) {
+ rwLock.readLock().lock();
+ try {
+ PlacementPolicyAdherence currentPlacementAdherence =
isEnsembleAdheringToPlacementPolicy(
+ currentEnsemble, writeQuorumSize, ackQuorumSize);
+ if (PlacementPolicyAdherence.FAIL != currentPlacementAdherence) {
+ return PlacementResult.of(new ArrayList<>(currentEnsemble),
currentPlacementAdherence);
+ }
+ for (BookieId bookieId : currentEnsemble) {
+ if (!knownBookies.containsKey(bookieId)) {
+ excludeBookies.add(bookieId);
+ }
+ }
+ PlacementResult<List<BookieId>> placementResult =
PlacementResult.of(Collections.emptyList(),
+ PlacementPolicyAdherence.FAIL);
+ int minDiffer = Integer.MAX_VALUE;
+ for (int i = 0; i < currentEnsemble.size(); i++) {
+ PlacementResult<List<BookieId>> result =
doReplaceToAdherePlacementPolicy(ensembleSize,
+ writeQuorumSize, ackQuorumSize, excludeBookies,
currentEnsemble, i);
+ if (PlacementPolicyAdherence.FAIL ==
result.getAdheringToPolicy()) {
+ continue;
+ }
+ int differ = differBetweenBookies(currentEnsemble,
result.getResult());
+ if (differ < minDiffer) {
+ minDiffer = differ;
+ placementResult = result;
+ if (minDiffer == 1) {
+ break;
+ }
+ }
+ }
+ return placementResult;
+ } finally {
+ rwLock.readLock().unlock();
+ }
+ }
+
+ private PlacementResult<List<BookieId>> doReplaceToAdherePlacementPolicy(
+ int ensembleSize,
+ int writeQuorumSize,
+ int ackQuorumSize,
+ Set<BookieId> excludeBookies,
+ List<BookieId> currentEnsemble,
+ int startIndex) {
+ final List<BookieNode> provisionalEnsembleNodes =
currentEnsemble.stream()
+ .map(this::convertBookieToNode).collect(Collectors.toList());
+ final Set<Node> excludeNodes = convertBookiesToNodes(
+ addDefaultRackBookiesIfMinNumRacksIsEnforced(excludeBookies));
+ int minNumRacksPerWriteQuorumForThisEnsemble =
Math.min(writeQuorumSize, minNumRacksPerWriteQuorum);
+ final RRTopologyAwareCoverageEnsemble ensemble =
+ new RRTopologyAwareCoverageEnsemble(
+ ensembleSize,
+ writeQuorumSize,
+ ackQuorumSize,
+ RACKNAME_DISTANCE_FROM_LEAVES,
+ null,
+ null,
+ minNumRacksPerWriteQuorumForThisEnsemble);
+ int numRacks = topology.getNumOfRacks();
+ // only one rack or less than
minNumRacksPerWriteQuorumForThisEnsemble, stop calculation to skip relocation
+ if (numRacks < 2 || numRacks <
minNumRacksPerWriteQuorumForThisEnsemble) {
+ LOG.warn("Skip ensemble relocation because the cluster has only {}
rack.", numRacks);
+ return PlacementResult.of(Collections.emptyList(),
PlacementPolicyAdherence.FAIL);
+ }
Review Comment:
Should move to replaceToAdherePlacementPolicy and check it first to reduce
redundant calls.
##########
bookkeeper-server/src/main/java/org/apache/bookkeeper/client/EnsemblePlacementPolicy.java:
##########
@@ -440,6 +440,31 @@ default boolean
areAckedBookiesAdheringToPlacementPolicy(Set<BookieId> ackedBook
return true;
}
+ /**
+ * Returns placement result. If the currentEnsemble is not adhering
placement policy, returns new ensemble that
+ * adheres placement policy. It should be implemented so as to minify the
number of bookies replaced.
+ *
+ * @param ensembleSize
+ * ensemble size
+ * @param writeQuorumSize
+ * writeQuorumSize of the ensemble
+ * @param ackQuorumSize
+ * ackQuorumSize of the ensemble
+ * @param excludeBookies
+ * bookies that should not be considered as targets
+ * @param currentEnsemble
+ * current ensemble
+ * @return a placement result
+ */
+ default PlacementResult<List<BookieId>> replaceToAdherePlacementPolicy(
Review Comment:
What about other TopologyAware classes like RegionAware, Zoneaware?
Implement in the next PR? Or temporally implement the [old
approach](https://github.com/horizonzy/bookkeeper/blob/7669c7ad15952f5ea0c448d4003e5aabcebebd94/bookkeeper-server/src/main/java/org/apache/bookkeeper/client/TopologyAwareEnsemblePlacementPolicy.java#L794-L886)?
--
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
To unsubscribe, e-mail: [email protected]
For queries about this service, please contact Infrastructure at:
[email protected]