MarvinCai commented on a change in pull request #4265: [transaction][acknowledge] Introduce PENDING_ACK state in acknowledgement path URL: https://github.com/apache/pulsar/pull/4265#discussion_r287216445
########## File path: pulsar-broker/src/main/java/org/apache/pulsar/broker/service/persistent/PersistentSubscription.java ########## @@ -681,5 +862,66 @@ void topicTerminated() { } } + /** + * Commit a transaction. + * + * @param txnID {@link TxnID} to identify the transaction. + * @param properties Additional user-defined properties that can be associated with a particular cursor position. + * @throws IllegalArgumentException + * @throws TransactionConflictException + */ + public synchronized void commitTxn(TxnID txnID, Map<String,Long> properties) throws TransactionConflictException { + if (!this.pendingAckMessagesMap.containsKey(txnID)) { + String errorMsg = "[" + topicName + "][" + subName + "] Transaction with id:" + txnID.getMostSigBits() + + txnID.getLeastSigBits() + " not found."; + log.error(errorMsg); + throw new IllegalArgumentException(errorMsg); + } + + // This shouldn't happen. Review comment: After reconsideration, I think this check here doesn't really make sense. When client commit a transaction, it doesn't necessary to be the ongoing transaction that did cumulative ack on this subscription before. But we could add timeout check here, so when a transaction did cumulative ack but did commit within time, it's position will be removed. So other subsequent transaction can still use cumulative ack. ---------------------------------------------------------------- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org With regards, Apache Git Services