tarunannapareddy commented on code in PR #17577:
URL: https://github.com/apache/pulsar/pull/17577#discussion_r967648342


##########
pulsar-client/src/main/java/org/apache/pulsar/client/impl/ConsumerBase.java:
##########
@@ -616,6 +642,25 @@ public CompletableFuture<Void> 
acknowledgeCumulativeAsync(MessageId messageId, T
         return doAcknowledgeWithTxn(messageId, AckType.Cumulative, 
Collections.emptyMap(), txnImpl);
     }
 
+    protected CompletableFuture<Void> acknowledgeCumulativeAsync(Map<String, 
MessageId> topicToMessageIdMap) throws PulsarClientException {
+        return acknowledgeCumulativeAsync(topicToMessageIdMap, null);
+    }
+
+    protected CompletableFuture<Void> acknowledgeCumulativeAsync(Map<String, 
MessageId> topicToMessageIdMap, Transaction txn) throws PulsarClientException {
+        validateTopicToMessageIdMap(topicToMessageIdMap);
+        if (!isCumulativeAcknowledgementAllowed(conf.getSubscriptionType())) {
+            return FutureUtil.failedFuture(new 
PulsarClientException.InvalidConfigurationException(
+                    "Cannot use cumulative acks on a 
non-exclusive/non-failover subscription"));
+        }
+
+        TransactionImpl txnImpl = null;

Review Comment:
   I am wary of the transaction usecase in cummulative ack over multiple 
partitions.
   1. I have removed acknowledgeCumulativeAsync(Map<String, MessageId> 
topicToMessageIdMap, Transaction txn) function i added, as it is not primary 
aim of this PR. please guide me if it is valid usecase. I will add support in 
Consumer.java Interface and write testcase
   2. I have added testcase for negative input case



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: [email protected]

For queries about this service, please contact Infrastructure at:
[email protected]

Reply via email to