congbobo184 commented on code in PR #21177:
URL: https://github.com/apache/pulsar/pull/21177#discussion_r1331575223
##########
pulsar-broker/src/test/java/org/apache/pulsar/client/impl/TransactionEndToEndTest.java:
##########
@@ -254,6 +254,62 @@ private void
testFilterMsgsInPendingAckStateWhenConsumerDisconnect(boolean enabl
Assert.assertEquals(receiveCounter, count / 2);
}
+ @Test
+ private void testMsgsInPendingAckStateWouldNotGetTheConsumerStuck() throws
Exception {
+ final String topicName = NAMESPACE1 +
"/testMsgsInPendingAckStateWouldNotGetTheConsumerStuck";
+ final String subscription = "test";
+
+ Producer<Integer> producer = pulsarClient.newProducer(Schema.INT32)
+ .topic(topicName)
+ .create();
+ Consumer<Integer> consumer = pulsarClient.newConsumer(Schema.INT32)
+ .topic(topicName)
+ .subscriptionName(subscription)
+ .subscriptionType(SubscriptionType.Shared)
+
.subscriptionInitialPosition(SubscriptionInitialPosition.Earliest)
+ .subscribe();
+
+ int numStep1Receive = 2, numStep2Receive = 2, numStep3Receive = 2;
+ int numTotalMessage = numStep1Receive + numStep2Receive +
numStep3Receive;
+
+ for (int i = 0; i < numTotalMessage; i++) {
+ producer.send(i);
+ }
+
+ Transaction step1Txn = getTxn();
+ Transaction step2Txn = getTxn();
+
+ // Step 1, try to consume some messages but do not commit the
transaction
+ for (int i = 0; i < numStep1Receive; i++) {
+ consumer.acknowledgeAsync(consumer.receive().getMessageId(),
step1Txn).get();
+ }
+
+ // Step 2, try to consume some messages and commit the transaction
+ for (int i = 0; i < numStep2Receive; i++) {
+ consumer.acknowledgeAsync(consumer.receive().getMessageId(),
step2Txn).get();
+ }
+
+ // commit step2Txn
+ step2Txn.commit().get();
+
+ // close and re-create consumer
+ consumer.close();
+ Consumer<Integer> consumer2 = pulsarClient.newConsumer(Schema.INT32)
+ .topic(topicName)
+ .receiverQueueSize(numStep3Receive)
+ .subscriptionName(subscription)
+ .subscriptionType(SubscriptionType.Shared)
+
.subscriptionInitialPosition(SubscriptionInitialPosition.Earliest)
+ .subscribe();
+
+ // Step 3, try to consume the rest messages and should receive all of
them
+ for (int i = 0; i < numStep3Receive; i++) {
+ // should get the message instead of timeout
+ Message<Integer> msg = consumer2.receive(10, TimeUnit.SECONDS);
Review Comment:
only 3 is enough
--
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
To unsubscribe, e-mail: [email protected]
For queries about this service, please contact Infrastructure at:
[email protected]