This is an automated email from the ASF dual-hosted git repository.

technoboy pushed a commit to branch branch-2.11
in repository https://gitbox.apache.org/repos/asf/pulsar.git


The following commit(s) were added to refs/heads/branch-2.11 by this push:
     new 77ff53bab90 [fix] [txn] fix consumer can receive aborted txn message 
when readType is replay (#19815)
77ff53bab90 is described below

commit 77ff53bab90405bc501878946833dfa1498edc97
Author: ken <[email protected]>
AuthorDate: Wed Jul 5 17:00:57 2023 +0800

    [fix] [txn] fix consumer can receive aborted txn message when readType is 
replay (#19815)
    
    Co-authored-by: fanjianye <[email protected]>
---
 .../broker/service/AbstractBaseDispatcher.java     |  2 +-
 .../broker/transaction/TransactionConsumeTest.java | 60 ++++++++++++++++++++++
 2 files changed, 61 insertions(+), 1 deletion(-)

diff --git 
a/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/AbstractBaseDispatcher.java
 
b/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/AbstractBaseDispatcher.java
index d6184c657c4..c9bf2d6770f 100644
--- 
a/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/AbstractBaseDispatcher.java
+++ 
b/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/AbstractBaseDispatcher.java
@@ -164,7 +164,7 @@ public abstract class AbstractBaseDispatcher extends 
EntryFilterSupport implemen
                 entry.release();
                 continue;
             }
-            if (!isReplayRead && msgMetadata != null && 
msgMetadata.hasTxnidMostBits()
+            if (msgMetadata != null && msgMetadata.hasTxnidMostBits()
                     && msgMetadata.hasTxnidLeastBits()) {
                 if (Markers.isTxnMarker(msgMetadata)) {
                     // because consumer can receive message is smaller than 
maxReadPosition,
diff --git 
a/pulsar-broker/src/test/java/org/apache/pulsar/broker/transaction/TransactionConsumeTest.java
 
b/pulsar-broker/src/test/java/org/apache/pulsar/broker/transaction/TransactionConsumeTest.java
index 32efdb51659..847eb31da4f 100644
--- 
a/pulsar-broker/src/test/java/org/apache/pulsar/broker/transaction/TransactionConsumeTest.java
+++ 
b/pulsar-broker/src/test/java/org/apache/pulsar/broker/transaction/TransactionConsumeTest.java
@@ -22,6 +22,7 @@ import static java.nio.charset.StandardCharsets.UTF_8;
 import com.google.common.collect.Sets;
 import io.netty.buffer.ByteBuf;
 import io.netty.buffer.Unpooled;
+import java.lang.reflect.Field;
 import java.util.ArrayList;
 import java.util.List;
 import java.util.concurrent.CompletableFuture;
@@ -31,6 +32,9 @@ import lombok.Cleanup;
 import lombok.extern.slf4j.Slf4j;
 import org.apache.bookkeeper.mledger.impl.PositionImpl;
 import org.apache.pulsar.broker.service.Topic;
+import org.apache.pulsar.broker.service.persistent.MessageRedeliveryController;
+import 
org.apache.pulsar.broker.service.persistent.PersistentDispatcherMultipleConsumers;
+import org.apache.pulsar.broker.service.persistent.PersistentSubscription;
 import org.apache.pulsar.broker.service.persistent.PersistentTopic;
 import org.apache.pulsar.client.api.Consumer;
 import org.apache.pulsar.client.api.Message;
@@ -223,6 +227,62 @@ public class TransactionConsumeTest extends 
TransactionTestBase {
         log.info("TransactionConsumeTest sortedTest finish.");
     }
 
+    @Test
+    public void testMessageRedelivery() throws Exception {
+        int transactionMessageCnt = 10;
+        String subName = "shared-test";
+
+        @Cleanup
+        Consumer<byte[]> sharedConsumer = pulsarClient.newConsumer()
+                .topic(CONSUME_TOPIC)
+                .subscriptionName(subName)
+                .subscriptionType(SubscriptionType.Shared)
+                .subscribe();
+
+        Awaitility.await().until(sharedConsumer::isConnected);
+
+        long mostSigBits = 2L;
+        long leastSigBits = 5L;
+        TxnID txnID = new TxnID(mostSigBits, leastSigBits);
+
+        // produce batch message with txn and then abort
+        PersistentTopic persistentTopic = (PersistentTopic) 
getPulsarServiceList().get(0).getBrokerService()
+                .getTopic(CONSUME_TOPIC, false).get().get();
+
+        List<String> sendMessageList = new ArrayList<>();
+        List<MessageIdData> messageIdDataList = 
appendTransactionMessages(txnID, persistentTopic, transactionMessageCnt, 
sendMessageList);
+
+        persistentTopic.endTxn(txnID, TxnAction.ABORT_VALUE, 0L).get();
+        log.info("Abort txn.");
+
+        // redeliver transaction messages to shared consumer
+        PersistentSubscription subRef = 
persistentTopic.getSubscription(subName);
+        PersistentDispatcherMultipleConsumers dispatcher = 
(PersistentDispatcherMultipleConsumers) subRef
+                .getDispatcher();
+        Field redeliveryMessagesField = 
PersistentDispatcherMultipleConsumers.class
+                .getDeclaredField("redeliveryMessages");
+        redeliveryMessagesField.setAccessible(true);
+        MessageRedeliveryController redeliveryMessages = new 
MessageRedeliveryController(true);
+
+        final Field totalAvailablePermitsField = 
PersistentDispatcherMultipleConsumers.class
+                .getDeclaredField("totalAvailablePermits");
+        totalAvailablePermitsField.setAccessible(true);
+        totalAvailablePermitsField.set(dispatcher, 1000);
+
+        for (MessageIdData messageIdData : messageIdDataList) {
+            redeliveryMessages.add(messageIdData.getLedgerId(), 
messageIdData.getEntryId());
+        }
+
+        redeliveryMessagesField.set(dispatcher, redeliveryMessages);
+        dispatcher.readMoreEntries();
+
+        // shared consumer should not receive the redelivered aborted 
transaction messages
+        Message message = sharedConsumer.receive(5, TimeUnit.SECONDS);
+        Assert.assertNull(message);
+
+        log.info("TransactionConsumeTest testMessageRedelivery finish.");
+    }
+
     private void sendNormalMessages(Producer<byte[]> producer, int startMsgCnt,
                                     int messageCnt, List<String> 
sendMessageList)
             throws PulsarClientException {

Reply via email to