This is an automated email from the ASF dual-hosted git repository.
jiangtian pushed a commit to branch native_raft
in repository https://gitbox.apache.org/repos/asf/iotdb.git
The following commit(s) were added to refs/heads/native_raft by this push:
new 7214e62c5be reset window after snapshot installation clear
serialization cache in followers fix match term
7214e62c5be is described below
commit 7214e62c5bebef67c4c369c45859be9ed09bc438
Author: Tian Jiang <[email protected]>
AuthorDate: Mon May 22 11:24:05 2023 +0800
reset window after snapshot installation
clear serialization cache in followers
fix match term
---
.../org/apache/iotdb/consensus/natraft/protocol/RaftMember.java | 9 ++++++++-
.../natraft/protocol/log/appender/SlidingWindowLogAppender.java | 8 +++++---
.../natraft/protocol/log/catchup/LogCatchUpInBatchHandler.java | 5 ++++-
.../consensus/natraft/protocol/log/manager/RaftLogManager.java | 4 ++++
.../log/manager/serialization/SyncLogDequeSerializer.java | 9 ++++++++-
.../consensus/natraft/protocol/log/recycle/EntryAllocator.java | 1 -
.../iotdb/db/mpp/execution/executor/RegionWriteExecutor.java | 4 ++--
7 files changed, 31 insertions(+), 9 deletions(-)
diff --git
a/consensus/src/main/java/org/apache/iotdb/consensus/natraft/protocol/RaftMember.java
b/consensus/src/main/java/org/apache/iotdb/consensus/natraft/protocol/RaftMember.java
index 9be05b5dcce..2e6cc318930 100644
---
a/consensus/src/main/java/org/apache/iotdb/consensus/natraft/protocol/RaftMember.java
+++
b/consensus/src/main/java/org/apache/iotdb/consensus/natraft/protocol/RaftMember.java
@@ -237,7 +237,7 @@ public class RaftMember {
new EntryAllocator<>(config, RequestEntry::new, this::getSafeIndex);
this.logManager =
new DirectorySnapshotRaftLogManager(
- new SyncLogDequeSerializer(groupId, config),
+ new SyncLogDequeSerializer(groupId, config, this),
new AsyncLogApplier(new BaseApplier(stateMachine, this), name,
config),
name,
stateMachine,
@@ -1085,6 +1085,13 @@ public class RaftMember {
} finally {
snapshotApplyLock.unlock();
}
+ logManager.getLock().writeLock().lock();
+ try {
+ logAppender.reset();
+ } finally {
+ logManager.getLock().writeLock().unlock();
+ }
+
return new TSStatus(TSStatusCode.SUCCESS_STATUS.getStatusCode());
}
diff --git
a/consensus/src/main/java/org/apache/iotdb/consensus/natraft/protocol/log/appender/SlidingWindowLogAppender.java
b/consensus/src/main/java/org/apache/iotdb/consensus/natraft/protocol/log/appender/SlidingWindowLogAppender.java
index 07486cb4edb..edd4dbe63cb 100644
---
a/consensus/src/main/java/org/apache/iotdb/consensus/natraft/protocol/log/appender/SlidingWindowLogAppender.java
+++
b/consensus/src/main/java/org/apache/iotdb/consensus/natraft/protocol/log/appender/SlidingWindowLogAppender.java
@@ -165,9 +165,11 @@ public class SlidingWindowLogAppender implements
LogAppender {
private void moveWindowLeftward(int step) {
int length = Math.max(windowCapacity - step, 0);
- System.arraycopy(logWindow, 0, logWindow, step, length);
- for (int i = 0; i < length; i++) {
- logWindow[i] = null;
+ if (length > 0) {
+ System.arraycopy(logWindow, 0, logWindow, step, length);
+ for (int i = 0; i < length; i++) {
+ logWindow[i] = null;
+ }
}
firstPosPrevIndex = logManager.getLastLogIndex();
}
diff --git
a/consensus/src/main/java/org/apache/iotdb/consensus/natraft/protocol/log/catchup/LogCatchUpInBatchHandler.java
b/consensus/src/main/java/org/apache/iotdb/consensus/natraft/protocol/log/catchup/LogCatchUpInBatchHandler.java
index e6892886c81..9a049a168f0 100644
---
a/consensus/src/main/java/org/apache/iotdb/consensus/natraft/protocol/log/catchup/LogCatchUpInBatchHandler.java
+++
b/consensus/src/main/java/org/apache/iotdb/consensus/natraft/protocol/log/catchup/LogCatchUpInBatchHandler.java
@@ -82,7 +82,10 @@ public class LogCatchUpInBatchHandler implements
AsyncMethodCallback<AppendEntry
appendSucceed.notifyAll();
}
logger.warn(
- "{}: Catch-up with {} logs aborted because leadership is lost",
logs.size(), memberName);
+ "{}: Catch-up with {} logs aborted because leadership is lost, resp:
{}",
+ logs.size(),
+ memberName,
+ resp);
}
}
diff --git
a/consensus/src/main/java/org/apache/iotdb/consensus/natraft/protocol/log/manager/RaftLogManager.java
b/consensus/src/main/java/org/apache/iotdb/consensus/natraft/protocol/log/manager/RaftLogManager.java
index 337cc04e9d1..2b4eb410e8f 100644
---
a/consensus/src/main/java/org/apache/iotdb/consensus/natraft/protocol/log/manager/RaftLogManager.java
+++
b/consensus/src/main/java/org/apache/iotdb/consensus/natraft/protocol/log/manager/RaftLogManager.java
@@ -702,6 +702,10 @@ public abstract class RaftLogManager {
} catch (Exception e) {
return false;
}
+ if (term == -1) {
+ // the leader is probing for catch-up and the entry must be committed
+ return index <= commitIndex;
+ }
return t == term;
}
diff --git
a/consensus/src/main/java/org/apache/iotdb/consensus/natraft/protocol/log/manager/serialization/SyncLogDequeSerializer.java
b/consensus/src/main/java/org/apache/iotdb/consensus/natraft/protocol/log/manager/serialization/SyncLogDequeSerializer.java
index a3a2ea890c2..56b742c825d 100644
---
a/consensus/src/main/java/org/apache/iotdb/consensus/natraft/protocol/log/manager/serialization/SyncLogDequeSerializer.java
+++
b/consensus/src/main/java/org/apache/iotdb/consensus/natraft/protocol/log/manager/serialization/SyncLogDequeSerializer.java
@@ -24,6 +24,7 @@ import org.apache.iotdb.commons.file.SystemFileFactory;
import org.apache.iotdb.consensus.natraft.exception.UnknownLogTypeException;
import org.apache.iotdb.consensus.natraft.protocol.HardState;
import org.apache.iotdb.consensus.natraft.protocol.RaftConfig;
+import org.apache.iotdb.consensus.natraft.protocol.RaftMember;
import org.apache.iotdb.consensus.natraft.protocol.log.Entry;
import org.apache.iotdb.consensus.natraft.protocol.log.LogParser;
import
org.apache.iotdb.consensus.natraft.protocol.log.manager.serialization.SyncLogDequeSerializer.VersionController.SimpleFileVersionController;
@@ -161,6 +162,7 @@ public class SyncLogDequeSerializer implements
StableEntryManager {
private RaftConfig config;
private ICompressor compressor =
ICompressor.getCompressor(CompressionType.SNAPPY);
private IUnCompressor unCompressor =
IUnCompressor.getUnCompressor(CompressionType.SNAPPY);
+ private RaftMember member;
private void initCommonProperties() {
logDataBuffer = ByteBuffer.allocate(config.getRaftLogBufferSize());
@@ -210,12 +212,13 @@ public class SyncLogDequeSerializer implements
StableEntryManager {
*
* <p>build serializer with node id
*/
- public SyncLogDequeSerializer(ConsensusGroupId groupId, RaftConfig config) {
+ public SyncLogDequeSerializer(ConsensusGroupId groupId, RaftConfig config,
RaftMember member) {
this.config = config;
name = groupId.toString();
logDir = getLogDir(groupId);
initCommonProperties();
initMetaAndLogFiles();
+ this.member = member;
}
public String getLogDir(ConsensusGroupId groupId) {
@@ -320,6 +323,10 @@ public class SyncLogDequeSerializer implements
StableEntryManager {
flushLogBuffer(true);
logDataBuffer.put(logData);
}
+ // followers only use the cache in persistence, and it is of no use beyond
this point
+ if (!member.isLeader()) {
+ entry.setSerializationCache(null);
+ }
lastLogIndex = entry.getCurrLogIndex();
}
diff --git
a/consensus/src/main/java/org/apache/iotdb/consensus/natraft/protocol/log/recycle/EntryAllocator.java
b/consensus/src/main/java/org/apache/iotdb/consensus/natraft/protocol/log/recycle/EntryAllocator.java
index b98cdbde9ae..d08e5e9a5da 100644
---
a/consensus/src/main/java/org/apache/iotdb/consensus/natraft/protocol/log/recycle/EntryAllocator.java
+++
b/consensus/src/main/java/org/apache/iotdb/consensus/natraft/protocol/log/recycle/EntryAllocator.java
@@ -125,7 +125,6 @@ public class EntryAllocator<T extends Entry> {
+ ","
+ "recycleRatio="
+ recycleHitRatio()
- + ","
+ "}";
}
}
diff --git
a/server/src/main/java/org/apache/iotdb/db/mpp/execution/executor/RegionWriteExecutor.java
b/server/src/main/java/org/apache/iotdb/db/mpp/execution/executor/RegionWriteExecutor.java
index 302cd7e3fc2..06735caa5d4 100644
---
a/server/src/main/java/org/apache/iotdb/db/mpp/execution/executor/RegionWriteExecutor.java
+++
b/server/src/main/java/org/apache/iotdb/db/mpp/execution/executor/RegionWriteExecutor.java
@@ -235,8 +235,8 @@ public class RegionWriteExecutor {
if (writeResponse.getStatus() != null) {
response.setAccepted(
TSStatusCode.SUCCESS_STATUS.getStatusCode() ==
writeResponse.getStatus().getCode()
- || TSStatusCode.WEAKLY_ACCEPTED.getStatusCode()
- == writeResponse.getStatus().getCode());
+ || TSStatusCode.WEAKLY_ACCEPTED.getStatusCode()
+ == writeResponse.getStatus().getCode());
if (TSStatusCode.SUCCESS_STATUS.getStatusCode() !=
writeResponse.getStatus().getCode()) {
response.setMessage(writeResponse.getStatus().message);
response.setStatus(writeResponse.getStatus());