fretiq opened a new issue #8703:
URL: https://github.com/apache/pulsar/issues/8703
**Describe the bug**
When combining Key_Shared subscriptions and delayed messages, consumers fail
to receive messages.
**To Reproduce**
Steps to reproduce the behavior:
1. Create a topic + Key_Shared subscription
1. Send 40 delayed messages
1. Send 40 messages
1. Create subscription with `receiverQueueSize=10`
1. Create another subscription with `receiverQueueSize=10`
1. Receive messages on each subscription until exhaustion
1. See that subscription 1 receives ~13 messages and subscription 2 receives
none
Here's a test case:
```java
@Test
public void testContinueDispatchMessagesWhenMessageDelayed() throws
Exception {
int totalMessages = 40;
int sum = 0;
final String topic = "persistent://public/default/key_shared-" +
UUID.randomUUID();
final String subName = "my-sub";
@Cleanup
Consumer<Integer> consumer1 = pulsarClient.newConsumer(Schema.INT32)
.topic(topic)
.subscriptionName(subName)
.receiverQueueSize(10)
.subscriptionType(SubscriptionType.Key_Shared)
.subscribe();
@Cleanup
Producer<Integer> producer = pulsarClient.newProducer(Schema.INT32)
.topic(topic)
.create();
for (int i = 0; i < totalMessages; i++) {
producer.newMessage()
.key(String.valueOf(random.nextInt(NUMBER_OF_KEYS)))
.value(100 + i)
.deliverAfter(10, TimeUnit.SECONDS)
.send();
}
for (int i = 0; i < totalMessages; i++) {
producer.newMessage()
.key(String.valueOf(random.nextInt(NUMBER_OF_KEYS)))
.value(i)
.send();
}
@Cleanup
Consumer<Integer> consumer2 = pulsarClient.newConsumer(Schema.INT32)
.topic(topic)
.subscriptionName(subName)
.receiverQueueSize(10)
.subscriptionType(SubscriptionType.Key_Shared)
.subscribe();
for (int i = 0; i < 100; i++) {
Message<Integer> msg = consumer1.receive(1, TimeUnit.SECONDS);
if (msg != null) {
log.info("c1 message: {}", msg.getValue());
consumer1.acknowledge(msg);
} else {
break;
}
sum++;
}
log.info("Got {} messages...", sum);
for (int i = 0; i < 100; i++) {
Message<Integer> msg = consumer2.receive(1, TimeUnit.SECONDS);
if (msg != null) {
log.info("c2 message: {}", msg.getValue());
consumer2.acknowledge(msg);
} else {
break;
}
sum++;
}
log.info("Got {} other messages...", sum);
Assert.assertEquals(sum, totalMessages);
}
```
<details>
<summary>Logs</summary>
org.apache.pulsar.client.api.KeySharedSubscriptionTest - c1 message: 0
org.apache.pulsar.client.api.KeySharedSubscriptionTest - c1 message: 1
org.apache.pulsar.client.api.KeySharedSubscriptionTest - c1 message: 2
org.apache.pulsar.client.api.KeySharedSubscriptionTest - c1 message: 3
org.apache.pulsar.client.api.KeySharedSubscriptionTest - c1 message: 4
org.apache.pulsar.client.api.KeySharedSubscriptionTest - c1 message: 5
org.apache.pulsar.client.api.KeySharedSubscriptionTest - c1 message: 6
org.apache.pulsar.client.api.KeySharedSubscriptionTest - c1 message: 7
org.apache.pulsar.client.api.KeySharedSubscriptionTest - c1 message: 8
org.apache.pulsar.client.api.KeySharedSubscriptionTest - c1 message: 9
org.apache.pulsar.client.api.KeySharedSubscriptionTest - c1 message: 11
org.apache.pulsar.client.api.KeySharedSubscriptionTest - c1 message: 12
org.apache.pulsar.client.api.KeySharedSubscriptionTest - c1 message: 16
org.apache.pulsar.client.api.KeySharedSubscriptionTest - Got 13 messages...
org.apache.pulsar.client.api.KeySharedSubscriptionTest - Got 13 other
messages...
</details>
**Expected behavior**
All 40 messages should have been received.
**Desktop (please complete the following information):**
- OS: macOS
- Pulsar: v2.6.2
----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
For queries about this service, please contact Infrastructure at:
[email protected]