This is an automated email from the ASF dual-hosted git repository.

xiangying pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/pulsar.git


The following commit(s) were added to refs/heads/master by this push:
     new 926834ef8b2 [improve][txn] Support ack message list for transaction 
(#15729)
926834ef8b2 is described below

commit 926834ef8b2b57d3964aa7e9773e6245bcee861c
Author: Xiangying Meng <[email protected]>
AuthorDate: Tue Jul 5 12:42:23 2022 +0800

    [improve][txn] Support ack message list for transaction (#15729)
    
    * [improve][txn] Support ack message list for transaction
    ### Motivation
    Now, there is
    ```java
    doAcknowledgeWithTxn(List<MessageId> messageIdList, AckType ackType,
                                                               Map<String, 
Long> properties,
                                                               TransactionImpl 
txn)
    ```
    But not interface
    ```java
    acknowledgeAsync(List<MessageId> messageIdList, Transaction txn)
    ```
---
 .../pulsar/broker/transaction/TransactionTest.java | 144 +++++++++++++++++++++
 .../org/apache/pulsar/client/api/Consumer.java     |  37 ++++++
 .../apache/pulsar/client/impl/ConsumerBase.java    |  42 +++++-
 .../apache/pulsar/client/impl/ConsumerImpl.java    |  80 +++++++++++-
 .../apache/pulsar/common/protocol/Commands.java    |  20 ++-
 5 files changed, 314 insertions(+), 9 deletions(-)

diff --git 
a/pulsar-broker/src/test/java/org/apache/pulsar/broker/transaction/TransactionTest.java
 
b/pulsar-broker/src/test/java/org/apache/pulsar/broker/transaction/TransactionTest.java
index e95a811528c..78d499cbb62 100644
--- 
a/pulsar-broker/src/test/java/org/apache/pulsar/broker/transaction/TransactionTest.java
+++ 
b/pulsar-broker/src/test/java/org/apache/pulsar/broker/transaction/TransactionTest.java
@@ -38,6 +38,7 @@ import io.netty.buffer.Unpooled;
 import io.netty.util.Timeout;
 import java.lang.reflect.Field;
 import java.lang.reflect.Method;
+import java.util.ArrayList;
 import java.util.Collections;
 import java.util.HashMap;
 import java.util.Map;
@@ -106,7 +107,9 @@ import org.apache.pulsar.client.api.SubscriptionType;
 import org.apache.pulsar.client.api.transaction.Transaction;
 import org.apache.pulsar.client.api.transaction.TxnID;
 import org.apache.pulsar.client.impl.ClientCnx;
+import org.apache.pulsar.client.impl.ConsumerBase;
 import org.apache.pulsar.client.impl.MessageIdImpl;
+import org.apache.pulsar.client.impl.MessagesImpl;
 import org.apache.pulsar.client.util.ExecutorProvider;
 import org.apache.pulsar.common.api.proto.CommandSubscribe;
 import org.apache.pulsar.client.impl.transaction.TransactionImpl;
@@ -1020,6 +1023,147 @@ public class TransactionTest extends 
TransactionTestBase {
         transaction.commit().get();
     }
 
+    @Test(timeOut = 30000)
+    public void testTransactionAckMessageList() throws Exception {
+        String topic = "persistent://" + NAMESPACE1 +"/test";
+        String subName = "testSub";
+
+        @Cleanup
+        Producer<byte[]> producer = pulsarClient.newProducer()
+                .topic(topic)
+                .sendTimeout(5, TimeUnit.SECONDS)
+                .create();
+        @Cleanup
+        Consumer<byte[]> consumer = pulsarClient.newConsumer()
+                .topic(topic)
+                .subscriptionName(subName)
+                .subscribe();
+
+        for (int i = 0; i < 5; i++) {
+            producer.newMessage().send();
+        }
+        //verify using aborted transaction to ack message list
+        List<MessageId> messages = new ArrayList<>();
+        for (int i = 0; i < 4; i++) {
+            Message<byte[]> message = consumer.receive();
+            messages.add(message.getMessageId());
+        }
+        Transaction transaction = pulsarClient
+                .newTransaction()
+                .withTransactionTimeout(5, TimeUnit.MINUTES)
+                .build()
+                .get();
+
+        consumer.acknowledgeAsync(messages, transaction);
+        transaction.abort().get();
+        consumer.close();
+        consumer = pulsarClient.newConsumer()
+                .topic(topic)
+                .subscriptionName(subName)
+                .subscribe();
+        for (int i = 0; i < 4; i++) {
+            Message<byte[]> message = consumer.receive();
+            assertTrue(messages.contains(message.getMessageId()));
+        }
+
+        //verify using committed transaction to ack message list
+        transaction = pulsarClient
+                .newTransaction()
+                .withTransactionTimeout(5, TimeUnit.MINUTES)
+                .build()
+                .get();
+        consumer.acknowledgeAsync(messages, transaction);
+        transaction.commit().get();
+
+        consumer.close();
+        consumer = pulsarClient.newConsumer()
+                .topic(topic)
+                .subscriptionName(subName)
+                .subscribe();
+        Message<byte[]> message = consumer.receive(5, TimeUnit.SECONDS);
+        Assert.assertNotNull(message);
+        Assert.assertFalse(messages.contains(message.getMessageId()));
+        message = consumer.receive(5, TimeUnit.SECONDS);
+        Assert.assertNull(message);
+        consumer.close();
+    }
+
+
+    @Test(timeOut = 30000)
+    public void testTransactionAckMessages() throws Exception {
+        String topic = "persistent://" + NAMESPACE1 
+"/testTransactionAckMessages";
+        String subName = "testSub";
+        admin.topics().createPartitionedTopic(topic, 2);
+
+        @Cleanup
+        Producer<byte[]> producer = pulsarClient.newProducer()
+                .topic(topic)
+                .sendTimeout(5, TimeUnit.SECONDS)
+                .create();
+        @Cleanup
+        Consumer<byte[]> consumer = pulsarClient.newConsumer()
+                .topic(topic)
+                .subscriptionName(subName)
+                .subscribe();
+
+        for (int i = 0; i < 4; i++) {
+            producer.newMessage().send();
+        }
+        Method method = 
ConsumerBase.class.getDeclaredMethod("getNewMessagesImpl");
+        method.setAccessible(true);
+
+        Field field = MessagesImpl.class.getDeclaredField("messageList");
+        field.setAccessible(true);
+
+        MessagesImpl<byte[]> messages = (MessagesImpl<byte[]>) 
method.invoke(consumer);
+
+        List<Message<byte[]>> messageList = new ArrayList<>();
+        for (int i = 0; i < 4; i++) {
+            Message<byte[]> message = consumer.receive();
+            messageList.add(message);
+        }
+        field.set(messages, messageList);
+        Transaction transaction = pulsarClient
+                .newTransaction()
+                .withTransactionTimeout(5, TimeUnit.MINUTES)
+                .build()
+                .get();
+
+        consumer.acknowledgeAsync(messages, transaction);
+        transaction.abort().get();
+        consumer.close();
+        consumer = pulsarClient.newConsumer()
+                .topic(topic)
+                .subscriptionName(subName)
+                .subscribe();
+        List<MessageId> messageIds = new ArrayList<>();
+        for (Message message : messageList) {
+            messageIds.add(message.getMessageId());
+        }
+        for (int i = 0; i < 4; i++) {
+            Message<byte[]> message = consumer.receive(5, TimeUnit.SECONDS);
+            assertTrue(messageIds.contains(message.getMessageId()));
+        }
+
+        //verify using committed transaction to ack message list
+        transaction = pulsarClient
+                .newTransaction()
+                .withTransactionTimeout(5, TimeUnit.MINUTES)
+                .build()
+                .get();
+        consumer.acknowledgeAsync(messages, transaction);
+        transaction.commit().get();
+
+        consumer.close();
+        consumer = pulsarClient.newConsumer()
+                .topic(topic)
+                .subscriptionName(subName)
+                .subscribe();
+        Message<byte[]> message = consumer.receive(5, TimeUnit.SECONDS);
+        Assert.assertNull(message);
+        consumer.close();
+    }
+
     @Test
     public void testGetConnectExceptionForAckMsgWhenCnxIsNull() throws 
Exception {
         String topic = NAMESPACE1 + 
"/testGetConnectExceptionForAckMsgWhenCnxIsNull";
diff --git 
a/pulsar-client-api/src/main/java/org/apache/pulsar/client/api/Consumer.java 
b/pulsar-client-api/src/main/java/org/apache/pulsar/client/api/Consumer.java
index b7a9dd9ef48..3017cc2b4a0 100644
--- a/pulsar-client-api/src/main/java/org/apache/pulsar/client/api/Consumer.java
+++ b/pulsar-client-api/src/main/java/org/apache/pulsar/client/api/Consumer.java
@@ -491,6 +491,26 @@ public interface Consumer<T> extends Closeable {
      */
     CompletableFuture<Void> acknowledgeAsync(Messages<?> messages);
 
+
+    /**
+     * Asynchronously acknowledge the consumption of {@link Messages}, it will 
store in pending ack.
+     * After the transaction commit, the message will actually ack.
+     * After the transaction abort, the message will be redelivered.
+     * @param messages
+     *            The {@link Messages} to be acknowledged
+     * @param txn {@link Transaction} The transaction to ack messages.
+     * @throws PulsarClientException.AlreadyClosedException
+     *             if the consumer was already closed
+     * @throws 
org.apache.pulsar.client.api.PulsarClientException.TransactionConflictException
+     *             if the ack with messageId is less than the messageId in 
pending ack state or ack with transaction is
+     *             different from the transaction in pending ack.
+     * @throws 
org.apache.pulsar.client.api.PulsarClientException.NotAllowedException
+     *             broker don't support transaction
+     * @return {@link CompletableFuture} the future of the ack result
+     * */
+    CompletableFuture<Void> acknowledgeAsync(Messages<?> messages, Transaction 
txn);
+
+
     /**
      * Asynchronously acknowledge the consumption of a list of message.
      * @param messageIdList
@@ -498,6 +518,23 @@ public interface Consumer<T> extends Closeable {
      */
     CompletableFuture<Void> acknowledgeAsync(List<MessageId> messageIdList);
 
+
+    /**
+     * Acknowledge the consumption of a list of message, it will store in 
pending ack.
+     * After the transaction commit, the message will actually ack.
+     * After the transaction abort, the message will be redelivered.
+     * @param messageIdList A list of message Id.
+     * @param txn {@link Transaction} The transaction to ack messages.
+     * @throws PulsarClientException.AlreadyClosedException
+     *             if the consumer was already closed
+     * @throws 
org.apache.pulsar.client.api.PulsarClientException.TransactionConflictException
+     *             if the ack with messageId is less than the messageId in 
pending ack state or ack with transaction is
+     *             different from the transaction in pending ack.
+     * @throws 
org.apache.pulsar.client.api.PulsarClientException.NotAllowedException
+     *             broker don't support transaction
+     * @return {@link CompletableFuture} the future of the ack result     */
+    CompletableFuture<Void> acknowledgeAsync(List<MessageId> messageIdList, 
Transaction txn);
+
     /**
      * Asynchronously reconsumeLater the consumption of a single message.
      *
diff --git 
a/pulsar-client/src/main/java/org/apache/pulsar/client/impl/ConsumerBase.java 
b/pulsar-client/src/main/java/org/apache/pulsar/client/impl/ConsumerBase.java
index 0da17def097..aeb1b9ed7db 100644
--- 
a/pulsar-client/src/main/java/org/apache/pulsar/client/impl/ConsumerBase.java
+++ 
b/pulsar-client/src/main/java/org/apache/pulsar/client/impl/ConsumerBase.java
@@ -485,6 +485,11 @@ public abstract class ConsumerBase<T> extends HandlerState 
implements Consumer<T
 
     @Override
     public CompletableFuture<Void> acknowledgeAsync(Messages<?> messages) {
+        return acknowledgeAsync(messages, null);
+    }
+
+    @Override
+    public CompletableFuture<Void> acknowledgeAsync(Messages<?> messages, 
Transaction txn) {
         List<MessageId> messageIds = new ArrayList<>(messages.size());
         for (Message<?> message: messages) {
             try {
@@ -494,7 +499,11 @@ public abstract class ConsumerBase<T> extends HandlerState 
implements Consumer<T
             }
             messageIds.add(message.getMessageId());
         }
-        return acknowledgeAsync(messageIds);
+        if (txn != null) {
+            return acknowledgeAsync(messageIds, txn);
+        } else {
+            return acknowledgeAsync(messageIds);
+        }
     }
 
     @Override
@@ -502,6 +511,11 @@ public abstract class ConsumerBase<T> extends HandlerState 
implements Consumer<T
         return doAcknowledgeWithTxn(messageIdList, AckType.Individual, 
Collections.emptyMap(), null);
     }
 
+    @Override
+    public CompletableFuture<Void> acknowledgeAsync(List<MessageId> 
messageIdList, Transaction txn) {
+        return doAcknowledgeWithTxn(messageIdList, AckType.Individual, 
Collections.emptyMap(), (TransactionImpl) txn);
+    }
+
     @Override
     public CompletableFuture<Void> reconsumeLaterAsync(Message<?> message, 
long delayTime, TimeUnit unit) {
         return reconsumeLaterAsync(message, null, delayTime, unit);
@@ -611,12 +625,21 @@ public abstract class ConsumerBase<T> extends 
HandlerState implements Consumer<T
                                                            Map<String, Long> 
properties,
                                                            TransactionImpl 
txn) {
         CompletableFuture<Void> ackFuture;
-        if (txn != null) {
+        if (txn != null && this instanceof ConsumerImpl) {
+
+            // it is okay that we register acked topic after sending the 
acknowledgements. because
+            // the transactional ack will not be visiable for consumers until 
the transaction is
+            // committed
+            if (ackType == AckType.Cumulative) {
+                txn.registerCumulativeAckConsumer((ConsumerImpl<?>) this);
+            }
+
             ackFuture = txn.registerAckedTopic(getTopic(), subscription)
                     .thenCompose(ignored -> doAcknowledge(messageIdList, 
ackType, properties, txn));
+            // register the ackFuture as part of the transaction
             txn.registerAckOp(ackFuture);
         } else {
-            ackFuture = doAcknowledge(messageIdList, ackType, properties, 
null);
+            ackFuture = doAcknowledge(messageIdList, ackType, properties, txn);
         }
         return ackFuture;
     }
@@ -787,12 +810,25 @@ public abstract class ConsumerBase<T> extends 
HandlerState implements Consumer<T
         }
     }
 
+    protected void onAcknowledge(List<MessageId> messageIds, Throwable 
exception) {
+        if (interceptors != null) {
+            messageIds.forEach(messageId -> interceptors.onAcknowledge(this, 
messageId, exception));
+        }
+    }
+
     protected void onAcknowledgeCumulative(MessageId messageId, Throwable 
exception) {
         if (interceptors != null) {
             interceptors.onAcknowledgeCumulative(this, messageId, exception);
         }
     }
 
+    protected void onAcknowledgeCumulative(List<MessageId> messageIds, 
Throwable exception) {
+        if (interceptors != null) {
+            messageIds.forEach(messageId -> 
interceptors.onAcknowledgeCumulative(this, messageId, exception));
+        }
+    }
+
+
     protected void onNegativeAcksSend(Set<MessageId> messageIds) {
         if (interceptors != null) {
             interceptors.onNegativeAcksSend(this, messageIds);
diff --git 
a/pulsar-client/src/main/java/org/apache/pulsar/client/impl/ConsumerImpl.java 
b/pulsar-client/src/main/java/org/apache/pulsar/client/impl/ConsumerImpl.java
index 5228810f6cf..c9cf4a12b75 100644
--- 
a/pulsar-client/src/main/java/org/apache/pulsar/client/impl/ConsumerImpl.java
+++ 
b/pulsar-client/src/main/java/org/apache/pulsar/client/impl/ConsumerImpl.java
@@ -36,6 +36,7 @@ import java.nio.ByteBuffer;
 import java.util.ArrayList;
 import java.util.Collections;
 import java.util.HashMap;
+import java.util.LinkedList;
 import java.util.List;
 import java.util.Map;
 import java.util.Objects;
@@ -587,11 +588,28 @@ public class ConsumerImpl<T> extends ConsumerBase<T> 
implements ConnectionHandle
     }
 
     @Override
-    protected CompletableFuture<Void> doAcknowledge(List<MessageId> 
messageIdList,
-                                                    AckType ackType,
-                                                    Map<String, Long> 
properties,
-                                                    TransactionImpl txn) {
-        return 
this.acknowledgmentsGroupingTracker.addListAcknowledgment(messageIdList, 
ackType, properties);
+    protected CompletableFuture<Void> doAcknowledge(List<MessageId> 
messageIdList, AckType ackType,
+                                                    Map<String, Long> 
properties, TransactionImpl txn) {
+
+        for (MessageId messageId : messageIdList) {
+            checkArgument(messageId instanceof MessageIdImpl);
+        }
+        if (getState() != State.Ready && getState() != State.Connecting) {
+            stats.incrementNumAcksFailed();
+            PulsarClientException exception = new 
PulsarClientException("Consumer not ready. State: " + getState());
+            if (AckType.Individual.equals(ackType)) {
+                onAcknowledge(messageIdList, exception);
+            } else if (AckType.Cumulative.equals(ackType)) {
+                onAcknowledgeCumulative(messageIdList, exception);
+            }
+            return FutureUtil.failedFuture(exception);
+        }
+        if (txn != null) {
+            return doTransactionAcknowledgeForResponse(messageIdList, ackType, 
null,
+                    properties, new TxnID(txn.getTxnIdMostBits(), 
txn.getTxnIdLeastBits()));
+        } else {
+            return 
this.acknowledgmentsGroupingTracker.addListAcknowledgment(messageIdList, 
ackType, properties);
+        }
     }
 
 
@@ -2704,6 +2722,58 @@ public class ConsumerImpl<T> extends ConsumerBase<T> 
implements ConnectionHandle
         }
     }
 
+    private CompletableFuture<Void> 
doTransactionAcknowledgeForResponse(List<MessageId> messageIds, AckType ackType,
+                                                                        
ValidationError validationError,
+                                                                        
Map<String, Long> properties, TxnID txnID) {
+        BitSetRecyclable bitSetRecyclable = null;
+        long ledgerId;
+        long entryId;
+        ByteBuf cmd;
+        long requestId = client.newRequestId();
+        List<MessageIdData> messageIdDataList = new LinkedList<>();
+        for (MessageId messageId : messageIds) {
+            if (messageId instanceof BatchMessageIdImpl) {
+                BatchMessageIdImpl batchMessageId = (BatchMessageIdImpl) 
messageId;
+                bitSetRecyclable = BitSetRecyclable.create();
+                if (ackType == AckType.Cumulative) {
+                    batchMessageId.ackCumulative();
+                    bitSetRecyclable.set(0, batchMessageId.getBatchSize());
+                    bitSetRecyclable.clear(0, batchMessageId.getBatchIndex() + 
1);
+                } else {
+                    bitSetRecyclable.set(0, batchMessageId.getBatchSize());
+                    bitSetRecyclable.clear(batchMessageId.getBatchIndex());
+                }
+                MessageIdData messageIdData = new MessageIdData();
+                messageIdData.setLedgerId(batchMessageId.getLedgerId());
+                messageIdData.setEntryId(batchMessageId.getEntryId());
+                messageIdData.setBatchSize(batchMessageId.getBatchSize());
+                long[] as = bitSetRecyclable.toLongArray();
+                for (int i = 0; i < as.length; i++) {
+                    messageIdData.addAckSet(as[i]);
+                }
+                bitSetRecyclable.recycle();
+                messageIdDataList.add(messageIdData);
+            } else {
+                MessageIdImpl singleMessage = (MessageIdImpl) messageId;
+                ledgerId = singleMessage.getLedgerId();
+                entryId = singleMessage.getEntryId();
+                MessageIdData messageIdData = new MessageIdData();
+                messageIdData.setLedgerId(ledgerId);
+                messageIdData.setEntryId(entryId);
+                messageIdDataList.add(messageIdData);
+            }
+
+            if (ackType == AckType.Cumulative) {
+                unAckedMessageTracker.removeMessagesTill(messageId);
+            } else {
+                unAckedMessageTracker.remove(messageId);
+            }
+        }
+        cmd = Commands.newAck(consumerId, messageIdDataList, ackType, 
validationError, properties,
+                txnID.getLeastSigBits(), txnID.getMostSigBits(), requestId);
+        return cnx().newAckForReceipt(cmd, requestId);
+    }
+
     public Map<MessageIdImpl, List<MessageImpl<T>>> 
getPossibleSendToDeadLetterTopicMessages() {
         return possibleSendToDeadLetterTopicMessages;
     }
diff --git 
a/pulsar-common/src/main/java/org/apache/pulsar/common/protocol/Commands.java 
b/pulsar-common/src/main/java/org/apache/pulsar/common/protocol/Commands.java
index d8151579353..62243063741 100644
--- 
a/pulsar-common/src/main/java/org/apache/pulsar/common/protocol/Commands.java
+++ 
b/pulsar-common/src/main/java/org/apache/pulsar/common/protocol/Commands.java
@@ -990,9 +990,26 @@ public class Commands {
         }
 
         if (batchSize >= 0) {
-          messageIdData.setBatchSize(batchSize);
+            messageIdData.setBatchSize(batchSize);
         }
 
+        return newAck(validationError, properties, txnIdLeastBits, 
txnIdMostBits, requestId, ack, cmd);
+    }
+
+    public static ByteBuf newAck(long consumerId, List<MessageIdData> 
messageIds, AckType ackType,
+                                 ValidationError validationError, Map<String, 
Long> properties, long txnIdLeastBits,
+                                 long txnIdMostBits, long requestId) {
+        BaseCommand cmd = localCmd(Type.ACK);
+        CommandAck ack = cmd.setAck()
+                .setConsumerId(consumerId)
+                .setAckType(ackType);
+        ack.addAllMessageIds(messageIds);
+
+        return newAck(validationError, properties, txnIdLeastBits, 
txnIdMostBits, requestId, ack, cmd);
+    }
+
+    private static ByteBuf newAck(ValidationError validationError, Map<String, 
Long> properties, long txnIdLeastBits,
+                                  long txnIdMostBits, long requestId, 
CommandAck ack, BaseCommand cmd) {
         if (validationError != null) {
             ack.setValidationError(validationError);
         }
@@ -1014,6 +1031,7 @@ public class Commands {
         return serializeWithSize(cmd);
     }
 
+
     public static ByteBuf newAck(long consumerId, long ledgerId, long entryId, 
BitSetRecyclable ackSet, AckType ackType,
                                  ValidationError validationError, Map<String, 
Long> properties, long txnIdLeastBits,
                                  long txnIdMostBits, long requestId) {

Reply via email to