This is an automated email from the ASF dual-hosted git repository.

technoboy pushed a commit to branch branch-3.1
in repository https://gitbox.apache.org/repos/asf/pulsar.git


The following commit(s) were added to refs/heads/branch-3.1 by this push:
     new 31880ec2eb5 [fix][txn] fix the consumer stuck due to deduplicated 
messages in pending ack state (#21177)
31880ec2eb5 is described below

commit 31880ec2eb571c4b38097d6e0e8e679200918b6b
Author: hrzzzz <[email protected]>
AuthorDate: Wed Sep 27 15:45:20 2023 +0800

    [fix][txn] fix the consumer stuck due to deduplicated messages in pending 
ack state (#21177)
---
 .../broker/service/AbstractBaseDispatcher.java     | 11 ++--
 .../client/impl/TransactionEndToEndTest.java       | 59 ++++++++++++++++++++++
 2 files changed, 65 insertions(+), 5 deletions(-)

diff --git 
a/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/AbstractBaseDispatcher.java
 
b/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/AbstractBaseDispatcher.java
index 50f05f80e3f..eb8b0151395 100644
--- 
a/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/AbstractBaseDispatcher.java
+++ 
b/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/AbstractBaseDispatcher.java
@@ -213,12 +213,7 @@ public abstract class AbstractBaseDispatcher extends 
EntryFilterSupport implemen
                 this.filterAcceptedMsgs.add(entryMsgCnt);
             }
 
-            totalEntries++;
             int batchSize = msgMetadata.getNumMessagesInBatch();
-            totalMessages += batchSize;
-            totalBytes += metadataAndPayload.readableBytes();
-            totalChunkedMessages += msgMetadata.hasChunkId() ? 1 : 0;
-            batchSizes.setBatchSize(i, batchSize);
             long[] ackSet = null;
             if (indexesAcks != null && cursor != null) {
                 PositionImpl position = PositionImpl.get(entry.getLedgerId(), 
entry.getEntryId());
@@ -262,6 +257,12 @@ public abstract class AbstractBaseDispatcher extends 
EntryFilterSupport implemen
                 }
             }
 
+            totalEntries++;
+            totalMessages += batchSize;
+            totalBytes += metadataAndPayload.readableBytes();
+            totalChunkedMessages += msgMetadata.hasChunkId() ? 1 : 0;
+            batchSizes.setBatchSize(i, batchSize);
+
             BrokerInterceptor interceptor = subscription.interceptor();
             if (null != interceptor) {
                 // keep for compatibility if users has implemented the old 
interface
diff --git 
a/pulsar-broker/src/test/java/org/apache/pulsar/client/impl/TransactionEndToEndTest.java
 
b/pulsar-broker/src/test/java/org/apache/pulsar/client/impl/TransactionEndToEndTest.java
index 34cc3bc1ca5..348fb04b7dd 100644
--- 
a/pulsar-broker/src/test/java/org/apache/pulsar/client/impl/TransactionEndToEndTest.java
+++ 
b/pulsar-broker/src/test/java/org/apache/pulsar/client/impl/TransactionEndToEndTest.java
@@ -254,6 +254,65 @@ public class TransactionEndToEndTest extends 
TransactionTestBase {
         Assert.assertEquals(receiveCounter, count / 2);
     }
 
+    @Test
+    private void testMsgsInPendingAckStateWouldNotGetTheConsumerStuck() throws 
Exception {
+        final String topicName = NAMESPACE1 + 
"/testMsgsInPendingAckStateWouldNotGetTheConsumerStuck";
+        final String subscription = "test";
+
+        @Cleanup
+        Producer<Integer> producer = pulsarClient.newProducer(Schema.INT32)
+                .topic(topicName)
+                .create();
+        @Cleanup
+        Consumer<Integer> consumer = pulsarClient.newConsumer(Schema.INT32)
+                .topic(topicName)
+                .subscriptionName(subscription)
+                .subscriptionType(SubscriptionType.Shared)
+                
.subscriptionInitialPosition(SubscriptionInitialPosition.Earliest)
+                .subscribe();
+
+        int numStep1Receive = 2, numStep2Receive = 2, numStep3Receive = 2;
+        int numTotalMessage = numStep1Receive + numStep2Receive + 
numStep3Receive;
+
+        for (int i = 0; i < numTotalMessage; i++) {
+            producer.send(i);
+        }
+
+        Transaction step1Txn = getTxn();
+        Transaction step2Txn = getTxn();
+
+        // Step 1, try to consume some messages but do not commit the 
transaction
+        for (int i = 0; i < numStep1Receive; i++) {
+            consumer.acknowledgeAsync(consumer.receive().getMessageId(), 
step1Txn).get();
+        }
+
+        // Step 2, try to consume some messages and commit the transaction
+        for (int i = 0; i < numStep2Receive; i++) {
+            consumer.acknowledgeAsync(consumer.receive().getMessageId(), 
step2Txn).get();
+        }
+
+        // commit step2Txn
+        step2Txn.commit().get();
+
+        // close and re-create consumer
+        consumer.close();
+        @Cleanup
+        Consumer<Integer> consumer2 = pulsarClient.newConsumer(Schema.INT32)
+                .topic(topicName)
+                .receiverQueueSize(numStep3Receive)
+                .subscriptionName(subscription)
+                .subscriptionType(SubscriptionType.Shared)
+                
.subscriptionInitialPosition(SubscriptionInitialPosition.Earliest)
+                .subscribe();
+
+        // Step 3, try to consume the rest messages and should receive all of 
them
+        for (int i = 0; i < numStep3Receive; i++) {
+            // should get the message instead of timeout
+            Message<Integer> msg = consumer2.receive(3, TimeUnit.SECONDS);
+            Assert.assertEquals(msg.getValue(), numStep1Receive + 
numStep2Receive + i);
+        }
+    }
+
     @Test(dataProvider="enableBatch")
     private void produceCommitTest(boolean enableBatch) throws Exception {
         @Cleanup

Reply via email to