This is an automated email from the ASF dual-hosted git repository.
technoboy pushed a commit to branch branch-3.0
in repository https://gitbox.apache.org/repos/asf/pulsar.git
The following commit(s) were added to refs/heads/branch-3.0 by this push:
new dae4e013c75 [fix][txn] fix the consumer stuck due to deduplicated
messages in pending ack state (#21177)
dae4e013c75 is described below
commit dae4e013c757e1da3dada2af3c34ee15fd052b55
Author: hrzzzz <[email protected]>
AuthorDate: Wed Sep 27 15:45:20 2023 +0800
[fix][txn] fix the consumer stuck due to deduplicated messages in pending
ack state (#21177)
---
.../broker/service/AbstractBaseDispatcher.java | 11 ++--
.../client/impl/TransactionEndToEndTest.java | 59 ++++++++++++++++++++++
2 files changed, 65 insertions(+), 5 deletions(-)
diff --git
a/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/AbstractBaseDispatcher.java
b/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/AbstractBaseDispatcher.java
index 171cbc3bf4b..e778955f8f5 100644
---
a/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/AbstractBaseDispatcher.java
+++
b/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/AbstractBaseDispatcher.java
@@ -213,12 +213,7 @@ public abstract class AbstractBaseDispatcher extends
EntryFilterSupport implemen
this.filterAcceptedMsgs.add(entryMsgCnt);
}
- totalEntries++;
int batchSize = msgMetadata.getNumMessagesInBatch();
- totalMessages += batchSize;
- totalBytes += metadataAndPayload.readableBytes();
- totalChunkedMessages += msgMetadata.hasChunkId() ? 1 : 0;
- batchSizes.setBatchSize(i, batchSize);
long[] ackSet = null;
if (indexesAcks != null && cursor != null) {
PositionImpl position = PositionImpl.get(entry.getLedgerId(),
entry.getEntryId());
@@ -262,6 +257,12 @@ public abstract class AbstractBaseDispatcher extends
EntryFilterSupport implemen
}
}
+ totalEntries++;
+ totalMessages += batchSize;
+ totalBytes += metadataAndPayload.readableBytes();
+ totalChunkedMessages += msgMetadata.hasChunkId() ? 1 : 0;
+ batchSizes.setBatchSize(i, batchSize);
+
BrokerInterceptor interceptor = subscription.interceptor();
if (null != interceptor) {
// keep for compatibility if users has implemented the old
interface
diff --git
a/pulsar-broker/src/test/java/org/apache/pulsar/client/impl/TransactionEndToEndTest.java
b/pulsar-broker/src/test/java/org/apache/pulsar/client/impl/TransactionEndToEndTest.java
index 34cc3bc1ca5..348fb04b7dd 100644
---
a/pulsar-broker/src/test/java/org/apache/pulsar/client/impl/TransactionEndToEndTest.java
+++
b/pulsar-broker/src/test/java/org/apache/pulsar/client/impl/TransactionEndToEndTest.java
@@ -254,6 +254,65 @@ public class TransactionEndToEndTest extends
TransactionTestBase {
Assert.assertEquals(receiveCounter, count / 2);
}
+ @Test
+ private void testMsgsInPendingAckStateWouldNotGetTheConsumerStuck() throws
Exception {
+ final String topicName = NAMESPACE1 +
"/testMsgsInPendingAckStateWouldNotGetTheConsumerStuck";
+ final String subscription = "test";
+
+ @Cleanup
+ Producer<Integer> producer = pulsarClient.newProducer(Schema.INT32)
+ .topic(topicName)
+ .create();
+ @Cleanup
+ Consumer<Integer> consumer = pulsarClient.newConsumer(Schema.INT32)
+ .topic(topicName)
+ .subscriptionName(subscription)
+ .subscriptionType(SubscriptionType.Shared)
+
.subscriptionInitialPosition(SubscriptionInitialPosition.Earliest)
+ .subscribe();
+
+ int numStep1Receive = 2, numStep2Receive = 2, numStep3Receive = 2;
+ int numTotalMessage = numStep1Receive + numStep2Receive +
numStep3Receive;
+
+ for (int i = 0; i < numTotalMessage; i++) {
+ producer.send(i);
+ }
+
+ Transaction step1Txn = getTxn();
+ Transaction step2Txn = getTxn();
+
+ // Step 1, try to consume some messages but do not commit the
transaction
+ for (int i = 0; i < numStep1Receive; i++) {
+ consumer.acknowledgeAsync(consumer.receive().getMessageId(),
step1Txn).get();
+ }
+
+ // Step 2, try to consume some messages and commit the transaction
+ for (int i = 0; i < numStep2Receive; i++) {
+ consumer.acknowledgeAsync(consumer.receive().getMessageId(),
step2Txn).get();
+ }
+
+ // commit step2Txn
+ step2Txn.commit().get();
+
+ // close and re-create consumer
+ consumer.close();
+ @Cleanup
+ Consumer<Integer> consumer2 = pulsarClient.newConsumer(Schema.INT32)
+ .topic(topicName)
+ .receiverQueueSize(numStep3Receive)
+ .subscriptionName(subscription)
+ .subscriptionType(SubscriptionType.Shared)
+
.subscriptionInitialPosition(SubscriptionInitialPosition.Earliest)
+ .subscribe();
+
+ // Step 3, try to consume the rest messages and should receive all of
them
+ for (int i = 0; i < numStep3Receive; i++) {
+ // should get the message instead of timeout
+ Message<Integer> msg = consumer2.receive(3, TimeUnit.SECONDS);
+ Assert.assertEquals(msg.getValue(), numStep1Receive +
numStep2Receive + i);
+ }
+ }
+
@Test(dataProvider="enableBatch")
private void produceCommitTest(boolean enableBatch) throws Exception {
@Cleanup