congbobo184 commented on code in PR #16707:
URL: https://github.com/apache/pulsar/pull/16707#discussion_r926446584


##########
pulsar-broker/src/main/java/org/apache/pulsar/broker/transaction/pendingack/impl/MLPendingAckStore.java:
##########
@@ -104,6 +126,11 @@ public MLPendingAckStore(ManagedLedger managedLedger, 
ManagedCursor cursor,
         this.subManagedCursor = subManagedCursor;
         this.logIndexBackoff = new 
LogIndexLagBackoff(transactionPendingAckLogIndexMinLag, Long.MAX_VALUE, 1);
         this.maxIndexLag = logIndexBackoff.next(0);
+        this.bufferedWriter = new TxnLogBufferedWriter(managedLedger, 
((ManagedLedgerImpl) managedLedger).getExecutor(),
+                scheduledExecutorService, PendingAckLogSerializer.INSTANCE,
+                bufferedWriterConfig.getBatchedWriteMaxRecords(), 
bufferedWriterConfig.getBatchedWriteMaxSize(),
+                bufferedWriterConfig.getBatchedWriteMaxDelayInMillis(), 
bufferedWriterConfig.isBatchEnabled());
+        this.batchedPendingAckLogsWaitingForHandle = new 
ArrayList<>(bufferedWriterConfig.getBatchedWriteMaxRecords());

Review Comment:
   We don't need to set the size, we can wait for it to automatically expand



##########
pulsar-broker/src/main/java/org/apache/pulsar/broker/transaction/pendingack/impl/MLPendingAckStore.java:
##########
@@ -240,13 +280,48 @@ public void addFailed(ManagedLedgerException exception, 
Object ctx) {
                 if (exception instanceof 
ManagedLedgerException.ManagedLedgerAlreadyClosedException) {
                     managedLedger.readyToCreateNewLedger();
                 }
-                buf.release();
                 completableFuture.completeExceptionally(new 
PersistenceException(exception));
             }
         }, null);
         return completableFuture;
     }
 
+    /**
+     * Build the index mapping of Transaction pending ack log (aka t-log) and 
Topic message log (aka m-log).
+     * When m-log has been ack, t-log which holds m-log is no longer useful, 
this method builder the mapping of them.
+     *
+     * If a Ledger Entry has many t-log, we only need to care about the record 
that carries the largest acknowledgement
+     * info. Because all Commit/Abort log after this record describes behavior 
acknowledgement, if the behavior
+     * acknowledgement has been handle correct, these Commit/Abort log is no 
longer useful.
+     * @param logPosition The position of batch log Entry.
+     * @param logList Pending ack log records in a batch log Entry.
+     */
+    private void handleMetadataEntry(PositionImpl logPosition, 
List<PendingAckMetadataEntry> logList) {
+        // Find the record that carries the largest ack info, and call 
"handleMetadataEntry(position, pendingAckLog)"
+        PendingAckMetadataEntry pendingAckLogHasMaxAckPosition = null;
+        PositionImpl maxAcknowledgementPosition = this.maxAckPosition;
+        for (int i = logList.size() - 1; i >= 0; i--){
+            PendingAckMetadataEntry pendingAckLog = logList.get(i);
+            if (pendingAckLog.getPendingAckOp() == PendingAckOp.ABORT
+                    && pendingAckLog.getPendingAckOp() == PendingAckOp.COMMIT) 
{

Review Comment:
   ```suggestion
               if (pendingAckLog.getPendingAckOp() == PendingAckOp.ABORT
                       || pendingAckLog.getPendingAckOp() == 
PendingAckOp.COMMIT) {
   ```
   
   it is very similar to 
https://github.com/apache/pulsar/blob/4483df2b3d113bfb55a8bb1a9e6c77160257c475/pulsar-broker/src/main/java/org/apache/pulsar/broker/transaction/pendingack/impl/MLPendingAckStore.java#L253-L254
   
   may be we can advance logic



##########
pulsar-broker/src/main/java/org/apache/pulsar/broker/transaction/pendingack/impl/MLPendingAckStore.java:
##########
@@ -240,13 +280,48 @@ public void addFailed(ManagedLedgerException exception, 
Object ctx) {
                 if (exception instanceof 
ManagedLedgerException.ManagedLedgerAlreadyClosedException) {
                     managedLedger.readyToCreateNewLedger();
                 }
-                buf.release();
                 completableFuture.completeExceptionally(new 
PersistenceException(exception));
             }
         }, null);
         return completableFuture;
     }
 
+    /**
+     * Build the index mapping of Transaction pending ack log (aka t-log) and 
Topic message log (aka m-log).
+     * When m-log has been ack, t-log which holds m-log is no longer useful, 
this method builder the mapping of them.
+     *
+     * If a Ledger Entry has many t-log, we only need to care about the record 
that carries the largest acknowledgement
+     * info. Because all Commit/Abort log after this record describes behavior 
acknowledgement, if the behavior
+     * acknowledgement has been handle correct, these Commit/Abort log is no 
longer useful.
+     * @param logPosition The position of batch log Entry.
+     * @param logList Pending ack log records in a batch log Entry.
+     */
+    private void handleMetadataEntry(PositionImpl logPosition, 
List<PendingAckMetadataEntry> logList) {
+        // Find the record that carries the largest ack info, and call 
"handleMetadataEntry(position, pendingAckLog)"
+        PendingAckMetadataEntry pendingAckLogHasMaxAckPosition = null;
+        PositionImpl maxAcknowledgementPosition = this.maxAckPosition;
+        for (int i = logList.size() - 1; i >= 0; i--){
+            PendingAckMetadataEntry pendingAckLog = logList.get(i);
+            if (pendingAckLog.getPendingAckOp() == PendingAckOp.ABORT
+                    && pendingAckLog.getPendingAckOp() == PendingAckOp.COMMIT) 
{
+                continue;
+            }
+            if (pendingAckLog.getPendingAckMetadatasList().isEmpty()){
+                continue;
+            }
+            for (PendingAckMetadata ack : 
pendingAckLog.getPendingAckMetadatasList()){
+                if (maxAcknowledgementPosition.compareTo(ack.getLedgerId(), 
ack.getEntryId()) < 0){
+                    maxAcknowledgementPosition = 
PositionImpl.get(ack.getLedgerId(), ack.getEntryId());
+                    pendingAckLogHasMaxAckPosition = pendingAckLog;
+                }
+            }

Review Comment:
   this logic is very similar to 
   
https://github.com/apache/pulsar/blob/4483df2b3d113bfb55a8bb1a9e6c77160257c475/pulsar-broker/src/main/java/org/apache/pulsar/broker/transaction/pendingack/impl/MLPendingAckStore.java#L255-L257
   
   may we can handle once



##########
pulsar-broker/src/main/java/org/apache/pulsar/broker/transaction/pendingack/impl/MLPendingAckStore.java:
##########
@@ -332,14 +408,41 @@ public void run() {
                 while (lastConfirmedEntry.compareTo(currentLoadPosition) > 0 
&& fillEntryQueueCallback.fillQueue()) {
                     Entry entry = entryQueue.poll();
                     if (entry != null) {
-                        ByteBuf buffer = entry.getDataBuffer();
                         currentLoadPosition = 
PositionImpl.get(entry.getLedgerId(), entry.getEntryId());
-                        PendingAckMetadataEntry pendingAckMetadataEntry = new 
PendingAckMetadataEntry();
-                        pendingAckMetadataEntry.parseFrom(buffer, 
buffer.readableBytes());
-                        currentIndexLag.incrementAndGet();
-                        handleMetadataEntry(new 
PositionImpl(entry.getLedgerId(), entry.getEntryId()),
-                                pendingAckMetadataEntry);
-                        
pendingAckReplyCallBack.handleMetadataEntry(pendingAckMetadataEntry);
+                        List<PendingAckMetadataEntry> logs = 
deserializeEntry(entry);
+                        if (logs.isEmpty()){
+                            continue;
+                        } else if (logs.size() == 1){
+                            currentIndexLag.incrementAndGet();
+                            PendingAckMetadataEntry log = logs.get(0);
+                            handleMetadataEntry(new 
PositionImpl(entry.getLedgerId(), entry.getEntryId()), log);
+                            pendingAckReplyCallBack.handleMetadataEntry(log);
+                        } else {
+                            /**
+                             * 1. Query batch index of current entry from 
cursor.
+                             * 2. Filter the data which has already ack.
+                             * 3. Build batched position and handle valid data.
+                             */
+                            long[] ackSetAlreadyAck = 
cursor.getDeletedBatchIndexesAsLongArray(
+                                    PositionImpl.get(entry.getLedgerId(), 
entry.getEntryId()));
+                            BitSetRecyclable bitSetAlreadyAck = null;
+                            if (ackSetAlreadyAck != null){
+                                bitSetAlreadyAck = 
BitSetRecyclable.valueOf(ackSetAlreadyAck);
+                            }

Review Comment:
   pending ack clear log data is use markDelete, so we don't need this logic



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: [email protected]

For queries about this service, please contact Infrastructure at:
[email protected]

Reply via email to