poorbarcode commented on code in PR #16707:
URL: https://github.com/apache/pulsar/pull/16707#discussion_r926557032
##########
pulsar-broker/src/main/java/org/apache/pulsar/broker/transaction/pendingack/impl/MLPendingAckStore.java:
##########
@@ -240,13 +280,48 @@ public void addFailed(ManagedLedgerException exception,
Object ctx) {
if (exception instanceof
ManagedLedgerException.ManagedLedgerAlreadyClosedException) {
managedLedger.readyToCreateNewLedger();
}
- buf.release();
completableFuture.completeExceptionally(new
PersistenceException(exception));
}
}, null);
return completableFuture;
}
+ /**
+ * Build the index mapping of Transaction pending ack log (aka t-log) and
Topic message log (aka m-log).
+ * When m-log has been ack, t-log which holds m-log is no longer useful,
this method builder the mapping of them.
+ *
+ * If a Ledger Entry has many t-log, we only need to care about the record
that carries the largest acknowledgement
+ * info. Because all Commit/Abort log after this record describes behavior
acknowledgement, if the behavior
+ * acknowledgement has been handle correct, these Commit/Abort log is no
longer useful.
+ * @param logPosition The position of batch log Entry.
+ * @param logList Pending ack log records in a batch log Entry.
+ */
+ private void handleMetadataEntry(PositionImpl logPosition,
List<PendingAckMetadataEntry> logList) {
+ // Find the record that carries the largest ack info, and call
"handleMetadataEntry(position, pendingAckLog)"
+ PendingAckMetadataEntry pendingAckLogHasMaxAckPosition = null;
+ PositionImpl maxAcknowledgementPosition = this.maxAckPosition;
+ for (int i = logList.size() - 1; i >= 0; i--){
+ PendingAckMetadataEntry pendingAckLog = logList.get(i);
+ if (pendingAckLog.getPendingAckOp() == PendingAckOp.ABORT
+ && pendingAckLog.getPendingAckOp() == PendingAckOp.COMMIT)
{
+ continue;
+ }
+ if (pendingAckLog.getPendingAckMetadatasList().isEmpty()){
+ continue;
+ }
+ for (PendingAckMetadata ack :
pendingAckLog.getPendingAckMetadatasList()){
+ if (maxAcknowledgementPosition.compareTo(ack.getLedgerId(),
ack.getEntryId()) < 0){
+ maxAcknowledgementPosition =
PositionImpl.get(ack.getLedgerId(), ack.getEntryId());
+ pendingAckLogHasMaxAckPosition = pendingAckLog;
+ }
+ }
Review Comment:
Already fixed.
--
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
To unsubscribe, e-mail: [email protected]
For queries about this service, please contact Infrastructure at:
[email protected]