lhotari commented on code in PR #21657:
URL: https://github.com/apache/pulsar/pull/21657#discussion_r1758694609
##########
pulsar-broker/src/test/java/org/apache/pulsar/client/api/KeySharedSubscriptionTest.java:
##########
@@ -1630,4 +1629,66 @@ public void
testContinueDispatchMessagesWhenMessageDelayed() throws Exception {
log.info("Got {} other messages...", sum);
Assert.assertEquals(sum, delayedMessages + messages);
}
+
+ /**
+ * This test validates key-shared subscription should not get stuck and if
one of the consumer dies then broker
+ * should redeliver those messages to the new consumer instead stop
dispatching messages to all consumers.
+ *
+ * @throws Exception
+ */
+ @Test
+ public void testKeySharedMessageRedeliveryWithoutStuck()
+ throws Exception {
+ String topic = "persistent://public/default/key_shared-" +
UUID.randomUUID();
+ boolean enableBatch = false;
+ Set<Integer> values = new HashSet<>();
+
+ @Cleanup
+ Consumer<Integer> consumer1 = createConsumer(topic);
+
+ @Cleanup
+ Producer<Integer> producer = createProducer(topic, enableBatch);
+ int count = 0;
+ for (int i = 0; i < 10; i++) {
+ String key = String.valueOf(random.nextInt(NUMBER_OF_KEYS));
+ producer.newMessage().key(key).value(count++).send();
+ }
+
+ @Cleanup
+ Consumer<Integer> consumer2 = createConsumer(topic);
+
+ for (int i = 0; i < 10; i++) {
+ String key = String.valueOf(random.nextInt(NUMBER_OF_KEYS));
+ producer.newMessage().key(key).value(count++).send();
+ }
+
+ @Cleanup
+ Consumer<Integer> consumer3 = createConsumer(topic);
+
+ consumer2.redeliverUnacknowledgedMessages();
+
+ for (int i = 0; i < 10; i++) {
+ String key = String.valueOf(random.nextInt(NUMBER_OF_KEYS));
+ producer.newMessage().key(key).value(count++).send();
+ }
+ consumer1.close();
+
+ for(int i = 0; i < count; i++) {
+ Message<Integer> msg = consumer2.receive(100,
TimeUnit.MILLISECONDS);
+ if (msg!=null) {
+ values.add(msg.getValue());
+ } else {
+ break;
+ }
+ }
+ for(int i = 0; i < count; i++) {
+ Message<Integer> msg = consumer3.receive(1, TimeUnit.MILLISECONDS);
+ if (msg!=null) {
+ values.add(msg.getValue());
+ } else {
+ break;
+ }
+ }
Review Comment:
this test is currently invalid. Messages would need to be acknowledged and
consumed concurrently. This test passes at least in branch-3.3 when making the
changes.
--
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
To unsubscribe, e-mail: [email protected]
For queries about this service, please contact Infrastructure at:
[email protected]