eolivelli commented on code in PR #17577:
URL: https://github.com/apache/pulsar/pull/17577#discussion_r967633588


##########
pulsar-client-api/src/main/java/org/apache/pulsar/client/api/Consumer.java:
##########
@@ -390,6 +390,21 @@ void reconsumeLater(Message<?> message,
      */
     void acknowledgeCumulative(MessageId messageId) throws 
PulsarClientException;
 
+    /**
+     * Acknowledge the reception of all the messages in the stream up to (and 
including) the provided message for each topic or partition.
+     *
+     * <p>This method will block until the acknowledge has been sent to the 
brokers of each topic or partition.

Review Comment:
   This is not correct.
   Please remove this sentence at all.
   Acks are always sent best effort.
   Only of you do something in a transaction you can have such guarantees.



##########
pulsar-client/src/main/java/org/apache/pulsar/client/impl/ConsumerBase.java:
##########
@@ -616,6 +642,25 @@ public CompletableFuture<Void> 
acknowledgeCumulativeAsync(MessageId messageId, T
         return doAcknowledgeWithTxn(messageId, AckType.Cumulative, 
Collections.emptyMap(), txnImpl);
     }
 
+    protected CompletableFuture<Void> acknowledgeCumulativeAsync(Map<String, 
MessageId> topicToMessageIdMap) throws PulsarClientException {
+        return acknowledgeCumulativeAsync(topicToMessageIdMap, null);
+    }
+
+    protected CompletableFuture<Void> acknowledgeCumulativeAsync(Map<String, 
MessageId> topicToMessageIdMap, Transaction txn) throws PulsarClientException {

Review Comment:
   It looks like the keys are useless.
   So why do we need this API?



##########
pulsar-client/src/main/java/org/apache/pulsar/client/impl/ConsumerBase.java:
##########
@@ -616,6 +642,25 @@ public CompletableFuture<Void> 
acknowledgeCumulativeAsync(MessageId messageId, T
         return doAcknowledgeWithTxn(messageId, AckType.Cumulative, 
Collections.emptyMap(), txnImpl);
     }
 
+    protected CompletableFuture<Void> acknowledgeCumulativeAsync(Map<String, 
MessageId> topicToMessageIdMap) throws PulsarClientException {
+        return acknowledgeCumulativeAsync(topicToMessageIdMap, null);
+    }
+
+    protected CompletableFuture<Void> acknowledgeCumulativeAsync(Map<String, 
MessageId> topicToMessageIdMap, Transaction txn) throws PulsarClientException {
+        validateTopicToMessageIdMap(topicToMessageIdMap);
+        if (!isCumulativeAcknowledgementAllowed(conf.getSubscriptionType())) {
+            return FutureUtil.failedFuture(new 
PulsarClientException.InvalidConfigurationException(
+                    "Cannot use cumulative acks on a 
non-exclusive/non-failover subscription"));
+        }
+
+        TransactionImpl txnImpl = null;

Review Comment:
   It looks like we are not covering the TC branch with the new tests



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: [email protected]

For queries about this service, please contact Infrastructure at:
[email protected]

Reply via email to