This is an automated email from the ASF dual-hosted git repository.
lizhanhui pushed a commit to branch develop
in repository https://gitbox.apache.org/repos/asf/rocketmq.git
The following commit(s) were added to refs/heads/develop by this push:
new 15d32db03b [ISSUE #7547] Let consumer be aware of message queue
assignment change (#7548)
15d32db03b is described below
commit 15d32db03b20b130473271b182c9ba32cc8048cb
Author: Zhanhui Li <[email protected]>
AuthorDate: Mon Nov 13 09:44:25 2023 +0800
[ISSUE #7547] Let consumer be aware of message queue assignment change
(#7548)
* let consumer be aware of message queue assignment change
Signed-off-by: Li Zhanhui <[email protected]>
* add unit test for DefaultMQPushConsumer#setMessageQueueListener
Signed-off-by: Li Zhanhui <[email protected]>
* fix: bazel build warnings
Signed-off-by: Zhanhui Li <[email protected]>
* fix: set MixCommitlogTest test size as medium
Signed-off-by: Zhanhui Li <[email protected]>
* allow cache bazel test results
Signed-off-by: Li Zhanhui <[email protected]>
* fix code style issue by removing unused imports
Signed-off-by: Li Zhanhui <[email protected]>
* fix #7552
Signed-off-by: Zhanhui Li <[email protected]>
---------
Signed-off-by: Li Zhanhui <[email protected]>
Signed-off-by: Zhanhui Li <[email protected]>
---
.github/workflows/bazel.yml | 2 +-
.../rocketmq/client/consumer/DefaultMQPushConsumer.java | 13 +++++++++++++
.../org/apache/rocketmq/client/consumer/MQConsumer.java | 8 +++++---
.../rocketmq/client/consumer/MessageQueueListener.java | 5 ++---
.../client/impl/consumer/DefaultMQPushConsumerImpl.java | 10 +++++++++-
.../client/impl/consumer/RebalancePushImpl.java | 8 +++++++-
.../proxy/service/message/LocalRemotingCommand.java | 1 +
remoting/BUILD.bazel | 1 +
store/BUILD.bazel | 1 +
.../consumer/balance/NormalMsgDynamicBalanceIT.java | 17 +++++++++++++++++
tieredstore/BUILD.bazel | 1 +
tools/BUILD.bazel | 1 +
12 files changed, 59 insertions(+), 9 deletions(-)
diff --git a/.github/workflows/bazel.yml b/.github/workflows/bazel.yml
index 7652b93048..af674592bb 100644
--- a/.github/workflows/bazel.yml
+++ b/.github/workflows/bazel.yml
@@ -19,4 +19,4 @@ jobs:
- name: Build
run: bazel build --config=remote //...
- name: Run Tests
- run: bazel test --config=remote --nocache_test_results //...
\ No newline at end of file
+ run: bazel test --config=remote //...
\ No newline at end of file
diff --git
a/client/src/main/java/org/apache/rocketmq/client/consumer/DefaultMQPushConsumer.java
b/client/src/main/java/org/apache/rocketmq/client/consumer/DefaultMQPushConsumer.java
index 1afb9113eb..e593a17c98 100644
---
a/client/src/main/java/org/apache/rocketmq/client/consumer/DefaultMQPushConsumer.java
+++
b/client/src/main/java/org/apache/rocketmq/client/consumer/DefaultMQPushConsumer.java
@@ -150,6 +150,11 @@ public class DefaultMQPushConsumer extends ClientConfig
implements MQPushConsume
*/
private MessageListener messageListener;
+ /**
+ * Listener to call if message queue assignment is changed.
+ */
+ private MessageQueueListener messageQueueListener;
+
/**
* Offset Storage
*/
@@ -987,4 +992,12 @@ public class DefaultMQPushConsumer extends ClientConfig
implements MQPushConsume
public void setClientRebalance(boolean clientRebalance) {
this.clientRebalance = clientRebalance;
}
+
+ public MessageQueueListener getMessageQueueListener() {
+ return messageQueueListener;
+ }
+
+ public void setMessageQueueListener(MessageQueueListener
messageQueueListener) {
+ this.messageQueueListener = messageQueueListener;
+ }
}
diff --git
a/client/src/main/java/org/apache/rocketmq/client/consumer/MQConsumer.java
b/client/src/main/java/org/apache/rocketmq/client/consumer/MQConsumer.java
index f4a8eda23a..81e06ee417 100644
--- a/client/src/main/java/org/apache/rocketmq/client/consumer/MQConsumer.java
+++ b/client/src/main/java/org/apache/rocketmq/client/consumer/MQConsumer.java
@@ -29,20 +29,22 @@ import
org.apache.rocketmq.remoting.exception.RemotingException;
*/
public interface MQConsumer extends MQAdmin {
/**
- * If consuming failure,message will be send back to the brokers,and delay
consuming some time
+ * If consuming of messages failed, they will be sent back to the brokers
for another delivery attempt after
+ * interval specified in delay level.
*/
@Deprecated
void sendMessageBack(final MessageExt msg, final int delayLevel) throws
RemotingException,
MQBrokerException, InterruptedException, MQClientException;
/**
- * If consuming failure,message will be send back to the broker,and delay
consuming some time
+ * If consuming of messages failed, they will be sent back to the brokers
for another delivery attempt after
+ * interval specified in delay level.
*/
void sendMessageBack(final MessageExt msg, final int delayLevel, final
String brokerName)
throws RemotingException, MQBrokerException, InterruptedException,
MQClientException;
/**
- * Fetch message queues from consumer cache according to the topic
+ * Fetch message queues from consumer cache pertaining to the given topic.
*
* @param topic message topic
* @return queue set
diff --git
a/client/src/main/java/org/apache/rocketmq/client/consumer/MessageQueueListener.java
b/client/src/main/java/org/apache/rocketmq/client/consumer/MessageQueueListener.java
index 63795a6eeb..74510f4c3e 100644
---
a/client/src/main/java/org/apache/rocketmq/client/consumer/MessageQueueListener.java
+++
b/client/src/main/java/org/apache/rocketmq/client/consumer/MessageQueueListener.java
@@ -26,8 +26,7 @@ public interface MessageQueueListener {
/**
* @param topic message topic
* @param mqAll all queues in this message topic
- * @param mqDivided collection of queues,assigned to the current consumer
+ * @param mqAssigned collection of queues, assigned to the current consumer
*/
- void messageQueueChanged(final String topic, final Set<MessageQueue> mqAll,
- final Set<MessageQueue> mqDivided);
+ void messageQueueChanged(final String topic, final Set<MessageQueue>
mqAll, final Set<MessageQueue> mqAssigned);
}
diff --git
a/client/src/main/java/org/apache/rocketmq/client/impl/consumer/DefaultMQPushConsumerImpl.java
b/client/src/main/java/org/apache/rocketmq/client/impl/consumer/DefaultMQPushConsumerImpl.java
index e57579321c..cfb89b5c88 100644
---
a/client/src/main/java/org/apache/rocketmq/client/impl/consumer/DefaultMQPushConsumerImpl.java
+++
b/client/src/main/java/org/apache/rocketmq/client/impl/consumer/DefaultMQPushConsumerImpl.java
@@ -35,6 +35,7 @@ import org.apache.rocketmq.client.consumer.AckCallback;
import org.apache.rocketmq.client.consumer.AckResult;
import org.apache.rocketmq.client.consumer.AckStatus;
import org.apache.rocketmq.client.consumer.DefaultMQPushConsumer;
+import org.apache.rocketmq.client.consumer.MessageQueueListener;
import org.apache.rocketmq.client.consumer.MessageSelector;
import org.apache.rocketmq.client.consumer.PopCallback;
import org.apache.rocketmq.client.consumer.PopResult;
@@ -132,7 +133,7 @@ public class DefaultMQPushConsumerImpl implements
MQConsumerInner {
private long queueMaxSpanFlowControlTimes = 0;
//10s 30s 1m 2m 3m 4m 5m 6m 7m 8m 9m 10m 20m 30m 1h 2h
- private int[] popDelayLevel = new int[] {10, 30, 60, 120, 180, 240, 300,
360, 420, 480, 540, 600, 1200, 1800, 3600, 7200};
+ private final int[] popDelayLevel = new int[] {10, 30, 60, 120, 180, 240,
300, 360, 420, 480, 540, 600, 1200, 1800, 3600, 7200};
private static final int MAX_POP_INVISIBLE_TIME = 300000;
private static final int MIN_POP_INVISIBLE_TIME = 5000;
@@ -1553,4 +1554,11 @@ public class DefaultMQPushConsumerImpl implements
MQConsumerInner {
int[] getPopDelayLevel() {
return popDelayLevel;
}
+
+ public MessageQueueListener getMessageQueueListener() {
+ if (null == defaultMQPushConsumer) {
+ return null;
+ }
+ return defaultMQPushConsumer.getMessageQueueListener();
+ }
}
diff --git
a/client/src/main/java/org/apache/rocketmq/client/impl/consumer/RebalancePushImpl.java
b/client/src/main/java/org/apache/rocketmq/client/impl/consumer/RebalancePushImpl.java
index df509f3716..f9cf429c69 100644
---
a/client/src/main/java/org/apache/rocketmq/client/impl/consumer/RebalancePushImpl.java
+++
b/client/src/main/java/org/apache/rocketmq/client/impl/consumer/RebalancePushImpl.java
@@ -20,6 +20,7 @@ import java.util.List;
import java.util.Set;
import java.util.concurrent.TimeUnit;
import org.apache.rocketmq.client.consumer.AllocateMessageQueueStrategy;
+import org.apache.rocketmq.client.consumer.MessageQueueListener;
import org.apache.rocketmq.client.consumer.store.OffsetStore;
import org.apache.rocketmq.client.consumer.store.ReadOffsetType;
import org.apache.rocketmq.client.exception.MQClientException;
@@ -52,7 +53,7 @@ public class RebalancePushImpl extends RebalanceImpl {
@Override
public void messageQueueChanged(String topic, Set<MessageQueue> mqAll,
Set<MessageQueue> mqDivided) {
- /**
+ /*
* When rebalance result changed, should update subscription's version
to notify broker.
* Fix: inconsistency subscription may lead to consumer miss messages.
*/
@@ -82,6 +83,11 @@ public class RebalancePushImpl extends RebalanceImpl {
// notify broker
this.getmQClientFactory().sendHeartbeatToAllBrokerWithLockV2(true);
+
+ MessageQueueListener messageQueueListener =
defaultMQPushConsumerImpl.getMessageQueueListener();
+ if (null != messageQueueListener) {
+ messageQueueListener.messageQueueChanged(topic, mqAll, mqDivided);
+ }
}
@Override
diff --git
a/proxy/src/main/java/org/apache/rocketmq/proxy/service/message/LocalRemotingCommand.java
b/proxy/src/main/java/org/apache/rocketmq/proxy/service/message/LocalRemotingCommand.java
index 915cafcd57..7bf4a16982 100644
---
a/proxy/src/main/java/org/apache/rocketmq/proxy/service/message/LocalRemotingCommand.java
+++
b/proxy/src/main/java/org/apache/rocketmq/proxy/service/message/LocalRemotingCommand.java
@@ -32,6 +32,7 @@ public class LocalRemotingCommand extends RemotingCommand {
cmd.writeCustomHeader(customHeader);
cmd.setExtFields(new HashMap<>());
setCmdVersion(cmd);
+ cmd.makeCustomHeaderToNet();
return cmd;
}
diff --git a/remoting/BUILD.bazel b/remoting/BUILD.bazel
index db8b24301d..072148bc08 100644
--- a/remoting/BUILD.bazel
+++ b/remoting/BUILD.bazel
@@ -65,6 +65,7 @@ java_library(
"@maven//:io_opentelemetry_opentelemetry_sdk_metrics",
"@maven//:org_apache_tomcat_annotations_api",
"@maven//:org_apache_commons_commons_lang3",
+ "@maven//:org_jetbrains_annotations",
],
resources = glob(["src/test/resources/certs/*.pem"]) +
glob(["src/test/resources/certs/*.key"])
)
diff --git a/store/BUILD.bazel b/store/BUILD.bazel
index bf594aaa69..4b046c68eb 100644
--- a/store/BUILD.bazel
+++ b/store/BUILD.bazel
@@ -79,6 +79,7 @@ GenTestRules(
"src/test/java/org/apache/rocketmq/store/dledger/DLedgerCommitlogTest",
"src/test/java/org/apache/rocketmq/store/MappedFileQueueTest",
"src/test/java/org/apache/rocketmq/store/queue/BatchConsumeMessageTest",
+ "src/test/java/org/apache/rocketmq/store/dledger/MixCommitlogTest",
],
test_files = glob(["src/test/java/**/*Test.java"]),
deps = [
diff --git
a/test/src/test/java/org/apache/rocketmq/test/client/consumer/balance/NormalMsgDynamicBalanceIT.java
b/test/src/test/java/org/apache/rocketmq/test/client/consumer/balance/NormalMsgDynamicBalanceIT.java
index b2c9b06589..684b718ae5 100644
---
a/test/src/test/java/org/apache/rocketmq/test/client/consumer/balance/NormalMsgDynamicBalanceIT.java
+++
b/test/src/test/java/org/apache/rocketmq/test/client/consumer/balance/NormalMsgDynamicBalanceIT.java
@@ -17,6 +17,8 @@
package org.apache.rocketmq.test.client.consumer.balance;
+import java.util.concurrent.CountDownLatch;
+import java.util.concurrent.TimeUnit;
import org.apache.rocketmq.logging.org.slf4j.Logger;
import org.apache.rocketmq.logging.org.slf4j.LoggerFactory;
import org.apache.rocketmq.test.base.BaseConf;
@@ -112,4 +114,19 @@ public class NormalMsgDynamicBalanceIT extends BaseConf {
consumer2.getListener().getAllUndupMsgBody()).size());
assertThat(balance).isEqualTo(true);
}
+
+ @Test
+ public void testMessageQueueListener() throws InterruptedException {
+ final CountDownLatch latch = new CountDownLatch(1);
+
+ RMQNormalConsumer consumer1 = getConsumer(NAMESRV_ADDR, topic, "*",
new RMQNormalListener());
+ // Register message queue listener
+ consumer1.getConsumer().setMessageQueueListener((topic, mqAll,
mqAssigned) -> latch.countDown());
+
+ // Without message queue listener
+ RMQNormalConsumer consumer2 = getConsumer(NAMESRV_ADDR,
consumer1.getConsumerGroup(), topic,
+ "*", new RMQNormalListener());
+
+ Assert.assertTrue(latch.await(30, TimeUnit.SECONDS));
+ }
}
diff --git a/tieredstore/BUILD.bazel b/tieredstore/BUILD.bazel
index 8b6705ac28..e16fca90d0 100644
--- a/tieredstore/BUILD.bazel
+++ b/tieredstore/BUILD.bazel
@@ -41,6 +41,7 @@ java_library(
"@maven//:org_apache_tomcat_annotations_api",
"@maven//:com_alibaba_fastjson",
"@maven//:org_apache_rocketmq_rocketmq_rocksdb",
+ "@maven//:commons_collections_commons_collections",
],
)
diff --git a/tools/BUILD.bazel b/tools/BUILD.bazel
index 9ccc115335..05d88f7b00 100644
--- a/tools/BUILD.bazel
+++ b/tools/BUILD.bazel
@@ -39,6 +39,7 @@ java_library(
"@maven//:commons_collections_commons_collections",
"@maven//:io_github_aliyunmq_rocketmq_slf4j_api",
"@maven//:io_github_aliyunmq_rocketmq_logback_classic",
+ "@maven//:org_apache_rocketmq_rocketmq_rocksdb",
],
)