congbobo184 commented on code in PR #21177:
URL: https://github.com/apache/pulsar/pull/21177#discussion_r1331575223


##########
pulsar-broker/src/test/java/org/apache/pulsar/client/impl/TransactionEndToEndTest.java:
##########
@@ -254,6 +254,62 @@ private void 
testFilterMsgsInPendingAckStateWhenConsumerDisconnect(boolean enabl
         Assert.assertEquals(receiveCounter, count / 2);
     }
 
+    @Test
+    private void testMsgsInPendingAckStateWouldNotGetTheConsumerStuck() throws 
Exception {
+        final String topicName = NAMESPACE1 + 
"/testMsgsInPendingAckStateWouldNotGetTheConsumerStuck";
+        final String subscription = "test";
+
+        Producer<Integer> producer = pulsarClient.newProducer(Schema.INT32)
+                .topic(topicName)
+                .create();
+        Consumer<Integer> consumer = pulsarClient.newConsumer(Schema.INT32)
+                .topic(topicName)
+                .subscriptionName(subscription)
+                .subscriptionType(SubscriptionType.Shared)
+                
.subscriptionInitialPosition(SubscriptionInitialPosition.Earliest)
+                .subscribe();
+
+        int numStep1Receive = 2, numStep2Receive = 2, numStep3Receive = 2;
+        int numTotalMessage = numStep1Receive + numStep2Receive + 
numStep3Receive;
+
+        for (int i = 0; i < numTotalMessage; i++) {
+            producer.send(i);
+        }
+
+        Transaction step1Txn = getTxn();
+        Transaction step2Txn = getTxn();
+
+        // Step 1, try to consume some messages but do not commit the 
transaction
+        for (int i = 0; i < numStep1Receive; i++) {
+            consumer.acknowledgeAsync(consumer.receive().getMessageId(), 
step1Txn).get();
+        }
+
+        // Step 2, try to consume some messages and commit the transaction
+        for (int i = 0; i < numStep2Receive; i++) {
+            consumer.acknowledgeAsync(consumer.receive().getMessageId(), 
step2Txn).get();
+        }
+
+        // commit step2Txn
+        step2Txn.commit().get();
+
+        // close and re-create consumer
+        consumer.close();
+        Consumer<Integer> consumer2 = pulsarClient.newConsumer(Schema.INT32)
+                .topic(topicName)
+                .receiverQueueSize(numStep3Receive)
+                .subscriptionName(subscription)
+                .subscriptionType(SubscriptionType.Shared)
+                
.subscriptionInitialPosition(SubscriptionInitialPosition.Earliest)
+                .subscribe();
+
+        // Step 3, try to consume the rest messages and should receive all of 
them
+        for (int i = 0; i < numStep3Receive; i++) {
+            // should get the message instead of timeout
+            Message<Integer> msg = consumer2.receive(10, TimeUnit.SECONDS);

Review Comment:
   only 3 is enough



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: [email protected]

For queries about this service, please contact Infrastructure at:
[email protected]

Reply via email to