hangc0276 commented on code in PR #3359:
URL: https://github.com/apache/bookkeeper/pull/3359#discussion_r914354951


##########
bookkeeper-server/src/main/java/org/apache/bookkeeper/client/TopologyAwareEnsemblePlacementPolicy.java:
##########
@@ -790,6 +794,113 @@ public void updateBookieInfo(Map<BookieId, BookieInfo> 
bookieInfoMap) {
         }
     }
 
+    @Override
+    public Map<Integer, BookieId> 
replaceNotAdheringPlacementPolicyBookie(List<BookieId> ensemble, int 
writeQuorumSize,
+            int ackQuorumSize) {
+        if (CollectionUtils.isEmpty(ensemble)) {
+            return Collections.emptyMap();
+        }
+        PlacementPolicyAdherence ensembleAdheringToPlacementPolicy = 
isEnsembleAdheringToPlacementPolicy(ensemble,
+                writeQuorumSize, ackQuorumSize);
+        if (PlacementPolicyAdherence.FAIL != 
ensembleAdheringToPlacementPolicy) {

Review Comment:
   Do we need to deal with the `MEET_SOFT` type?



##########
bookkeeper-server/src/main/java/org/apache/bookkeeper/replication/ReplicationWorker.java:
##########
@@ -357,6 +366,65 @@ private boolean tryReadingFaultyEntries(LedgerHandle lh, 
LedgerFragment ledgerFr
         return (returnRCValue.get() == BKException.Code.OK);
     }
 
+    private Set<LedgerFragment> 
getNeedRepairedPlacementNotAdheringFragments(LedgerHandle lh) {
+        if (!conf.getRepairedPlacementPolicyNotAdheringBookieEnable()) {
+            return Collections.emptySet();
+        }
+        long ledgerId = lh.getId();
+        Set<LedgerFragment> placementNotAdheringFragments = new HashSet<>();
+        CompletableFuture<Versioned<LedgerMetadata>> future = 
ledgerManager.readLedgerMetadata(
+                ledgerId).whenComplete((metadataVer, exception) -> {
+            if (exception == null) {
+                LedgerMetadata metadata = metadataVer.getValue();
+                int writeQuorumSize = metadata.getWriteQuorumSize();
+                int ackQuorumSize = metadata.getAckQuorumSize();
+                if (!metadata.isClosed()) {
+                    return;
+                }
+                Long curEntryId = null;
+                EnsemblePlacementPolicy.PlacementPolicyAdherence 
previousSegmentAdheringToPlacementPolicy = null;
+
+                for (Map.Entry<Long, ? extends List<BookieId>> entry : 
metadata.getAllEnsembles().entrySet()) {
+                    if (curEntryId != null) {
+                        if 
(EnsemblePlacementPolicy.PlacementPolicyAdherence.FAIL
+                                == previousSegmentAdheringToPlacementPolicy) {
+                            LedgerFragment ledgerFragment = new 
LedgerFragment(lh, curEntryId,
+                                    entry.getKey() - 1, new HashSet<>());
+                            
ledgerFragment.setReplicateType(LedgerFragment.ReplicateType.DATA_NOT_ADHERING_PLACEMENT);
+                            placementNotAdheringFragments.add(ledgerFragment);
+                        }
+                    }
+                    previousSegmentAdheringToPlacementPolicy =
+                            
admin.isEnsembleAdheringToPlacementPolicy(entry.getValue(),
+                                    writeQuorumSize, ackQuorumSize);
+                    curEntryId = entry.getKey();
+                }
+                if (curEntryId != null) {
+                    if (EnsemblePlacementPolicy.PlacementPolicyAdherence.FAIL

Review Comment:
   We only deal with the `FAIL` type, but in the auditor check, we still mark 
the `MEETS_SOFT` type ledger as under-replicated. Do we need to deal with the 
`MEETS_SOFT` type of fragments?



##########
bookkeeper-server/src/main/java/org/apache/bookkeeper/client/TopologyAwareEnsemblePlacementPolicy.java:
##########
@@ -790,6 +794,113 @@ public void updateBookieInfo(Map<BookieId, BookieInfo> 
bookieInfoMap) {
         }
     }
 
+    @Override
+    public Map<Integer, BookieId> 
replaceNotAdheringPlacementPolicyBookie(List<BookieId> ensemble, int 
writeQuorumSize,
+            int ackQuorumSize) {
+        if (CollectionUtils.isEmpty(ensemble)) {
+            return Collections.emptyMap();
+        }
+        PlacementPolicyAdherence ensembleAdheringToPlacementPolicy = 
isEnsembleAdheringToPlacementPolicy(ensemble,
+                writeQuorumSize, ackQuorumSize);
+        if (PlacementPolicyAdherence.FAIL != 
ensembleAdheringToPlacementPolicy) {
+            return Collections.emptyMap();
+        }
+        Map<BookieId, Integer> bookieIndex = new HashMap<>();
+        for (int i = 0; i < ensemble.size(); i++) {
+            bookieIndex.put(ensemble.get(i), i);
+        }
+
+        Map<BookieId, BookieNode> clone = new HashMap<>(knownBookies);
+
+        Map<String, List<BookieNode>> toPlaceGroup = new HashMap<>();
+        for (BookieId bookieId : ensemble) {
+            //If the bookieId shutdown, put it to inactive.
+            BookieNode bookieNode = clone.get(bookieId);

Review Comment:
   If the bookie shutdown, it will be removed from knownBookies immediately. It 
belongs to `DATA_LOSS` type



##########
bookkeeper-server/src/main/java/org/apache/bookkeeper/meta/AbstractZkLedgerManager.java:
##########
@@ -402,6 +402,7 @@ public void registerLedgerMetadataListener(long ledgerId, 
LedgerMetadataListener
                 }
             }
             synchronized (listenerSet) {
+                listenerSet = listeners.computeIfAbsent(ledgerId, k -> new 
HashSet<>());

Review Comment:
   Do you bing the previous change to this PR?



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: [email protected]

For queries about this service, please contact Infrastructure at:
[email protected]

Reply via email to