BewareMyPower commented on code in PR #18491:
URL: https://github.com/apache/pulsar/pull/18491#discussion_r1024855178


##########
pulsar-broker/src/test/java/org/apache/pulsar/client/impl/NegativeAcksTest.java:
##########
@@ -439,4 +443,76 @@ public void run() {
         Assert.assertEquals(count, 9);
         Assert.assertEquals(0, datas.size());
     }
+
+    /**
+     * see https://github.com/apache/pulsar/pull/18491
+     */
+    @Test
+    public void testMultiTopicConsumerConcurrentRedeliverAndReceive() throws 
Exception {
+        final String topic = BrokerTestUtil.newUniqueName("my-topic");
+        admin.topics().createPartitionedTopic(topic, 2);
+
+        final int receiverQueueSize = 10;
+
+        @Cleanup
+        MultiTopicsConsumerImpl<Integer> consumer =
+                (MultiTopicsConsumerImpl<Integer>) 
pulsarClient.newConsumer(Schema.INT32)
+                .topic(topic)
+                .subscriptionName("sub")
+                .receiverQueueSize(receiverQueueSize)
+                .subscribe();
+
+        @Cleanup
+        Producer<Integer> producer = pulsarClient.newProducer(Schema.INT32)
+                .topic(topic)
+                .enableBatching(false)
+                .create();
+
+        for (int i = 0; i < receiverQueueSize; i++){
+            producer.send(i);
+        }
+
+        Awaitility.await().until(() -> consumer.incomingMessages.size() == 
receiverQueueSize);
+
+        // For testing the race condition of issue #18491
+        // We need to inject a delay for the pinned internal thread
+        injectDelayToInternalThread(consumer, 1000L);
+        consumer.redeliverUnacknowledgedMessages();
+        // Make sure the message redelivery is completed. The incoming queue 
will be cleaned up during the redelivery.
+        waitForAllTasksForInternalThread(consumer);
+
+        Set<Integer> receivedMsgs = new HashSet<>();
+        for (;;){
+            Message<Integer> msg = consumer.receive(2, TimeUnit.SECONDS);
+            if (msg == null){
+                break;
+            }
+            receivedMsgs.add(msg.getValue());
+        }
+        Assert.assertEquals(receivedMsgs.size(), 10);
+    }
+
+    private void injectDelayToInternalThread(MultiTopicsConsumerImpl<?> 
consumer, long delayInMillis){
+        ExecutorService internalPinnedExecutor =
+                        WhiteboxImpl.getInternalState(consumer, 
"internalPinnedExecutor");
+        internalPinnedExecutor.execute(() -> {
+            try {
+                Thread.sleep(delayInMillis);
+            } catch (InterruptedException ignore) {
+            }
+        });
+    }
+
+    /**
+     * If the task after "redeliver" finish, means task-redeliver finish.
+     */
+    private void waitForAllTasksForInternalThread(MultiTopicsConsumerImpl 
consumer){

Review Comment:
   ```suggestion
       private void waitForAllTasksForInternalThread(MultiTopicsConsumerImpl<?> 
consumer) {
   ```



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: [email protected]

For queries about this service, please contact Infrastructure at:
[email protected]

Reply via email to