This is an automated email from the ASF dual-hosted git repository.
tanxinyu pushed a commit to branch rel/0.12
in repository https://gitbox.apache.org/repos/asf/iotdb.git
The following commit(s) were added to refs/heads/rel/0.12 by this push:
new 420fa10 [To rel/0.12] Add a judgement to determine raft log size can
fit into buffer before log appending (#4670)
420fa10 is described below
commit 420fa10e7991a5e4377aaa5bdc1643f0a88b7ab1
Author: Mrquan <[email protected]>
AuthorDate: Fri Dec 31 09:01:12 2021 +0800
[To rel/0.12] Add a judgement to determine raft log size can fit into
buffer before log appending (#4670)
* fix the client ip bug in cluster
* fix a typo
* Add an example for Cluster setup on 3 nodes
* add a judgement
Co-authored-by: 权思屹 <[email protected]>
---
.../iotdb/cluster/server/member/RaftMember.java | 35 +++++++++++++++++-----
1 file changed, 28 insertions(+), 7 deletions(-)
diff --git
a/cluster/src/main/java/org/apache/iotdb/cluster/server/member/RaftMember.java
b/cluster/src/main/java/org/apache/iotdb/cluster/server/member/RaftMember.java
index 99aeb15..36794bd 100644
---
a/cluster/src/main/java/org/apache/iotdb/cluster/server/member/RaftMember.java
+++
b/cluster/src/main/java/org/apache/iotdb/cluster/server/member/RaftMember.java
@@ -970,12 +970,22 @@ public abstract class RaftMember {
}
long startTime =
Timer.Statistic.RAFT_SENDER_APPEND_LOG.getOperationStartTime();
PhysicalPlanLog log = new PhysicalPlanLog();
+ log.setPlan(plan);
+ // if a single log exceeds the threshold
+ // we need to return error code to the client as in server mode
+ if
(ClusterDescriptor.getInstance().getConfig().isEnableRaftLogPersistence()
+ & log.serialize().capacity() + Integer.BYTES
+ >=
ClusterDescriptor.getInstance().getConfig().getRaftLogBufferSize()) {
+ logger.error(
+ "Log cannot fit into buffer, please increase raft_log_buffer_size;"
+ + "or reduce the size of requests you send.");
+ return StatusUtils.INTERNAL_ERROR;
+ }
// assign term and index to the new log and append it
synchronized (logManager) {
log.setCurrLogTerm(getTerm().get());
log.setCurrLogIndex(logManager.getLastLogIndex() + 1);
- log.setPlan(plan);
plan.setIndex(log.getCurrLogIndex());
// appendLogInGroup will serialize log, and set log size, and we will
use the size after it
logManager.append(log);
@@ -1001,6 +1011,17 @@ public abstract class RaftMember {
// assign term and index to the new log and append it
SendLogRequest sendLogRequest;
+ log.setPlan(plan);
+ // just like processPlanLocally,we need to check the size of log
+ if
(ClusterDescriptor.getInstance().getConfig().isEnableRaftLogPersistence()
+ & log.serialize().capacity() + Integer.BYTES
+ >=
ClusterDescriptor.getInstance().getConfig().getRaftLogBufferSize()) {
+ logger.error(
+ "Log cannot fit into buffer, please increase raft_log_buffer_size;"
+ + "or reduce the size of requests you send.");
+ return StatusUtils.INTERNAL_ERROR;
+ }
+
long startTime =
Statistic.RAFT_SENDER_COMPETE_LOG_MANAGER_BEFORE_APPEND_V2.getOperationStartTime();
synchronized (logManager) {
@@ -1009,7 +1030,7 @@ public abstract class RaftMember {
log.setCurrLogTerm(getTerm().get());
log.setCurrLogIndex(logManager.getLastLogIndex() + 1);
- log.setPlan(plan);
+
plan.setIndex(log.getCurrLogIndex());
startTime =
Timer.Statistic.RAFT_SENDER_APPEND_LOG_V2.getOperationStartTime();
@@ -1020,13 +1041,13 @@ public abstract class RaftMember {
startTime =
Statistic.RAFT_SENDER_BUILD_LOG_REQUEST.getOperationStartTime();
sendLogRequest = buildSendLogRequest(log);
Statistic.RAFT_SENDER_BUILD_LOG_REQUEST.calOperationCostTimeFromStart(startTime);
-
- startTime = Statistic.RAFT_SENDER_OFFER_LOG.getOperationStartTime();
- log.setCreateTime(System.nanoTime());
- getLogDispatcher().offer(sendLogRequest);
- Statistic.RAFT_SENDER_OFFER_LOG.calOperationCostTimeFromStart(startTime);
}
+ startTime = Statistic.RAFT_SENDER_OFFER_LOG.getOperationStartTime();
+ log.setCreateTime(System.nanoTime());
+ getLogDispatcher().offer(sendLogRequest);
+ Statistic.RAFT_SENDER_OFFER_LOG.calOperationCostTimeFromStart(startTime);
+
try {
AppendLogResult appendLogResult =
waitAppendResult(