congbobo184 opened a new pull request, #17003:
URL: https://github.com/apache/pulsar/pull/17003
issue: reduce the consumer `unackedCount` use incorrect consumer.
reproduce step
```
for (int i = 0; i < 5; i++) {
producer.newMessage().value(("Hello Pulsar - " +
i).getBytes()).sendAsync();
}
// consume-1 receive 5 batch messages
List<MessageId> list = new ArrayList<>();
for (int i = 0; i < 5; i++) {
list.add(consumer1.receive().getMessageId());
}
// consumer-1 redeliver the batch messages
consumer1.negativeAcknowledge(list.get(0));
// consumer-2 will receive the messages that the consumer-1
redelivered
for (int i = 0; i < 5; i++) {
consumer2.receive().getMessageId();
}
// consumer1 ack two messages in the batch message
consumer1.acknowledge(list.get(1));
consumer1.acknowledge(list.get(2));
// consumer-2 redeliver the rest of the messages
consumer2.negativeAcknowledge(list.get(1));
// consume-1 close will redeliver the rest messages to consumer-2
consumer1.close();
// consumer-2 can receive the rest of 3 messages
for (int i = 0; i < 3; i++) {
consumer2.acknowledge(consumer2.receive().getMessageId());
}
// consumer-2 can't receive any messages, all the messages in batch
has been acked
Message<byte[]> message = consumer2.receive(1, TimeUnit.SECONDS);
assertNull(message);
// the number of consumer-2's unacked messages is 0
Awaitility.await().until(() ->
getPulsar().getBrokerService().getTopic(topicName, false)
.get().get().getSubscription(subName).getConsumers().get(0).getUnackedMessages()
== 0);
```
in current code the `getUnackedMessages = 2`
### Motivation
get the correct consumer and reduce the correct un acked messages
### Modifications
change the method `getAckedCountForBatchIndexLevelEnabled` use
ownerConsumer to check the pendingAck messages
### Verifying this change
add the test
### Does this pull request potentially affect one of the following parts:
*If `yes` was chosen, please highlight the changes*
- Dependencies (does it add or upgrade a dependency): (no)
- The public API: (no)
- The schema: (no)
- The default values of configurations: (no)
- The wire protocol: (no)
- The rest endpoints: (no)
- The admin cli options: (no)
- Anything that affects deployment: (no)
### Documentation
- Does this pull request introduce a new feature? (yes)
- If yes, how is the feature documented? (not applicable / docs / JavaDocs
/ not documented)
- If a feature is not applicable for documentation, explain why?
- If a feature is not documented yet in this PR, please create a followup
issue for adding the documentation
--
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
To unsubscribe, e-mail: [email protected]
For queries about this service, please contact Infrastructure at:
[email protected]