equanz commented on code in PR #3359:
URL: https://github.com/apache/bookkeeper/pull/3359#discussion_r939784939


##########
bookkeeper-server/src/main/java/org/apache/bookkeeper/client/RackawareEnsemblePlacementPolicyImpl.java:
##########
@@ -1079,4 +1083,220 @@ public boolean 
areAckedBookiesAdheringToPlacementPolicy(Set<BookieId> ackedBooki
         }
         return rackCounter.size() >= minWriteQuorumNumRacksPerWriteQuorum;
     }
+
+    @Override
+    public PlacementResult<List<BookieId>> replaceToAdherePlacementPolicy(
+            int ensembleSize,
+            int writeQuorumSize,
+            int ackQuorumSize,
+            Set<BookieId> excludeBookies,
+            List<BookieId> currentEnsemble) {
+        rwLock.readLock().lock();
+        try {
+            PlacementPolicyAdherence currentPlacementAdherence = 
isEnsembleAdheringToPlacementPolicy(
+                    currentEnsemble, writeQuorumSize, ackQuorumSize);
+            if (PlacementPolicyAdherence.FAIL != currentPlacementAdherence) {
+                return PlacementResult.of(new ArrayList<>(currentEnsemble), 
currentPlacementAdherence);
+            }
+            for (BookieId bookieId : currentEnsemble) {
+                if (!knownBookies.containsKey(bookieId)) {
+                    excludeBookies.add(bookieId);
+                }
+            }
+            PlacementResult<List<BookieId>> placementResult = 
PlacementResult.of(Collections.emptyList(),
+                    PlacementPolicyAdherence.FAIL);
+            int minDiffer = Integer.MAX_VALUE;
+            for (int i = 0; i < currentEnsemble.size(); i++) {
+                PlacementResult<List<BookieId>> result = 
doReplaceToAdherePlacementPolicy(ensembleSize,
+                        writeQuorumSize, ackQuorumSize, excludeBookies, 
currentEnsemble, i);
+                if (PlacementPolicyAdherence.FAIL == 
result.getAdheringToPolicy()) {
+                    continue;
+                }
+                int differ = differBetweenBookies(currentEnsemble, 
result.getResult());
+                if (differ < minDiffer) {
+                    minDiffer = differ;
+                    placementResult = result;
+                    if (minDiffer == 1) {
+                        break;
+                    }
+                }
+            }
+            return placementResult;
+        } finally {
+            rwLock.readLock().unlock();
+        }
+    }
+
+    private PlacementResult<List<BookieId>> doReplaceToAdherePlacementPolicy(
+            int ensembleSize,
+            int writeQuorumSize,
+            int ackQuorumSize,
+            Set<BookieId> excludeBookies,
+            List<BookieId> currentEnsemble,
+            int startIndex) {
+        final List<BookieNode> provisionalEnsembleNodes = 
currentEnsemble.stream()
+                .map(this::convertBookieToNode).collect(Collectors.toList());
+        final Set<Node> excludeNodes = convertBookiesToNodes(
+                addDefaultRackBookiesIfMinNumRacksIsEnforced(excludeBookies));
+        int minNumRacksPerWriteQuorumForThisEnsemble = 
Math.min(writeQuorumSize, minNumRacksPerWriteQuorum);
+        final RRTopologyAwareCoverageEnsemble ensemble =
+                new RRTopologyAwareCoverageEnsemble(
+                        ensembleSize,
+                        writeQuorumSize,
+                        ackQuorumSize,
+                        RACKNAME_DISTANCE_FROM_LEAVES,
+                        null,
+                        null,
+                        minNumRacksPerWriteQuorumForThisEnsemble);
+        int numRacks = topology.getNumOfRacks();
+        // only one rack or less than 
minNumRacksPerWriteQuorumForThisEnsemble, stop calculation to skip relocation
+        if (numRacks < 2 || numRacks < 
minNumRacksPerWriteQuorumForThisEnsemble) {
+            LOG.warn("Skip ensemble relocation because the cluster has only {} 
rack.", numRacks);
+            return PlacementResult.of(Collections.emptyList(), 
PlacementPolicyAdherence.FAIL);
+        }
+        BookieNode prevNode = null;
+        final BookieNode firstNode = provisionalEnsembleNodes.get(startIndex);
+        // use same bookie at first to reduce ledger replication
+        if (!excludeNodes.contains(firstNode) && ensemble.apply(firstNode, 
ensemble)
+                && ensemble.addNode(firstNode)) {
+            excludeNodes.add(firstNode);
+            prevNode = firstNode;
+        }

Review Comment:
   Should set firstNode to the first index of the `provisionalEnsembleNodes`. 
This variable is used in `replaceToAdherePlacementPolicyInternal` for 
calculation.



##########
bookkeeper-server/src/main/java/org/apache/bookkeeper/client/RackawareEnsemblePlacementPolicyImpl.java:
##########
@@ -1079,4 +1083,220 @@ public boolean 
areAckedBookiesAdheringToPlacementPolicy(Set<BookieId> ackedBooki
         }
         return rackCounter.size() >= minWriteQuorumNumRacksPerWriteQuorum;
     }
+
+    @Override
+    public PlacementResult<List<BookieId>> replaceToAdherePlacementPolicy(
+            int ensembleSize,
+            int writeQuorumSize,
+            int ackQuorumSize,
+            Set<BookieId> excludeBookies,
+            List<BookieId> currentEnsemble) {
+        rwLock.readLock().lock();
+        try {
+            PlacementPolicyAdherence currentPlacementAdherence = 
isEnsembleAdheringToPlacementPolicy(
+                    currentEnsemble, writeQuorumSize, ackQuorumSize);
+            if (PlacementPolicyAdherence.FAIL != currentPlacementAdherence) {
+                return PlacementResult.of(new ArrayList<>(currentEnsemble), 
currentPlacementAdherence);
+            }
+            for (BookieId bookieId : currentEnsemble) {
+                if (!knownBookies.containsKey(bookieId)) {
+                    excludeBookies.add(bookieId);
+                }
+            }
+            PlacementResult<List<BookieId>> placementResult = 
PlacementResult.of(Collections.emptyList(),
+                    PlacementPolicyAdherence.FAIL);
+            int minDiffer = Integer.MAX_VALUE;
+            for (int i = 0; i < currentEnsemble.size(); i++) {
+                PlacementResult<List<BookieId>> result = 
doReplaceToAdherePlacementPolicy(ensembleSize,
+                        writeQuorumSize, ackQuorumSize, excludeBookies, 
currentEnsemble, i);
+                if (PlacementPolicyAdherence.FAIL == 
result.getAdheringToPolicy()) {
+                    continue;
+                }
+                int differ = differBetweenBookies(currentEnsemble, 
result.getResult());
+                if (differ < minDiffer) {
+                    minDiffer = differ;
+                    placementResult = result;
+                    if (minDiffer == 1) {
+                        break;
+                    }
+                }
+            }
+            return placementResult;
+        } finally {
+            rwLock.readLock().unlock();
+        }
+    }
+
+    private PlacementResult<List<BookieId>> doReplaceToAdherePlacementPolicy(
+            int ensembleSize,
+            int writeQuorumSize,
+            int ackQuorumSize,
+            Set<BookieId> excludeBookies,
+            List<BookieId> currentEnsemble,
+            int startIndex) {
+        final List<BookieNode> provisionalEnsembleNodes = 
currentEnsemble.stream()
+                .map(this::convertBookieToNode).collect(Collectors.toList());
+        final Set<Node> excludeNodes = convertBookiesToNodes(
+                addDefaultRackBookiesIfMinNumRacksIsEnforced(excludeBookies));
+        int minNumRacksPerWriteQuorumForThisEnsemble = 
Math.min(writeQuorumSize, minNumRacksPerWriteQuorum);
+        final RRTopologyAwareCoverageEnsemble ensemble =
+                new RRTopologyAwareCoverageEnsemble(
+                        ensembleSize,
+                        writeQuorumSize,
+                        ackQuorumSize,
+                        RACKNAME_DISTANCE_FROM_LEAVES,
+                        null,
+                        null,
+                        minNumRacksPerWriteQuorumForThisEnsemble);
+        int numRacks = topology.getNumOfRacks();
+        // only one rack or less than 
minNumRacksPerWriteQuorumForThisEnsemble, stop calculation to skip relocation
+        if (numRacks < 2 || numRacks < 
minNumRacksPerWriteQuorumForThisEnsemble) {
+            LOG.warn("Skip ensemble relocation because the cluster has only {} 
rack.", numRacks);
+            return PlacementResult.of(Collections.emptyList(), 
PlacementPolicyAdherence.FAIL);
+        }

Review Comment:
   Should move to replaceToAdherePlacementPolicy and check it first to reduce 
redundant calls.



##########
bookkeeper-server/src/main/java/org/apache/bookkeeper/client/EnsemblePlacementPolicy.java:
##########
@@ -440,6 +440,31 @@ default boolean 
areAckedBookiesAdheringToPlacementPolicy(Set<BookieId> ackedBook
         return true;
     }
 
+    /**
+     * Returns placement result. If the currentEnsemble is not adhering 
placement policy, returns new ensemble that
+     * adheres placement policy. It should be implemented so as to minify the 
number of bookies replaced.
+     *
+     * @param ensembleSize
+     *            ensemble size
+     * @param writeQuorumSize
+ *                writeQuorumSize of the ensemble
+     * @param ackQuorumSize
+     *            ackQuorumSize of the ensemble
+     * @param excludeBookies
+     *            bookies that should not be considered as targets
+     * @param currentEnsemble
+     *            current ensemble
+     * @return a placement result
+     */
+    default PlacementResult<List<BookieId>> replaceToAdherePlacementPolicy(

Review Comment:
   What about other TopologyAware classes like RegionAware, Zoneaware?
   Implement in the next PR? Or temporally implement the [old 
approach](https://github.com/horizonzy/bookkeeper/blob/7669c7ad15952f5ea0c448d4003e5aabcebebd94/bookkeeper-server/src/main/java/org/apache/bookkeeper/client/TopologyAwareEnsemblePlacementPolicy.java#L794-L886)?



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: [email protected]

For queries about this service, please contact Infrastructure at:
[email protected]

Reply via email to