This is an automated email from the ASF dual-hosted git repository.

xyz pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/pulsar.git


The following commit(s) were added to refs/heads/master by this push:
     new 6a719480b2a [improve][client] Move acknowledge APIs to another 
interface and improve docs (#18519)
6a719480b2a is described below

commit 6a719480b2afc84f5acb85fedf3accf8861eb97a
Author: Yunze Xu <[email protected]>
AuthorDate: Tue Nov 29 12:38:57 2022 +0800

    [improve][client] Move acknowledge APIs to another interface and improve 
docs (#18519)
---
 .../org/apache/pulsar/client/api/Consumer.java     | 228 +--------------------
 .../pulsar/client/api/MessageAcknowledger.java     | 156 ++++++++++++++
 2 files changed, 157 insertions(+), 227 deletions(-)

diff --git 
a/pulsar-client-api/src/main/java/org/apache/pulsar/client/api/Consumer.java 
b/pulsar-client-api/src/main/java/org/apache/pulsar/client/api/Consumer.java
index f727f21689b..3fbab236a60 100644
--- a/pulsar-client-api/src/main/java/org/apache/pulsar/client/api/Consumer.java
+++ b/pulsar-client-api/src/main/java/org/apache/pulsar/client/api/Consumer.java
@@ -19,12 +19,10 @@
 package org.apache.pulsar.client.api;
 
 import java.io.Closeable;
-import java.util.List;
 import java.util.Map;
 import java.util.concurrent.CompletableFuture;
 import java.util.concurrent.TimeUnit;
 import java.util.function.Function;
-import org.apache.pulsar.client.api.transaction.Transaction;
 import org.apache.pulsar.common.classification.InterfaceAudience;
 import org.apache.pulsar.common.classification.InterfaceStability;
 
@@ -35,7 +33,7 @@ import 
org.apache.pulsar.common.classification.InterfaceStability;
  */
 @InterfaceAudience.Public
 @InterfaceStability.Stable
-public interface Consumer<T> extends Closeable {
+public interface Consumer<T> extends Closeable, MessageAcknowledger {
 
     /**
      * Get a topic for the consumer.
@@ -156,42 +154,6 @@ public interface Consumer<T> extends Closeable {
      */
     CompletableFuture<Messages<T>> batchReceiveAsync();
 
-    /**
-     * Acknowledge the consumption of a single message.
-     *
-     * @param message
-     *            The {@code Message} to be acknowledged
-     * @throws PulsarClientException.AlreadyClosedException
-     *             if the consumer was already closed
-     */
-    void acknowledge(Message<?> message) throws PulsarClientException;
-
-    /**
-     * Acknowledge the consumption of a single message, identified by its 
{@link MessageId}.
-     *
-     * @param messageId
-     *            The {@link MessageId} to be acknowledged
-     * @throws PulsarClientException.AlreadyClosedException
-     *             if the consumer was already closed
-     */
-    void acknowledge(MessageId messageId) throws PulsarClientException;
-
-    /**
-     * Acknowledge the consumption of {@link Messages}.
-     *
-     * @param messages messages
-     * @throws PulsarClientException.AlreadyClosedException
-     *              if the consumer was already closed
-     */
-    void acknowledge(Messages<?> messages) throws PulsarClientException;
-
-    /**
-     * Acknowledge the consumption of a list of message.
-     * @param messageIdList
-     * @throws PulsarClientException
-     */
-    void acknowledge(List<MessageId> messageIdList) throws 
PulsarClientException;
-
     /**
      * Acknowledge the failure to process a single message.
      *
@@ -356,78 +318,6 @@ public interface Consumer<T> extends Closeable {
      */
     void reconsumeLater(Messages<?> messages, long delayTime, TimeUnit unit) 
throws PulsarClientException;
 
-    /**
-     * Acknowledge the reception of all the messages in the stream up to (and 
including) the provided message.
-     *
-     * <p>This method will block until the acknowledge has been sent to the 
broker. After that, the messages will not be
-     * re-delivered to this consumer.
-     *
-     * <p>Cumulative acknowledge cannot be used when the consumer type is set 
to ConsumerShared.
-     *
-     * <p>It's equivalent to calling asyncAcknowledgeCumulative(Message) and 
waiting for the callback to be triggered.
-     *
-     * @param message
-     *            The {@code Message} to be cumulatively acknowledged
-     * @throws PulsarClientException.AlreadyClosedException
-     *             if the consumer was already closed
-     */
-    void acknowledgeCumulative(Message<?> message) throws 
PulsarClientException;
-
-    /**
-     * Acknowledge the reception of all the messages in the stream up to (and 
including) the provided message.
-     *
-     * <p>This method will block until the acknowledge has been sent to the 
broker. After that, the messages will not be
-     * re-delivered to this consumer.
-     *
-     * <p>Cumulative acknowledge cannot be used when the consumer type is set 
to ConsumerShared.
-     *
-     * <p>It's equivalent to calling asyncAcknowledgeCumulative(MessageId) and 
waiting for the callback to be triggered.
-     *
-     * @param messageId
-     *            The {@code MessageId} to be cumulatively acknowledged
-     * @throws PulsarClientException.AlreadyClosedException
-     *             if the consumer was already closed
-     */
-    void acknowledgeCumulative(MessageId messageId) throws 
PulsarClientException;
-
-    /**
-     * Acknowledge the reception of all the messages in the stream up to (and 
including) the provided message with this
-     * transaction, it will store in transaction pending ack.
-     *
-     * <p>After the transaction commit, the end of previous transaction acked 
message until this transaction
-     * acked message will actually ack.
-     *
-     * <p>After the transaction abort, the end of previous transaction acked 
message until this transaction
-     * acked message will be redelivered to this consumer.
-     *
-     * <p>Cumulative acknowledge with transaction only support cumulative ack 
and now have not support individual and
-     * cumulative ack sharing.
-     *
-     * <p>If cumulative ack with a transaction success, we can cumulative ack 
messageId with the same transaction
-     * more than previous messageId.
-     *
-     * <p>It will not be allowed to cumulative ack with a transaction 
different from the previous one when the previous
-     * transaction haven't commit or abort.
-     *
-     * <p>Cumulative acknowledge cannot be used when the consumer type is set 
to ConsumerShared.
-     *
-     * @param messageId
-     *            The {@code MessageId} to be cumulatively acknowledged
-     * @param txn {@link Transaction} the transaction to cumulative ack
-     * @throws PulsarClientException.AlreadyClosedException
-     *             if the consumer was already closed
-     * @throws 
org.apache.pulsar.client.api.PulsarClientException.TransactionConflictException
-     *             if the ack with messageId is less than the messageId in 
pending ack state or ack with transaction is
-     *             different from the transaction in pending ack.
-     * @throws 
org.apache.pulsar.client.api.PulsarClientException.NotAllowedException
-     *             broker don't support transaction
-     * @return {@link CompletableFuture} the future of the ack result
-     *
-     * @since 2.7.0
-     */
-    CompletableFuture<Void> acknowledgeCumulativeAsync(MessageId messageId,
-                                                       Transaction txn);
-
     /**
      * reconsumeLater the reception of all the messages in the stream up to 
(and including) the provided message.
      *
@@ -442,98 +332,6 @@ public interface Consumer<T> extends Closeable {
      */
     void reconsumeLaterCumulative(Message<?> message, long delayTime, TimeUnit 
unit) throws PulsarClientException;
 
-    /**
-     * Asynchronously acknowledge the consumption of a single message.
-     *
-     * @param message
-     *            The {@code Message} to be acknowledged
-     * @return a future that can be used to track the completion of the 
operation
-     */
-    CompletableFuture<Void> acknowledgeAsync(Message<?> message);
-
-    /**
-     * Asynchronously acknowledge the consumption of a single message.
-     *
-     * @param messageId
-     *            The {@code MessageId} to be acknowledged
-     * @return a future that can be used to track the completion of the 
operation
-     */
-    CompletableFuture<Void> acknowledgeAsync(MessageId messageId);
-
-    /**
-     * Asynchronously acknowledge the consumption of a single message, it will 
store in pending ack.
-     *
-     * <p>After the transaction commit, the message will actually ack.
-     *
-     * <p>After the transaction abort, the message will be redelivered.
-     *
-     * @param messageId {@link MessageId} to be individual acknowledged
-     * @param txn {@link Transaction} the transaction to cumulative ack
-     * @throws PulsarClientException.AlreadyClosedException
-     *             if the consumer was already closed
-     * @throws 
org.apache.pulsar.client.api.PulsarClientException.TransactionConflictException
-     *             if the ack with messageId has been acked by another 
transaction
-     * @throws 
org.apache.pulsar.client.api.PulsarClientException.NotAllowedException
-     *             broker don't support transaction
-     *             don't find batch size in consumer pending ack
-     * @return {@link CompletableFuture} the future of the ack result
-     *
-     * @since 2.7.0
-     */
-    CompletableFuture<Void> acknowledgeAsync(MessageId messageId, Transaction 
txn);
-
-    /**
-     * Asynchronously acknowledge the consumption of {@link Messages}.
-     *
-     * @param messages
-     *            The {@link Messages} to be acknowledged
-     * @return a future that can be used to track the completion of the 
operation
-     */
-    CompletableFuture<Void> acknowledgeAsync(Messages<?> messages);
-
-
-    /**
-     * Asynchronously acknowledge the consumption of {@link Messages}, it will 
store in pending ack.
-     * After the transaction commit, the message will actually ack.
-     * After the transaction abort, the message will be redelivered.
-     * @param messages
-     *            The {@link Messages} to be acknowledged
-     * @param txn {@link Transaction} The transaction to ack messages.
-     * @throws PulsarClientException.AlreadyClosedException
-     *             if the consumer was already closed
-     * @throws 
org.apache.pulsar.client.api.PulsarClientException.TransactionConflictException
-     *             if the ack with messageId is less than the messageId in 
pending ack state or ack with transaction is
-     *             different from the transaction in pending ack.
-     * @throws 
org.apache.pulsar.client.api.PulsarClientException.NotAllowedException
-     *             broker don't support transaction
-     * @return {@link CompletableFuture} the future of the ack result
-     * */
-    CompletableFuture<Void> acknowledgeAsync(Messages<?> messages, Transaction 
txn);
-
-
-    /**
-     * Asynchronously acknowledge the consumption of a list of message.
-     * @param messageIdList
-     * @return
-     */
-    CompletableFuture<Void> acknowledgeAsync(List<MessageId> messageIdList);
-
-
-    /**
-     * Acknowledge the consumption of a list of message, it will store in 
pending ack.
-     * After the transaction commit, the message will actually ack.
-     * After the transaction abort, the message will be redelivered.
-     * @param messageIdList A list of message Id.
-     * @param txn {@link Transaction} The transaction to ack messages.
-     * @throws PulsarClientException.AlreadyClosedException
-     *             if the consumer was already closed
-     * @throws 
org.apache.pulsar.client.api.PulsarClientException.TransactionConflictException
-     *             if the ack with messageId is less than the messageId in 
pending ack state or ack with transaction is
-     *             different from the transaction in pending ack.
-     * @throws 
org.apache.pulsar.client.api.PulsarClientException.NotAllowedException
-     *             broker don't support transaction
-     * @return {@link CompletableFuture} the future of the ack result     */
-    CompletableFuture<Void> acknowledgeAsync(List<MessageId> messageIdList, 
Transaction txn);
 
     /**
      * Asynchronously reconsumeLater the consumption of a single message.
@@ -578,30 +376,6 @@ public interface Consumer<T> extends Closeable {
      */
     CompletableFuture<Void> reconsumeLaterAsync(Messages<?> messages, long 
delayTime, TimeUnit unit);
 
-    /**
-     * Asynchronously Acknowledge the reception of all the messages in the 
stream up to (and including) the provided
-     * message.
-     *
-     * <p>Cumulative acknowledge cannot be used when the consumer type is set 
to ConsumerShared.
-     *
-     * @param message
-     *            The {@code Message} to be cumulatively acknowledged
-     * @return a future that can be used to track the completion of the 
operation
-     */
-    CompletableFuture<Void> acknowledgeCumulativeAsync(Message<?> message);
-
-    /**
-     * Asynchronously Acknowledge the reception of all the messages in the 
stream up to (and including) the provided
-     * message.
-     *
-     * <p>Cumulative acknowledge cannot be used when the consumer type is set 
to ConsumerShared.
-     *
-     * @param messageId
-     *            The {@code MessageId} to be cumulatively acknowledged
-     * @return a future that can be used to track the completion of the 
operation
-     */
-    CompletableFuture<Void> acknowledgeCumulativeAsync(MessageId messageId);
-
     /**
      * Asynchronously ReconsumeLater the reception of all the messages in the 
stream up to (and including) the provided
      * message.
diff --git 
a/pulsar-client-api/src/main/java/org/apache/pulsar/client/api/MessageAcknowledger.java
 
b/pulsar-client-api/src/main/java/org/apache/pulsar/client/api/MessageAcknowledger.java
new file mode 100644
index 00000000000..c0a53983c5a
--- /dev/null
+++ 
b/pulsar-client-api/src/main/java/org/apache/pulsar/client/api/MessageAcknowledger.java
@@ -0,0 +1,156 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.pulsar.client.api;
+
+import java.util.List;
+import java.util.concurrent.CompletableFuture;
+import org.apache.pulsar.client.api.transaction.Transaction;
+
+/**
+ * The interface to acknowledge one or more messages individually or 
cumulatively.
+ * <p>
+ * It contains two methods of various overloads:
+ * - `acknowledge`: acknowledge individually
+ * - `acknowledgeCumulative`: acknowledge cumulatively
+ * Each of them has an associated asynchronous API that has the "Async" suffix 
in the name.
+ * <p>
+ * The 1st method parameter is
+ * - {@link MessageId} or {@link Message} when acknowledging a single message
+ * - {@link java.util.List<MessageId>} or {@link Messages} when acknowledging 
multiple messages
+ * <p>
+ * The 2nd method parameter is optional. Specify a non-null {@link 
Transaction} instance for transaction usages:
+ * - After the transaction is committed, the message will be actually 
acknowledged (individually or cumulatively).
+ * - After the transaction is aborted, the message will be redelivered.
+ * @see Transaction#commit()
+ * @see Transaction#abort()
+ */
+public interface MessageAcknowledger {
+
+    /**
+     * Acknowledge the consumption of a single message.
+     *
+     * @param messageId {@link MessageId} to be individual acknowledged
+     *
+     * @throws PulsarClientException.AlreadyClosedException}
+     *             if the consumer was already closed
+     */
+    void acknowledge(MessageId messageId) throws PulsarClientException;
+
+    default void acknowledge(Message<?> message) throws PulsarClientException {
+        acknowledge(message.getMessageId());
+    }
+
+    /**
+     * Acknowledge the consumption of a list of message.
+     * @param messageIdList the list of message IDs.
+     */
+    void acknowledge(List<MessageId> messageIdList) throws 
PulsarClientException;
+
+    default void acknowledge(Messages<?> messages) throws 
PulsarClientException {
+        for (Message<?> message : messages) {
+            acknowledge(message.getMessageId());
+        }
+    }
+
+    /**
+     * Acknowledge the reception of all the messages in the stream up to (and 
including) the provided message.
+     *
+     * <p>This method will block until the acknowledge has been sent to the 
broker. After that, the messages will not be
+     * re-delivered to this consumer.
+     *
+     * <p>Cumulative acknowledge cannot be used when the consumer type is set 
to ConsumerShared.
+     *
+     * <p>It's equivalent to calling asyncAcknowledgeCumulative(MessageId) and 
waiting for the callback to be triggered.
+     *
+     * @param messageId
+     *            The {@code MessageId} to be cumulatively acknowledged
+     * @throws PulsarClientException.AlreadyClosedException
+     *             if the consumer was already closed
+     */
+    void acknowledgeCumulative(MessageId messageId) throws 
PulsarClientException;
+
+    default void acknowledgeCumulative(Message<?> message) throws 
PulsarClientException {
+        acknowledgeCumulative(message.getMessageId());
+    }
+
+    /**
+     * The asynchronous version of {@link #acknowledge(MessageId)} with 
transaction support.
+     */
+    CompletableFuture<Void> acknowledgeAsync(MessageId messageId, Transaction 
txn);
+
+    /**
+     * The asynchronous version of {@link #acknowledge(MessageId)}.
+     */
+    default CompletableFuture<Void> acknowledgeAsync(MessageId messageId) {
+        return acknowledgeAsync(messageId, null);
+    }
+
+    /**
+     * The asynchronous version of {@link #acknowledge(List)} with transaction 
support.
+     */
+    CompletableFuture<Void> acknowledgeAsync(List<MessageId> messageIdList, 
Transaction txn);
+
+    /**
+     * The asynchronous version of {@link #acknowledge(List)}.
+     */
+    CompletableFuture<Void> acknowledgeAsync(List<MessageId> messageIdList);
+
+    /**
+     * The asynchronous version of {@link #acknowledge(Message)}.
+     */
+    CompletableFuture<Void> acknowledgeAsync(Message<?> message);
+
+    /**
+     * The asynchronous version of {@link #acknowledge(Messages)}.
+     */
+    CompletableFuture<Void> acknowledgeAsync(Messages<?> messages);
+
+    /**
+     * The asynchronous version of {@link #acknowledge(Messages)} with 
transaction support.
+     */
+    CompletableFuture<Void> acknowledgeAsync(Messages<?> messages, Transaction 
txn);
+
+    /**
+     * The asynchronous version of {@link #acknowledgeCumulative(MessageId)} 
with transaction support.
+     *
+     * @apiNote It's not allowed to cumulative ack with a transaction 
different from the previous one when the previous
+     * transaction is not committed or aborted.
+     * @apiNote It cannot be used for {@link SubscriptionType#Shared} 
subscription.
+     *
+     * @param messageId
+     *            The {@code MessageId} to be cumulatively acknowledged
+     * @param txn {@link Transaction} the transaction to cumulative ack
+     */
+    CompletableFuture<Void> acknowledgeCumulativeAsync(MessageId messageId,
+                                                       Transaction txn);
+
+    /**
+     * The asynchronous version of {@link #acknowledgeCumulative(Message)}.
+     */
+    default CompletableFuture<Void> acknowledgeCumulativeAsync(Message<?> 
message) {
+        return acknowledgeCumulativeAsync(message.getMessageId());
+    }
+
+    /**
+     * The asynchronous version of {@link #acknowledgeCumulative(MessageId)}.
+     */
+    default CompletableFuture<Void> acknowledgeCumulativeAsync(MessageId 
messageId) {
+        return acknowledgeCumulativeAsync(messageId, null);
+    }
+}

Reply via email to