This is an automated email from the ASF dual-hosted git repository.
neuyilan pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/iotdb.git
The following commit(s) were added to refs/heads/master by this push:
new 4dbe90a [IOTDB-2193] Reduce unnecessary lock operations of
RaftLogManager to improve writing performance (#4638)
4dbe90a is described below
commit 4dbe90ab4fad7b998abb443a448b8769c72fcfe5
Author: Mrquan <[email protected]>
AuthorDate: Wed Dec 29 09:02:08 2021 +0800
[IOTDB-2193] Reduce unnecessary lock operations of RaftLogManager to
improve writing performance (#4638)
---
.../iotdb/cluster/server/member/RaftMember.java | 101 +++++++++++----------
1 file changed, 55 insertions(+), 46 deletions(-)
diff --git
a/cluster/src/main/java/org/apache/iotdb/cluster/server/member/RaftMember.java
b/cluster/src/main/java/org/apache/iotdb/cluster/server/member/RaftMember.java
index ad356fb..1e71fed 100644
---
a/cluster/src/main/java/org/apache/iotdb/cluster/server/member/RaftMember.java
+++
b/cluster/src/main/java/org/apache/iotdb/cluster/server/member/RaftMember.java
@@ -967,34 +967,40 @@ public abstract class RaftMember implements
RaftMemberMBean {
long startTime =
Timer.Statistic.RAFT_SENDER_APPEND_LOG.getOperationStartTime();
Log log;
+
+ if (plan instanceof LogPlan) {
+ try {
+ log = LogParser.getINSTANCE().parse(((LogPlan) plan).getLog());
+ } catch (UnknownLogTypeException e) {
+ logger.error("Can not parse LogPlan {}", plan, e);
+ return StatusUtils.PARSE_LOG_ERROR;
+ }
+ } else {
+ log = new PhysicalPlanLog();
+ ((PhysicalPlanLog) log).setPlan(plan);
+ }
+
+ // if a single log exceeds the threshold
+ // we need to return error code to the client as in server mode
+ if
(ClusterDescriptor.getInstance().getConfig().isEnableRaftLogPersistence()
+ & log.serialize().capacity() + Integer.BYTES
+ >=
ClusterDescriptor.getInstance().getConfig().getRaftLogBufferSize()) {
+ logger.error(
+ "Log cannot fit into buffer, please increase raft_log_buffer_size;"
+ + "or reduce the size of requests you send.");
+ return StatusUtils.INTERNAL_ERROR;
+ }
+
// assign term and index to the new log and append it
synchronized (logManager) {
- if (plan instanceof LogPlan) {
- try {
- log = LogParser.getINSTANCE().parse(((LogPlan) plan).getLog());
- } catch (UnknownLogTypeException e) {
- logger.error("Can not parse LogPlan {}", plan, e);
- return StatusUtils.PARSE_LOG_ERROR;
- }
- } else {
- log = new PhysicalPlanLog();
- ((PhysicalPlanLog) log).setPlan(plan);
+ if (!(plan instanceof LogPlan)) {
plan.setIndex(logManager.getLastLogIndex() + 1);
}
log.setCurrLogTerm(getTerm().get());
log.setCurrLogIndex(logManager.getLastLogIndex() + 1);
-
- // if a single log exceeds the threshold
- // we need to return error code to the client as in server mode
- if (log.serialize().capacity() + Integer.BYTES
- >=
ClusterDescriptor.getInstance().getConfig().getRaftLogBufferSize()) {
- logger.error(
- "Log cannot fit into buffer, please increase raft_log_buffer_size;"
- + "or reduce the size of requests you send.");
- return StatusUtils.INTERNAL_ERROR;
- }
logManager.append(log);
}
+
Timer.Statistic.RAFT_SENDER_APPEND_LOG.calOperationCostTimeFromStart(startTime);
try {
@@ -1015,51 +1021,54 @@ public abstract class RaftMember implements
RaftMemberMBean {
// assign term and index to the new log and append it
SendLogRequest sendLogRequest;
+ Log log;
+ if (plan instanceof LogPlan) {
+ try {
+ log = LogParser.getINSTANCE().parse(((LogPlan) plan).getLog());
+ } catch (UnknownLogTypeException e) {
+ logger.error("Can not parse LogPlan {}", plan, e);
+ return StatusUtils.PARSE_LOG_ERROR;
+ }
+ } else {
+ log = new PhysicalPlanLog();
+ ((PhysicalPlanLog) log).setPlan(plan);
+ }
+
+ // just like processPlanLocally,we need to check the size of log
+ if
(ClusterDescriptor.getInstance().getConfig().isEnableRaftLogPersistence()
+ & log.serialize().capacity() + Integer.BYTES
+ >=
ClusterDescriptor.getInstance().getConfig().getRaftLogBufferSize()) {
+ logger.error(
+ "Log cannot fit into buffer, please increase raft_log_buffer_size;"
+ + "or reduce the size of requests you send.");
+ return StatusUtils.INTERNAL_ERROR;
+ }
+
long startTime =
Statistic.RAFT_SENDER_COMPETE_LOG_MANAGER_BEFORE_APPEND_V2.getOperationStartTime();
- Log log;
synchronized (logManager) {
Statistic.RAFT_SENDER_COMPETE_LOG_MANAGER_BEFORE_APPEND_V2.calOperationCostTimeFromStart(
startTime);
- if (plan instanceof LogPlan) {
- try {
- log = LogParser.getINSTANCE().parse(((LogPlan) plan).getLog());
- } catch (UnknownLogTypeException e) {
- logger.error("Can not parse LogPlan {}", plan, e);
- return StatusUtils.PARSE_LOG_ERROR;
- }
- } else {
- log = new PhysicalPlanLog();
- ((PhysicalPlanLog) log).setPlan(plan);
+ if (!(plan instanceof LogPlan)) {
plan.setIndex(logManager.getLastLogIndex() + 1);
}
log.setCurrLogTerm(getTerm().get());
log.setCurrLogIndex(logManager.getLastLogIndex() + 1);
startTime =
Timer.Statistic.RAFT_SENDER_APPEND_LOG_V2.getOperationStartTime();
- // just like processPlanLocally,we need to check the size of log
- if (log.serialize().capacity() + Integer.BYTES
- >=
ClusterDescriptor.getInstance().getConfig().getRaftLogBufferSize()) {
- logger.error(
- "Log cannot fit into buffer, please increase raft_log_buffer_size;"
- + "or reduce the size of requests you send.");
- return StatusUtils.INTERNAL_ERROR;
- }
- // logDispatcher will serialize log, and set log size, and we will use
the size after it
logManager.append(log);
Timer.Statistic.RAFT_SENDER_APPEND_LOG_V2.calOperationCostTimeFromStart(startTime);
-
startTime =
Statistic.RAFT_SENDER_BUILD_LOG_REQUEST.getOperationStartTime();
sendLogRequest = buildSendLogRequest(log);
Statistic.RAFT_SENDER_BUILD_LOG_REQUEST.calOperationCostTimeFromStart(startTime);
-
- startTime = Statistic.RAFT_SENDER_OFFER_LOG.getOperationStartTime();
- log.setCreateTime(System.nanoTime());
- getLogDispatcher().offer(sendLogRequest);
- Statistic.RAFT_SENDER_OFFER_LOG.calOperationCostTimeFromStart(startTime);
}
+ startTime = Statistic.RAFT_SENDER_OFFER_LOG.getOperationStartTime();
+ log.setCreateTime(System.nanoTime());
+ getLogDispatcher().offer(sendLogRequest);
+ Statistic.RAFT_SENDER_OFFER_LOG.calOperationCostTimeFromStart(startTime);
+
try {
AppendLogResult appendLogResult =
waitAppendResult(