codelipenghui commented on code in PR #18491:
URL: https://github.com/apache/pulsar/pull/18491#discussion_r1023655858
##########
pulsar-broker/src/test/java/org/apache/pulsar/client/impl/NegativeAcksTest.java:
##########
@@ -439,4 +443,61 @@ public void run() {
Assert.assertEquals(count, 9);
Assert.assertEquals(0, datas.size());
}
+
+ /**
+ * see https://github.com/apache/pulsar/pull/18491
+ */
+ @Test
+ public void testMultiTopicConsumerConcurrentRedeliverAndReceive() throws
Exception {
Review Comment:
I have tried the test, but it seems unable to cover the change or reproduce
the issue without this fix.
After changing the test like this
```diff
diff --git
a/pulsar-broker/src/test/java/org/apache/pulsar/client/impl/NegativeAcksTest.java
b/pulsar-broker/src/test/java/org/apache/pulsar/client/impl/NegativeAcksTest.java
index 56f2d3f29ec..c476cb516aa 100644
---
a/pulsar-broker/src/test/java/org/apache/pulsar/client/impl/NegativeAcksTest.java
+++
b/pulsar-broker/src/test/java/org/apache/pulsar/client/impl/NegativeAcksTest.java
@@ -474,9 +474,12 @@ public class NegativeAcksTest extends
ProducerConsumerBase {
Awaitility.await().until(() -> consumer.incomingMessages.size() ==
receiverQueueSize);
+ // For testing the race condition of issue #18491
+ // We need to inject a delay for the pinned internal thread
+ injectDelayToInternalThread(consumer, 1000L);
consumer.redeliverUnacknowledgedMessages();
- waitMultiTopicConsumerRedeliverFinish(consumer);
-
+ // Make sure the message redelivery is completed. The incoming
queue will be cleaned up during the redelivery.
+ waitForAllTasksForInternalThread(consumer);
Set<Integer> receivedMsgs = new HashSet<>();
for (;;){
Message<Integer> msg = consumer.receive(2, TimeUnit.SECONDS);
@@ -491,7 +494,18 @@ public class NegativeAcksTest extends
ProducerConsumerBase {
/**
* If the task after "redeliver" finish, means task-redeliver finish.
*/
- private void
waitMultiTopicConsumerRedeliverFinish(MultiTopicsConsumerImpl consumer){
+ private void injectDelayToInternalThread(MultiTopicsConsumerImpl<?>
consumer, long delayInMillis){
+ ExecutorService internalPinnedExecutor =
+ WhiteboxImpl.getInternalState(consumer,
"internalPinnedExecutor");
+ internalPinnedExecutor.execute(() -> {
+ try {
+ Thread.sleep(delayInMillis);
+ } catch (InterruptedException ignore) {
+ }
+ });
+ }
+
+ private void
waitForAllTasksForInternalThread(MultiTopicsConsumerImpl<?> consumer) {
ExecutorService internalPinnedExecutor =
WhiteboxImpl.getInternalState(consumer,
"internalPinnedExecutor");
CompletableFuture<Void> taskAfterRedeliver = new
CompletableFuture<>();
```
The issue can be reproduced without the fix and passed with the fix.
--
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
To unsubscribe, e-mail: [email protected]
For queries about this service, please contact Infrastructure at:
[email protected]