congbobo184 commented on code in PR #15729:
URL: https://github.com/apache/pulsar/pull/15729#discussion_r887634742


##########
pulsar-client-api/src/main/java/org/apache/pulsar/client/api/Consumer.java:
##########
@@ -414,13 +414,6 @@ void reconsumeLater(Message<?> message,
      * @param messageId
      *            The {@code MessageId} to be cumulatively acknowledged
      * @param txn {@link Transaction} the transaction to cumulative ack
-     * @throws PulsarClientException.AlreadyClosedException
-     *             if the consumer was already closed
-     * @throws 
org.apache.pulsar.client.api.PulsarClientException.TransactionConflictException
-     *             if the ack with messageId is less than the messageId in 
pending ack state or ack with transaction is
-     *             different from the transaction in pending ack.
-     * @throws 
org.apache.pulsar.client.api.PulsarClientException.NotAllowedException
-     *             broker don't support transaction

Review Comment:
   don't delete this



##########
pulsar-client/src/main/java/org/apache/pulsar/client/impl/ConsumerImpl.java:
##########
@@ -578,11 +579,32 @@ protected CompletableFuture<Void> doAcknowledge(MessageId 
messageId, AckType ack
     }
 
     @Override
-    protected CompletableFuture<Void> doAcknowledge(List<MessageId> 
messageIdList,
-                                                    AckType ackType,
-                                                    Map<String, Long> 
properties,
-                                                    TransactionImpl txn) {
-        return 
this.acknowledgmentsGroupingTracker.addListAcknowledgment(messageIdList, 
ackType, properties);
+    protected CompletableFuture<Void> doAcknowledge(List<MessageId> 
messageIdList, AckType ackType,
+                                                    Map<String, Long> 
properties, TransactionImpl txn) {
+
+        for (MessageId messageId : messageIdList) {
+            checkArgument(messageId instanceof MessageIdImpl);
+        }
+        if (getState() != State.Ready && getState() != State.Connecting) {
+            stats.incrementNumAcksFailed();
+            PulsarClientException exception = new 
PulsarClientException("Consumer not ready. State: " + getState());
+            if (AckType.Individual.equals(ackType)) {
+                onAcknowledge(messageIdList, exception);
+            } else if (AckType.Cumulative.equals(ackType)) {
+                onAcknowledgeCumulative(messageIdList, exception);
+            }
+            return FutureUtil.failedFuture(exception);
+        }
+        if (txn != null) {
+            List<CompletableFuture<Void>> completableFutures = new 
LinkedList<>();
+            for (MessageId messageId : messageIdList) {
+                
completableFutures.add(doTransactionAcknowledgeForResponse(messageId, ackType, 
null,

Review Comment:
   the proto support send messgeIdList, so you should rewrite a command is 
better.
   
https://github.com/apache/pulsar/blob/9b01d1c472aa5cde5205ad86fa3b8f998da95f35/pulsar-common/src/main/proto/PulsarApi.proto#L553



##########
pulsar-client-api/src/main/java/org/apache/pulsar/client/api/Consumer.java:
##########
@@ -469,13 +462,6 @@ CompletableFuture<Void> 
acknowledgeCumulativeAsync(MessageId messageId,
      *
      * @param messageId {@link MessageId} to be individual acknowledged
      * @param txn {@link Transaction} the transaction to cumulative ack
-     * @throws PulsarClientException.AlreadyClosedException
-     *             if the consumer was already closed
-     * @throws 
org.apache.pulsar.client.api.PulsarClientException.TransactionConflictException
-     *             if the ack with messageId has been acked by another 
transaction
-     * @throws 
org.apache.pulsar.client.api.PulsarClientException.NotAllowedException
-     *             broker don't support transaction
-     *             don't find batch size in consumer pending ack

Review Comment:
   if you want to delete this, please add the sync method in the consumer API



##########
pulsar-client/src/main/java/org/apache/pulsar/client/impl/ConsumerBase.java:
##########
@@ -606,12 +625,21 @@ protected CompletableFuture<Void> 
doAcknowledgeWithTxn(List<MessageId> messageId
                                                            Map<String, Long> 
properties,
                                                            TransactionImpl 
txn) {
         CompletableFuture<Void> ackFuture;
-        if (txn != null) {
+        if (txn != null && this instanceof ConsumerImpl) {
+
+            // it is okay that we register acked topic after sending the 
acknowledgements. because
+            // the transactional ack will not be visiable for consumers until 
the transaction is
+            // committed
+            if (ackType == AckType.Cumulative) {
+                txn.registerCumulativeAckConsumer((ConsumerImpl<?>) this);
+            }
+
             ackFuture = txn.registerAckedTopic(getTopic(), subscription)
                     .thenCompose(ignored -> doAcknowledge(messageIdList, 
ackType, properties, txn));

Review Comment:
   we don't need regist ack in this code



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: [email protected]

For queries about this service, please contact Infrastructure at:
[email protected]

Reply via email to