BewareMyPower commented on code in PR #18491:
URL: https://github.com/apache/pulsar/pull/18491#discussion_r1024855178
##########
pulsar-broker/src/test/java/org/apache/pulsar/client/impl/NegativeAcksTest.java:
##########
@@ -439,4 +443,76 @@ public void run() {
Assert.assertEquals(count, 9);
Assert.assertEquals(0, datas.size());
}
+
+ /**
+ * see https://github.com/apache/pulsar/pull/18491
+ */
+ @Test
+ public void testMultiTopicConsumerConcurrentRedeliverAndReceive() throws
Exception {
+ final String topic = BrokerTestUtil.newUniqueName("my-topic");
+ admin.topics().createPartitionedTopic(topic, 2);
+
+ final int receiverQueueSize = 10;
+
+ @Cleanup
+ MultiTopicsConsumerImpl<Integer> consumer =
+ (MultiTopicsConsumerImpl<Integer>)
pulsarClient.newConsumer(Schema.INT32)
+ .topic(topic)
+ .subscriptionName("sub")
+ .receiverQueueSize(receiverQueueSize)
+ .subscribe();
+
+ @Cleanup
+ Producer<Integer> producer = pulsarClient.newProducer(Schema.INT32)
+ .topic(topic)
+ .enableBatching(false)
+ .create();
+
+ for (int i = 0; i < receiverQueueSize; i++){
+ producer.send(i);
+ }
+
+ Awaitility.await().until(() -> consumer.incomingMessages.size() ==
receiverQueueSize);
+
+ // For testing the race condition of issue #18491
+ // We need to inject a delay for the pinned internal thread
+ injectDelayToInternalThread(consumer, 1000L);
+ consumer.redeliverUnacknowledgedMessages();
+ // Make sure the message redelivery is completed. The incoming queue
will be cleaned up during the redelivery.
+ waitForAllTasksForInternalThread(consumer);
+
+ Set<Integer> receivedMsgs = new HashSet<>();
+ for (;;){
+ Message<Integer> msg = consumer.receive(2, TimeUnit.SECONDS);
+ if (msg == null){
+ break;
+ }
+ receivedMsgs.add(msg.getValue());
+ }
+ Assert.assertEquals(receivedMsgs.size(), 10);
+ }
+
+ private void injectDelayToInternalThread(MultiTopicsConsumerImpl<?>
consumer, long delayInMillis){
+ ExecutorService internalPinnedExecutor =
+ WhiteboxImpl.getInternalState(consumer,
"internalPinnedExecutor");
+ internalPinnedExecutor.execute(() -> {
+ try {
+ Thread.sleep(delayInMillis);
+ } catch (InterruptedException ignore) {
+ }
+ });
+ }
+
+ /**
+ * If the task after "redeliver" finish, means task-redeliver finish.
+ */
+ private void waitForAllTasksForInternalThread(MultiTopicsConsumerImpl
consumer){
Review Comment:
```suggestion
private void waitForAllTasksForInternalThread(MultiTopicsConsumerImpl<?>
consumer) {
```
--
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
To unsubscribe, e-mail: [email protected]
For queries about this service, please contact Infrastructure at:
[email protected]