codelipenghui commented on code in PR #18491:
URL: https://github.com/apache/pulsar/pull/18491#discussion_r1023655858


##########
pulsar-broker/src/test/java/org/apache/pulsar/client/impl/NegativeAcksTest.java:
##########
@@ -439,4 +443,61 @@ public void run() {
         Assert.assertEquals(count, 9);
         Assert.assertEquals(0, datas.size());
     }
+
+    /**
+     * see https://github.com/apache/pulsar/pull/18491
+     */
+    @Test
+    public void testMultiTopicConsumerConcurrentRedeliverAndReceive() throws 
Exception {

Review Comment:
   I have tried the test, but it seems unable to cover the change or reproduce 
the issue without this fix.
   
   After changing the test like this
   
   ```diff
   diff --git 
a/pulsar-broker/src/test/java/org/apache/pulsar/client/impl/NegativeAcksTest.java
 
b/pulsar-broker/src/test/java/org/apache/pulsar/client/impl/NegativeAcksTest.java
   index 56f2d3f29ec..c476cb516aa 100644
   --- 
a/pulsar-broker/src/test/java/org/apache/pulsar/client/impl/NegativeAcksTest.java
   +++ 
b/pulsar-broker/src/test/java/org/apache/pulsar/client/impl/NegativeAcksTest.java
   @@ -474,9 +474,12 @@ public class NegativeAcksTest extends 
ProducerConsumerBase {
    
            Awaitility.await().until(() -> consumer.incomingMessages.size() == 
receiverQueueSize);
    
   +        // For testing the race condition of issue #18491
   +        // We need to inject a delay for the pinned internal thread
   +        injectDelayToInternalThread(consumer, 1000L);
            consumer.redeliverUnacknowledgedMessages();
   -        waitMultiTopicConsumerRedeliverFinish(consumer);
   -
   +        // Make sure the message redelivery is completed. The incoming 
queue will be cleaned up during the redelivery.
   +        waitForAllTasksForInternalThread(consumer);
            Set<Integer> receivedMsgs = new HashSet<>();
            for (;;){
                Message<Integer> msg = consumer.receive(2, TimeUnit.SECONDS);
   @@ -491,7 +494,18 @@ public class NegativeAcksTest extends 
ProducerConsumerBase {
        /**
         * If the task after "redeliver" finish, means task-redeliver finish.
         */
   -    private void 
waitMultiTopicConsumerRedeliverFinish(MultiTopicsConsumerImpl consumer){
   +    private void injectDelayToInternalThread(MultiTopicsConsumerImpl<?> 
consumer, long delayInMillis){
   +        ExecutorService internalPinnedExecutor =
   +                WhiteboxImpl.getInternalState(consumer, 
"internalPinnedExecutor");
   +        internalPinnedExecutor.execute(() -> {
   +            try {
   +                Thread.sleep(delayInMillis);
   +            } catch (InterruptedException ignore) {
   +            }
   +        });
   +    }
   +
   +    private void 
waitForAllTasksForInternalThread(MultiTopicsConsumerImpl<?> consumer) {
            ExecutorService internalPinnedExecutor =
                    WhiteboxImpl.getInternalState(consumer, 
"internalPinnedExecutor");
            CompletableFuture<Void> taskAfterRedeliver = new 
CompletableFuture<>();
   ```
   
   The issue can be reproduced without the fix and passed with the fix.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: [email protected]

For queries about this service, please contact Infrastructure at:
[email protected]

Reply via email to