javeme commented on code in PR #1527:
URL: 
https://github.com/apache/incubator-hugegraph/pull/1527#discussion_r873147362


##########
hugegraph-core/src/main/java/com/baidu/hugegraph/config/CoreOptions.java:
##########
@@ -252,10 +228,19 @@ public static synchronized CoreOptions instance() {
     public static final ConfigOption<Integer> RAFT_RPC_TIMEOUT =
             new ConfigOption<>(
                     "raft.rpc_timeout",
-                    "The rpc timeout for jraft rpc.",
+                    "The general rpc timeout for jraft rpc.",
                     positiveInt(),
                     // jraft default value is 5000(ms)
-                    60000
+                    60 * 1000
+            );
+
+    public static final ConfigOption<Integer> RAFT_INSTALL_SNAPSHOT_TIMEOUT =
+            new ConfigOption<>(
+                    "raft.install_snapshot_rpc_timeout",
+                    "The install snapshot rpc timeout for jraft rpc.",
+                    positiveInt(),
+                    // jraft default value is 5 * 60 * 1000

Review Comment:
   5 minutes?



##########
hugegraph-core/src/main/java/com/baidu/hugegraph/backend/store/raft/rpc/RpcForwarder.java:
##########
@@ -19,7 +19,7 @@
 
 package com.baidu.hugegraph.backend.store.raft.rpc;
 
-import static 
com.baidu.hugegraph.backend.store.raft.RaftSharedContext.WAIT_RPC_TIMEOUT;
+import static 
com.baidu.hugegraph.backend.store.raft.RaftContext.WAIT_RPC_TIMEOUT;

Review Comment:
   avoid static import



##########
hugegraph-core/src/main/java/com/baidu/hugegraph/backend/store/raft/RaftNode.java:
##########
@@ -98,39 +104,52 @@ public void shutdown() {
         this.node.shutdown();
     }
 
-    public void snapshot() {
-        if (!this.context.useSnapshot()) {
-            return;
-        }
+    public RaftClosure<?> snapshot() {
         RaftClosure<?> future = new RaftClosure<>();
         try {
             this.node().snapshot(future);
-            future.waitFinished();
+            return future;
         } catch (Throwable e) {
             throw new BackendException("Failed to create snapshot", e);
         }
     }
 
-    private Node initRaftNode() throws IOException {
+    private RaftGroupService initRaftNode() throws IOException {
         NodeOptions nodeOptions = this.context.nodeOptions();
         nodeOptions.setFsm(this.stateMachine);
-        // TODO: When support sharding, groupId needs to be bound to shard Id
+        /*
+         * TODO: the groupId is same as graph name now, when support sharding,
+         *  groupId needs to be bound to shard Id
+         */
         String groupId = this.context.group();
         PeerId endpoint = this.context.endpoint();
         /*
          * Start raft node with shared rpc server:
-         * return new RaftGroupService(groupId, endpoint, nodeOptions,
-         *                             this.context.rpcServer(), true)
-         *        .start(false)
          */
-        return RaftServiceFactory.createAndInitRaftNode(groupId, endpoint,
-                                                        nodeOptions);
+        RpcServer rpcServer = this.context.rpcServer();
+        LOG.info("The raft endpoint '{}', initial group peers [{}]",
+                 endpoint, nodeOptions.getInitialConf());
+        // Shared rpc server
+        return new RaftGroupService(groupId, endpoint, nodeOptions,
+                                    rpcServer, true);
+    }
+
+    public void close() {
+        if (this.raftGroupService != null) {
+            this.raftGroupService.shutdown();
+            try {
+                this.raftGroupService.join();
+            } catch (final InterruptedException e) {
+                throw new RaftException("Interrupted while shutdown " +
+                                                "raftGroupService");

Review Comment:
   align



##########
hugegraph-core/src/main/java/com/baidu/hugegraph/backend/store/raft/RaftContext.java:
##########
@@ -389,7 +392,6 @@ private void registerRpcRequestProcessors() {
         this.rpcServer.registerProcessor(new SetLeaderProcessor(this));
         this.rpcServer.registerProcessor(new ListPeersProcessor(this));
     }
-

Review Comment:
   ditto



##########
hugegraph-core/src/main/java/com/baidu/hugegraph/backend/store/raft/RaftContext.java:
##########
@@ -104,26 +106,41 @@ public final class RaftSharedContext {
     private RaftGroupManager raftGroupManager;
     private RpcForwarder rpcForwarder;
 
-    public RaftSharedContext(HugeGraphParams params) {
+    public RaftContext(HugeGraphParams params, RpcServer rpcServer,
+                       PeerId endpoint) {
         this.params = params;
-        HugeConfig config = this.config();
+        this.rpcServer = rpcServer;
+        this.endpoint = endpoint;
 
+        HugeConfig config = params.configuration();
         this.schemaStoreName = config.get(CoreOptions.STORE_SCHEMA);
         this.graphStoreName = config.get(CoreOptions.STORE_GRAPH);
         this.systemStoreName = config.get(CoreOptions.STORE_SYSTEM);
         this.stores = new RaftBackendStore[StoreType.ALL.getNumber()];
-        this.rpcServer = this.initAndStartRpcServer();
+
+        /*
+         * TODO Depending on the name of the config item for server options,
+         * need to get through ServerConfig and CoreConfig

Review Comment:
   raft.group_peers is transfered from ServerConfig instead of CoreConfig, 
since it's shared by all graphs.



##########
hugegraph-core/src/main/java/com/baidu/hugegraph/backend/store/raft/RaftContext.java:
##########
@@ -104,26 +106,41 @@ public final class RaftSharedContext {
     private RaftGroupManager raftGroupManager;
     private RpcForwarder rpcForwarder;
 
-    public RaftSharedContext(HugeGraphParams params) {
+    public RaftContext(HugeGraphParams params, RpcServer rpcServer,
+                       PeerId endpoint) {
         this.params = params;
-        HugeConfig config = this.config();
+        this.rpcServer = rpcServer;
+        this.endpoint = endpoint;
 
+        HugeConfig config = params.configuration();
         this.schemaStoreName = config.get(CoreOptions.STORE_SCHEMA);
         this.graphStoreName = config.get(CoreOptions.STORE_GRAPH);
         this.systemStoreName = config.get(CoreOptions.STORE_SYSTEM);
         this.stores = new RaftBackendStore[StoreType.ALL.getNumber()];
-        this.rpcServer = this.initAndStartRpcServer();
+
+        /*
+         * TODO Depending on the name of the config item for server options,
+         * need to get through ServerConfig and CoreConfig
+         */
+        // this.endpoint = new PeerId();
+        // String endpointStr = config.getString("raft.endpoint");
+        // if (!this.endpoint.parse(endpointStr)) {
+        //     throw new HugeException("Failed to parse endpoint %s", 
endpointStr);
+        // }
+        this.groupPeers = new Configuration();
+        String groupPeersStr = config.getString("raft.group_peers");
+        if (!this.groupPeers.parse(groupPeersStr)) {
+            throw new HugeException("Failed to parse group peers %s",
+                                    groupPeersStr);
+        }
+
         if (config.get(CoreOptions.RAFT_SAFE_READ)) {
             int threads = config.get(CoreOptions.RAFT_READ_INDEX_THREADS);
             this.readIndexExecutor = this.createReadIndexExecutor(threads);
         } else {
             this.readIndexExecutor = null;
         }
-        if (config.get(CoreOptions.RAFT_USE_SNAPSHOT)) {
-            this.snapshotExecutor = this.createSnapshotExecutor(4);
-        } else {
-            this.snapshotExecutor = null;
-        }
+        this.snapshotExecutor = this.createSnapshotExecutor(4);

Review Comment:
   define a const var



##########
hugegraph-core/src/main/java/com/baidu/hugegraph/backend/store/raft/RaftNode.java:
##########
@@ -45,18 +48,21 @@ public final class RaftNode {
 
     private static final Logger LOG = Log.logger(RaftNode.class);
 
-    private final RaftSharedContext context;
+    private final RaftContext context;
+    private final RaftGroupService raftGroupService;
     private final Node node;
     private final StoreStateMachine stateMachine;
     private final AtomicReference<LeaderInfo> leaderInfo;
     private final AtomicBoolean started;
     private final AtomicInteger busyCounter;
 
-    public RaftNode(RaftSharedContext context) {
+    public RaftNode(RaftContext context) {
         this.context = context;
         this.stateMachine = new StoreStateMachine(context);
         try {
-            this.node = this.initRaftNode();
+            this.raftGroupService = this.initRaftNode();
+            // Start node
+            this.node = this.raftGroupService.start(false);

Review Comment:
   do need to hold raftGroupService?



##########
hugegraph-core/src/main/java/com/baidu/hugegraph/backend/store/raft/RaftNode.java:
##########
@@ -98,39 +104,52 @@ public void shutdown() {
         this.node.shutdown();
     }
 
-    public void snapshot() {
-        if (!this.context.useSnapshot()) {
-            return;
-        }
+    public RaftClosure<?> snapshot() {
         RaftClosure<?> future = new RaftClosure<>();
         try {
             this.node().snapshot(future);
-            future.waitFinished();
+            return future;
         } catch (Throwable e) {
             throw new BackendException("Failed to create snapshot", e);
         }
     }
 
-    private Node initRaftNode() throws IOException {
+    private RaftGroupService initRaftNode() throws IOException {
         NodeOptions nodeOptions = this.context.nodeOptions();
         nodeOptions.setFsm(this.stateMachine);
-        // TODO: When support sharding, groupId needs to be bound to shard Id
+        /*
+         * TODO: the groupId is same as graph name now, when support sharding,
+         *  groupId needs to be bound to shard Id
+         */
         String groupId = this.context.group();
         PeerId endpoint = this.context.endpoint();
         /*
          * Start raft node with shared rpc server:
-         * return new RaftGroupService(groupId, endpoint, nodeOptions,
-         *                             this.context.rpcServer(), true)
-         *        .start(false)
          */
-        return RaftServiceFactory.createAndInitRaftNode(groupId, endpoint,
-                                                        nodeOptions);
+        RpcServer rpcServer = this.context.rpcServer();
+        LOG.info("The raft endpoint '{}', initial group peers [{}]",
+                 endpoint, nodeOptions.getInitialConf());
+        // Shared rpc server
+        return new RaftGroupService(groupId, endpoint, nodeOptions,
+                                    rpcServer, true);

Review Comment:
   can share rpc-server by RaftServiceFactory.createAndInitRaftNode()



##########
hugegraph-core/src/main/java/com/baidu/hugegraph/backend/store/raft/RaftContext.java:
##########
@@ -353,14 +352,17 @@ public ExecutorService backendExecutor() {
         return this.backendExecutor;
     }
 
+    public ExecutorService readIndexExecutor() {
+        return this.readIndexExecutor;
+    }
+
     public GraphMode graphMode() {
         return this.params.mode();
     }
 
     private HugeConfig config() {
         return this.params.configuration();
     }
-

Review Comment:
   keep blank line



##########
hugegraph-core/src/main/java/com/baidu/hugegraph/backend/store/raft/RaftBackendStoreProvider.java:
##########
@@ -181,10 +213,15 @@ public void clear() {
     }
 
     @Override
-    public void truncate() {
+    public void truncate(HugeGraph graph) {

Review Comment:
   pass systemStoreName?



##########
hugegraph-core/src/main/java/com/baidu/hugegraph/config/CoreOptions.java:
##########
@@ -252,10 +228,19 @@ public static synchronized CoreOptions instance() {
     public static final ConfigOption<Integer> RAFT_RPC_TIMEOUT =
             new ConfigOption<>(
                     "raft.rpc_timeout",
-                    "The rpc timeout for jraft rpc.",
+                    "The general rpc timeout for jraft rpc.",

Review Comment:
   in millisecond



##########
hugegraph-core/src/main/java/com/baidu/hugegraph/backend/store/raft/RaftContext.java:
##########
@@ -375,6 +377,7 @@ private RpcServer initAndStartRpcServer() {
         NodeManager.getInstance().addAddress(endpoint.getEndpoint());
         RpcServer rpcServer = RaftRpcServerFactory.createAndStartRaftRpcServer(
                                                    endpoint.getEndpoint());
+        LOG.info("RPC server is started successfully");

Review Comment:
   replaced by 
https://github.com/apache/incubator-hugegraph/pull/1527/files#diff-7a0b04965824fe2f392d66bc9cb0addb6b059d81142c3117f8ff6c390b5e4e03R294



##########
hugegraph-core/src/main/java/com/baidu/hugegraph/backend/store/raft/RaftNode.java:
##########
@@ -98,39 +104,52 @@ public void shutdown() {
         this.node.shutdown();
     }
 
-    public void snapshot() {
-        if (!this.context.useSnapshot()) {
-            return;
-        }
+    public RaftClosure<?> snapshot() {
         RaftClosure<?> future = new RaftClosure<>();
         try {
             this.node().snapshot(future);
-            future.waitFinished();
+            return future;
         } catch (Throwable e) {
             throw new BackendException("Failed to create snapshot", e);
         }
     }
 
-    private Node initRaftNode() throws IOException {
+    private RaftGroupService initRaftNode() throws IOException {
         NodeOptions nodeOptions = this.context.nodeOptions();
         nodeOptions.setFsm(this.stateMachine);
-        // TODO: When support sharding, groupId needs to be bound to shard Id
+        /*
+         * TODO: the groupId is same as graph name now, when support sharding,
+         *  groupId needs to be bound to shard Id
+         */
         String groupId = this.context.group();
         PeerId endpoint = this.context.endpoint();
         /*
          * Start raft node with shared rpc server:
-         * return new RaftGroupService(groupId, endpoint, nodeOptions,
-         *                             this.context.rpcServer(), true)
-         *        .start(false)
          */
-        return RaftServiceFactory.createAndInitRaftNode(groupId, endpoint,
-                                                        nodeOptions);
+        RpcServer rpcServer = this.context.rpcServer();
+        LOG.info("The raft endpoint '{}', initial group peers [{}]",
+                 endpoint, nodeOptions.getInitialConf());
+        // Shared rpc server
+        return new RaftGroupService(groupId, endpoint, nodeOptions,
+                                    rpcServer, true);
+    }
+
+    public void close() {
+        if (this.raftGroupService != null) {
+            this.raftGroupService.shutdown();
+            try {
+                this.raftGroupService.join();

Review Comment:
   don't need to shutdown raftGroupService when raftNode closed?



##########
hugegraph-core/src/main/java/com/baidu/hugegraph/backend/store/raft/RaftBackendStoreProvider.java:
##########
@@ -45,22 +50,50 @@ public class RaftBackendStoreProvider implements 
BackendStoreProvider {
     private static final Logger LOG = 
Log.logger(RaftBackendStoreProvider.class);
 
     private final BackendStoreProvider provider;
-    private final RaftSharedContext context;
     private RaftBackendStore schemaStore;
     private RaftBackendStore graphStore;
     private RaftBackendStore systemStore;
+    private RaftContext context;
 
-    public RaftBackendStoreProvider(BackendStoreProvider provider,
-                                    HugeGraphParams params) {
+    public RaftBackendStoreProvider(BackendStoreProvider provider) {
         this.provider = provider;
-        this.context = new RaftSharedContext(params);
         this.schemaStore = null;
         this.graphStore = null;
         this.systemStore = null;
+        this.context = null;
+    }
+
+    public void initRaftContext(HugeGraphParams params, RpcServer rpcServer) {
+        HugeConfig config = params.configuration();
+        Integer lowWaterMark = config.get(
+                               CoreOptions.RAFT_RPC_BUF_LOW_WATER_MARK);
+        System.setProperty("bolt.channel_write_buf_low_water_mark",

Review Comment:
   reuse from RaftContext.initAndStartRpcServer()?



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: [email protected]

For queries about this service, please contact Infrastructure at:
[email protected]

Reply via email to