congbobo184 commented on code in PR #16707:
URL: https://github.com/apache/pulsar/pull/16707#discussion_r926310974


##########
pulsar-broker/src/main/java/org/apache/pulsar/broker/service/BrokerService.java:
##########
@@ -343,6 +346,8 @@ public BrokerService(PulsarService pulsar, EventLoopGroup 
eventLoopGroup) throws
         this.backlogQuotaManager = new BacklogQuotaManager(pulsar);
         this.backlogQuotaChecker = Executors
                 .newSingleThreadScheduledExecutor(new 
DefaultThreadFactory("pulsar-backlog-quota-checker"));
+        this.transactionLogBufferedWriteAsyncFlushTrigger = Executors
+                .newSingleThreadScheduledExecutor(new 
DefaultThreadFactory("pulsar-backlog-quota-checker"));

Review Comment:
   change name or use exist thread



##########
pulsar-broker/src/main/java/org/apache/pulsar/broker/transaction/pendingack/impl/MLPendingAckStore.java:
##########
@@ -240,13 +280,48 @@ public void addFailed(ManagedLedgerException exception, 
Object ctx) {
                 if (exception instanceof 
ManagedLedgerException.ManagedLedgerAlreadyClosedException) {
                     managedLedger.readyToCreateNewLedger();
                 }
-                buf.release();
                 completableFuture.completeExceptionally(new 
PersistenceException(exception));
             }
         }, null);
         return completableFuture;
     }
 
+    /**
+     * Build the index mapping of Transaction pending ack log (aka t-log) and 
Topic message log (aka m-log).
+     * When m-log has been ack, t-log which holds m-log is no longer useful, 
this method builder the mapping of them.
+     *
+     * If a Ledger Entry has many t-log, we only need to care about the record 
that carries the largest acknowledgement
+     * info. Because all Commit/Abort log after this record describes behavior 
acknowledgement, if the behavior
+     * acknowledgement has been handle correct, these Commit/Abort log is no 
longer useful.
+     * @param logPosition The position of batch log Entry.
+     * @param logList Pending ack log records in a batch log Entry.
+     */
+    private void handleMetadataEntry(PositionImpl logPosition, 
List<PendingAckMetadataEntry> logList) {
+        // Find the record that carries the largest ack info, and call 
"handleMetadataEntry(position, pendingAckLog)"
+        PendingAckMetadataEntry pendingAckLogHasMaxAckPosition = null;
+        PositionImpl maxAcknowledgementPosition = this.maxAckPosition;
+        for (int i = logList.size() - 1; i >= 0; i--){
+            PendingAckMetadataEntry pendingAckLog = logList.get(i);
+            if (pendingAckLog.getPendingAckOp() == PendingAckOp.ABORT
+                    && pendingAckLog.getPendingAckOp() == PendingAckOp.COMMIT) 
{
+                continue;
+            }
+            if (pendingAckLog.getPendingAckMetadatasList().isEmpty()){
+                continue;
+            }
+            for (PendingAckMetadata ack : 
pendingAckLog.getPendingAckMetadatasList()){
+                if (maxAcknowledgementPosition.compareTo(ack.getLedgerId(), 
ack.getEntryId()) < 0){
+                    maxAcknowledgementPosition = 
PositionImpl.get(ack.getLedgerId(), ack.getEntryId());
+                    pendingAckLogHasMaxAckPosition = pendingAckLog;
+                }
+            }
+        }
+
+        if (pendingAckLogHasMaxAckPosition != null) {
+            handleMetadataEntry(logPosition, pendingAckLogHasMaxAckPosition);

Review Comment:
   commit or abort all need handleMetadataEntry right?



##########
pulsar-broker/src/main/java/org/apache/pulsar/broker/transaction/pendingack/impl/MLPendingAckStore.java:
##########
@@ -240,13 +280,48 @@ public void addFailed(ManagedLedgerException exception, 
Object ctx) {
                 if (exception instanceof 
ManagedLedgerException.ManagedLedgerAlreadyClosedException) {
                     managedLedger.readyToCreateNewLedger();
                 }
-                buf.release();
                 completableFuture.completeExceptionally(new 
PersistenceException(exception));
             }
         }, null);
         return completableFuture;
     }
 
+    /**
+     * Build the index mapping of Transaction pending ack log (aka t-log) and 
Topic message log (aka m-log).
+     * When m-log has been ack, t-log which holds m-log is no longer useful, 
this method builder the mapping of them.
+     *
+     * If a Ledger Entry has many t-log, we only need to care about the record 
that carries the largest acknowledgement
+     * info. Because all Commit/Abort log after this record describes behavior 
acknowledgement, if the behavior
+     * acknowledgement has been handle correct, these Commit/Abort log is no 
longer useful.
+     * @param logPosition The position of batch log Entry.
+     * @param logList Pending ack log records in a batch log Entry.
+     */
+    private void handleMetadataEntry(PositionImpl logPosition, 
List<PendingAckMetadataEntry> logList) {
+        // Find the record that carries the largest ack info, and call 
"handleMetadataEntry(position, pendingAckLog)"
+        PendingAckMetadataEntry pendingAckLogHasMaxAckPosition = null;
+        PositionImpl maxAcknowledgementPosition = this.maxAckPosition;
+        for (int i = logList.size() - 1; i >= 0; i--){
+            PendingAckMetadataEntry pendingAckLog = logList.get(i);
+            if (pendingAckLog.getPendingAckOp() == PendingAckOp.ABORT
+                    && pendingAckLog.getPendingAckOp() == PendingAckOp.COMMIT) 
{
+                continue;
+            }
+            if (pendingAckLog.getPendingAckMetadatasList().isEmpty()){
+                continue;
+            }
+            for (PendingAckMetadata ack : 
pendingAckLog.getPendingAckMetadatasList()){
+                if (maxAcknowledgementPosition.compareTo(ack.getLedgerId(), 
ack.getEntryId()) < 0){
+                    maxAcknowledgementPosition = 
PositionImpl.get(ack.getLedgerId(), ack.getEntryId());
+                    pendingAckLogHasMaxAckPosition = pendingAckLog;
+                }
+            }
+        }

Review Comment:
   only the logList.size() is batchSize, we handle once is enough. don't every 
add ops handle



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: [email protected]

For queries about this service, please contact Infrastructure at:
[email protected]

Reply via email to