This is an automated email from the ASF dual-hosted git repository.

xiangying pushed a commit to branch branch-2.10
in repository https://gitbox.apache.org/repos/asf/pulsar.git


The following commit(s) were added to refs/heads/branch-2.10 by this push:
     new 4d7876b5cd5 [fix] [txn] fix consumer can receive aborted txn message 
when readType is replay (#19815)
4d7876b5cd5 is described below

commit 4d7876b5cd52784fabd510fcfe0158db5a6ea528
Author: ken <[email protected]>
AuthorDate: Wed Jul 5 17:00:57 2023 +0800

    [fix] [txn] fix consumer can receive aborted txn message when readType is 
replay (#19815)
    
    Co-authored-by: fanjianye <[email protected]>
    (cherry picked from commit ac33311b7dfccdc88d560bbfd33c5cd968881717)
---
 .../broker/service/AbstractBaseDispatcher.java     |  2 +-
 .../broker/transaction/TransactionConsumeTest.java | 60 ++++++++++++++++++++++
 2 files changed, 61 insertions(+), 1 deletion(-)

diff --git 
a/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/AbstractBaseDispatcher.java
 
b/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/AbstractBaseDispatcher.java
index 0b5108eeab8..e54a3332a49 100644
--- 
a/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/AbstractBaseDispatcher.java
+++ 
b/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/AbstractBaseDispatcher.java
@@ -183,7 +183,7 @@ public abstract class AbstractBaseDispatcher implements 
Dispatcher {
                     continue;
                 }
             }
-            if (!isReplayRead && msgMetadata != null && 
msgMetadata.hasTxnidMostBits()
+            if (msgMetadata != null && msgMetadata.hasTxnidMostBits()
                     && msgMetadata.hasTxnidLeastBits()) {
                 if (Markers.isTxnMarker(msgMetadata)) {
                     // because consumer can receive message is smaller than 
maxReadPosition,
diff --git 
a/pulsar-broker/src/test/java/org/apache/pulsar/broker/transaction/TransactionConsumeTest.java
 
b/pulsar-broker/src/test/java/org/apache/pulsar/broker/transaction/TransactionConsumeTest.java
index 24e5c5aaf1e..6ff3053e2bb 100644
--- 
a/pulsar-broker/src/test/java/org/apache/pulsar/broker/transaction/TransactionConsumeTest.java
+++ 
b/pulsar-broker/src/test/java/org/apache/pulsar/broker/transaction/TransactionConsumeTest.java
@@ -23,6 +23,7 @@ import static java.nio.charset.StandardCharsets.UTF_8;
 import com.google.common.collect.Sets;
 import io.netty.buffer.ByteBuf;
 import io.netty.buffer.Unpooled;
+import java.lang.reflect.Field;
 import java.util.ArrayList;
 import java.util.List;
 import java.util.concurrent.CompletableFuture;
@@ -32,6 +33,9 @@ import lombok.Cleanup;
 import lombok.extern.slf4j.Slf4j;
 import org.apache.bookkeeper.mledger.impl.PositionImpl;
 import org.apache.pulsar.broker.service.Topic;
+import org.apache.pulsar.broker.service.persistent.MessageRedeliveryController;
+import 
org.apache.pulsar.broker.service.persistent.PersistentDispatcherMultipleConsumers;
+import org.apache.pulsar.broker.service.persistent.PersistentSubscription;
 import org.apache.pulsar.broker.service.persistent.PersistentTopic;
 import org.apache.pulsar.client.api.Consumer;
 import org.apache.pulsar.client.api.Message;
@@ -225,6 +229,62 @@ public class TransactionConsumeTest extends 
TransactionTestBase {
         log.info("TransactionConsumeTest sortedTest finish.");
     }
 
+    @Test
+    public void testMessageRedelivery() throws Exception {
+        int transactionMessageCnt = 10;
+        String subName = "shared-test";
+
+        @Cleanup
+        Consumer<byte[]> sharedConsumer = pulsarClient.newConsumer()
+                .topic(CONSUME_TOPIC)
+                .subscriptionName(subName)
+                .subscriptionType(SubscriptionType.Shared)
+                .subscribe();
+
+        Awaitility.await().until(sharedConsumer::isConnected);
+
+        long mostSigBits = 2L;
+        long leastSigBits = 5L;
+        TxnID txnID = new TxnID(mostSigBits, leastSigBits);
+
+        // produce batch message with txn and then abort
+        PersistentTopic persistentTopic = (PersistentTopic) 
getPulsarServiceList().get(0).getBrokerService()
+                .getTopic(CONSUME_TOPIC, false).get().get();
+
+        List<String> sendMessageList = new ArrayList<>();
+        List<MessageIdData> messageIdDataList = 
appendTransactionMessages(txnID, persistentTopic, transactionMessageCnt, 
sendMessageList);
+
+        persistentTopic.endTxn(txnID, TxnAction.ABORT_VALUE, 0L).get();
+        log.info("Abort txn.");
+
+        // redeliver transaction messages to shared consumer
+        PersistentSubscription subRef = 
persistentTopic.getSubscription(subName);
+        PersistentDispatcherMultipleConsumers dispatcher = 
(PersistentDispatcherMultipleConsumers) subRef
+                .getDispatcher();
+        Field redeliveryMessagesField = 
PersistentDispatcherMultipleConsumers.class
+                .getDeclaredField("redeliveryMessages");
+        redeliveryMessagesField.setAccessible(true);
+        MessageRedeliveryController redeliveryMessages = new 
MessageRedeliveryController(true);
+
+        final Field totalAvailablePermitsField = 
PersistentDispatcherMultipleConsumers.class
+                .getDeclaredField("totalAvailablePermits");
+        totalAvailablePermitsField.setAccessible(true);
+        totalAvailablePermitsField.set(dispatcher, 1000);
+
+        for (MessageIdData messageIdData : messageIdDataList) {
+            redeliveryMessages.add(messageIdData.getLedgerId(), 
messageIdData.getEntryId());
+        }
+
+        redeliveryMessagesField.set(dispatcher, redeliveryMessages);
+        dispatcher.readMoreEntries();
+
+        // shared consumer should not receive the redelivered aborted 
transaction messages
+        Message message = sharedConsumer.receive(5, TimeUnit.SECONDS);
+        Assert.assertNull(message);
+
+        log.info("TransactionConsumeTest testMessageRedelivery finish.");
+    }
+
     private void sendNormalMessages(Producer<byte[]> producer, int startMsgCnt,
                                     int messageCnt, List<String> 
sendMessageList)
             throws PulsarClientException {

Reply via email to