This is an automated email from the ASF dual-hosted git repository.
xyz pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/pulsar.git
The following commit(s) were added to refs/heads/master by this push:
new 6a719480b2a [improve][client] Move acknowledge APIs to another
interface and improve docs (#18519)
6a719480b2a is described below
commit 6a719480b2afc84f5acb85fedf3accf8861eb97a
Author: Yunze Xu <[email protected]>
AuthorDate: Tue Nov 29 12:38:57 2022 +0800
[improve][client] Move acknowledge APIs to another interface and improve
docs (#18519)
---
.../org/apache/pulsar/client/api/Consumer.java | 228 +--------------------
.../pulsar/client/api/MessageAcknowledger.java | 156 ++++++++++++++
2 files changed, 157 insertions(+), 227 deletions(-)
diff --git
a/pulsar-client-api/src/main/java/org/apache/pulsar/client/api/Consumer.java
b/pulsar-client-api/src/main/java/org/apache/pulsar/client/api/Consumer.java
index f727f21689b..3fbab236a60 100644
--- a/pulsar-client-api/src/main/java/org/apache/pulsar/client/api/Consumer.java
+++ b/pulsar-client-api/src/main/java/org/apache/pulsar/client/api/Consumer.java
@@ -19,12 +19,10 @@
package org.apache.pulsar.client.api;
import java.io.Closeable;
-import java.util.List;
import java.util.Map;
import java.util.concurrent.CompletableFuture;
import java.util.concurrent.TimeUnit;
import java.util.function.Function;
-import org.apache.pulsar.client.api.transaction.Transaction;
import org.apache.pulsar.common.classification.InterfaceAudience;
import org.apache.pulsar.common.classification.InterfaceStability;
@@ -35,7 +33,7 @@ import
org.apache.pulsar.common.classification.InterfaceStability;
*/
@InterfaceAudience.Public
@InterfaceStability.Stable
-public interface Consumer<T> extends Closeable {
+public interface Consumer<T> extends Closeable, MessageAcknowledger {
/**
* Get a topic for the consumer.
@@ -156,42 +154,6 @@ public interface Consumer<T> extends Closeable {
*/
CompletableFuture<Messages<T>> batchReceiveAsync();
- /**
- * Acknowledge the consumption of a single message.
- *
- * @param message
- * The {@code Message} to be acknowledged
- * @throws PulsarClientException.AlreadyClosedException
- * if the consumer was already closed
- */
- void acknowledge(Message<?> message) throws PulsarClientException;
-
- /**
- * Acknowledge the consumption of a single message, identified by its
{@link MessageId}.
- *
- * @param messageId
- * The {@link MessageId} to be acknowledged
- * @throws PulsarClientException.AlreadyClosedException
- * if the consumer was already closed
- */
- void acknowledge(MessageId messageId) throws PulsarClientException;
-
- /**
- * Acknowledge the consumption of {@link Messages}.
- *
- * @param messages messages
- * @throws PulsarClientException.AlreadyClosedException
- * if the consumer was already closed
- */
- void acknowledge(Messages<?> messages) throws PulsarClientException;
-
- /**
- * Acknowledge the consumption of a list of message.
- * @param messageIdList
- * @throws PulsarClientException
- */
- void acknowledge(List<MessageId> messageIdList) throws
PulsarClientException;
-
/**
* Acknowledge the failure to process a single message.
*
@@ -356,78 +318,6 @@ public interface Consumer<T> extends Closeable {
*/
void reconsumeLater(Messages<?> messages, long delayTime, TimeUnit unit)
throws PulsarClientException;
- /**
- * Acknowledge the reception of all the messages in the stream up to (and
including) the provided message.
- *
- * <p>This method will block until the acknowledge has been sent to the
broker. After that, the messages will not be
- * re-delivered to this consumer.
- *
- * <p>Cumulative acknowledge cannot be used when the consumer type is set
to ConsumerShared.
- *
- * <p>It's equivalent to calling asyncAcknowledgeCumulative(Message) and
waiting for the callback to be triggered.
- *
- * @param message
- * The {@code Message} to be cumulatively acknowledged
- * @throws PulsarClientException.AlreadyClosedException
- * if the consumer was already closed
- */
- void acknowledgeCumulative(Message<?> message) throws
PulsarClientException;
-
- /**
- * Acknowledge the reception of all the messages in the stream up to (and
including) the provided message.
- *
- * <p>This method will block until the acknowledge has been sent to the
broker. After that, the messages will not be
- * re-delivered to this consumer.
- *
- * <p>Cumulative acknowledge cannot be used when the consumer type is set
to ConsumerShared.
- *
- * <p>It's equivalent to calling asyncAcknowledgeCumulative(MessageId) and
waiting for the callback to be triggered.
- *
- * @param messageId
- * The {@code MessageId} to be cumulatively acknowledged
- * @throws PulsarClientException.AlreadyClosedException
- * if the consumer was already closed
- */
- void acknowledgeCumulative(MessageId messageId) throws
PulsarClientException;
-
- /**
- * Acknowledge the reception of all the messages in the stream up to (and
including) the provided message with this
- * transaction, it will store in transaction pending ack.
- *
- * <p>After the transaction commit, the end of previous transaction acked
message until this transaction
- * acked message will actually ack.
- *
- * <p>After the transaction abort, the end of previous transaction acked
message until this transaction
- * acked message will be redelivered to this consumer.
- *
- * <p>Cumulative acknowledge with transaction only support cumulative ack
and now have not support individual and
- * cumulative ack sharing.
- *
- * <p>If cumulative ack with a transaction success, we can cumulative ack
messageId with the same transaction
- * more than previous messageId.
- *
- * <p>It will not be allowed to cumulative ack with a transaction
different from the previous one when the previous
- * transaction haven't commit or abort.
- *
- * <p>Cumulative acknowledge cannot be used when the consumer type is set
to ConsumerShared.
- *
- * @param messageId
- * The {@code MessageId} to be cumulatively acknowledged
- * @param txn {@link Transaction} the transaction to cumulative ack
- * @throws PulsarClientException.AlreadyClosedException
- * if the consumer was already closed
- * @throws
org.apache.pulsar.client.api.PulsarClientException.TransactionConflictException
- * if the ack with messageId is less than the messageId in
pending ack state or ack with transaction is
- * different from the transaction in pending ack.
- * @throws
org.apache.pulsar.client.api.PulsarClientException.NotAllowedException
- * broker don't support transaction
- * @return {@link CompletableFuture} the future of the ack result
- *
- * @since 2.7.0
- */
- CompletableFuture<Void> acknowledgeCumulativeAsync(MessageId messageId,
- Transaction txn);
-
/**
* reconsumeLater the reception of all the messages in the stream up to
(and including) the provided message.
*
@@ -442,98 +332,6 @@ public interface Consumer<T> extends Closeable {
*/
void reconsumeLaterCumulative(Message<?> message, long delayTime, TimeUnit
unit) throws PulsarClientException;
- /**
- * Asynchronously acknowledge the consumption of a single message.
- *
- * @param message
- * The {@code Message} to be acknowledged
- * @return a future that can be used to track the completion of the
operation
- */
- CompletableFuture<Void> acknowledgeAsync(Message<?> message);
-
- /**
- * Asynchronously acknowledge the consumption of a single message.
- *
- * @param messageId
- * The {@code MessageId} to be acknowledged
- * @return a future that can be used to track the completion of the
operation
- */
- CompletableFuture<Void> acknowledgeAsync(MessageId messageId);
-
- /**
- * Asynchronously acknowledge the consumption of a single message, it will
store in pending ack.
- *
- * <p>After the transaction commit, the message will actually ack.
- *
- * <p>After the transaction abort, the message will be redelivered.
- *
- * @param messageId {@link MessageId} to be individual acknowledged
- * @param txn {@link Transaction} the transaction to cumulative ack
- * @throws PulsarClientException.AlreadyClosedException
- * if the consumer was already closed
- * @throws
org.apache.pulsar.client.api.PulsarClientException.TransactionConflictException
- * if the ack with messageId has been acked by another
transaction
- * @throws
org.apache.pulsar.client.api.PulsarClientException.NotAllowedException
- * broker don't support transaction
- * don't find batch size in consumer pending ack
- * @return {@link CompletableFuture} the future of the ack result
- *
- * @since 2.7.0
- */
- CompletableFuture<Void> acknowledgeAsync(MessageId messageId, Transaction
txn);
-
- /**
- * Asynchronously acknowledge the consumption of {@link Messages}.
- *
- * @param messages
- * The {@link Messages} to be acknowledged
- * @return a future that can be used to track the completion of the
operation
- */
- CompletableFuture<Void> acknowledgeAsync(Messages<?> messages);
-
-
- /**
- * Asynchronously acknowledge the consumption of {@link Messages}, it will
store in pending ack.
- * After the transaction commit, the message will actually ack.
- * After the transaction abort, the message will be redelivered.
- * @param messages
- * The {@link Messages} to be acknowledged
- * @param txn {@link Transaction} The transaction to ack messages.
- * @throws PulsarClientException.AlreadyClosedException
- * if the consumer was already closed
- * @throws
org.apache.pulsar.client.api.PulsarClientException.TransactionConflictException
- * if the ack with messageId is less than the messageId in
pending ack state or ack with transaction is
- * different from the transaction in pending ack.
- * @throws
org.apache.pulsar.client.api.PulsarClientException.NotAllowedException
- * broker don't support transaction
- * @return {@link CompletableFuture} the future of the ack result
- * */
- CompletableFuture<Void> acknowledgeAsync(Messages<?> messages, Transaction
txn);
-
-
- /**
- * Asynchronously acknowledge the consumption of a list of message.
- * @param messageIdList
- * @return
- */
- CompletableFuture<Void> acknowledgeAsync(List<MessageId> messageIdList);
-
-
- /**
- * Acknowledge the consumption of a list of message, it will store in
pending ack.
- * After the transaction commit, the message will actually ack.
- * After the transaction abort, the message will be redelivered.
- * @param messageIdList A list of message Id.
- * @param txn {@link Transaction} The transaction to ack messages.
- * @throws PulsarClientException.AlreadyClosedException
- * if the consumer was already closed
- * @throws
org.apache.pulsar.client.api.PulsarClientException.TransactionConflictException
- * if the ack with messageId is less than the messageId in
pending ack state or ack with transaction is
- * different from the transaction in pending ack.
- * @throws
org.apache.pulsar.client.api.PulsarClientException.NotAllowedException
- * broker don't support transaction
- * @return {@link CompletableFuture} the future of the ack result */
- CompletableFuture<Void> acknowledgeAsync(List<MessageId> messageIdList,
Transaction txn);
/**
* Asynchronously reconsumeLater the consumption of a single message.
@@ -578,30 +376,6 @@ public interface Consumer<T> extends Closeable {
*/
CompletableFuture<Void> reconsumeLaterAsync(Messages<?> messages, long
delayTime, TimeUnit unit);
- /**
- * Asynchronously Acknowledge the reception of all the messages in the
stream up to (and including) the provided
- * message.
- *
- * <p>Cumulative acknowledge cannot be used when the consumer type is set
to ConsumerShared.
- *
- * @param message
- * The {@code Message} to be cumulatively acknowledged
- * @return a future that can be used to track the completion of the
operation
- */
- CompletableFuture<Void> acknowledgeCumulativeAsync(Message<?> message);
-
- /**
- * Asynchronously Acknowledge the reception of all the messages in the
stream up to (and including) the provided
- * message.
- *
- * <p>Cumulative acknowledge cannot be used when the consumer type is set
to ConsumerShared.
- *
- * @param messageId
- * The {@code MessageId} to be cumulatively acknowledged
- * @return a future that can be used to track the completion of the
operation
- */
- CompletableFuture<Void> acknowledgeCumulativeAsync(MessageId messageId);
-
/**
* Asynchronously ReconsumeLater the reception of all the messages in the
stream up to (and including) the provided
* message.
diff --git
a/pulsar-client-api/src/main/java/org/apache/pulsar/client/api/MessageAcknowledger.java
b/pulsar-client-api/src/main/java/org/apache/pulsar/client/api/MessageAcknowledger.java
new file mode 100644
index 00000000000..c0a53983c5a
--- /dev/null
+++
b/pulsar-client-api/src/main/java/org/apache/pulsar/client/api/MessageAcknowledger.java
@@ -0,0 +1,156 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.pulsar.client.api;
+
+import java.util.List;
+import java.util.concurrent.CompletableFuture;
+import org.apache.pulsar.client.api.transaction.Transaction;
+
+/**
+ * The interface to acknowledge one or more messages individually or
cumulatively.
+ * <p>
+ * It contains two methods of various overloads:
+ * - `acknowledge`: acknowledge individually
+ * - `acknowledgeCumulative`: acknowledge cumulatively
+ * Each of them has an associated asynchronous API that has the "Async" suffix
in the name.
+ * <p>
+ * The 1st method parameter is
+ * - {@link MessageId} or {@link Message} when acknowledging a single message
+ * - {@link java.util.List<MessageId>} or {@link Messages} when acknowledging
multiple messages
+ * <p>
+ * The 2nd method parameter is optional. Specify a non-null {@link
Transaction} instance for transaction usages:
+ * - After the transaction is committed, the message will be actually
acknowledged (individually or cumulatively).
+ * - After the transaction is aborted, the message will be redelivered.
+ * @see Transaction#commit()
+ * @see Transaction#abort()
+ */
+public interface MessageAcknowledger {
+
+ /**
+ * Acknowledge the consumption of a single message.
+ *
+ * @param messageId {@link MessageId} to be individual acknowledged
+ *
+ * @throws PulsarClientException.AlreadyClosedException}
+ * if the consumer was already closed
+ */
+ void acknowledge(MessageId messageId) throws PulsarClientException;
+
+ default void acknowledge(Message<?> message) throws PulsarClientException {
+ acknowledge(message.getMessageId());
+ }
+
+ /**
+ * Acknowledge the consumption of a list of message.
+ * @param messageIdList the list of message IDs.
+ */
+ void acknowledge(List<MessageId> messageIdList) throws
PulsarClientException;
+
+ default void acknowledge(Messages<?> messages) throws
PulsarClientException {
+ for (Message<?> message : messages) {
+ acknowledge(message.getMessageId());
+ }
+ }
+
+ /**
+ * Acknowledge the reception of all the messages in the stream up to (and
including) the provided message.
+ *
+ * <p>This method will block until the acknowledge has been sent to the
broker. After that, the messages will not be
+ * re-delivered to this consumer.
+ *
+ * <p>Cumulative acknowledge cannot be used when the consumer type is set
to ConsumerShared.
+ *
+ * <p>It's equivalent to calling asyncAcknowledgeCumulative(MessageId) and
waiting for the callback to be triggered.
+ *
+ * @param messageId
+ * The {@code MessageId} to be cumulatively acknowledged
+ * @throws PulsarClientException.AlreadyClosedException
+ * if the consumer was already closed
+ */
+ void acknowledgeCumulative(MessageId messageId) throws
PulsarClientException;
+
+ default void acknowledgeCumulative(Message<?> message) throws
PulsarClientException {
+ acknowledgeCumulative(message.getMessageId());
+ }
+
+ /**
+ * The asynchronous version of {@link #acknowledge(MessageId)} with
transaction support.
+ */
+ CompletableFuture<Void> acknowledgeAsync(MessageId messageId, Transaction
txn);
+
+ /**
+ * The asynchronous version of {@link #acknowledge(MessageId)}.
+ */
+ default CompletableFuture<Void> acknowledgeAsync(MessageId messageId) {
+ return acknowledgeAsync(messageId, null);
+ }
+
+ /**
+ * The asynchronous version of {@link #acknowledge(List)} with transaction
support.
+ */
+ CompletableFuture<Void> acknowledgeAsync(List<MessageId> messageIdList,
Transaction txn);
+
+ /**
+ * The asynchronous version of {@link #acknowledge(List)}.
+ */
+ CompletableFuture<Void> acknowledgeAsync(List<MessageId> messageIdList);
+
+ /**
+ * The asynchronous version of {@link #acknowledge(Message)}.
+ */
+ CompletableFuture<Void> acknowledgeAsync(Message<?> message);
+
+ /**
+ * The asynchronous version of {@link #acknowledge(Messages)}.
+ */
+ CompletableFuture<Void> acknowledgeAsync(Messages<?> messages);
+
+ /**
+ * The asynchronous version of {@link #acknowledge(Messages)} with
transaction support.
+ */
+ CompletableFuture<Void> acknowledgeAsync(Messages<?> messages, Transaction
txn);
+
+ /**
+ * The asynchronous version of {@link #acknowledgeCumulative(MessageId)}
with transaction support.
+ *
+ * @apiNote It's not allowed to cumulative ack with a transaction
different from the previous one when the previous
+ * transaction is not committed or aborted.
+ * @apiNote It cannot be used for {@link SubscriptionType#Shared}
subscription.
+ *
+ * @param messageId
+ * The {@code MessageId} to be cumulatively acknowledged
+ * @param txn {@link Transaction} the transaction to cumulative ack
+ */
+ CompletableFuture<Void> acknowledgeCumulativeAsync(MessageId messageId,
+ Transaction txn);
+
+ /**
+ * The asynchronous version of {@link #acknowledgeCumulative(Message)}.
+ */
+ default CompletableFuture<Void> acknowledgeCumulativeAsync(Message<?>
message) {
+ return acknowledgeCumulativeAsync(message.getMessageId());
+ }
+
+ /**
+ * The asynchronous version of {@link #acknowledgeCumulative(MessageId)}.
+ */
+ default CompletableFuture<Void> acknowledgeCumulativeAsync(MessageId
messageId) {
+ return acknowledgeCumulativeAsync(messageId, null);
+ }
+}