lhotari commented on code in PR #21657:
URL: https://github.com/apache/pulsar/pull/21657#discussion_r1758694609


##########
pulsar-broker/src/test/java/org/apache/pulsar/client/api/KeySharedSubscriptionTest.java:
##########
@@ -1630,4 +1629,66 @@ public void 
testContinueDispatchMessagesWhenMessageDelayed() throws Exception {
         log.info("Got {} other messages...", sum);
         Assert.assertEquals(sum, delayedMessages + messages);
     }
+
+    /**
+     * This test validates key-shared subscription should not get stuck and if 
one of the consumer dies then broker
+     * should redeliver those messages to the new consumer instead stop 
dispatching messages to all consumers.
+     * 
+     * @throws Exception
+     */
+    @Test
+    public void testKeySharedMessageRedeliveryWithoutStuck()
+            throws Exception {
+        String topic = "persistent://public/default/key_shared-" + 
UUID.randomUUID();
+        boolean enableBatch = false;
+        Set<Integer> values = new HashSet<>();
+
+        @Cleanup
+        Consumer<Integer> consumer1 = createConsumer(topic);
+
+        @Cleanup
+        Producer<Integer> producer = createProducer(topic, enableBatch);
+        int count = 0;
+        for (int i = 0; i < 10; i++) {
+            String key = String.valueOf(random.nextInt(NUMBER_OF_KEYS));
+            producer.newMessage().key(key).value(count++).send();
+        }
+
+        @Cleanup
+        Consumer<Integer> consumer2 = createConsumer(topic);
+
+        for (int i = 0; i < 10; i++) {
+            String key = String.valueOf(random.nextInt(NUMBER_OF_KEYS));
+            producer.newMessage().key(key).value(count++).send();
+        }
+
+        @Cleanup
+        Consumer<Integer> consumer3 = createConsumer(topic);
+
+        consumer2.redeliverUnacknowledgedMessages();
+
+        for (int i = 0; i < 10; i++) {
+            String key = String.valueOf(random.nextInt(NUMBER_OF_KEYS));
+            producer.newMessage().key(key).value(count++).send();
+        }
+        consumer1.close();
+
+        for(int i = 0; i < count; i++) {
+            Message<Integer> msg = consumer2.receive(100, 
TimeUnit.MILLISECONDS);
+            if (msg!=null) {
+                values.add(msg.getValue());
+            } else {
+                break;
+            }
+        }
+        for(int i = 0; i < count; i++) {
+            Message<Integer> msg = consumer3.receive(1, TimeUnit.MILLISECONDS);
+            if (msg!=null) {
+                values.add(msg.getValue());
+            } else {
+                break;
+            }
+        }

Review Comment:
   this test is currently invalid. Messages would need to be acknowledged and 
consumed concurrently. This test passes at least in branch-3.3 when making the 
changes.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: [email protected]

For queries about this service, please contact Infrastructure at:
[email protected]

Reply via email to