This is an automated email from the ASF dual-hosted git repository.

dinglei pushed a commit to branch develop
in repository https://gitbox.apache.org/repos/asf/rocketmq-mqtt.git


The following commit(s) were added to refs/heads/develop by this push:
     new 4fac7e8  add some comment (#128)
4fac7e8 is described below

commit 4fac7e8778395d31f294927835acc2780d6d2ef5
Author: Dongyuan Pan <[email protected]>
AuthorDate: Sun Jul 31 15:25:04 2022 +0800

    add some comment (#128)
---
 .../meta/raft/processor/AbstractRpcProcessor.java  | 18 ++++++++++++++
 .../meta/raft/processor/MqttReadRpcProcessor.java  |  3 +++
 .../meta/raft/processor/MqttWriteRpcProcessor.java |  3 +++
 .../mqtt/meta/raft/processor/StateProcessor.java   | 28 ++++++++++++++++++++++
 4 files changed, 52 insertions(+)

diff --git 
a/meta/src/main/java/org/apache/rocketmq/mqtt/meta/raft/processor/AbstractRpcProcessor.java
 
b/meta/src/main/java/org/apache/rocketmq/mqtt/meta/raft/processor/AbstractRpcProcessor.java
index f469f1b..7def602 100644
--- 
a/meta/src/main/java/org/apache/rocketmq/mqtt/meta/raft/processor/AbstractRpcProcessor.java
+++ 
b/meta/src/main/java/org/apache/rocketmq/mqtt/meta/raft/processor/AbstractRpcProcessor.java
@@ -33,9 +33,20 @@ import org.slf4j.LoggerFactory;
 
 import java.util.Objects;
 
+/**
+ * RPC abstract processor
+ */
 public abstract class AbstractRpcProcessor {
     private static final Logger LOGGER = 
LoggerFactory.getLogger(AbstractRpcProcessor.class);
 
+    /**
+     * The default RPC request handling method, where the current node is the 
master node of the requested RAFT group, processes the request
+     *
+     * @param server
+     * @param group
+     * @param rpcCtx
+     * @param message
+     */
     protected void handleRequest(final MqttRaftServer server, final String 
group, final RpcContext rpcCtx, Message message) {
         try {
             final RaftGroupHolder raftGroupHolder = 
server.getRaftGroupHolder(group);
@@ -86,6 +97,13 @@ public abstract class AbstractRpcProcessor {
         return closure;
     }
 
+    /**
+     * To process linear consistent reads, read from the current node first 
and redirect the request to the master node if the read fails
+     * @param server
+     * @param group
+     * @param rpcCtx
+     * @param request
+     */
     public void handleReadIndex(final MqttRaftServer server, final String 
group, final RpcContext rpcCtx, ReadRequest request) {
         try {
             final RaftGroupHolder raftGroupHolder = 
server.getRaftGroupHolder(group);
diff --git 
a/meta/src/main/java/org/apache/rocketmq/mqtt/meta/raft/processor/MqttReadRpcProcessor.java
 
b/meta/src/main/java/org/apache/rocketmq/mqtt/meta/raft/processor/MqttReadRpcProcessor.java
index f3207c6..f20ef79 100644
--- 
a/meta/src/main/java/org/apache/rocketmq/mqtt/meta/raft/processor/MqttReadRpcProcessor.java
+++ 
b/meta/src/main/java/org/apache/rocketmq/mqtt/meta/raft/processor/MqttReadRpcProcessor.java
@@ -22,6 +22,9 @@ import com.alipay.sofa.jraft.rpc.RpcProcessor;
 import org.apache.rocketmq.mqtt.common.model.consistency.ReadRequest;
 import org.apache.rocketmq.mqtt.meta.raft.MqttRaftServer;
 
+/**
+ * The RPC Processor for read request.
+ */
 public class MqttReadRpcProcessor extends AbstractRpcProcessor implements 
RpcProcessor<ReadRequest> {
     private final MqttRaftServer server;
 
diff --git 
a/meta/src/main/java/org/apache/rocketmq/mqtt/meta/raft/processor/MqttWriteRpcProcessor.java
 
b/meta/src/main/java/org/apache/rocketmq/mqtt/meta/raft/processor/MqttWriteRpcProcessor.java
index 528f732..d8dad76 100644
--- 
a/meta/src/main/java/org/apache/rocketmq/mqtt/meta/raft/processor/MqttWriteRpcProcessor.java
+++ 
b/meta/src/main/java/org/apache/rocketmq/mqtt/meta/raft/processor/MqttWriteRpcProcessor.java
@@ -22,6 +22,9 @@ import com.alipay.sofa.jraft.rpc.RpcProcessor;
 import org.apache.rocketmq.mqtt.common.model.consistency.WriteRequest;
 import org.apache.rocketmq.mqtt.meta.raft.MqttRaftServer;
 
+/**
+ * The RPC Processor for write request
+ */
 public class MqttWriteRpcProcessor extends AbstractRpcProcessor implements 
RpcProcessor<WriteRequest> {
     private final MqttRaftServer server;
 
diff --git 
a/meta/src/main/java/org/apache/rocketmq/mqtt/meta/raft/processor/StateProcessor.java
 
b/meta/src/main/java/org/apache/rocketmq/mqtt/meta/raft/processor/StateProcessor.java
index b040dd1..853459c 100644
--- 
a/meta/src/main/java/org/apache/rocketmq/mqtt/meta/raft/processor/StateProcessor.java
+++ 
b/meta/src/main/java/org/apache/rocketmq/mqtt/meta/raft/processor/StateProcessor.java
@@ -26,22 +26,50 @@ import 
org.apache.rocketmq.mqtt.meta.raft.snapshot.SnapshotOperation;
 
 import java.util.function.BiConsumer;
 
+/**
+ * A concrete processing class for a business state machine
+ */
 public abstract class StateProcessor {
 
+    /**
+     * Process the read request to apply the state machine
+     * @param request
+     * @return
+     */
     public abstract Response onReadRequest(ReadRequest request);
 
+    /**
+     * Process the write request to apply the state machine
+     * @param log
+     * @return
+     */
     public abstract Response onWriteRequest(WriteRequest log);
 
     public SnapshotOperation loadSnapshotOperate() {
         return null;
     }
 
+    /**
+     * Save the state machine snapshot
+     * @param writer
+     * @param callFinally
+     */
     public abstract void onSnapshotSave(SnapshotWriter writer, 
BiConsumer<Boolean, Throwable> callFinally);
 
+    /**
+     * Load the state machine snapshot
+     * @param reader
+     * @return
+     */
     public abstract boolean onSnapshotLoad(SnapshotReader reader);
 
     public void onError(Throwable error) {
     }
+
+    /**
+     * Raft Grouping category. The grouping category and sequence number 
identify the unique RAFT group
+     * @return
+     */
     public abstract String groupCategory();
 
 }

Reply via email to