This is an automated email from the ASF dual-hosted git repository.
dinglei pushed a commit to branch develop
in repository https://gitbox.apache.org/repos/asf/rocketmq-mqtt.git
The following commit(s) were added to refs/heads/develop by this push:
new 4fac7e8 add some comment (#128)
4fac7e8 is described below
commit 4fac7e8778395d31f294927835acc2780d6d2ef5
Author: Dongyuan Pan <[email protected]>
AuthorDate: Sun Jul 31 15:25:04 2022 +0800
add some comment (#128)
---
.../meta/raft/processor/AbstractRpcProcessor.java | 18 ++++++++++++++
.../meta/raft/processor/MqttReadRpcProcessor.java | 3 +++
.../meta/raft/processor/MqttWriteRpcProcessor.java | 3 +++
.../mqtt/meta/raft/processor/StateProcessor.java | 28 ++++++++++++++++++++++
4 files changed, 52 insertions(+)
diff --git
a/meta/src/main/java/org/apache/rocketmq/mqtt/meta/raft/processor/AbstractRpcProcessor.java
b/meta/src/main/java/org/apache/rocketmq/mqtt/meta/raft/processor/AbstractRpcProcessor.java
index f469f1b..7def602 100644
---
a/meta/src/main/java/org/apache/rocketmq/mqtt/meta/raft/processor/AbstractRpcProcessor.java
+++
b/meta/src/main/java/org/apache/rocketmq/mqtt/meta/raft/processor/AbstractRpcProcessor.java
@@ -33,9 +33,20 @@ import org.slf4j.LoggerFactory;
import java.util.Objects;
+/**
+ * RPC abstract processor
+ */
public abstract class AbstractRpcProcessor {
private static final Logger LOGGER =
LoggerFactory.getLogger(AbstractRpcProcessor.class);
+ /**
+ * The default RPC request handling method, where the current node is the
master node of the requested RAFT group, processes the request
+ *
+ * @param server
+ * @param group
+ * @param rpcCtx
+ * @param message
+ */
protected void handleRequest(final MqttRaftServer server, final String
group, final RpcContext rpcCtx, Message message) {
try {
final RaftGroupHolder raftGroupHolder =
server.getRaftGroupHolder(group);
@@ -86,6 +97,13 @@ public abstract class AbstractRpcProcessor {
return closure;
}
+ /**
+ * To process linear consistent reads, read from the current node first
and redirect the request to the master node if the read fails
+ * @param server
+ * @param group
+ * @param rpcCtx
+ * @param request
+ */
public void handleReadIndex(final MqttRaftServer server, final String
group, final RpcContext rpcCtx, ReadRequest request) {
try {
final RaftGroupHolder raftGroupHolder =
server.getRaftGroupHolder(group);
diff --git
a/meta/src/main/java/org/apache/rocketmq/mqtt/meta/raft/processor/MqttReadRpcProcessor.java
b/meta/src/main/java/org/apache/rocketmq/mqtt/meta/raft/processor/MqttReadRpcProcessor.java
index f3207c6..f20ef79 100644
---
a/meta/src/main/java/org/apache/rocketmq/mqtt/meta/raft/processor/MqttReadRpcProcessor.java
+++
b/meta/src/main/java/org/apache/rocketmq/mqtt/meta/raft/processor/MqttReadRpcProcessor.java
@@ -22,6 +22,9 @@ import com.alipay.sofa.jraft.rpc.RpcProcessor;
import org.apache.rocketmq.mqtt.common.model.consistency.ReadRequest;
import org.apache.rocketmq.mqtt.meta.raft.MqttRaftServer;
+/**
+ * The RPC Processor for read request.
+ */
public class MqttReadRpcProcessor extends AbstractRpcProcessor implements
RpcProcessor<ReadRequest> {
private final MqttRaftServer server;
diff --git
a/meta/src/main/java/org/apache/rocketmq/mqtt/meta/raft/processor/MqttWriteRpcProcessor.java
b/meta/src/main/java/org/apache/rocketmq/mqtt/meta/raft/processor/MqttWriteRpcProcessor.java
index 528f732..d8dad76 100644
---
a/meta/src/main/java/org/apache/rocketmq/mqtt/meta/raft/processor/MqttWriteRpcProcessor.java
+++
b/meta/src/main/java/org/apache/rocketmq/mqtt/meta/raft/processor/MqttWriteRpcProcessor.java
@@ -22,6 +22,9 @@ import com.alipay.sofa.jraft.rpc.RpcProcessor;
import org.apache.rocketmq.mqtt.common.model.consistency.WriteRequest;
import org.apache.rocketmq.mqtt.meta.raft.MqttRaftServer;
+/**
+ * The RPC Processor for write request
+ */
public class MqttWriteRpcProcessor extends AbstractRpcProcessor implements
RpcProcessor<WriteRequest> {
private final MqttRaftServer server;
diff --git
a/meta/src/main/java/org/apache/rocketmq/mqtt/meta/raft/processor/StateProcessor.java
b/meta/src/main/java/org/apache/rocketmq/mqtt/meta/raft/processor/StateProcessor.java
index b040dd1..853459c 100644
---
a/meta/src/main/java/org/apache/rocketmq/mqtt/meta/raft/processor/StateProcessor.java
+++
b/meta/src/main/java/org/apache/rocketmq/mqtt/meta/raft/processor/StateProcessor.java
@@ -26,22 +26,50 @@ import
org.apache.rocketmq.mqtt.meta.raft.snapshot.SnapshotOperation;
import java.util.function.BiConsumer;
+/**
+ * A concrete processing class for a business state machine
+ */
public abstract class StateProcessor {
+ /**
+ * Process the read request to apply the state machine
+ * @param request
+ * @return
+ */
public abstract Response onReadRequest(ReadRequest request);
+ /**
+ * Process the write request to apply the state machine
+ * @param log
+ * @return
+ */
public abstract Response onWriteRequest(WriteRequest log);
public SnapshotOperation loadSnapshotOperate() {
return null;
}
+ /**
+ * Save the state machine snapshot
+ * @param writer
+ * @param callFinally
+ */
public abstract void onSnapshotSave(SnapshotWriter writer,
BiConsumer<Boolean, Throwable> callFinally);
+ /**
+ * Load the state machine snapshot
+ * @param reader
+ * @return
+ */
public abstract boolean onSnapshotLoad(SnapshotReader reader);
public void onError(Throwable error) {
}
+
+ /**
+ * Raft Grouping category. The grouping category and sequence number
identify the unique RAFT group
+ * @return
+ */
public abstract String groupCategory();
}