hangc0276 commented on code in PR #3359:
URL: https://github.com/apache/bookkeeper/pull/3359#discussion_r914354951
##########
bookkeeper-server/src/main/java/org/apache/bookkeeper/client/TopologyAwareEnsemblePlacementPolicy.java:
##########
@@ -790,6 +794,113 @@ public void updateBookieInfo(Map<BookieId, BookieInfo>
bookieInfoMap) {
}
}
+ @Override
+ public Map<Integer, BookieId>
replaceNotAdheringPlacementPolicyBookie(List<BookieId> ensemble, int
writeQuorumSize,
+ int ackQuorumSize) {
+ if (CollectionUtils.isEmpty(ensemble)) {
+ return Collections.emptyMap();
+ }
+ PlacementPolicyAdherence ensembleAdheringToPlacementPolicy =
isEnsembleAdheringToPlacementPolicy(ensemble,
+ writeQuorumSize, ackQuorumSize);
+ if (PlacementPolicyAdherence.FAIL !=
ensembleAdheringToPlacementPolicy) {
Review Comment:
Do we need to deal with the `MEET_SOFT` type?
##########
bookkeeper-server/src/main/java/org/apache/bookkeeper/replication/ReplicationWorker.java:
##########
@@ -357,6 +366,65 @@ private boolean tryReadingFaultyEntries(LedgerHandle lh,
LedgerFragment ledgerFr
return (returnRCValue.get() == BKException.Code.OK);
}
+ private Set<LedgerFragment>
getNeedRepairedPlacementNotAdheringFragments(LedgerHandle lh) {
+ if (!conf.getRepairedPlacementPolicyNotAdheringBookieEnable()) {
+ return Collections.emptySet();
+ }
+ long ledgerId = lh.getId();
+ Set<LedgerFragment> placementNotAdheringFragments = new HashSet<>();
+ CompletableFuture<Versioned<LedgerMetadata>> future =
ledgerManager.readLedgerMetadata(
+ ledgerId).whenComplete((metadataVer, exception) -> {
+ if (exception == null) {
+ LedgerMetadata metadata = metadataVer.getValue();
+ int writeQuorumSize = metadata.getWriteQuorumSize();
+ int ackQuorumSize = metadata.getAckQuorumSize();
+ if (!metadata.isClosed()) {
+ return;
+ }
+ Long curEntryId = null;
+ EnsemblePlacementPolicy.PlacementPolicyAdherence
previousSegmentAdheringToPlacementPolicy = null;
+
+ for (Map.Entry<Long, ? extends List<BookieId>> entry :
metadata.getAllEnsembles().entrySet()) {
+ if (curEntryId != null) {
+ if
(EnsemblePlacementPolicy.PlacementPolicyAdherence.FAIL
+ == previousSegmentAdheringToPlacementPolicy) {
+ LedgerFragment ledgerFragment = new
LedgerFragment(lh, curEntryId,
+ entry.getKey() - 1, new HashSet<>());
+
ledgerFragment.setReplicateType(LedgerFragment.ReplicateType.DATA_NOT_ADHERING_PLACEMENT);
+ placementNotAdheringFragments.add(ledgerFragment);
+ }
+ }
+ previousSegmentAdheringToPlacementPolicy =
+
admin.isEnsembleAdheringToPlacementPolicy(entry.getValue(),
+ writeQuorumSize, ackQuorumSize);
+ curEntryId = entry.getKey();
+ }
+ if (curEntryId != null) {
+ if (EnsemblePlacementPolicy.PlacementPolicyAdherence.FAIL
Review Comment:
We only deal with the `FAIL` type, but in the auditor check, we still mark
the `MEETS_SOFT` type ledger as under-replicated. Do we need to deal with the
`MEETS_SOFT` type of fragments?
##########
bookkeeper-server/src/main/java/org/apache/bookkeeper/client/TopologyAwareEnsemblePlacementPolicy.java:
##########
@@ -790,6 +794,113 @@ public void updateBookieInfo(Map<BookieId, BookieInfo>
bookieInfoMap) {
}
}
+ @Override
+ public Map<Integer, BookieId>
replaceNotAdheringPlacementPolicyBookie(List<BookieId> ensemble, int
writeQuorumSize,
+ int ackQuorumSize) {
+ if (CollectionUtils.isEmpty(ensemble)) {
+ return Collections.emptyMap();
+ }
+ PlacementPolicyAdherence ensembleAdheringToPlacementPolicy =
isEnsembleAdheringToPlacementPolicy(ensemble,
+ writeQuorumSize, ackQuorumSize);
+ if (PlacementPolicyAdherence.FAIL !=
ensembleAdheringToPlacementPolicy) {
+ return Collections.emptyMap();
+ }
+ Map<BookieId, Integer> bookieIndex = new HashMap<>();
+ for (int i = 0; i < ensemble.size(); i++) {
+ bookieIndex.put(ensemble.get(i), i);
+ }
+
+ Map<BookieId, BookieNode> clone = new HashMap<>(knownBookies);
+
+ Map<String, List<BookieNode>> toPlaceGroup = new HashMap<>();
+ for (BookieId bookieId : ensemble) {
+ //If the bookieId shutdown, put it to inactive.
+ BookieNode bookieNode = clone.get(bookieId);
Review Comment:
If the bookie shutdown, it will be removed from knownBookies immediately. It
belongs to `DATA_LOSS` type
##########
bookkeeper-server/src/main/java/org/apache/bookkeeper/meta/AbstractZkLedgerManager.java:
##########
@@ -402,6 +402,7 @@ public void registerLedgerMetadataListener(long ledgerId,
LedgerMetadataListener
}
}
synchronized (listenerSet) {
+ listenerSet = listeners.computeIfAbsent(ledgerId, k -> new
HashSet<>());
Review Comment:
Do you bing the previous change to this PR?
--
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
To unsubscribe, e-mail: [email protected]
For queries about this service, please contact Infrastructure at:
[email protected]