codelipenghui commented on a change in pull request #4265: 
[transaction][acknowledge] Introduce in-memory PENDING_ACK state in 
acknowledgement path
URL: https://github.com/apache/pulsar/pull/4265#discussion_r293298678
 
 

 ##########
 File path: 
pulsar-broker/src/main/java/org/apache/pulsar/broker/service/persistent/PersistentSubscription.java
 ##########
 @@ -723,6 +917,115 @@ void topicTerminated() {
         }
     }
 
+    /**
+     * Commit a transaction.
+     *
+     * @param txnId         {@link TxnID} to identify the transaction.
+     * @param properties    Additional user-defined properties that can be 
associated with a particular cursor position.
+     * @throws IllegalArgumentException if given {@link TxnID} is not found in 
this subscription.
+     */
+    public synchronized CompletableFuture<Void> commitTxn(TxnID txnId, 
Map<String,Long> properties) {
+
+        if (pendingAckMessagesMap != null && 
!this.pendingAckMessagesMap.containsKey(txnId)) {
+            String errorMsg = "[" + topicName + "][" + subName + "] 
Transaction with id:" + txnId + " not found.";
+            log.error(errorMsg);
+            throw new IllegalArgumentException(errorMsg);
+        }
+
+        CompletableFuture<Void> commitFuture = new CompletableFuture<>();
+        CompletableFuture<Void> deleteFuture = new CompletableFuture<>();
+        CompletableFuture<Void> marketDeleteFuture = new CompletableFuture<>();
+
+        MarkDeleteCallback markDeleteCallback = new MarkDeleteCallback() {
+            @Override
+            public void markDeleteComplete(Object ctx) {
+                PositionImpl pos = (PositionImpl) ctx;
+                if (log.isDebugEnabled()) {
+                    log.debug("[{}][{}] Mark deleted messages until position 
{}", topicName, subName, pos);
+                }
+                marketDeleteFuture.complete(null);
+            }
+
+            @Override
+            public void markDeleteFailed(ManagedLedgerException exception, 
Object ctx) {
+                if (log.isDebugEnabled()) {
+                    log.debug("[{}][{}] Failed to mark delete for position ", 
topicName, subName, ctx, exception);
+                }
+                marketDeleteFuture.completeExceptionally(exception);
+            }
+        };
+
+        DeleteCallback deleteCallback = new DeleteCallback() {
+            @Override
+            public void deleteComplete(Object position) {
+                if (log.isDebugEnabled()) {
+                    log.debug("[{}][{}] Deleted message at {}", topicName, 
subName, position);
+                }
+                deleteFuture.complete(null);
+            }
+
+            @Override
+            public void deleteFailed(ManagedLedgerException exception, Object 
ctx) {
+                if (log.isDebugEnabled()) {
+                    log.warn("[{}][{}] Failed to delete message at {}", 
topicName, subName, ctx, exception);
+                }
+                deleteFuture.completeExceptionally(exception);
+            }
+        };
+
+        // It's valid to create transaction then commit without doing any 
operation, which will cause
+        // pendingAckMessagesMap to be null.
+        ConcurrentOpenHashSet<Position> pendingAckMessageForCurrentTxn = 
pendingAckMessagesMap != null ?
+                                                
this.pendingAckMessagesMap.remove(txnId) : new ConcurrentOpenHashSet();
 
 Review comment:
   Is it necessary to create a new Set here?

----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
[email protected]


With regards,
Apache Git Services

Reply via email to