This is an automated email from the ASF dual-hosted git repository.
jinrongtong pushed a commit to branch develop
in repository https://gitbox.apache.org/repos/asf/rocketmq-mqtt.git
The following commit(s) were added to refs/heads/develop by this push:
new 495d573 [ISSUE #274] Fix startup exception (#276)
495d573 is described below
commit 495d573d1964b395c0b6098b8cc498a2ab7f6fd4
Author: yx9o <[email protected]>
AuthorDate: Thu May 23 10:26:17 2024 +0800
[ISSUE #274] Fix startup exception (#276)
* fix: startup exception
* fix: wrong unit test
* Adjust loading order
* Adjust loading order
* Cherry pick #265
---
distribution/conf/spring.xml | 2 +-
.../mqtt/ds/meta/MetaPersistManagerSample.java | 1 -
.../rocketmq/mqtt/ds/notify/NotifyManager.java | 15 +++++++-----
.../ds/test/meta/TestMetaPersistManagerSample.java | 28 ++++++++++++++++------
.../mqtt/ds/test/notify/TestNotifyManager.java | 2 +-
5 files changed, 32 insertions(+), 16 deletions(-)
diff --git a/distribution/conf/spring.xml b/distribution/conf/spring.xml
index 12a4bc6..cf90640 100644
--- a/distribution/conf/spring.xml
+++ b/distribution/conf/spring.xml
@@ -35,7 +35,7 @@
<bean id="authManager"
class="org.apache.rocketmq.mqtt.ds.auth.AuthManagerSample" init-method="init"/>
- <bean id="metaPersistManager"
class="org.apache.rocketmq.mqtt.ds.meta.MetaPersistManagerSample"
init-method="init"/>
+ <bean id="metaPersistManager"
class="org.apache.rocketmq.mqtt.ds.meta.MetaPersistManagerSample"
init-method="init" depends-on="notifyManager"/>
<bean id="RetainedPersistManager"
class="org.apache.rocketmq.mqtt.ds.meta.RetainedPersistManagerImpl"
init-method="init"/>
<bean id="willMsgPersistManager"
class="org.apache.rocketmq.mqtt.ds.meta.WillMsgPersistManagerImpl"
init-method="init"/>
diff --git
a/mqtt-ds/src/main/java/org/apache/rocketmq/mqtt/ds/meta/MetaPersistManagerSample.java
b/mqtt-ds/src/main/java/org/apache/rocketmq/mqtt/ds/meta/MetaPersistManagerSample.java
index 97b85e1..ff396c0 100644
---
a/mqtt-ds/src/main/java/org/apache/rocketmq/mqtt/ds/meta/MetaPersistManagerSample.java
+++
b/mqtt-ds/src/main/java/org/apache/rocketmq/mqtt/ds/meta/MetaPersistManagerSample.java
@@ -17,7 +17,6 @@
package org.apache.rocketmq.mqtt.ds.meta;
-import org.apache.commons.lang3.StringUtils;
import org.apache.rocketmq.client.exception.MQBrokerException;
import org.apache.rocketmq.client.exception.MQClientException;
import org.apache.rocketmq.common.MixAll;
diff --git
a/mqtt-ds/src/main/java/org/apache/rocketmq/mqtt/ds/notify/NotifyManager.java
b/mqtt-ds/src/main/java/org/apache/rocketmq/mqtt/ds/notify/NotifyManager.java
index e027fa5..d94b0f6 100644
---
a/mqtt-ds/src/main/java/org/apache/rocketmq/mqtt/ds/notify/NotifyManager.java
+++
b/mqtt-ds/src/main/java/org/apache/rocketmq/mqtt/ds/notify/NotifyManager.java
@@ -46,9 +46,9 @@ import org.apache.rocketmq.remoting.netty.NettyRemotingClient;
import org.apache.rocketmq.remoting.protocol.RemotingCommand;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
+import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.stereotype.Component;
-import javax.annotation.PostConstruct;
import javax.annotation.Resource;
import java.util.HashSet;
import java.util.Iterator;
@@ -73,9 +73,7 @@ public class NotifyManager {
private NettyRemotingClient remotingClient;
private DefaultMQProducer defaultMQProducer;
-
- @Resource
- private ServiceConf serviceConf;
+ private final ServiceConf serviceConf;
@Resource
private MetaPersistManager metaPersistManager;
@@ -83,8 +81,13 @@ public class NotifyManager {
@Resource
private FirstTopicManager firstTopicManager;
- @PostConstruct
- public void init() throws MQClientException {
+ @Autowired
+ public NotifyManager(ServiceConf serviceConf) {
+ this.serviceConf = serviceConf;
+ init();
+ }
+
+ private void init() {
defaultMQPushConsumer =
MqFactory.buildDefaultMQPushConsumer(dispatcherConsumerGroup,
serviceConf.getProperties(), new Dispatcher());
defaultMQPushConsumer.setPullInterval(1);
defaultMQPushConsumer.setConsumeMessageBatchMaxSize(64);
diff --git
a/mqtt-ds/src/test/java/org/apache/rocketmq/mqtt/ds/test/meta/TestMetaPersistManagerSample.java
b/mqtt-ds/src/test/java/org/apache/rocketmq/mqtt/ds/test/meta/TestMetaPersistManagerSample.java
index 1c9eebf..5dad741 100644
---
a/mqtt-ds/src/test/java/org/apache/rocketmq/mqtt/ds/test/meta/TestMetaPersistManagerSample.java
+++
b/mqtt-ds/src/test/java/org/apache/rocketmq/mqtt/ds/test/meta/TestMetaPersistManagerSample.java
@@ -19,41 +19,55 @@ package org.apache.rocketmq.mqtt.ds.test.meta;
import org.apache.commons.lang3.reflect.FieldUtils;
import org.apache.commons.lang3.reflect.MethodUtils;
+import org.apache.rocketmq.client.exception.MQBrokerException;
import org.apache.rocketmq.client.exception.MQClientException;
import org.apache.rocketmq.mqtt.common.util.TopicUtils;
import org.apache.rocketmq.mqtt.ds.meta.MetaPersistManagerSample;
import org.apache.rocketmq.remoting.exception.RemotingException;
+import org.apache.rocketmq.remoting.protocol.body.Connection;
+import org.apache.rocketmq.remoting.protocol.body.ConsumerConnection;
import org.apache.rocketmq.tools.admin.DefaultMQAdminExt;
-import org.junit.Assert;
import org.junit.Test;
import org.junit.runner.RunWith;
import org.mockito.junit.MockitoJUnitRunner;
import java.lang.reflect.InvocationTargetException;
+import java.util.HashSet;
+import static org.mockito.ArgumentMatchers.anyString;
+import static org.junit.Assert.assertEquals;
import static org.mockito.Mockito.mock;
import static org.mockito.Mockito.when;
@RunWith(MockitoJUnitRunner.class)
public class TestMetaPersistManagerSample {
+
private static final String RMQ_NAMESPACE = "LMQ";
+
private static final String KEY_LMQ_ALL_FIRST_TOPICS = "ALL_FIRST_TOPICS";
- private static final String KEY_LMQ_CONNECT_NODES = "LMQ_CONNECT_NODES";
@Test
- public void refreshMeta() throws IllegalAccessException,
RemotingException, InterruptedException, MQClientException,
InvocationTargetException, NoSuchMethodException {
+ public void refreshMeta() throws IllegalAccessException,
RemotingException, InterruptedException, MQClientException,
+ InvocationTargetException, NoSuchMethodException, MQBrokerException {
MetaPersistManagerSample metaPersistManagerSample = new
MetaPersistManagerSample();
DefaultMQAdminExt defaultMQAdminExt = mock(DefaultMQAdminExt.class);
FieldUtils.writeDeclaredField(metaPersistManagerSample,
"defaultMQAdminExt", defaultMQAdminExt, true);
String firstTopic = "test";
String wildcards = "test/2/#";
String node = "localhost";
+ Connection connection = new Connection();
+ connection.setClientAddr(node);
+ HashSet<Connection> connectNodeSet = new HashSet<>();
+ connectNodeSet.add(connection);
+ ConsumerConnection consumerConnection = new ConsumerConnection();
+ consumerConnection.setConnectionSet(connectNodeSet);
+
when(defaultMQAdminExt.getKVConfig(RMQ_NAMESPACE,KEY_LMQ_ALL_FIRST_TOPICS)).thenReturn(firstTopic);
when(defaultMQAdminExt.getKVConfig(RMQ_NAMESPACE,firstTopic)).thenReturn(wildcards);
-
when(defaultMQAdminExt.getKVConfig(RMQ_NAMESPACE,KEY_LMQ_CONNECT_NODES)).thenReturn(node);
+
when(defaultMQAdminExt.examineConsumerConnectionInfo(anyString())).thenReturn(consumerConnection);
MethodUtils.invokeMethod(metaPersistManagerSample, true,
"refreshMeta");
-
Assert.assertTrue(firstTopic.equals(metaPersistManagerSample.getAllFirstTopics().iterator().next()));
-
Assert.assertTrue(TopicUtils.normalizeTopic(wildcards).equals(metaPersistManagerSample.getWildcards(firstTopic).iterator().next()));
-
Assert.assertTrue(node.equals(metaPersistManagerSample.getConnectNodeSet().iterator().next()));
+ assertEquals(firstTopic,
metaPersistManagerSample.getAllFirstTopics().iterator().next());
+ assertEquals(TopicUtils.normalizeTopic(wildcards),
metaPersistManagerSample.getWildcards(firstTopic).iterator().next());
+ assertEquals("localhost",
metaPersistManagerSample.getConnectNodeSet().iterator().next());
}
}
diff --git
a/mqtt-ds/src/test/java/org/apache/rocketmq/mqtt/ds/test/notify/TestNotifyManager.java
b/mqtt-ds/src/test/java/org/apache/rocketmq/mqtt/ds/test/notify/TestNotifyManager.java
index 0e2d28a..fd17bd4 100644
---
a/mqtt-ds/src/test/java/org/apache/rocketmq/mqtt/ds/test/notify/TestNotifyManager.java
+++
b/mqtt-ds/src/test/java/org/apache/rocketmq/mqtt/ds/test/notify/TestNotifyManager.java
@@ -88,7 +88,7 @@ public class TestNotifyManager {
@Before
public void SetUp() throws IllegalAccessException {
- notifyManager = new NotifyManager();
+ notifyManager = new NotifyManager(serviceConf);
FieldUtils.writeDeclaredField(notifyManager, "metaPersistManager",
metaPersistManager, true);
FieldUtils.writeDeclaredField(notifyManager, "firstTopicManager",
firstTopicManager, true);
FieldUtils.writeDeclaredField(notifyManager, "defaultMQPushConsumer",
defaultMQPushConsumer, true);