This is an automated email from the ASF dual-hosted git repository.
lizhanhui pushed a commit to branch consumer_aware_queue_change
in repository https://gitbox.apache.org/repos/asf/rocketmq.git
The following commit(s) were added to refs/heads/consumer_aware_queue_change by
this push:
new 974c765882 add unit test for
DefaultMQPushConsumer#setMessageQueueListener
974c765882 is described below
commit 974c7658828fb70028de6ecdd2966b3ad638a2ed
Author: Li Zhanhui <[email protected]>
AuthorDate: Fri Nov 10 15:42:23 2023 +0800
add unit test for DefaultMQPushConsumer#setMessageQueueListener
Signed-off-by: Li Zhanhui <[email protected]>
---
.../consumer/balance/NormalMsgDynamicBalanceIT.java | 20 ++++++++++++++++++++
1 file changed, 20 insertions(+)
diff --git
a/test/src/test/java/org/apache/rocketmq/test/client/consumer/balance/NormalMsgDynamicBalanceIT.java
b/test/src/test/java/org/apache/rocketmq/test/client/consumer/balance/NormalMsgDynamicBalanceIT.java
index b2c9b06589..9bcd2a5ce2 100644
---
a/test/src/test/java/org/apache/rocketmq/test/client/consumer/balance/NormalMsgDynamicBalanceIT.java
+++
b/test/src/test/java/org/apache/rocketmq/test/client/consumer/balance/NormalMsgDynamicBalanceIT.java
@@ -17,6 +17,11 @@
package org.apache.rocketmq.test.client.consumer.balance;
+import java.util.Set;
+import java.util.concurrent.CountDownLatch;
+import java.util.concurrent.TimeUnit;
+import org.apache.rocketmq.client.consumer.MessageQueueListener;
+import org.apache.rocketmq.common.message.MessageQueue;
import org.apache.rocketmq.logging.org.slf4j.Logger;
import org.apache.rocketmq.logging.org.slf4j.LoggerFactory;
import org.apache.rocketmq.test.base.BaseConf;
@@ -112,4 +117,19 @@ public class NormalMsgDynamicBalanceIT extends BaseConf {
consumer2.getListener().getAllUndupMsgBody()).size());
assertThat(balance).isEqualTo(true);
}
+
+ @Test
+ public void testMessageQueueListener() throws InterruptedException {
+ final CountDownLatch latch = new CountDownLatch(1);
+
+ RMQNormalConsumer consumer1 = getConsumer(NAMESRV_ADDR, topic, "*",
new RMQNormalListener());
+ // Register message queue listener
+ consumer1.getConsumer().setMessageQueueListener((topic, mqAll,
mqAssigned) -> latch.countDown());
+
+ // Without message queue listener
+ RMQNormalConsumer consumer2 = getConsumer(NAMESRV_ADDR,
consumer1.getConsumerGroup(), topic,
+ "*", new RMQNormalListener());
+
+ Assert.assertTrue(latch.await(30, TimeUnit.SECONDS));
+ }
}