codelipenghui commented on a change in pull request #4265: 
[transaction][acknowledge] Introduce in-memory PENDING_ACK state in 
acknowledgement path
URL: https://github.com/apache/pulsar/pull/4265#discussion_r293306740
 
 

 ##########
 File path: 
pulsar-broker/src/main/java/org/apache/pulsar/broker/service/persistent/PersistentSubscription.java
 ##########
 @@ -723,6 +917,115 @@ void topicTerminated() {
         }
     }
 
+    /**
+     * Commit a transaction.
+     *
+     * @param txnId         {@link TxnID} to identify the transaction.
+     * @param properties    Additional user-defined properties that can be 
associated with a particular cursor position.
+     * @throws IllegalArgumentException if given {@link TxnID} is not found in 
this subscription.
+     */
+    public synchronized CompletableFuture<Void> commitTxn(TxnID txnId, 
Map<String,Long> properties) {
+
+        if (pendingAckMessagesMap != null && 
!this.pendingAckMessagesMap.containsKey(txnId)) {
+            String errorMsg = "[" + topicName + "][" + subName + "] 
Transaction with id:" + txnId + " not found.";
+            log.error(errorMsg);
+            throw new IllegalArgumentException(errorMsg);
+        }
+
+        CompletableFuture<Void> commitFuture = new CompletableFuture<>();
+        CompletableFuture<Void> deleteFuture = new CompletableFuture<>();
+        CompletableFuture<Void> marketDeleteFuture = new CompletableFuture<>();
+
+        MarkDeleteCallback markDeleteCallback = new MarkDeleteCallback() {
+            @Override
+            public void markDeleteComplete(Object ctx) {
+                PositionImpl pos = (PositionImpl) ctx;
+                if (log.isDebugEnabled()) {
+                    log.debug("[{}][{}] Mark deleted messages until position 
{}", topicName, subName, pos);
+                }
+                marketDeleteFuture.complete(null);
+            }
+
+            @Override
+            public void markDeleteFailed(ManagedLedgerException exception, 
Object ctx) {
+                if (log.isDebugEnabled()) {
+                    log.debug("[{}][{}] Failed to mark delete for position ", 
topicName, subName, ctx, exception);
+                }
+                marketDeleteFuture.completeExceptionally(exception);
+            }
+        };
+
+        DeleteCallback deleteCallback = new DeleteCallback() {
+            @Override
+            public void deleteComplete(Object position) {
+                if (log.isDebugEnabled()) {
+                    log.debug("[{}][{}] Deleted message at {}", topicName, 
subName, position);
+                }
+                deleteFuture.complete(null);
+            }
+
+            @Override
+            public void deleteFailed(ManagedLedgerException exception, Object 
ctx) {
+                if (log.isDebugEnabled()) {
+                    log.warn("[{}][{}] Failed to delete message at {}", 
topicName, subName, ctx, exception);
+                }
+                deleteFuture.completeExceptionally(exception);
+            }
+        };
+
+        // It's valid to create transaction then commit without doing any 
operation, which will cause
+        // pendingAckMessagesMap to be null.
+        ConcurrentOpenHashSet<Position> pendingAckMessageForCurrentTxn = 
pendingAckMessagesMap != null ?
+                                                
this.pendingAckMessagesMap.remove(txnId) : new ConcurrentOpenHashSet();
+        List<Position> positions = pendingAckMessageForCurrentTxn.values();
+        // Materialize all single acks.
+        cursor.asyncDelete(positions, deleteCallback, positions);
+        if (pendingAckMessages != null) {
+            positions.forEach(position -> 
this.pendingAckMessages.remove(position));
+        }
+
+        // Materialize cumulative ack.
+        cursor.asyncMarkDelete(this.pendingCumulativeAckMessage, (null == 
properties)?
+                Collections.emptyMap() : properties, markDeleteCallback, 
this.pendingCumulativeAckMessage);
+
+        // Reset txdID and position for cumulative ack.
+        PENDING_CUMULATIVE_ACK_TXNID_UPDATER.set(this, null);
+        POSITION_UPDATER.set(this, null);
+        deleteFuture.runAfterBoth(marketDeleteFuture, () -> 
commitFuture.complete(null))
+                    .exceptionally((exception) -> {
+                        commitFuture.completeExceptionally(exception);
+                        return null;
+                    });
+
+        return commitFuture;
+    }
+
+    /**
+     * Abort a transaction.
+     *
+     * @param txnId  {@link TxnID} to identify the transaction.
+     * @throws IllegalArgumentException if given {@link TxnID} is not found in 
this subscription.
+     */
+    public synchronized CompletableFuture<Void> abortTxn(TxnID txnId) {
 
 Review comment:
   is abortTxn need to redeliver messages the txnId contains? because if broker 
handle 
    redeliverUnacknowledgedMessages will skip the messages which is in a 
transaction, if abortTxn do not redeliver these messages, i think message will 
never be redelivery.
   
   I think need to call dispatcher.redeliverUnacknowledgedMessages(consumer, 
pendingAckMessagesMap.get(txnId))

----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
[email protected]


With regards,
Apache Git Services

Reply via email to