This is an automated email from the ASF dual-hosted git repository.

neuyilan pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/iotdb.git


The following commit(s) were added to refs/heads/master by this push:
     new 4dbe90a  [IOTDB-2193] Reduce unnecessary lock operations of 
RaftLogManager to improve writing performance (#4638)
4dbe90a is described below

commit 4dbe90ab4fad7b998abb443a448b8769c72fcfe5
Author: Mrquan <[email protected]>
AuthorDate: Wed Dec 29 09:02:08 2021 +0800

    [IOTDB-2193] Reduce unnecessary lock operations of RaftLogManager to 
improve writing performance (#4638)
---
 .../iotdb/cluster/server/member/RaftMember.java    | 101 +++++++++++----------
 1 file changed, 55 insertions(+), 46 deletions(-)

diff --git 
a/cluster/src/main/java/org/apache/iotdb/cluster/server/member/RaftMember.java 
b/cluster/src/main/java/org/apache/iotdb/cluster/server/member/RaftMember.java
index ad356fb..1e71fed 100644
--- 
a/cluster/src/main/java/org/apache/iotdb/cluster/server/member/RaftMember.java
+++ 
b/cluster/src/main/java/org/apache/iotdb/cluster/server/member/RaftMember.java
@@ -967,34 +967,40 @@ public abstract class RaftMember implements 
RaftMemberMBean {
     long startTime = 
Timer.Statistic.RAFT_SENDER_APPEND_LOG.getOperationStartTime();
 
     Log log;
+
+    if (plan instanceof LogPlan) {
+      try {
+        log = LogParser.getINSTANCE().parse(((LogPlan) plan).getLog());
+      } catch (UnknownLogTypeException e) {
+        logger.error("Can not parse LogPlan {}", plan, e);
+        return StatusUtils.PARSE_LOG_ERROR;
+      }
+    } else {
+      log = new PhysicalPlanLog();
+      ((PhysicalPlanLog) log).setPlan(plan);
+    }
+
+    // if a single log exceeds the threshold
+    // we need to return error code to the client as in server mode
+    if 
(ClusterDescriptor.getInstance().getConfig().isEnableRaftLogPersistence()
+        & log.serialize().capacity() + Integer.BYTES
+            >= 
ClusterDescriptor.getInstance().getConfig().getRaftLogBufferSize()) {
+      logger.error(
+          "Log cannot fit into buffer, please increase raft_log_buffer_size;"
+              + "or reduce the size of requests you send.");
+      return StatusUtils.INTERNAL_ERROR;
+    }
+
     // assign term and index to the new log and append it
     synchronized (logManager) {
-      if (plan instanceof LogPlan) {
-        try {
-          log = LogParser.getINSTANCE().parse(((LogPlan) plan).getLog());
-        } catch (UnknownLogTypeException e) {
-          logger.error("Can not parse LogPlan {}", plan, e);
-          return StatusUtils.PARSE_LOG_ERROR;
-        }
-      } else {
-        log = new PhysicalPlanLog();
-        ((PhysicalPlanLog) log).setPlan(plan);
+      if (!(plan instanceof LogPlan)) {
         plan.setIndex(logManager.getLastLogIndex() + 1);
       }
       log.setCurrLogTerm(getTerm().get());
       log.setCurrLogIndex(logManager.getLastLogIndex() + 1);
-
-      // if a single log exceeds the threshold
-      // we need to return error code to the client as in server mode
-      if (log.serialize().capacity() + Integer.BYTES
-          >= 
ClusterDescriptor.getInstance().getConfig().getRaftLogBufferSize()) {
-        logger.error(
-            "Log cannot fit into buffer, please increase raft_log_buffer_size;"
-                + "or reduce the size of requests you send.");
-        return StatusUtils.INTERNAL_ERROR;
-      }
       logManager.append(log);
     }
+
     
Timer.Statistic.RAFT_SENDER_APPEND_LOG.calOperationCostTimeFromStart(startTime);
 
     try {
@@ -1015,51 +1021,54 @@ public abstract class RaftMember implements 
RaftMemberMBean {
     // assign term and index to the new log and append it
     SendLogRequest sendLogRequest;
 
+    Log log;
+    if (plan instanceof LogPlan) {
+      try {
+        log = LogParser.getINSTANCE().parse(((LogPlan) plan).getLog());
+      } catch (UnknownLogTypeException e) {
+        logger.error("Can not parse LogPlan {}", plan, e);
+        return StatusUtils.PARSE_LOG_ERROR;
+      }
+    } else {
+      log = new PhysicalPlanLog();
+      ((PhysicalPlanLog) log).setPlan(plan);
+    }
+
+    // just like processPlanLocally,we need to check the size of log
+    if 
(ClusterDescriptor.getInstance().getConfig().isEnableRaftLogPersistence()
+        & log.serialize().capacity() + Integer.BYTES
+            >= 
ClusterDescriptor.getInstance().getConfig().getRaftLogBufferSize()) {
+      logger.error(
+          "Log cannot fit into buffer, please increase raft_log_buffer_size;"
+              + "or reduce the size of requests you send.");
+      return StatusUtils.INTERNAL_ERROR;
+    }
+
     long startTime =
         
Statistic.RAFT_SENDER_COMPETE_LOG_MANAGER_BEFORE_APPEND_V2.getOperationStartTime();
-    Log log;
     synchronized (logManager) {
       
Statistic.RAFT_SENDER_COMPETE_LOG_MANAGER_BEFORE_APPEND_V2.calOperationCostTimeFromStart(
           startTime);
 
-      if (plan instanceof LogPlan) {
-        try {
-          log = LogParser.getINSTANCE().parse(((LogPlan) plan).getLog());
-        } catch (UnknownLogTypeException e) {
-          logger.error("Can not parse LogPlan {}", plan, e);
-          return StatusUtils.PARSE_LOG_ERROR;
-        }
-      } else {
-        log = new PhysicalPlanLog();
-        ((PhysicalPlanLog) log).setPlan(plan);
+      if (!(plan instanceof LogPlan)) {
         plan.setIndex(logManager.getLastLogIndex() + 1);
       }
       log.setCurrLogTerm(getTerm().get());
       log.setCurrLogIndex(logManager.getLastLogIndex() + 1);
 
       startTime = 
Timer.Statistic.RAFT_SENDER_APPEND_LOG_V2.getOperationStartTime();
-      // just like processPlanLocally,we need to check the size of log
-      if (log.serialize().capacity() + Integer.BYTES
-          >= 
ClusterDescriptor.getInstance().getConfig().getRaftLogBufferSize()) {
-        logger.error(
-            "Log cannot fit into buffer, please increase raft_log_buffer_size;"
-                + "or reduce the size of requests you send.");
-        return StatusUtils.INTERNAL_ERROR;
-      }
-      // logDispatcher will serialize log, and set log size, and we will use 
the size after it
       logManager.append(log);
       
Timer.Statistic.RAFT_SENDER_APPEND_LOG_V2.calOperationCostTimeFromStart(startTime);
-
       startTime = 
Statistic.RAFT_SENDER_BUILD_LOG_REQUEST.getOperationStartTime();
       sendLogRequest = buildSendLogRequest(log);
       
Statistic.RAFT_SENDER_BUILD_LOG_REQUEST.calOperationCostTimeFromStart(startTime);
-
-      startTime = Statistic.RAFT_SENDER_OFFER_LOG.getOperationStartTime();
-      log.setCreateTime(System.nanoTime());
-      getLogDispatcher().offer(sendLogRequest);
-      Statistic.RAFT_SENDER_OFFER_LOG.calOperationCostTimeFromStart(startTime);
     }
 
+    startTime = Statistic.RAFT_SENDER_OFFER_LOG.getOperationStartTime();
+    log.setCreateTime(System.nanoTime());
+    getLogDispatcher().offer(sendLogRequest);
+    Statistic.RAFT_SENDER_OFFER_LOG.calOperationCostTimeFromStart(startTime);
+
     try {
       AppendLogResult appendLogResult =
           waitAppendResult(

Reply via email to