gaoran10 commented on code in PR #20980:
URL: https://github.com/apache/pulsar/pull/20980#discussion_r1292074537
##########
pulsar-broker/src/main/java/org/apache/pulsar/compaction/TwoPhaseCompactor.java:
##########
@@ -271,7 +266,7 @@ private void phaseTwoLoop(RawReader reader, MessageId to,
Map<String, MessageId>
promise.completeExceptionally(exception2);
}
});
- if (to.equals(id)) {
+ if (lastReadId.equals(id)) {
Review Comment:
Yes, if we don't stop phase two, the next entry may be dropped because it
does not exist in the `latestForKey`.
##########
pulsar-broker/src/main/java/org/apache/pulsar/compaction/TwoPhaseCompactor.java:
##########
@@ -162,46 +161,42 @@ private void phaseOneLoop(RawReader reader,
mxBean.addCompactionRemovedEvent(reader.getTopic());
}
}
- MessageId first = firstMessageId.orElse(deletedMessage ? null
: id);
- MessageId to = deletedMessage ? toMessageId.orElse(null) : id;
+ MessageId first = firstMessageId.orElse(id);
if (id.compareTo(lastMessageId) == 0) {
- loopPromise.complete(new PhaseOneResult(first == null ? id
: first, to == null ? id : to,
- lastMessageId, latestForKey));
+ loopPromise.complete(new PhaseOneResult(first,
lastMessageId, latestForKey));
} else {
phaseOneLoop(reader,
- Optional.ofNullable(first),
- Optional.ofNullable(to),
+ Optional.of(first),
lastMessageId,
latestForKey, loopPromise);
}
- } finally {
- m.close();
}
}, scheduler).exceptionally(ex -> {
loopPromise.completeExceptionally(ex);
return null;
});
}
- private CompletableFuture<Long> phaseTwo(RawReader reader, MessageId from,
MessageId to, MessageId lastReadId,
- Map<String, MessageId> latestForKey, BookKeeper bk) {
+ private CompletableFuture<Long> phaseTwo(RawReader reader, MessageId from,
MessageId lastReadId,
Review Comment:
Do we need to remove the param `to`?
--
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
To unsubscribe, e-mail: [email protected]
For queries about this service, please contact Infrastructure at:
[email protected]