This is an automated email from the ASF dual-hosted git repository.
pingww pushed a commit to branch develop
in repository https://gitbox.apache.org/repos/asf/rocketmq-mqtt.git
The following commit(s) were added to refs/heads/develop by this push:
new 0a4c230 Move rpc code out of state machine processor
new 4f5dae9 Merge pull request #168 from DongyuanPan/develop_refactor
0a4c230 is described below
commit 0a4c230bf22f5a714145da08de91f9ed7d43302a
Author: dongyuan.pdy <[email protected]>
AuthorDate: Mon Oct 31 16:28:29 2022 +0800
Move rpc code out of state machine processor
---
{conf => distribution/conf}/meta.conf | 0
{conf => distribution/conf}/meta_spring.xml | 0
.../java/org/apache/rocketmq/mqtt/ds/retain/RetainedMsgClient.java | 2 +-
.../java/org/apache/rocketmq/mqtt/meta/raft/MqttRaftServer.java | 6 +++---
.../rocketmq/mqtt/meta/raft/processor/CounterStateProcessor.java | 1 +
.../mqtt/meta/raft/processor/RetainedMsgStateProcessor.java | 1 +
.../mqtt/meta/raft/{processor => rpc}/AbstractRpcProcessor.java | 3 ++-
.../rocketmq/mqtt/meta/raft/{processor => rpc}/Constants.java | 2 +-
.../mqtt/meta/raft/{processor => rpc}/MqttReadRpcProcessor.java | 2 +-
.../mqtt/meta/raft/{processor => rpc}/MqttWriteRpcProcessor.java | 2 +-
.../main/java/org/apache/rocketmq/mqtt/meta/starter/Startup.java | 2 +-
.../test/java/org/apache/rocketmq/mqtt/meta/raft/CounterClient.java | 4 ++--
.../org/apache/rocketmq/mqtt/meta/raft/RetainedMsgClientTest.java | 2 +-
13 files changed, 15 insertions(+), 12 deletions(-)
diff --git a/conf/meta.conf b/distribution/conf/meta.conf
similarity index 100%
rename from conf/meta.conf
rename to distribution/conf/meta.conf
diff --git a/conf/meta_spring.xml b/distribution/conf/meta_spring.xml
similarity index 100%
rename from conf/meta_spring.xml
rename to distribution/conf/meta_spring.xml
diff --git
a/mqtt-ds/src/main/java/org/apache/rocketmq/mqtt/ds/retain/RetainedMsgClient.java
b/mqtt-ds/src/main/java/org/apache/rocketmq/mqtt/ds/retain/RetainedMsgClient.java
index e89c3b6..b48fa89 100644
---
a/mqtt-ds/src/main/java/org/apache/rocketmq/mqtt/ds/retain/RetainedMsgClient.java
+++
b/mqtt-ds/src/main/java/org/apache/rocketmq/mqtt/ds/retain/RetainedMsgClient.java
@@ -35,7 +35,7 @@ import
org.apache.rocketmq.mqtt.common.model.consistency.Response;
import org.apache.rocketmq.mqtt.common.model.consistency.StoreMessage;
import org.apache.rocketmq.mqtt.common.model.consistency.WriteRequest;
import org.apache.rocketmq.mqtt.ds.config.ServiceConf;
-import org.apache.rocketmq.mqtt.meta.raft.processor.Constants;
+import org.apache.rocketmq.mqtt.meta.raft.rpc.Constants;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.springframework.stereotype.Service;
diff --git
a/mqtt-meta/src/main/java/org/apache/rocketmq/mqtt/meta/raft/MqttRaftServer.java
b/mqtt-meta/src/main/java/org/apache/rocketmq/mqtt/meta/raft/MqttRaftServer.java
index dab71a6..d857fa7 100644
---
a/mqtt-meta/src/main/java/org/apache/rocketmq/mqtt/meta/raft/MqttRaftServer.java
+++
b/mqtt-meta/src/main/java/org/apache/rocketmq/mqtt/meta/raft/MqttRaftServer.java
@@ -49,8 +49,8 @@ import
org.apache.rocketmq.mqtt.common.model.consistency.Response;
import org.apache.rocketmq.mqtt.common.model.consistency.WriteRequest;
import org.apache.rocketmq.mqtt.meta.config.MetaConf;
import org.apache.rocketmq.mqtt.meta.raft.processor.CounterStateProcessor;
-import org.apache.rocketmq.mqtt.meta.raft.processor.MqttReadRpcProcessor;
-import org.apache.rocketmq.mqtt.meta.raft.processor.MqttWriteRpcProcessor;
+import org.apache.rocketmq.mqtt.meta.raft.rpc.MqttReadRpcProcessor;
+import org.apache.rocketmq.mqtt.meta.raft.rpc.MqttWriteRpcProcessor;
import org.apache.rocketmq.mqtt.meta.raft.processor.RetainedMsgStateProcessor;
import org.apache.rocketmq.mqtt.meta.raft.processor.StateProcessor;
import org.slf4j.Logger;
@@ -148,7 +148,7 @@ public class MqttRaftServer {
this.cliClientService = (CliClientServiceImpl) ((CliServiceImpl)
this.cliService).getCliClientService();
registerStateProcessor(new CounterStateProcessor());
- registerStateProcessor(new
RetainedMsgStateProcessor(metaConf.getMaxRetainedMessageNum())); //add
retained msg porcessor
+ registerStateProcessor(new
RetainedMsgStateProcessor(metaConf.getMaxRetainedMessageNum())); //add
retained msg processor
start();
}
diff --git
a/mqtt-meta/src/main/java/org/apache/rocketmq/mqtt/meta/raft/processor/CounterStateProcessor.java
b/mqtt-meta/src/main/java/org/apache/rocketmq/mqtt/meta/raft/processor/CounterStateProcessor.java
index becc55d..7933020 100644
---
a/mqtt-meta/src/main/java/org/apache/rocketmq/mqtt/meta/raft/processor/CounterStateProcessor.java
+++
b/mqtt-meta/src/main/java/org/apache/rocketmq/mqtt/meta/raft/processor/CounterStateProcessor.java
@@ -24,6 +24,7 @@ import com.google.protobuf.ByteString;
import org.apache.rocketmq.mqtt.common.model.consistency.ReadRequest;
import org.apache.rocketmq.mqtt.common.model.consistency.Response;
import org.apache.rocketmq.mqtt.common.model.consistency.WriteRequest;
+import org.apache.rocketmq.mqtt.meta.raft.rpc.Constants;
import org.apache.rocketmq.mqtt.meta.raft.snapshot.SnapshotOperation;
import
org.apache.rocketmq.mqtt.meta.raft.snapshot.impl.CounterSnapshotOperation;
diff --git
a/mqtt-meta/src/main/java/org/apache/rocketmq/mqtt/meta/raft/processor/RetainedMsgStateProcessor.java
b/mqtt-meta/src/main/java/org/apache/rocketmq/mqtt/meta/raft/processor/RetainedMsgStateProcessor.java
index 13f0206..b8b662b 100644
---
a/mqtt-meta/src/main/java/org/apache/rocketmq/mqtt/meta/raft/processor/RetainedMsgStateProcessor.java
+++
b/mqtt-meta/src/main/java/org/apache/rocketmq/mqtt/meta/raft/processor/RetainedMsgStateProcessor.java
@@ -26,6 +26,7 @@ import
org.apache.rocketmq.mqtt.common.model.consistency.ReadRequest;
import org.apache.rocketmq.mqtt.common.model.consistency.Response;
import org.apache.rocketmq.mqtt.common.model.consistency.WriteRequest;
import org.apache.rocketmq.mqtt.common.util.TopicUtils;
+import org.apache.rocketmq.mqtt.meta.raft.rpc.Constants;
import org.apache.rocketmq.mqtt.meta.raft.snapshot.SnapshotOperation;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
diff --git
a/mqtt-meta/src/main/java/org/apache/rocketmq/mqtt/meta/raft/processor/AbstractRpcProcessor.java
b/mqtt-meta/src/main/java/org/apache/rocketmq/mqtt/meta/raft/rpc/AbstractRpcProcessor.java
similarity index 98%
rename from
mqtt-meta/src/main/java/org/apache/rocketmq/mqtt/meta/raft/processor/AbstractRpcProcessor.java
rename to
mqtt-meta/src/main/java/org/apache/rocketmq/mqtt/meta/raft/rpc/AbstractRpcProcessor.java
index 7def602..d595332 100644
---
a/mqtt-meta/src/main/java/org/apache/rocketmq/mqtt/meta/raft/processor/AbstractRpcProcessor.java
+++
b/mqtt-meta/src/main/java/org/apache/rocketmq/mqtt/meta/raft/rpc/AbstractRpcProcessor.java
@@ -15,7 +15,7 @@
* limitations under the License.
*/
-package org.apache.rocketmq.mqtt.meta.raft.processor;
+package org.apache.rocketmq.mqtt.meta.raft.rpc;
import com.alipay.sofa.jraft.Node;
import com.alipay.sofa.jraft.Status;
@@ -28,6 +28,7 @@ import
org.apache.rocketmq.mqtt.common.model.consistency.Response;
import org.apache.rocketmq.mqtt.meta.raft.FailoverClosure;
import org.apache.rocketmq.mqtt.meta.raft.MqttRaftServer;
import org.apache.rocketmq.mqtt.meta.raft.RaftGroupHolder;
+import org.apache.rocketmq.mqtt.meta.raft.processor.StateProcessor;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
diff --git
a/mqtt-meta/src/main/java/org/apache/rocketmq/mqtt/meta/raft/processor/Constants.java
b/mqtt-meta/src/main/java/org/apache/rocketmq/mqtt/meta/raft/rpc/Constants.java
similarity index 94%
rename from
mqtt-meta/src/main/java/org/apache/rocketmq/mqtt/meta/raft/processor/Constants.java
rename to
mqtt-meta/src/main/java/org/apache/rocketmq/mqtt/meta/raft/rpc/Constants.java
index c91423b..36c3a4d 100644
---
a/mqtt-meta/src/main/java/org/apache/rocketmq/mqtt/meta/raft/processor/Constants.java
+++
b/mqtt-meta/src/main/java/org/apache/rocketmq/mqtt/meta/raft/rpc/Constants.java
@@ -15,7 +15,7 @@
* limitations under the License.
*/
-package org.apache.rocketmq.mqtt.meta.raft.processor;
+package org.apache.rocketmq.mqtt.meta.raft.rpc;
public class Constants {
public static final String COUNTER = "counter";
diff --git
a/mqtt-meta/src/main/java/org/apache/rocketmq/mqtt/meta/raft/processor/MqttReadRpcProcessor.java
b/mqtt-meta/src/main/java/org/apache/rocketmq/mqtt/meta/raft/rpc/MqttReadRpcProcessor.java
similarity index 96%
rename from
mqtt-meta/src/main/java/org/apache/rocketmq/mqtt/meta/raft/processor/MqttReadRpcProcessor.java
rename to
mqtt-meta/src/main/java/org/apache/rocketmq/mqtt/meta/raft/rpc/MqttReadRpcProcessor.java
index f20ef79..6e9bdac 100644
---
a/mqtt-meta/src/main/java/org/apache/rocketmq/mqtt/meta/raft/processor/MqttReadRpcProcessor.java
+++
b/mqtt-meta/src/main/java/org/apache/rocketmq/mqtt/meta/raft/rpc/MqttReadRpcProcessor.java
@@ -15,7 +15,7 @@
* limitations under the License.
*/
-package org.apache.rocketmq.mqtt.meta.raft.processor;
+package org.apache.rocketmq.mqtt.meta.raft.rpc;
import com.alipay.sofa.jraft.rpc.RpcContext;
import com.alipay.sofa.jraft.rpc.RpcProcessor;
diff --git
a/mqtt-meta/src/main/java/org/apache/rocketmq/mqtt/meta/raft/processor/MqttWriteRpcProcessor.java
b/mqtt-meta/src/main/java/org/apache/rocketmq/mqtt/meta/raft/rpc/MqttWriteRpcProcessor.java
similarity index 96%
rename from
mqtt-meta/src/main/java/org/apache/rocketmq/mqtt/meta/raft/processor/MqttWriteRpcProcessor.java
rename to
mqtt-meta/src/main/java/org/apache/rocketmq/mqtt/meta/raft/rpc/MqttWriteRpcProcessor.java
index e60e783..80f0569 100644
---
a/mqtt-meta/src/main/java/org/apache/rocketmq/mqtt/meta/raft/processor/MqttWriteRpcProcessor.java
+++
b/mqtt-meta/src/main/java/org/apache/rocketmq/mqtt/meta/raft/rpc/MqttWriteRpcProcessor.java
@@ -15,7 +15,7 @@
* limitations under the License.
*/
-package org.apache.rocketmq.mqtt.meta.raft.processor;
+package org.apache.rocketmq.mqtt.meta.raft.rpc;
import com.alipay.sofa.jraft.rpc.RpcContext;
import com.alipay.sofa.jraft.rpc.RpcProcessor;
diff --git
a/mqtt-meta/src/main/java/org/apache/rocketmq/mqtt/meta/starter/Startup.java
b/mqtt-meta/src/main/java/org/apache/rocketmq/mqtt/meta/starter/Startup.java
index 1629586..8b5e8b3 100644
--- a/mqtt-meta/src/main/java/org/apache/rocketmq/mqtt/meta/starter/Startup.java
+++ b/mqtt-meta/src/main/java/org/apache/rocketmq/mqtt/meta/starter/Startup.java
@@ -26,6 +26,6 @@ public class Startup {
ClassPathXmlApplicationContext applicationContext = new
ClassPathXmlApplicationContext("classpath:meta_spring.xml");
SpringUtil.setApplicationContext(applicationContext);
- System.out.println("start main ...");
+ System.out.println("start meta ...");
}
}
\ No newline at end of file
diff --git
a/mqtt-meta/src/test/java/org/apache/rocketmq/mqtt/meta/raft/CounterClient.java
b/mqtt-meta/src/test/java/org/apache/rocketmq/mqtt/meta/raft/CounterClient.java
index 330fa75..4d409a7 100644
---
a/mqtt-meta/src/test/java/org/apache/rocketmq/mqtt/meta/raft/CounterClient.java
+++
b/mqtt-meta/src/test/java/org/apache/rocketmq/mqtt/meta/raft/CounterClient.java
@@ -28,7 +28,7 @@ import com.alipay.sofa.jraft.rpc.impl.MarshallerRegistry;
import com.alipay.sofa.jraft.rpc.impl.cli.CliClientServiceImpl;
import com.alipay.sofa.jraft.util.RpcFactoryHelper;
import org.apache.rocketmq.mqtt.common.model.consistency.*;
-import org.apache.rocketmq.mqtt.meta.raft.processor.Constants;
+import org.apache.rocketmq.mqtt.meta.raft.rpc.Constants;
import java.util.HashMap;
import java.util.concurrent.CountDownLatch;
@@ -40,7 +40,7 @@ public class CounterClient {
public static void main(final String[] args) throws Exception {
final String groupId = Constants.COUNTER + GROUP_SEQ_NUM_SPLIT + 0;
- final String confStr = "";
+ final String confStr = "localhost:25001";
initRpcServer();
final Configuration conf = new Configuration();
if (!conf.parse(confStr)) {
diff --git
a/mqtt-meta/src/test/java/org/apache/rocketmq/mqtt/meta/raft/RetainedMsgClientTest.java
b/mqtt-meta/src/test/java/org/apache/rocketmq/mqtt/meta/raft/RetainedMsgClientTest.java
index 7a07d60..d3a91e8 100644
---
a/mqtt-meta/src/test/java/org/apache/rocketmq/mqtt/meta/raft/RetainedMsgClientTest.java
+++
b/mqtt-meta/src/test/java/org/apache/rocketmq/mqtt/meta/raft/RetainedMsgClientTest.java
@@ -37,7 +37,7 @@ import
org.apache.rocketmq.mqtt.common.model.consistency.Response;
import org.apache.rocketmq.mqtt.common.model.consistency.StoreMessage;
import org.apache.rocketmq.mqtt.common.model.consistency.WriteRequest;
import org.apache.rocketmq.mqtt.common.util.TopicUtils;
-import org.apache.rocketmq.mqtt.meta.raft.processor.Constants;
+import org.apache.rocketmq.mqtt.meta.raft.rpc.Constants;
import org.junit.Assert;
import org.junit.Before;
import org.junit.Test;