This is an automated email from the ASF dual-hosted git repository.
colinlee pushed a commit to branch research/traft
in repository https://gitbox.apache.org/repos/asf/iotdb.git
The following commit(s) were added to refs/heads/research/traft by this push:
new 6de79acdbe7 [TRaft]Raft Optimization for Time series Workloads (#17273)
6de79acdbe7 is described below
commit 6de79acdbe7f0576eaf57bf218e7162ea5e37031
Author: Little Health <[email protected]>
AuthorDate: Sun Mar 8 23:06:30 2026 +0800
[TRaft]Raft Optimization for Time series Workloads (#17273)
---
.../manager/load/balancer/RouteBalancer.java | 3 +
.../procedure/env/RegionMaintainHandler.java | 4 +-
.../apache/iotdb/consensus/ConsensusFactory.java | 1 +
.../common/request/IConsensusRequest.java | 14 +
.../iotdb/consensus/config/ConsensusConfig.java | 18 +-
.../apache/iotdb/consensus/config/TRaftConfig.java | 141 +++
.../iotdb/consensus/traft/TRaftConsensus.java | 328 +++++++
.../iotdb/consensus/traft/TRaftFollowerInfo.java | 153 ++++
.../iotdb/consensus/traft/TRaftLogAppender.java | 95 ++
.../iotdb/consensus/traft/TRaftLogEntry.java | 89 ++
.../iotdb/consensus/traft/TRaftLogStore.java | 155 ++++
.../iotdb/consensus/traft/TRaftNodeRegistry.java | 54 ++
.../iotdb/consensus/traft/TRaftRequestParser.java | 52 ++
.../TRaftRole.java} | 31 +-
.../iotdb/consensus/traft/TRaftServerImpl.java | 985 +++++++++++++++++++++
.../TRaftVoteRequest.java} | 52 +-
.../TRaftVoteResult.java} | 34 +-
.../db/consensus/DataRegionConsensusImpl.java | 2 +
.../plan/node/pipe/PipeEnrichedInsertNode.java | 10 +
.../plan/planner/plan/node/write/InsertNode.java | 6 +
.../planner/plan/node/write/InsertRowNode.java | 1 +
.../planner/plan/node/write/InsertRowsNode.java | 5 +
.../planner/plan/node/write/InsertTabletNode.java | 5 +
.../plan/node/write/RelationalInsertRowNode.java | 5 +
.../plan/node/write/RelationalInsertRowsNode.java | 6 +
.../node/write/RelationalInsertTabletNode.java | 5 +
.../java/org/apache/iotdb/db/service/DataNode.java | 2 +-
27 files changed, 2181 insertions(+), 75 deletions(-)
diff --git
a/iotdb-core/confignode/src/main/java/org/apache/iotdb/confignode/manager/load/balancer/RouteBalancer.java
b/iotdb-core/confignode/src/main/java/org/apache/iotdb/confignode/manager/load/balancer/RouteBalancer.java
index 37d86daf153..347a8e5399e 100644
---
a/iotdb-core/confignode/src/main/java/org/apache/iotdb/confignode/manager/load/balancer/RouteBalancer.java
+++
b/iotdb-core/confignode/src/main/java/org/apache/iotdb/confignode/manager/load/balancer/RouteBalancer.java
@@ -87,6 +87,8 @@ public class RouteBalancer implements
IClusterStatusSubscriber {
&&
ConsensusFactory.RATIS_CONSENSUS.equals(DATA_REGION_CONSENSUS_PROTOCOL_CLASS))
|| (CONF.isEnableAutoLeaderBalanceForIoTConsensus()
&&
ConsensusFactory.IOT_CONSENSUS.equals(DATA_REGION_CONSENSUS_PROTOCOL_CLASS))
+ || (CONF.isEnableAutoLeaderBalanceForIoTConsensus()
+ &&
ConsensusFactory.TRAFT_CONSENSUS.equals(DATA_REGION_CONSENSUS_PROTOCOL_CLASS))
|| (CONF.isEnableAutoLeaderBalanceForIoTConsensus()
&&
ConsensusFactory.IOT_CONSENSUS_V2.equals(DATA_REGION_CONSENSUS_PROTOCOL_CLASS))
// The simple consensus protocol will always automatically designate
itself as the leader
@@ -200,6 +202,7 @@ public class RouteBalancer implements
IClusterStatusSubscriber {
newLeaderId);
switch (consensusProtocolClass) {
case ConsensusFactory.IOT_CONSENSUS:
+ case ConsensusFactory.TRAFT_CONSENSUS:
case ConsensusFactory.SIMPLE_CONSENSUS:
// For IoTConsensus or SimpleConsensus protocol, change
// RegionRouteMap is enough
diff --git
a/iotdb-core/confignode/src/main/java/org/apache/iotdb/confignode/procedure/env/RegionMaintainHandler.java
b/iotdb-core/confignode/src/main/java/org/apache/iotdb/confignode/procedure/env/RegionMaintainHandler.java
index f827adea5d4..e9ae21e9492 100644
---
a/iotdb-core/confignode/src/main/java/org/apache/iotdb/confignode/procedure/env/RegionMaintainHandler.java
+++
b/iotdb-core/confignode/src/main/java/org/apache/iotdb/confignode/procedure/env/RegionMaintainHandler.java
@@ -91,6 +91,7 @@ import static
org.apache.iotdb.confignode.conf.ConfigNodeConstant.REGION_MIGRATE
import static org.apache.iotdb.consensus.ConsensusFactory.IOT_CONSENSUS;
import static org.apache.iotdb.consensus.ConsensusFactory.IOT_CONSENSUS_V2;
import static org.apache.iotdb.consensus.ConsensusFactory.RATIS_CONSENSUS;
+import static org.apache.iotdb.consensus.ConsensusFactory.TRAFT_CONSENSUS;
public class RegionMaintainHandler {
@@ -171,7 +172,8 @@ public class RegionMaintainHandler {
List<TDataNodeLocation> currentPeerNodes;
if (TConsensusGroupType.DataRegion.equals(regionId.getType())
&& (IOT_CONSENSUS.equals(CONF.getDataRegionConsensusProtocolClass())
- ||
IOT_CONSENSUS_V2.equals(CONF.getDataRegionConsensusProtocolClass()))) {
+ ||
IOT_CONSENSUS_V2.equals(CONF.getDataRegionConsensusProtocolClass())
+ ||
TRAFT_CONSENSUS.equals(CONF.getDataRegionConsensusProtocolClass()))) {
// parameter of createPeer for MultiLeader should be all peers
currentPeerNodes = new ArrayList<>(regionReplicaNodes);
currentPeerNodes.add(destDataNode);
diff --git
a/iotdb-core/consensus/src/main/java/org/apache/iotdb/consensus/ConsensusFactory.java
b/iotdb-core/consensus/src/main/java/org/apache/iotdb/consensus/ConsensusFactory.java
index 9d7a30279ad..1cd071e871b 100644
---
a/iotdb-core/consensus/src/main/java/org/apache/iotdb/consensus/ConsensusFactory.java
+++
b/iotdb-core/consensus/src/main/java/org/apache/iotdb/consensus/ConsensusFactory.java
@@ -37,6 +37,7 @@ public class ConsensusFactory {
public static final String SIMPLE_CONSENSUS =
"org.apache.iotdb.consensus.simple.SimpleConsensus";
public static final String RATIS_CONSENSUS =
"org.apache.iotdb.consensus.ratis.RatisConsensus";
public static final String IOT_CONSENSUS =
"org.apache.iotdb.consensus.iot.IoTConsensus";
+ public static final String TRAFT_CONSENSUS =
"org.apache.iotdb.consensus.traft.TRaftConsensus";
public static final String REAL_PIPE_CONSENSUS =
"org.apache.iotdb.consensus.pipe.PipeConsensus";
public static final String IOT_CONSENSUS_V2 =
"org.apache.iotdb.consensus.iot.IoTConsensusV2";
public static final String IOT_CONSENSUS_V2_BATCH_MODE = "batch";
diff --git
a/iotdb-core/consensus/src/main/java/org/apache/iotdb/consensus/common/request/IConsensusRequest.java
b/iotdb-core/consensus/src/main/java/org/apache/iotdb/consensus/common/request/IConsensusRequest.java
index cae54c2bcbc..6079ac75505 100644
---
a/iotdb-core/consensus/src/main/java/org/apache/iotdb/consensus/common/request/IConsensusRequest.java
+++
b/iotdb-core/consensus/src/main/java/org/apache/iotdb/consensus/common/request/IConsensusRequest.java
@@ -36,6 +36,20 @@ public interface IConsensusRequest {
*/
ByteBuffer serializeToByteBuffer();
+ default boolean hasTime() {
+ return false;
+ }
+
+ /**
+ * Return the primary timestamp carried by this request.
+ *
+ * <p>Callers should check {@link #hasTime()} before calling this method.
+ */
+ default long getTime() {
+ throw new UnsupportedOperationException(
+ String.format("%s does not carry timestamp", getClass().getName()));
+ }
+
default long getMemorySize() {
// return 0 by default
return 0;
diff --git
a/iotdb-core/consensus/src/main/java/org/apache/iotdb/consensus/config/ConsensusConfig.java
b/iotdb-core/consensus/src/main/java/org/apache/iotdb/consensus/config/ConsensusConfig.java
index dba53214abd..10109ec06b5 100644
---
a/iotdb-core/consensus/src/main/java/org/apache/iotdb/consensus/config/ConsensusConfig.java
+++
b/iotdb-core/consensus/src/main/java/org/apache/iotdb/consensus/config/ConsensusConfig.java
@@ -33,6 +33,7 @@ public class ConsensusConfig {
private final RatisConfig ratisConfig;
private final IoTConsensusConfig iotConsensusConfig;
private final PipeConsensusConfig pipeConsensusConfig;
+ private final TRaftConfig tRaftConfig;
private ConsensusConfig(
TEndPoint thisNode,
@@ -41,7 +42,8 @@ public class ConsensusConfig {
TConsensusGroupType consensusGroupType,
RatisConfig ratisConfig,
IoTConsensusConfig iotConsensusConfig,
- PipeConsensusConfig pipeConsensusConfig) {
+ PipeConsensusConfig pipeConsensusConfig,
+ TRaftConfig tRaftConfig) {
this.thisNodeEndPoint = thisNode;
this.thisNodeId = thisNodeId;
this.storageDir = storageDir;
@@ -49,6 +51,7 @@ public class ConsensusConfig {
this.ratisConfig = ratisConfig;
this.iotConsensusConfig = iotConsensusConfig;
this.pipeConsensusConfig = pipeConsensusConfig;
+ this.tRaftConfig = tRaftConfig;
}
public TEndPoint getThisNodeEndPoint() {
@@ -79,6 +82,10 @@ public class ConsensusConfig {
return pipeConsensusConfig;
}
+ public TRaftConfig getTRaftConfig() {
+ return tRaftConfig;
+ }
+
public static ConsensusConfig.Builder newBuilder() {
return new ConsensusConfig.Builder();
}
@@ -92,6 +99,7 @@ public class ConsensusConfig {
private RatisConfig ratisConfig;
private IoTConsensusConfig iotConsensusConfig;
private PipeConsensusConfig pipeConsensusConfig;
+ private TRaftConfig tRaftConfig;
public ConsensusConfig build() {
return new ConsensusConfig(
@@ -103,7 +111,8 @@ public class ConsensusConfig {
Optional.ofNullable(iotConsensusConfig)
.orElseGet(() -> IoTConsensusConfig.newBuilder().build()),
Optional.ofNullable(pipeConsensusConfig)
- .orElseGet(() -> PipeConsensusConfig.newBuilder().build()));
+ .orElseGet(() -> PipeConsensusConfig.newBuilder().build()),
+ Optional.ofNullable(tRaftConfig).orElseGet(() ->
TRaftConfig.newBuilder().build()));
}
public Builder setThisNode(TEndPoint thisNode) {
@@ -140,5 +149,10 @@ public class ConsensusConfig {
this.pipeConsensusConfig = pipeConsensusConfig;
return this;
}
+
+ public Builder setTRaftConfig(TRaftConfig tRaftConfig) {
+ this.tRaftConfig = tRaftConfig;
+ return this;
+ }
}
}
diff --git
a/iotdb-core/consensus/src/main/java/org/apache/iotdb/consensus/config/TRaftConfig.java
b/iotdb-core/consensus/src/main/java/org/apache/iotdb/consensus/config/TRaftConfig.java
new file mode 100644
index 00000000000..25296b1f107
--- /dev/null
+++
b/iotdb-core/consensus/src/main/java/org/apache/iotdb/consensus/config/TRaftConfig.java
@@ -0,0 +1,141 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.iotdb.consensus.config;
+
+import java.util.Optional;
+
+public class TRaftConfig {
+
+ private final Replication replication;
+ private final Election election;
+
+ private TRaftConfig(Replication replication, Election election) {
+ this.replication = replication;
+ this.election = election;
+ }
+
+ public Replication getReplication() {
+ return replication;
+ }
+
+ public Election getElection() {
+ return election;
+ }
+
+ public static Builder newBuilder() {
+ return new Builder();
+ }
+
+ public static class Builder {
+
+ private Replication replication;
+ private Election election;
+
+ public Builder setReplication(Replication replication) {
+ this.replication = replication;
+ return this;
+ }
+
+ public Builder setElection(Election election) {
+ this.election = election;
+ return this;
+ }
+
+ public TRaftConfig build() {
+ return new TRaftConfig(
+ Optional.ofNullable(replication).orElseGet(() ->
Replication.newBuilder().build()),
+ Optional.ofNullable(election).orElseGet(() ->
Election.newBuilder().build()));
+ }
+ }
+
+ public static class Replication {
+
+ private final int maxPendingRetryEntriesPerFollower;
+ private final long waitingReplicationTimeMs;
+
+ private Replication(int maxPendingRetryEntriesPerFollower, long
waitingReplicationTimeMs) {
+ this.maxPendingRetryEntriesPerFollower =
maxPendingRetryEntriesPerFollower;
+ this.waitingReplicationTimeMs = waitingReplicationTimeMs;
+ }
+
+ public int getMaxPendingRetryEntriesPerFollower() {
+ return maxPendingRetryEntriesPerFollower;
+ }
+
+ public long getWaitingReplicationTimeMs() {
+ return waitingReplicationTimeMs;
+ }
+
+ public static Builder newBuilder() {
+ return new Builder();
+ }
+
+ public static class Builder {
+
+ private int maxPendingRetryEntriesPerFollower = 100_000;
+ private long waitingReplicationTimeMs = 1L;
+
+ public Builder setMaxPendingRetryEntriesPerFollower(int
maxPendingRetryEntriesPerFollower) {
+ this.maxPendingRetryEntriesPerFollower =
maxPendingRetryEntriesPerFollower;
+ return this;
+ }
+
+ public Builder setWaitingReplicationTimeMs(long
waitingReplicationTimeMs) {
+ this.waitingReplicationTimeMs = waitingReplicationTimeMs;
+ return this;
+ }
+
+ public Replication build() {
+ return new Replication(maxPendingRetryEntriesPerFollower,
waitingReplicationTimeMs);
+ }
+ }
+ }
+
+ public static class Election {
+
+ private final long randomSeed;
+
+ private Election(long randomSeed) {
+ this.randomSeed = randomSeed;
+ }
+
+ public long getRandomSeed() {
+ return randomSeed;
+ }
+
+ public static Builder newBuilder() {
+ return new Builder();
+ }
+
+ public static class Builder {
+
+ private long randomSeed = System.nanoTime();
+
+ public Builder setRandomSeed(long randomSeed) {
+ this.randomSeed = randomSeed;
+ return this;
+ }
+
+ public Election build() {
+ return new Election(randomSeed);
+ }
+ }
+ }
+}
diff --git
a/iotdb-core/consensus/src/main/java/org/apache/iotdb/consensus/traft/TRaftConsensus.java
b/iotdb-core/consensus/src/main/java/org/apache/iotdb/consensus/traft/TRaftConsensus.java
new file mode 100644
index 00000000000..db4d9875f94
--- /dev/null
+++
b/iotdb-core/consensus/src/main/java/org/apache/iotdb/consensus/traft/TRaftConsensus.java
@@ -0,0 +1,328 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.iotdb.consensus.traft;
+
+import org.apache.iotdb.common.rpc.thrift.TEndPoint;
+import org.apache.iotdb.common.rpc.thrift.TSStatus;
+import org.apache.iotdb.commons.consensus.ConsensusGroupId;
+import org.apache.iotdb.commons.utils.FileUtils;
+import org.apache.iotdb.consensus.IConsensus;
+import org.apache.iotdb.consensus.IStateMachine;
+import org.apache.iotdb.consensus.common.DataSet;
+import org.apache.iotdb.consensus.common.Peer;
+import org.apache.iotdb.consensus.common.request.IConsensusRequest;
+import org.apache.iotdb.consensus.config.ConsensusConfig;
+import org.apache.iotdb.consensus.config.TRaftConfig;
+import org.apache.iotdb.consensus.exception.ConsensusException;
+import
org.apache.iotdb.consensus.exception.ConsensusGroupAlreadyExistException;
+import org.apache.iotdb.consensus.exception.ConsensusGroupNotExistException;
+import org.apache.iotdb.consensus.exception.IllegalPeerEndpointException;
+import org.apache.iotdb.consensus.exception.IllegalPeerNumException;
+import
org.apache.iotdb.consensus.exception.PeerAlreadyInConsensusGroupException;
+import org.apache.iotdb.consensus.exception.PeerNotInConsensusGroupException;
+
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import java.io.File;
+import java.io.IOException;
+import java.nio.file.DirectoryStream;
+import java.nio.file.Files;
+import java.nio.file.Path;
+import java.util.ArrayList;
+import java.util.Collections;
+import java.util.List;
+import java.util.Map;
+import java.util.Optional;
+import java.util.TreeSet;
+import java.util.concurrent.ConcurrentHashMap;
+import java.util.concurrent.atomic.AtomicBoolean;
+
+public class TRaftConsensus implements IConsensus {
+
+ private static final Logger LOGGER =
LoggerFactory.getLogger(TRaftConsensus.class);
+
+ private final TEndPoint thisNode;
+ private final int thisNodeId;
+ private final File storageDir;
+ private final IStateMachine.Registry registry;
+ private final Map<ConsensusGroupId, TRaftServerImpl> stateMachineMap = new
ConcurrentHashMap<>();
+
+ private TRaftConfig config;
+ private Map<ConsensusGroupId, List<Peer>> correctPeerListBeforeStart = null;
+
+ public TRaftConsensus(ConsensusConfig config, IStateMachine.Registry
registry) {
+ this.thisNode = config.getThisNodeEndPoint();
+ this.thisNodeId = config.getThisNodeId();
+ this.storageDir = new File(config.getStorageDir());
+ this.registry = registry;
+ this.config = config.getTRaftConfig();
+ }
+
+ @Override
+ public synchronized void start() throws IOException {
+ initAndRecover();
+ stateMachineMap.values().forEach(TRaftServerImpl::start);
+ TRaftNodeRegistry.register(thisNode, this);
+ }
+
+ @Override
+ public synchronized void stop() {
+ TRaftNodeRegistry.unregister(thisNode);
+ stateMachineMap.values().forEach(TRaftServerImpl::stop);
+ }
+
+ @Override
+ public TSStatus write(ConsensusGroupId groupId, IConsensusRequest request)
+ throws ConsensusException {
+ TRaftServerImpl impl =
+ Optional.ofNullable(stateMachineMap.get(groupId))
+ .orElseThrow(() -> new ConsensusGroupNotExistException(groupId));
+ if (impl.isReadOnly()) {
+ throw new ConsensusException("Current peer is read-only");
+ }
+ return impl.write(request);
+ }
+
+ @Override
+ public DataSet read(ConsensusGroupId groupId, IConsensusRequest request)
+ throws ConsensusException {
+ TRaftServerImpl impl =
+ Optional.ofNullable(stateMachineMap.get(groupId))
+ .orElseThrow(() -> new ConsensusGroupNotExistException(groupId));
+ return impl.read(request);
+ }
+
+ @Override
+ public void createLocalPeer(ConsensusGroupId groupId, List<Peer> peers)
throws ConsensusException {
+ List<Peer> effectivePeers = peers;
+ if (effectivePeers == null || effectivePeers.isEmpty()) {
+ effectivePeers =
+ Collections.singletonList(new Peer(groupId, thisNodeId, thisNode));
+ }
+ if (!effectivePeers.contains(new Peer(groupId, thisNodeId, thisNode))) {
+ throw new IllegalPeerEndpointException(thisNode, effectivePeers);
+ }
+ if (effectivePeers.size() < 1) {
+ throw new IllegalPeerNumException(effectivePeers.size());
+ }
+ final List<Peer> finalPeers = effectivePeers;
+ AtomicBoolean alreadyExists = new AtomicBoolean(true);
+ Optional.ofNullable(
+ stateMachineMap.computeIfAbsent(
+ groupId,
+ key -> {
+ alreadyExists.set(false);
+ File peerDir = new File(buildPeerDir(groupId));
+ if (!peerDir.exists() && !peerDir.mkdirs()) {
+ LOGGER.warn("Failed to create TRaft peer dir {}", peerDir);
+ return null;
+ }
+ try {
+ return new TRaftServerImpl(
+ peerDir.getAbsolutePath(),
+ new Peer(groupId, thisNodeId, thisNode),
+ new TreeSet<>(finalPeers),
+ registry.apply(groupId),
+ config);
+ } catch (IOException e) {
+ LOGGER.error("Failed to create TRaft server for {}",
groupId, e);
+ return null;
+ }
+ }))
+ .map(
+ impl -> {
+ impl.start();
+ return impl;
+ })
+ .orElseThrow(
+ () -> new ConsensusException(String.format("Failed to create local
peer %s", groupId)));
+ if (alreadyExists.get()) {
+ throw new ConsensusGroupAlreadyExistException(groupId);
+ }
+ }
+
+ @Override
+ public void deleteLocalPeer(ConsensusGroupId groupId) throws
ConsensusException {
+ AtomicBoolean exist = new AtomicBoolean(false);
+ stateMachineMap.computeIfPresent(
+ groupId,
+ (key, value) -> {
+ exist.set(true);
+ value.stop();
+ FileUtils.deleteFileOrDirectory(new File(buildPeerDir(groupId)));
+ return null;
+ });
+ if (!exist.get()) {
+ throw new ConsensusGroupNotExistException(groupId);
+ }
+ }
+
+ @Override
+ public void addRemotePeer(ConsensusGroupId groupId, Peer peer) throws
ConsensusException {
+ TRaftServerImpl impl =
+ Optional.ofNullable(stateMachineMap.get(groupId))
+ .orElseThrow(() -> new ConsensusGroupNotExistException(groupId));
+ if (impl.getConfiguration().contains(peer)) {
+ throw new PeerAlreadyInConsensusGroupException(groupId, peer);
+ }
+ try {
+ impl.addPeer(peer);
+ } catch (IOException e) {
+ throw new ConsensusException("Failed to add peer in TRaft", e);
+ }
+ }
+
+ @Override
+ public void removeRemotePeer(ConsensusGroupId groupId, Peer peer) throws
ConsensusException {
+ TRaftServerImpl impl =
+ Optional.ofNullable(stateMachineMap.get(groupId))
+ .orElseThrow(() -> new ConsensusGroupNotExistException(groupId));
+ if (!impl.getConfiguration().contains(peer)) {
+ throw new PeerNotInConsensusGroupException(groupId, peer.toString());
+ }
+ try {
+ impl.removePeer(peer);
+ } catch (IOException e) {
+ throw new ConsensusException("Failed to remove peer in TRaft", e);
+ }
+ }
+
+ @Override
+ public void recordCorrectPeerListBeforeStarting(Map<ConsensusGroupId,
List<Peer>> correctPeerList) {
+ this.correctPeerListBeforeStart = correctPeerList;
+ }
+
+ @Override
+ public void resetPeerList(ConsensusGroupId groupId, List<Peer> correctPeers)
+ throws ConsensusException {
+ TRaftServerImpl impl =
+ Optional.ofNullable(stateMachineMap.get(groupId))
+ .orElseThrow(() -> new ConsensusGroupNotExistException(groupId));
+ try {
+ impl.resetPeerList(correctPeers);
+ } catch (IOException e) {
+ throw new ConsensusException("Failed to reset peer list in TRaft", e);
+ }
+ }
+
+ @Override
+ public void transferLeader(ConsensusGroupId groupId, Peer newLeader) throws
ConsensusException {
+ TRaftServerImpl impl =
+ Optional.ofNullable(stateMachineMap.get(groupId))
+ .orElseThrow(() -> new ConsensusGroupNotExistException(groupId));
+ impl.transferLeader(newLeader);
+ }
+
+ @Override
+ public void triggerSnapshot(ConsensusGroupId groupId, boolean force) throws
ConsensusException {
+ throw new ConsensusException("TRaft does not support snapshot trigger
currently");
+ }
+
+ @Override
+ public boolean isLeader(ConsensusGroupId groupId) {
+ return
Optional.ofNullable(stateMachineMap.get(groupId)).map(TRaftServerImpl::isLeader).orElse(false);
+ }
+
+ @Override
+ public long getLogicalClock(ConsensusGroupId groupId) {
+ return Optional.ofNullable(stateMachineMap.get(groupId))
+ .map(TRaftServerImpl::getLogicalClock)
+ .orElse(0L);
+ }
+
+ @Override
+ public boolean isLeaderReady(ConsensusGroupId groupId) {
+ return Optional.ofNullable(stateMachineMap.get(groupId))
+ .map(TRaftServerImpl::isLeaderReady)
+ .orElse(false);
+ }
+
+ @Override
+ public Peer getLeader(ConsensusGroupId groupId) {
+ return
Optional.ofNullable(stateMachineMap.get(groupId)).map(TRaftServerImpl::getLeader).orElse(null);
+ }
+
+ @Override
+ public int getReplicationNum(ConsensusGroupId groupId) {
+ return Optional.ofNullable(stateMachineMap.get(groupId))
+ .map(TRaftServerImpl::getConfiguration)
+ .map(List::size)
+ .orElse(0);
+ }
+
+ @Override
+ public List<ConsensusGroupId> getAllConsensusGroupIds() {
+ return new ArrayList<>(stateMachineMap.keySet());
+ }
+
+ @Override
+ public String getRegionDirFromConsensusGroupId(ConsensusGroupId groupId) {
+ return buildPeerDir(groupId);
+ }
+
+ @Override
+ public void reloadConsensusConfig(ConsensusConfig consensusConfig) {
+ this.config = consensusConfig.getTRaftConfig();
+ stateMachineMap.values().forEach(server ->
server.reloadConsensusConfig(config));
+ }
+
+ TRaftServerImpl getImpl(ConsensusGroupId groupId) {
+ return stateMachineMap.get(groupId);
+ }
+
+ private void initAndRecover() throws IOException {
+ if (!storageDir.exists() && !storageDir.mkdirs()) {
+ throw new IOException(String.format("Unable to create consensus dir at
%s", storageDir));
+ }
+ try (DirectoryStream<Path> stream =
Files.newDirectoryStream(storageDir.toPath())) {
+ for (Path path : stream) {
+ String[] items = path.getFileName().toString().split("_");
+ if (items.length != 2) {
+ continue;
+ }
+ ConsensusGroupId consensusGroupId =
+ ConsensusGroupId.Factory.create(
+ Integer.parseInt(items[0]), Integer.parseInt(items[1]));
+ TRaftServerImpl consensus =
+ new TRaftServerImpl(
+ path.toString(),
+ new Peer(consensusGroupId, thisNodeId, thisNode),
+ new TreeSet<>(),
+ registry.apply(consensusGroupId),
+ config);
+ stateMachineMap.put(consensusGroupId, consensus);
+ }
+ }
+ if (correctPeerListBeforeStart != null) {
+ for (Map.Entry<ConsensusGroupId, List<Peer>> entry :
correctPeerListBeforeStart.entrySet()) {
+ TRaftServerImpl impl = stateMachineMap.get(entry.getKey());
+ if (impl == null) {
+ continue;
+ }
+ impl.resetPeerList(entry.getValue());
+ }
+ }
+ }
+
+ private String buildPeerDir(ConsensusGroupId groupId) {
+ return storageDir + File.separator + groupId.getType().getValue() + "_" +
groupId.getId();
+ }
+}
diff --git
a/iotdb-core/consensus/src/main/java/org/apache/iotdb/consensus/traft/TRaftFollowerInfo.java
b/iotdb-core/consensus/src/main/java/org/apache/iotdb/consensus/traft/TRaftFollowerInfo.java
new file mode 100644
index 00000000000..15aa3f01470
--- /dev/null
+++
b/iotdb-core/consensus/src/main/java/org/apache/iotdb/consensus/traft/TRaftFollowerInfo.java
@@ -0,0 +1,153 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.iotdb.consensus.traft;
+
+import java.util.HashSet;
+import java.util.Map;
+import java.util.NavigableMap;
+import java.util.NavigableSet;
+import java.util.Set;
+import java.util.TreeMap;
+import java.util.TreeSet;
+
+public class TRaftFollowerInfo {
+
+ private long currentReplicatingPartitionIndex;
+ private final Set<Long> currentReplicatingIndices = new HashSet<>();
+ private long nextPartitionIndex;
+ private long nextPartitionFirstIndex;
+ private long memoryReplicationSuccessCount;
+ private long diskReplicationSuccessCount;
+
+ // Keep delayed entries grouped by partition so we can replay one-by-one
from disk.
+ private final NavigableMap<Long, NavigableSet<Long>>
delayedIndicesByPartition = new TreeMap<>();
+
+ public TRaftFollowerInfo(long currentReplicatingPartitionIndex, long
nextPartitionFirstIndex) {
+ this.currentReplicatingPartitionIndex = currentReplicatingPartitionIndex;
+ this.nextPartitionIndex = currentReplicatingPartitionIndex;
+ this.nextPartitionFirstIndex = nextPartitionFirstIndex;
+ }
+
+ public long getCurrentReplicatingPartitionIndex() {
+ return currentReplicatingPartitionIndex;
+ }
+
+ public void setCurrentReplicatingPartitionIndex(long
currentReplicatingPartitionIndex) {
+ this.currentReplicatingPartitionIndex = currentReplicatingPartitionIndex;
+ }
+
+ public Set<Long> getCurrentReplicatingIndices() {
+ return currentReplicatingIndices;
+ }
+
+ public long getNextPartitionIndex() {
+ return nextPartitionIndex;
+ }
+
+ public void setNextPartitionIndex(long nextPartitionIndex) {
+ this.nextPartitionIndex = nextPartitionIndex;
+ }
+
+ public long getNextPartitionFirstIndex() {
+ return nextPartitionFirstIndex;
+ }
+
+ public void setNextPartitionFirstIndex(long nextPartitionFirstIndex) {
+ this.nextPartitionFirstIndex = nextPartitionFirstIndex;
+ }
+
+ public void addCurrentReplicatingIndex(long index) {
+ currentReplicatingIndices.add(index);
+ }
+
+ public void removeCurrentReplicatingIndex(long index) {
+ currentReplicatingIndices.remove(index);
+ }
+
+ public boolean hasCurrentReplicatingIndex() {
+ return !currentReplicatingIndices.isEmpty();
+ }
+
+ public long getMemoryReplicationSuccessCount() {
+ return memoryReplicationSuccessCount;
+ }
+
+ public long getDiskReplicationSuccessCount() {
+ return diskReplicationSuccessCount;
+ }
+
+ public void recordMemoryReplicationSuccess() {
+ memoryReplicationSuccessCount++;
+ }
+
+ public void recordDiskReplicationSuccess() {
+ diskReplicationSuccessCount++;
+ }
+
+ public void addDelayedIndex(long partitionIndex, long logIndex) {
+ delayedIndicesByPartition
+ .computeIfAbsent(partitionIndex, key -> new TreeSet<>())
+ .add(logIndex);
+ refreshNextPartitionPointers();
+ }
+
+ public void removeDelayedIndex(long partitionIndex, long logIndex) {
+ NavigableSet<Long> indices = delayedIndicesByPartition.get(partitionIndex);
+ if (indices == null) {
+ return;
+ }
+ indices.remove(logIndex);
+ if (indices.isEmpty()) {
+ delayedIndicesByPartition.remove(partitionIndex);
+ }
+ refreshNextPartitionPointers();
+ }
+
+ public Long getFirstDelayedPartitionIndex() {
+ return delayedIndicesByPartition.isEmpty() ? null :
delayedIndicesByPartition.firstKey();
+ }
+
+ public Long getFirstDelayedIndexOfPartition(long partitionIndex) {
+ NavigableSet<Long> indices = delayedIndicesByPartition.get(partitionIndex);
+ return indices == null || indices.isEmpty() ? null : indices.first();
+ }
+
+ public boolean hasDelayedEntries() {
+ return !delayedIndicesByPartition.isEmpty();
+ }
+
+ public long delayedEntryCount() {
+ long count = 0;
+ for (Map.Entry<Long, NavigableSet<Long>> entry :
delayedIndicesByPartition.entrySet()) {
+ count += entry.getValue().size();
+ }
+ return count;
+ }
+
+ private void refreshNextPartitionPointers() {
+ if (delayedIndicesByPartition.isEmpty()) {
+ nextPartitionIndex = currentReplicatingPartitionIndex;
+ return;
+ }
+ Map.Entry<Long, NavigableSet<Long>> first =
delayedIndicesByPartition.firstEntry();
+ nextPartitionIndex = first.getKey();
+ nextPartitionFirstIndex = first.getValue().first();
+ }
+}
diff --git
a/iotdb-core/consensus/src/main/java/org/apache/iotdb/consensus/traft/TRaftLogAppender.java
b/iotdb-core/consensus/src/main/java/org/apache/iotdb/consensus/traft/TRaftLogAppender.java
new file mode 100644
index 00000000000..c5bb758654f
--- /dev/null
+++
b/iotdb-core/consensus/src/main/java/org/apache/iotdb/consensus/traft/TRaftLogAppender.java
@@ -0,0 +1,95 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.iotdb.consensus.traft;
+
+import org.apache.iotdb.consensus.common.Peer;
+
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import java.util.concurrent.CountDownLatch;
+import java.util.concurrent.TimeUnit;
+
+/**
+ * Per-follower background appender thread.
+ *
+ * <p>Runs an infinite loop that polls every {@code waitingReplicationTimeMs}
milliseconds and calls
+ * back into {@link TRaftServerImpl#tryReplicateDiskEntriesToFollower} to
dispatch any log entries
+ * that are on disk but have not yet been sent to the target follower. This
handles two scenarios:
+ *
+ * <ul>
+ * <li>Entries whose partition index was higher than the leader's current
inserting partition at
+ * write time (the entry went straight to disk and was deferred).
+ * <li>Entries that could not be sent synchronously because the follower was
offline during the
+ * original write, or because the previous partition's quorum commit is
still pending.
+ * </ul>
+ */
+class TRaftLogAppender implements Runnable {
+
+ private static final Logger LOGGER =
LoggerFactory.getLogger(TRaftLogAppender.class);
+
+ private final TRaftServerImpl server;
+ private final Peer follower;
+ private final long waitingReplicationTimeMs;
+
+ private volatile boolean stopped = false;
+ private final CountDownLatch runFinished = new CountDownLatch(1);
+
+ TRaftLogAppender(TRaftServerImpl server, Peer follower, long
waitingReplicationTimeMs) {
+ this.server = server;
+ this.follower = follower;
+ this.waitingReplicationTimeMs = waitingReplicationTimeMs;
+ }
+
+ /** Signal the appender to stop after the current iteration completes. */
+ void stop() {
+ stopped = true;
+ }
+
+ /**
+ * Block until the appender thread has exited, up to {@code timeoutSeconds}
seconds.
+ *
+ * @return {@code true} if the thread exited within the timeout, {@code
false} otherwise.
+ */
+ boolean awaitTermination(long timeoutSeconds) throws InterruptedException {
+ return runFinished.await(timeoutSeconds, TimeUnit.SECONDS);
+ }
+
+ @Override
+ public void run() {
+ LOGGER.info("TRaftLogAppender started for follower {}", follower);
+ try {
+ while (!stopped && !Thread.currentThread().isInterrupted()) {
+ try {
+ // Attempt to replicate any disk entries that are ready for this
follower.
+ server.tryReplicateDiskEntriesToFollower(follower);
+ } catch (Exception e) {
+ LOGGER.warn("Error during disk replication to {}: {}", follower,
e.getMessage(), e);
+ }
+ TimeUnit.MILLISECONDS.sleep(waitingReplicationTimeMs);
+ }
+ } catch (InterruptedException e) {
+ Thread.currentThread().interrupt();
+ } finally {
+ runFinished.countDown();
+ LOGGER.info("TRaftLogAppender stopped for follower {}", follower);
+ }
+ }
+}
diff --git
a/iotdb-core/consensus/src/main/java/org/apache/iotdb/consensus/traft/TRaftLogEntry.java
b/iotdb-core/consensus/src/main/java/org/apache/iotdb/consensus/traft/TRaftLogEntry.java
new file mode 100644
index 00000000000..66542177124
--- /dev/null
+++
b/iotdb-core/consensus/src/main/java/org/apache/iotdb/consensus/traft/TRaftLogEntry.java
@@ -0,0 +1,89 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.iotdb.consensus.traft;
+
+import java.util.Arrays;
+
+public class TRaftLogEntry {
+
+ private final long timestamp;
+ private final long partitionIndex;
+ private final long logIndex;
+ private final long logTerm;
+ private final long interPartitionIndex;
+ private final long lastPartitionCount;
+ private final byte[] data;
+
+ public TRaftLogEntry(
+ long timestamp,
+ long partitionIndex,
+ long logIndex,
+ long logTerm,
+ long interPartitionIndex,
+ long lastPartitionCount,
+ byte[] data) {
+ this.timestamp = timestamp;
+ this.partitionIndex = partitionIndex;
+ this.logIndex = logIndex;
+ this.logTerm = logTerm;
+ this.interPartitionIndex = interPartitionIndex;
+ this.lastPartitionCount = lastPartitionCount;
+ this.data = data;
+ }
+
+ public long getTimestamp() {
+ return timestamp;
+ }
+
+ public long getPartitionIndex() {
+ return partitionIndex;
+ }
+
+ public long getLogIndex() {
+ return logIndex;
+ }
+
+ public long getLogTerm() {
+ return logTerm;
+ }
+
+ public long getInterPartitionIndex() {
+ return interPartitionIndex;
+ }
+
+ public long getLastPartitionCount() {
+ return lastPartitionCount;
+ }
+
+ public byte[] getData() {
+ return data;
+ }
+
+ public TRaftLogEntry copy() {
+ return new TRaftLogEntry(
+ timestamp,
+ partitionIndex,
+ logIndex,
+ logTerm,
+ interPartitionIndex,
+ lastPartitionCount,
+ Arrays.copyOf(data, data.length));
+ }
+}
diff --git
a/iotdb-core/consensus/src/main/java/org/apache/iotdb/consensus/traft/TRaftLogStore.java
b/iotdb-core/consensus/src/main/java/org/apache/iotdb/consensus/traft/TRaftLogStore.java
new file mode 100644
index 00000000000..0a578a22fcb
--- /dev/null
+++
b/iotdb-core/consensus/src/main/java/org/apache/iotdb/consensus/traft/TRaftLogStore.java
@@ -0,0 +1,155 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.iotdb.consensus.traft;
+
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import java.io.BufferedReader;
+import java.io.BufferedWriter;
+import java.io.File;
+import java.io.IOException;
+import java.nio.charset.StandardCharsets;
+import java.nio.file.Files;
+import java.nio.file.StandardOpenOption;
+import java.util.ArrayList;
+import java.util.Base64;
+import java.util.Collections;
+import java.util.LinkedHashMap;
+import java.util.List;
+import java.util.Map;
+
+class TRaftLogStore {
+
+ private static final Logger LOGGER =
LoggerFactory.getLogger(TRaftLogStore.class);
+ private static final String LOG_FILE_NAME = "traft.log";
+
+ private final File logFile;
+ private final Map<Long, TRaftLogEntry> logEntries = new LinkedHashMap<>();
+
+ TRaftLogStore(String storageDir) throws IOException {
+ this.logFile = new File(storageDir, LOG_FILE_NAME);
+ File parent = logFile.getParentFile();
+ if (parent != null && !parent.exists() && !parent.mkdirs()) {
+ throw new IOException(String.format("Failed to create TRaft dir %s",
parent));
+ }
+ if (!logFile.exists() && !logFile.createNewFile()) {
+ throw new IOException(String.format("Failed to create TRaft log file
%s", logFile));
+ }
+ loadAll();
+ }
+
+ synchronized void append(TRaftLogEntry entry) throws IOException {
+ if (logEntries.containsKey(entry.getLogIndex())) {
+ return;
+ }
+ try (BufferedWriter writer =
+ Files.newBufferedWriter(
+ logFile.toPath(), StandardCharsets.UTF_8,
StandardOpenOption.APPEND)) {
+ writer.write(serialize(entry));
+ writer.newLine();
+ }
+ logEntries.put(entry.getLogIndex(), entry.copy());
+ }
+
+ synchronized TRaftLogEntry getByIndex(long index) {
+ TRaftLogEntry entry = logEntries.get(index);
+ return entry == null ? null : entry.copy();
+ }
+
+ synchronized boolean contains(long index) {
+ return logEntries.containsKey(index);
+ }
+
+ synchronized TRaftLogEntry getLastEntry() {
+ if (logEntries.isEmpty()) {
+ return null;
+ }
+ TRaftLogEntry last = null;
+ for (TRaftLogEntry entry : logEntries.values()) {
+ last = entry;
+ }
+ return last == null ? null : last.copy();
+ }
+
+ synchronized List<TRaftLogEntry> getAllEntries() {
+ List<TRaftLogEntry> result = new ArrayList<>(logEntries.size());
+ for (TRaftLogEntry entry : logEntries.values()) {
+ result.add(entry.copy());
+ }
+ return Collections.unmodifiableList(result);
+ }
+
+ private void loadAll() throws IOException {
+ try (BufferedReader reader = Files.newBufferedReader(logFile.toPath(),
StandardCharsets.UTF_8)) {
+ String line;
+ while ((line = reader.readLine()) != null) {
+ if (line.isEmpty()) {
+ continue;
+ }
+ TRaftLogEntry entry = deserialize(line);
+ logEntries.put(entry.getLogIndex(), entry);
+ }
+ }
+ }
+
+ private String serialize(TRaftLogEntry entry) {
+ return entry.getLogIndex()
+ + ","
+ + entry.getLogTerm()
+ + ","
+ + entry.getTimestamp()
+ + ","
+ + entry.getPartitionIndex()
+ + ","
+ + entry.getInterPartitionIndex()
+ + ","
+ + entry.getLastPartitionCount()
+ + ","
+ + Base64.getEncoder().encodeToString(entry.getData());
+ }
+
+ private TRaftLogEntry deserialize(String line) {
+ String[] fields = line.split(",", 7);
+ if (fields.length != 7) {
+ throw new IllegalArgumentException("Invalid TRaft log line: " + line);
+ }
+ try {
+ long logIndex = Long.parseLong(fields[0]);
+ long logTerm = Long.parseLong(fields[1]);
+ long timestamp = Long.parseLong(fields[2]);
+ long partitionIndex = Long.parseLong(fields[3]);
+ long interPartitionIndex = Long.parseLong(fields[4]);
+ long lastPartitionCount = Long.parseLong(fields[5]);
+ byte[] data = Base64.getDecoder().decode(fields[6]);
+ return new TRaftLogEntry(
+ timestamp,
+ partitionIndex,
+ logIndex,
+ logTerm,
+ interPartitionIndex,
+ lastPartitionCount,
+ data);
+ } catch (Exception e) {
+ LOGGER.error("Failed to parse TRaft log line {}", line, e);
+ throw e;
+ }
+ }
+}
diff --git
a/iotdb-core/consensus/src/main/java/org/apache/iotdb/consensus/traft/TRaftNodeRegistry.java
b/iotdb-core/consensus/src/main/java/org/apache/iotdb/consensus/traft/TRaftNodeRegistry.java
new file mode 100644
index 00000000000..098dcb7a4ed
--- /dev/null
+++
b/iotdb-core/consensus/src/main/java/org/apache/iotdb/consensus/traft/TRaftNodeRegistry.java
@@ -0,0 +1,54 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.iotdb.consensus.traft;
+
+import org.apache.iotdb.common.rpc.thrift.TEndPoint;
+import org.apache.iotdb.consensus.common.Peer;
+
+import java.util.Map;
+import java.util.Optional;
+import java.util.concurrent.ConcurrentHashMap;
+
+class TRaftNodeRegistry {
+
+ private static final Map<String, TRaftConsensus> CONSENSUS_BY_ENDPOINT = new
ConcurrentHashMap<>();
+
+ private TRaftNodeRegistry() {}
+
+ static void register(TEndPoint endpoint, TRaftConsensus consensus) {
+ CONSENSUS_BY_ENDPOINT.put(toEndpointKey(endpoint), consensus);
+ }
+
+ static void unregister(TEndPoint endpoint) {
+ CONSENSUS_BY_ENDPOINT.remove(toEndpointKey(endpoint));
+ }
+
+ static Optional<TRaftServerImpl> resolveServer(Peer peer) {
+ TRaftConsensus consensus =
CONSENSUS_BY_ENDPOINT.get(toEndpointKey(peer.getEndpoint()));
+ if (consensus == null) {
+ return Optional.empty();
+ }
+ return Optional.ofNullable(consensus.getImpl(peer.getGroupId()));
+ }
+
+ private static String toEndpointKey(TEndPoint endpoint) {
+ return endpoint.getIp() + ":" + endpoint.getPort();
+ }
+}
diff --git
a/iotdb-core/consensus/src/main/java/org/apache/iotdb/consensus/traft/TRaftRequestParser.java
b/iotdb-core/consensus/src/main/java/org/apache/iotdb/consensus/traft/TRaftRequestParser.java
new file mode 100644
index 00000000000..2ce516cedf9
--- /dev/null
+++
b/iotdb-core/consensus/src/main/java/org/apache/iotdb/consensus/traft/TRaftRequestParser.java
@@ -0,0 +1,52 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.iotdb.consensus.traft;
+
+import org.apache.iotdb.consensus.common.request.ByteBufferConsensusRequest;
+import org.apache.iotdb.consensus.common.request.IConsensusRequest;
+
+import java.nio.ByteBuffer;
+
+class TRaftRequestParser {
+
+ private TRaftRequestParser() {}
+
+ static long extractTimestamp(IConsensusRequest request, long
fallbackTimestamp) {
+ if (request.hasTime()) {
+ return request.getTime();
+ }
+ ByteBuffer buffer = request.serializeToByteBuffer().duplicate();
+ if (buffer.remaining() < Long.BYTES) {
+ return fallbackTimestamp;
+ }
+ return buffer.getLong(buffer.position());
+ }
+
+ static byte[] extractRawRequest(IConsensusRequest request) {
+ ByteBuffer buffer = request.serializeToByteBuffer().duplicate();
+ byte[] result = new byte[buffer.remaining()];
+ buffer.get(result);
+ return result;
+ }
+
+ static IConsensusRequest buildRequest(byte[] rawRequest) {
+ return new ByteBufferConsensusRequest(ByteBuffer.wrap(rawRequest));
+ }
+}
diff --git
a/iotdb-core/consensus/src/main/java/org/apache/iotdb/consensus/common/request/IConsensusRequest.java
b/iotdb-core/consensus/src/main/java/org/apache/iotdb/consensus/traft/TRaftRole.java
similarity index 50%
copy from
iotdb-core/consensus/src/main/java/org/apache/iotdb/consensus/common/request/IConsensusRequest.java
copy to
iotdb-core/consensus/src/main/java/org/apache/iotdb/consensus/traft/TRaftRole.java
index cae54c2bcbc..39ff57143c9 100644
---
a/iotdb-core/consensus/src/main/java/org/apache/iotdb/consensus/common/request/IConsensusRequest.java
+++
b/iotdb-core/consensus/src/main/java/org/apache/iotdb/consensus/traft/TRaftRole.java
@@ -17,31 +17,10 @@
* under the License.
*/
-package org.apache.iotdb.consensus.common.request;
+package org.apache.iotdb.consensus.traft;
-import java.nio.ByteBuffer;
-
-public interface IConsensusRequest {
- /**
- * Serialize all the data to a ByteBuffer.
- *
- * <p>In a specific implementation, ByteBuf or PublicBAOS can be used to
reduce the number of
- * memory copies.
- *
- * <p>To improve efficiency, a specific implementation could return a
DirectByteBuffer to reduce
- * the memory copy required to send an RPC
- *
- * <p>Note: The implementation needs to ensure that the data in the returned
Bytebuffer cannot be
- * changed or an error may occur
- */
- ByteBuffer serializeToByteBuffer();
-
- default long getMemorySize() {
- // return 0 by default
- return 0;
- }
-
- default void markAsGeneratedByRemoteConsensusLeader() {
- // do nothing by default
- }
+public enum TRaftRole {
+ LEADER,
+ FOLLOWER,
+ CANDIDATE
}
diff --git
a/iotdb-core/consensus/src/main/java/org/apache/iotdb/consensus/traft/TRaftServerImpl.java
b/iotdb-core/consensus/src/main/java/org/apache/iotdb/consensus/traft/TRaftServerImpl.java
new file mode 100644
index 00000000000..d48dbe35f3b
--- /dev/null
+++
b/iotdb-core/consensus/src/main/java/org/apache/iotdb/consensus/traft/TRaftServerImpl.java
@@ -0,0 +1,985 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.iotdb.consensus.traft;
+
+import org.apache.iotdb.common.rpc.thrift.TEndPoint;
+import org.apache.iotdb.common.rpc.thrift.TSStatus;
+import org.apache.iotdb.consensus.IStateMachine;
+import org.apache.iotdb.consensus.common.DataSet;
+import org.apache.iotdb.consensus.common.Peer;
+import org.apache.iotdb.consensus.common.request.IConsensusRequest;
+import org.apache.iotdb.consensus.config.TRaftConfig;
+import org.apache.iotdb.rpc.RpcUtils;
+import org.apache.iotdb.rpc.TSStatusCode;
+
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import java.io.BufferedReader;
+import java.io.BufferedWriter;
+import java.io.File;
+import java.io.IOException;
+import java.nio.charset.StandardCharsets;
+import java.nio.file.Files;
+import java.util.ArrayList;
+import java.util.List;
+import java.util.Objects;
+import java.util.Optional;
+import java.util.Random;
+import java.util.TreeSet;
+import java.util.concurrent.ConcurrentHashMap;
+import java.util.concurrent.ExecutorService;
+import java.util.concurrent.Executors;
+import java.util.concurrent.atomic.AtomicInteger;
+import java.util.concurrent.atomic.AtomicLong;
+
+class TRaftServerImpl {
+
+ private static final Logger LOGGER =
LoggerFactory.getLogger(TRaftServerImpl.class);
+ private static final String CONFIGURATION_FILE_NAME = "configuration.dat";
+ private static final long INITIAL_PARTITION_INDEX = 0;
+
+ // ── persistent / structural state
───────────────────────────────────────────
+ private final String storageDir;
+ private final Peer thisNode;
+ private final IStateMachine stateMachine;
+ private final TreeSet<Peer> configuration = new TreeSet<>();
+ private final ConcurrentHashMap<Integer, TRaftFollowerInfo> followerInfoMap =
+ new ConcurrentHashMap<>();
+ private final TRaftLogStore logStore;
+ private final AtomicLong logicalClock = new AtomicLong(0);
+ private final Random random;
+
+ // ── volatile node state
──────────────────────────────────────────────────────
+ private volatile TRaftRole role = TRaftRole.FOLLOWER;
+ private volatile boolean active = true;
+ private volatile boolean started = false;
+
+ // ── term / election state
────────────────────────────────────────────────────
+ private long currentTerm = 0;
+ private int leaderId = -1;
+ private int votedFor = -1;
+
+ // ── partition tracking
───────────────────────────────────────────────────────
+ private long historicalMaxTimestamp = Long.MIN_VALUE;
+ private long maxPartitionIndex = INITIAL_PARTITION_INDEX;
+ private long currentPartitionIndexCount = 0;
+
+ /**
+ * The partition index of entries currently being actively inserted by the
leader (fast path).
+ * Entries arriving with a partition index equal to this value are eligible
for direct
+ * in-memory replication to matching followers. Entries with a higher
partition index are
+ * written directly to disk and replicated by the per-follower {@link
TRaftLogAppender}.
+ */
+ private long currentLeaderInsertingPartitionIndex = INITIAL_PARTITION_INDEX;
+
+ // ── quorum commit tracking
───────────────────────────────────────────────────
+ /**
+ * Tracks, per log index, how many followers have acknowledged that entry.
Once the ACK count
+ * reaches ⌈followerCount/2⌉ the entry is considered committed and removed
from the map.
+ * The map becoming empty signals that all entries of the current partition
have achieved
+ * quorum, allowing the next partition's entries to be transmitted.
+ */
+ private final ConcurrentHashMap<Long, AtomicInteger>
currentReplicationgIndicesToAckFollowerCount =
+ new ConcurrentHashMap<>();
+
+ // ── background appenders
─────────────────────────────────────────────────────
+ /** One background appender thread per follower, keyed by follower node-id.
*/
+ private final ConcurrentHashMap<Integer, TRaftLogAppender> appenderMap =
+ new ConcurrentHashMap<>();
+
+ private ExecutorService appenderExecutor;
+
+ private TRaftConfig config;
+
+ //
────────────────────────────────────────────────────────────────────────────
+ // Construction
+ //
────────────────────────────────────────────────────────────────────────────
+
+ TRaftServerImpl(
+ String storageDir,
+ Peer thisNode,
+ TreeSet<Peer> peers,
+ IStateMachine stateMachine,
+ TRaftConfig config)
+ throws IOException {
+ this.storageDir = storageDir;
+ this.thisNode = thisNode;
+ this.stateMachine = stateMachine;
+ this.config = config;
+ this.logStore = new TRaftLogStore(storageDir);
+ this.random = new Random(config.getElection().getRandomSeed() +
thisNode.getNodeId());
+ if (peers.isEmpty()) {
+ this.configuration.addAll(loadConfigurationFromDisk());
+ } else {
+ this.configuration.addAll(peers);
+ persistConfiguration();
+ }
+ if (!this.configuration.contains(thisNode)) {
+ this.configuration.add(thisNode);
+ persistConfiguration();
+ }
+ recoverFromDisk();
+ electInitialLeader();
+ initFollowerInfoMap();
+ }
+
+ //
────────────────────────────────────────────────────────────────────────────
+ // Lifecycle
+ //
────────────────────────────────────────────────────────────────────────────
+
+ public synchronized void start() {
+ if (started) {
+ return;
+ }
+ stateMachine.start();
+ started = true;
+ notifyLeaderChanged();
+ if (role == TRaftRole.LEADER) {
+ initAppenders();
+ }
+ }
+
+ public synchronized void stop() {
+ // Mark as stopped first so appender callbacks become no-ops immediately.
+ started = false;
+ stopAppenders();
+ stateMachine.stop();
+ }
+
+ //
────────────────────────────────────────────────────────────────────────────
+ // Write path (Leader)
+ //
────────────────────────────────────────────────────────────────────────────
+
+ /**
+ * Process a client write request as the leader.
+ *
+ * <p><b>Step 1 – Parse:</b> Build a {@link TRaftLogEntry} with timestamp
and partition index
+ * derived from the request.
+ *
+ * <p><b>Step 2 – Route by partition index:</b>
+ * <ul>
+ * <li>If the entry's partition index is <em>greater than</em>
+ * {@code currentLeaderInsertingPartitionIndex}: the entry and all
subsequent entries go
+ * directly to disk. The partition index pointer is advanced. A
best-effort quorum-commit
+ * check is performed (if the map is already empty the pointer
advances cleanly; otherwise
+ * it advances anyway and the appender will enforce ordering). All
followers receive the
+ * entry in their delayed queue; disk dispatch is attempted
synchronously.
+ * <li>If the entry's partition index <em>equals</em>
+ * {@code currentLeaderInsertingPartitionIndex}: each follower is
inspected individually.
+ * Followers whose {@code currentFollowerReplicatingPartitionIndex}
matches are sent the
+ * entry immediately (fast path) and the index is recorded in
+ * {@code currentReplicatingIndices}. Followers that are behind
receive it as a delayed
+ * entry and disk dispatch is attempted. The entry is then persisted
to disk.
+ * </ul>
+ *
+ * <p><b>Step 3 – Quorum tracking:</b> Each sent entry is registered in
+ * {@code currentReplicationgIndicesToAckFollowerCount}. When a follower
acknowledges the entry,
+ * its ACK count is incremented; once ≥ half the follower count the entry is
removed from the
+ * map. When the map empties the current partition is fully committed and
the next partition may
+ * be transmitted.
+ */
+ public synchronized TSStatus write(IConsensusRequest request) {
+ if (!active) {
+ return RpcUtils.getStatus(
+ TSStatusCode.WRITE_PROCESS_REJECT,
+ String.format("Peer %s is inactive and cannot process writes",
thisNode));
+ }
+ if (role != TRaftRole.LEADER) {
+ return RpcUtils.getStatus(
+ TSStatusCode.WRITE_PROCESS_REJECT,
+ String.format("Peer %s is not leader, current leader id: %s",
thisNode, leaderId));
+ }
+
+ TRaftLogEntry logEntry = buildLogEntry(request);
+
+ // Apply to local state machine immediately (leader apply-on-write model).
+ TSStatus localStatus =
stateMachine.write(stateMachine.deserializeRequest(request));
+ if (!isSuccess(localStatus)) {
+ return localStatus;
+ }
+
+ List<Peer> followers = getFollowers();
+ int followerCount = followers.size();
+ long entryPartitionIndex = logEntry.getPartitionIndex();
+
+ if (entryPartitionIndex > currentLeaderInsertingPartitionIndex) {
+ // ── Higher partition: write directly to disk ─────────────────────────
+ // Log a debug note if there are still uncommitted entries from the old
partition.
+ if (!currentReplicationgIndicesToAckFollowerCount.isEmpty()) {
+ LOGGER.debug(
+ "Partition boundary crossed from {} to {} with {} uncommitted
indices pending quorum",
+ currentLeaderInsertingPartitionIndex,
+ entryPartitionIndex,
+ currentReplicationgIndicesToAckFollowerCount.size());
+ }
+ currentLeaderInsertingPartitionIndex = entryPartitionIndex;
+
+ try {
+ logStore.append(logEntry);
+ } catch (IOException e) {
+ LOGGER.error("Failed to append TRaft log entry {}",
logEntry.getLogIndex(), e);
+ return RpcUtils.getStatus(
+ TSStatusCode.INTERNAL_SERVER_ERROR, "Failed to persist TRaft log
entry");
+ }
+ logicalClock.updateAndGet(v -> Math.max(v, logEntry.getLogIndex()));
+ updatePartitionIndexStat(logEntry);
+
+ // Add to every follower's delayed queue; attempt synchronous dispatch.
+ for (Peer follower : followers) {
+ TRaftFollowerInfo info =
+ followerInfoMap.computeIfAbsent(
+ follower.getNodeId(),
+ key -> new TRaftFollowerInfo(getLatestPartitionIndex(),
logicalClock.get()));
+ info.addDelayedIndex(logEntry.getPartitionIndex(),
logEntry.getLogIndex());
+ limitDelayedEntriesIfNecessary(info);
+ dispatchNextDelayedEntry(follower, info);
+ }
+
+ } else {
+ // ── Same partition: attempt fast-path replication per follower ────────
+ if (followerCount > 0) {
+ currentReplicationgIndicesToAckFollowerCount.put(
+ logEntry.getLogIndex(), new AtomicInteger(0));
+ }
+
+ List<Peer> delayedFollowers = new ArrayList<>();
+ for (Peer follower : followers) {
+ TRaftFollowerInfo info =
+ followerInfoMap.computeIfAbsent(
+ follower.getNodeId(),
+ key -> new TRaftFollowerInfo(getLatestPartitionIndex(),
logicalClock.get() + 1));
+
+ if (entryPartitionIndex == info.getCurrentReplicatingPartitionIndex())
{
+ // Fast path: record index and send directly.
+ info.addCurrentReplicatingIndex(logEntry.getLogIndex());
+ boolean success = sendEntryToFollower(follower, logEntry);
+ if (success) {
+ info.recordMemoryReplicationSuccess();
+ onFollowerAck(follower, info, logEntry);
+ } else {
+ onFollowerSendFailed(info, logEntry);
+ delayedFollowers.add(follower);
+ }
+ } else if (entryPartitionIndex >
info.getCurrentReplicatingPartitionIndex()) {
+ // Follower is behind – defer to disk path.
+ delayedFollowers.add(follower);
+ }
+ // entryPartitionIndex < info.getCurrentReplicatingPartitionIndex()
should not happen.
+ }
+
+ // Persist to disk after in-memory replication attempts.
+ try {
+ logStore.append(logEntry);
+ } catch (IOException e) {
+ LOGGER.error("Failed to append TRaft log entry {}",
logEntry.getLogIndex(), e);
+ return RpcUtils.getStatus(
+ TSStatusCode.INTERNAL_SERVER_ERROR, "Failed to persist TRaft log
entry");
+ }
+ logicalClock.updateAndGet(v -> Math.max(v, logEntry.getLogIndex()));
+ updatePartitionIndexStat(logEntry);
+
+ // For delayed followers: enqueue + synchronous disk dispatch.
+ for (Peer follower : delayedFollowers) {
+ TRaftFollowerInfo info = followerInfoMap.get(follower.getNodeId());
+ if (info == null) {
+ continue;
+ }
+ info.addDelayedIndex(logEntry.getPartitionIndex(),
logEntry.getLogIndex());
+ limitDelayedEntriesIfNecessary(info);
+ dispatchNextDelayedEntry(follower, info);
+ }
+ }
+
+ return localStatus;
+ }
+
+ //
────────────────────────────────────────────────────────────────────────────
+ // Read path
+ //
────────────────────────────────────────────────────────────────────────────
+
+ public synchronized DataSet read(IConsensusRequest request) {
+ return stateMachine.read(request);
+ }
+
+ //
────────────────────────────────────────────────────────────────────────────
+ // Follower RPC handler
+ //
────────────────────────────────────────────────────────────────────────────
+
+ /**
+ * Receive a replicated log entry from the leader.
+ *
+ * <p><b>Term check:</b>
+ * <ul>
+ * <li>If the leader's term is less than the follower's current term the
request is rejected
+ * so the leader can discover it is stale and step down.
+ * <li>If the leader's term is greater the follower updates its own term
and resets its vote.
+ * </ul>
+ *
+ * <p><b>Partition index check (after term is accepted):</b>
+ * <ul>
+ * <li>If the entry's partition index is <em>less than</em> the follower's
current maximum
+ * partition index, the follower is ahead of what the leader is
sending – this is an
+ * inconsistency and an error is returned.
+ * <li>If the entry's partition index is equal to or greater than the
follower's maximum, the
+ * entry is accepted, persisted, and applied to the state machine.
+ * </ul>
+ */
+ synchronized TSStatus receiveReplicatedLog(TRaftLogEntry logEntry, int
newLeaderId, long term) {
+ // ── Term comparison
──────────────────────────────────────────────────────
+ if (term < currentTerm) {
+ LOGGER.warn(
+ "Rejecting replicated log from leader {} with stale term {}
(currentTerm={})",
+ newLeaderId,
+ term,
+ currentTerm);
+ return RpcUtils.getStatus(
+ TSStatusCode.WRITE_PROCESS_REJECT,
+ "Leader term " + term + " is stale; follower currentTerm=" +
currentTerm);
+ }
+ if (term > currentTerm) {
+ currentTerm = term;
+ votedFor = -1;
+ }
+ becomeFollower(newLeaderId);
+
+ // Idempotency: already have this log index.
+ if (logStore.contains(logEntry.getLogIndex())) {
+ return RpcUtils.getStatus(TSStatusCode.SUCCESS_STATUS);
+ }
+
+ // ── Partition index comparison
───────────────────────────────────────────
+ if (logEntry.getPartitionIndex() < maxPartitionIndex) {
+ // Follower's partition is ahead of what the leader sent – inconsistency.
+ LOGGER.error(
+ "Partition inconsistency on follower {}: entry partitionIndex={},
follower maxPartitionIndex={}",
+ thisNode,
+ logEntry.getPartitionIndex(),
+ maxPartitionIndex);
+ return RpcUtils.getStatus(
+ TSStatusCode.INTERNAL_SERVER_ERROR,
+ "Partition index inconsistency: follower maxPartition="
+ + maxPartitionIndex
+ + " > entry partition="
+ + logEntry.getPartitionIndex());
+ }
+
+ // ── Persist and apply
────────────────────────────────────────────────────
+ try {
+ logStore.append(logEntry);
+ } catch (IOException e) {
+ LOGGER.error("Failed to persist replicated TRaft log {}",
logEntry.getLogIndex(), e);
+ return RpcUtils.getStatus(TSStatusCode.INTERNAL_SERVER_ERROR, "Failed to
persist log");
+ }
+
+ logicalClock.updateAndGet(v -> Math.max(v, logEntry.getLogIndex()));
+ updatePartitionIndexStat(logEntry);
+
+ IConsensusRequest deserializedRequest =
+
stateMachine.deserializeRequest(TRaftRequestParser.buildRequest(logEntry.getData()));
+ deserializedRequest.markAsGeneratedByRemoteConsensusLeader();
+ return stateMachine.write(deserializedRequest);
+ }
+
+ //
────────────────────────────────────────────────────────────────────────────
+ // Election
+ //
────────────────────────────────────────────────────────────────────────────
+
+ synchronized TRaftVoteResult requestVote(TRaftVoteRequest voteRequest) {
+ if (voteRequest.getTerm() < currentTerm) {
+ return new TRaftVoteResult(false, currentTerm);
+ }
+ if (voteRequest.getTerm() > currentTerm) {
+ currentTerm = voteRequest.getTerm();
+ votedFor = voteRequest.getCandidateId();
+ becomeFollower(-1);
+ return new TRaftVoteResult(true, currentTerm);
+ }
+ if (votedFor != -1 && votedFor != voteRequest.getCandidateId()) {
+ return new TRaftVoteResult(false, currentTerm);
+ }
+ int freshnessCompareResult =
+ compareCandidateFreshness(
+ voteRequest.getPartitionIndex(),
+ voteRequest.getCurrentPartitionIndexCount(),
+ maxPartitionIndex,
+ currentPartitionIndexCount);
+ boolean shouldVote = freshnessCompareResult > 0;
+ if (!shouldVote && freshnessCompareResult == 0) {
+ shouldVote = random.nextBoolean();
+ }
+ if (!shouldVote) {
+ return new TRaftVoteResult(false, currentTerm);
+ }
+ votedFor = voteRequest.getCandidateId();
+ return new TRaftVoteResult(true, currentTerm);
+ }
+
+ synchronized boolean campaignLeader() {
+ if (!active) {
+ return false;
+ }
+ role = TRaftRole.CANDIDATE;
+ leaderId = -1;
+ currentTerm++;
+ votedFor = thisNode.getNodeId();
+ int grantVotes = 1;
+ TRaftVoteRequest voteRequest =
+ new TRaftVoteRequest(
+ thisNode.getNodeId(), currentTerm, maxPartitionIndex,
currentPartitionIndexCount);
+ for (Peer follower : getFollowers()) {
+ Optional<TRaftServerImpl> followerServer =
TRaftNodeRegistry.resolveServer(follower);
+ if (!followerServer.isPresent()) {
+ continue;
+ }
+ TRaftVoteResult voteResult =
followerServer.get().requestVote(voteRequest);
+ if (voteResult.getTerm() > currentTerm) {
+ currentTerm = voteResult.getTerm();
+ becomeFollower(-1);
+ return false;
+ }
+ if (voteResult.isGranted()) {
+ grantVotes++;
+ }
+ }
+ if (grantVotes > configuration.size() / 2) {
+ becomeLeader();
+ return true;
+ }
+ becomeFollower(-1);
+ return false;
+ }
+
+ //
────────────────────────────────────────────────────────────────────────────
+ // Configuration management
+ //
────────────────────────────────────────────────────────────────────────────
+
+ synchronized void transferLeader(Peer newLeader) {
+ if (Objects.equals(newLeader, thisNode)) {
+ campaignLeader();
+ return;
+ }
+ Optional<TRaftServerImpl> targetServer =
TRaftNodeRegistry.resolveServer(newLeader);
+ if (!targetServer.isPresent()) {
+ return;
+ }
+ targetServer.get().campaignLeader();
+ if (targetServer.get().isLeader()) {
+ becomeFollower(newLeader.getNodeId());
+ }
+ }
+
+ synchronized void resetPeerList(List<Peer> newPeers) throws IOException {
+ configuration.clear();
+ configuration.addAll(newPeers);
+ if (!configuration.contains(thisNode)) {
+ configuration.add(thisNode);
+ }
+ persistConfiguration();
+ electInitialLeader();
+ initFollowerInfoMap();
+ if (started && role == TRaftRole.LEADER) {
+ stopAppenders();
+ initAppenders();
+ }
+ }
+
+ synchronized void addPeer(Peer peer) throws IOException {
+ configuration.add(peer);
+ persistConfiguration();
+ initFollowerInfoMap();
+ if (started && role == TRaftRole.LEADER) {
+ stopAppenders();
+ initAppenders();
+ }
+ }
+
+ synchronized void removePeer(Peer peer) throws IOException {
+ configuration.remove(peer);
+ persistConfiguration();
+ initFollowerInfoMap();
+ if (started && role == TRaftRole.LEADER) {
+ stopAppenders();
+ initAppenders();
+ }
+ if (peer.getNodeId() == leaderId) {
+ electInitialLeader();
+ }
+ }
+
+ //
────────────────────────────────────────────────────────────────────────────
+ // Background appender: called by TRaftLogAppender every
waitingReplicationTimeMs
+ //
────────────────────────────────────────────────────────────────────────────
+
+ /**
+ * Entry point for the per-follower {@link TRaftLogAppender} background
thread.
+ *
+ * <p>Checks whether the follower has pending delayed entries and, if so,
attempts to dispatch
+ * the next one from disk. This provides a periodic retry mechanism for
followers that were
+ * offline during the original write, and ensures that entries written to
disk during partition
+ * transitions are eventually transmitted.
+ */
+ synchronized void tryReplicateDiskEntriesToFollower(Peer follower) {
+ if (!started || role != TRaftRole.LEADER || !active) {
+ return;
+ }
+ TRaftFollowerInfo info = followerInfoMap.get(follower.getNodeId());
+ if (info == null || !info.hasDelayedEntries() ||
info.hasCurrentReplicatingIndex()) {
+ return;
+ }
+ dispatchNextDelayedEntry(follower, info);
+ }
+
+ //
────────────────────────────────────────────────────────────────────────────
+ // Accessors
+ //
────────────────────────────────────────────────────────────────────────────
+
+ synchronized Peer getLeader() {
+ return configuration.stream()
+ .filter(peer -> peer.getNodeId() == leaderId)
+ .findFirst()
+ .orElse(null);
+ }
+
+ synchronized List<Peer> getConfiguration() {
+ return new ArrayList<>(configuration);
+ }
+
+ synchronized long getLogicalClock() {
+ return logicalClock.get();
+ }
+
+ synchronized boolean isLeader() {
+ return role == TRaftRole.LEADER;
+ }
+
+ synchronized boolean isLeaderReady() {
+ return isLeader() && active;
+ }
+
+ synchronized boolean isReadOnly() {
+ return stateMachine.isReadOnly();
+ }
+
+ synchronized boolean isActive() {
+ return active;
+ }
+
+ synchronized void setActive(boolean active) {
+ this.active = active;
+ }
+
+ synchronized long getCurrentPartitionIndexCount() {
+ return currentPartitionIndexCount;
+ }
+
+ synchronized List<TRaftLogEntry> getLogEntries() {
+ return logStore.getAllEntries();
+ }
+
+ synchronized TRaftFollowerInfo getFollowerInfo(int followerNodeId) {
+ return followerInfoMap.get(followerNodeId);
+ }
+
+ synchronized long getLatestPartitionIndex() {
+ TRaftLogEntry last = logStore.getLastEntry();
+ return last == null ? INITIAL_PARTITION_INDEX : last.getPartitionIndex();
+ }
+
+ synchronized long getCurrentLeaderInsertingPartitionIndex() {
+ return currentLeaderInsertingPartitionIndex;
+ }
+
+ synchronized int getPendingQuorumEntryCount() {
+ return currentReplicationgIndicesToAckFollowerCount.size();
+ }
+
+ synchronized void reloadConsensusConfig(TRaftConfig config) {
+ this.config = config;
+ }
+
+ //
────────────────────────────────────────────────────────────────────────────
+ // Private: log entry construction
+ //
────────────────────────────────────────────────────────────────────────────
+
+ private TRaftLogEntry buildLogEntry(IConsensusRequest request) {
+ TRaftLogEntry previous = logStore.getLastEntry();
+ long timestamp =
+ TRaftRequestParser.extractTimestamp(
+ request, historicalMaxTimestamp == Long.MIN_VALUE ? 0 :
historicalMaxTimestamp);
+ long logIndex = previous == null ? 1 : previous.getLogIndex() + 1;
+ long logTerm = currentTerm;
+ long partitionIndex;
+ long interPartitionIndex;
+ long lastPartitionCount;
+ if (previous == null) {
+ partitionIndex = INITIAL_PARTITION_INDEX;
+ interPartitionIndex = INITIAL_PARTITION_INDEX;
+ lastPartitionCount = 0;
+ historicalMaxTimestamp = timestamp;
+ } else if (timestamp < historicalMaxTimestamp) {
+ partitionIndex = previous.getPartitionIndex() + 1;
+ interPartitionIndex = 0;
+ lastPartitionCount = previous.getInterPartitionIndex();
+ } else {
+ partitionIndex = previous.getPartitionIndex();
+ interPartitionIndex = previous.getInterPartitionIndex() + 1;
+ lastPartitionCount = previous.getLastPartitionCount();
+ historicalMaxTimestamp = Math.max(historicalMaxTimestamp, timestamp);
+ }
+ return new TRaftLogEntry(
+ timestamp,
+ partitionIndex,
+ logIndex,
+ logTerm,
+ interPartitionIndex,
+ lastPartitionCount,
+ TRaftRequestParser.extractRawRequest(request));
+ }
+
+ //
────────────────────────────────────────────────────────────────────────────
+ // Private: follower ACK and quorum tracking
+ //
────────────────────────────────────────────────────────────────────────────
+
+ /**
+ * Called when a follower successfully receives and acknowledges {@code
ackEntry}.
+ *
+ * <ol>
+ * <li>Clears the entry from the follower's in-flight tracking.
+ * <li>Updates the quorum commit counter for this log index.
+ * <li>Chains dispatch of the next pending delayed entry for this follower.
+ * </ol>
+ */
+ private void onFollowerAck(Peer follower, TRaftFollowerInfo info,
TRaftLogEntry ackEntry) {
+ info.removeCurrentReplicatingIndex(ackEntry.getLogIndex());
+ info.removeDelayedIndex(ackEntry.getPartitionIndex(),
ackEntry.getLogIndex());
+ updateQuorumTracking(ackEntry.getLogIndex());
+ dispatchNextDelayedEntry(follower, info);
+ }
+
+ /**
+ * Increments the ACK counter for {@code logIndex}. When the count reaches
at least half the
+ * follower count the entry is considered committed and removed from the
tracking map.
+ */
+ private void updateQuorumTracking(long logIndex) {
+ AtomicInteger counter =
currentReplicationgIndicesToAckFollowerCount.get(logIndex);
+ if (counter == null) {
+ return;
+ }
+ int count = counter.incrementAndGet();
+ int followerCount = getFollowers().size();
+ // Quorum: ACK count >= half of follower count (per the requirement).
+ // Using count * 2 >= followerCount avoids floating-point and handles all
sizes correctly.
+ if (followerCount > 0 && count * 2 >= followerCount) {
+ currentReplicationgIndicesToAckFollowerCount.remove(logIndex);
+ }
+ }
+
+ private void onFollowerSendFailed(TRaftFollowerInfo info, TRaftLogEntry
failedEntry) {
+ info.removeCurrentReplicatingIndex(failedEntry.getLogIndex());
+ info.addDelayedIndex(failedEntry.getPartitionIndex(),
failedEntry.getLogIndex());
+ limitDelayedEntriesIfNecessary(info);
+ }
+
+ //
────────────────────────────────────────────────────────────────────────────
+ // Private: disk dispatch
+ //
────────────────────────────────────────────────────────────────────────────
+
+ /**
+ * Attempts to dispatch the next pending delayed log entry for {@code
follower} from the local
+ * disk. After a successful dispatch the chain continues via {@link
#onFollowerAck} so that
+ * subsequent delayed entries in the same (or an advanced) partition are
sent without waiting
+ * for the next appender cycle.
+ */
+ private void dispatchNextDelayedEntry(Peer follower, TRaftFollowerInfo info)
{
+ if (info.hasCurrentReplicatingIndex()) {
+ return;
+ }
+
+ // Advance the follower's current replicating partition if possible.
+ Long firstDelayedPartition = info.getFirstDelayedPartitionIndex();
+ if (firstDelayedPartition == null) {
+ return;
+ }
+ if (firstDelayedPartition > info.getCurrentReplicatingPartitionIndex()) {
+ info.setCurrentReplicatingPartitionIndex(firstDelayedPartition);
+ }
+
+ Long delayedIndex =
+
info.getFirstDelayedIndexOfPartition(info.getCurrentReplicatingPartitionIndex());
+ if (delayedIndex == null) {
+ return;
+ }
+ info.setNextPartitionFirstIndex(delayedIndex);
+
+ TRaftLogEntry diskEntry = logStore.getByIndex(delayedIndex);
+ if (diskEntry == null) {
+ // Entry not yet on disk (race between write path and appender); retry
next cycle.
+ return;
+ }
+
+ info.addCurrentReplicatingIndex(diskEntry.getLogIndex());
+ boolean success = sendEntryToFollower(follower, diskEntry);
+ if (success) {
+ info.recordDiskReplicationSuccess();
+ // ACK handling: remove from in-flight, update quorum, chain next
dispatch.
+ info.removeCurrentReplicatingIndex(diskEntry.getLogIndex());
+ info.removeDelayedIndex(diskEntry.getPartitionIndex(),
diskEntry.getLogIndex());
+ updateQuorumTracking(diskEntry.getLogIndex());
+ // Recurse into the chain until there is nothing left or a send fails.
+ dispatchNextDelayedEntry(follower, info);
+ } else {
+ info.removeCurrentReplicatingIndex(diskEntry.getLogIndex());
+ }
+ }
+
+ //
────────────────────────────────────────────────────────────────────────────
+ // Private: RPC helpers
+ //
────────────────────────────────────────────────────────────────────────────
+
+ private boolean sendEntryToFollower(Peer follower, TRaftLogEntry entry) {
+ Optional<TRaftServerImpl> followerServer =
TRaftNodeRegistry.resolveServer(follower);
+ if (!followerServer.isPresent()) {
+ return false;
+ }
+ TSStatus status =
+ followerServer
+ .get()
+ .receiveReplicatedLog(entry.copy(), thisNode.getNodeId(),
currentTerm);
+ return isSuccess(status);
+ }
+
+ //
────────────────────────────────────────────────────────────────────────────
+ // Private: misc helpers
+ //
────────────────────────────────────────────────────────────────────────────
+
+ private void limitDelayedEntriesIfNecessary(TRaftFollowerInfo info) {
+ if (info.delayedEntryCount()
+ <= config.getReplication().getMaxPendingRetryEntriesPerFollower()) {
+ return;
+ }
+ LOGGER.warn(
+ "Delayed TRaft entries for a follower exceed limit {}. Current count:
{}",
+ config.getReplication().getMaxPendingRetryEntriesPerFollower(),
+ info.delayedEntryCount());
+ }
+
+ private List<Peer> getFollowers() {
+ List<Peer> followers = new ArrayList<>();
+ for (Peer peer : configuration) {
+ if (peer.getNodeId() != thisNode.getNodeId()) {
+ followers.add(peer);
+ }
+ }
+ return followers;
+ }
+
+ //
────────────────────────────────────────────────────────────────────────────
+ // Private: follower info init
+ //
────────────────────────────────────────────────────────────────────────────
+
+ private void initFollowerInfoMap() {
+ followerInfoMap.clear();
+ long latestPartitionIndex = getLatestPartitionIndex();
+ long nextLogIndex = logicalClock.get() + 1;
+ for (Peer follower : getFollowers()) {
+ followerInfoMap.put(
+ follower.getNodeId(), new TRaftFollowerInfo(latestPartitionIndex,
nextLogIndex));
+ }
+ }
+
+ //
────────────────────────────────────────────────────────────────────────────
+ // Private: appender lifecycle
+ //
────────────────────────────────────────────────────────────────────────────
+
+ private void initAppenders() {
+ if (appenderExecutor == null || appenderExecutor.isShutdown()) {
+ appenderExecutor = Executors.newCachedThreadPool(
+ r -> {
+ Thread t = new Thread(r);
+ t.setName("TRaftAppender-" + thisNode.getGroupId());
+ t.setDaemon(true);
+ return t;
+ });
+ }
+ long waitMs = config.getReplication().getWaitingReplicationTimeMs();
+ for (Peer follower : getFollowers()) {
+ TRaftLogAppender appender = new TRaftLogAppender(this, follower, waitMs);
+ appenderMap.put(follower.getNodeId(), appender);
+ appenderExecutor.submit(appender);
+ }
+ }
+
+ private void stopAppenders() {
+ // Signal all appenders to stop, then interrupt their threads via
shutdownNow().
+ // We do NOT block here: stopAppenders() is called while holding the
server lock, and appender
+ // threads also need that lock for tryReplicateDiskEntriesToFollower().
Waiting here would
+ // deadlock. Instead, the guard in tryReplicateDiskEntriesToFollower()
(started == false)
+ // ensures appenders become no-ops as soon as they wake from their sleep,
and they exit on
+ // the next loop iteration once they detect stopped==true or an
InterruptedException.
+ appenderMap.values().forEach(TRaftLogAppender::stop);
+ appenderMap.clear();
+ if (appenderExecutor != null) {
+ appenderExecutor.shutdownNow();
+ appenderExecutor = null;
+ }
+ }
+
+ //
────────────────────────────────────────────────────────────────────────────
+ // Private: leader/follower transitions
+ //
────────────────────────────────────────────────────────────────────────────
+
+ private void electInitialLeader() {
+ leaderId =
+ configuration.stream()
+ .map(Peer::getNodeId)
+ .max(Integer::compareTo)
+ .orElse(thisNode.getNodeId());
+ if (leaderId == thisNode.getNodeId()) {
+ role = TRaftRole.LEADER;
+ } else {
+ role = TRaftRole.FOLLOWER;
+ }
+ }
+
+ private void becomeLeader() {
+ role = TRaftRole.LEADER;
+ leaderId = thisNode.getNodeId();
+ votedFor = thisNode.getNodeId();
+ currentReplicationgIndicesToAckFollowerCount.clear();
+ initFollowerInfoMap();
+ if (started) {
+ stopAppenders();
+ initAppenders();
+ notifyLeaderChanged();
+ }
+ }
+
+ private void becomeFollower(int newLeaderId) {
+ boolean wasLeader = (role == TRaftRole.LEADER);
+ role = TRaftRole.FOLLOWER;
+ leaderId = newLeaderId;
+ if (wasLeader) {
+ stopAppenders();
+ }
+ if (started) {
+ notifyLeaderChanged();
+ }
+ }
+
+ private void notifyLeaderChanged() {
+ if (leaderId != -1) {
+ stateMachine.event().notifyLeaderChanged(thisNode.getGroupId(),
leaderId);
+ }
+ if (role == TRaftRole.LEADER) {
+ stateMachine.event().notifyLeaderReady();
+ } else {
+ stateMachine.event().notifyNotLeader();
+ }
+ }
+
+ //
────────────────────────────────────────────────────────────────────────────
+ // Private: recovery
+ //
────────────────────────────────────────────────────────────────────────────
+
+ private void recoverFromDisk() {
+ List<TRaftLogEntry> entries = logStore.getAllEntries();
+ for (TRaftLogEntry entry : entries) {
+ logicalClock.updateAndGet(v -> Math.max(v, entry.getLogIndex()));
+ currentTerm = Math.max(currentTerm, entry.getLogTerm());
+ historicalMaxTimestamp = Math.max(historicalMaxTimestamp,
entry.getTimestamp());
+ updatePartitionIndexStat(entry);
+ }
+ // On recovery, the inserting partition pointer starts at the maximum
persisted partition.
+ currentLeaderInsertingPartitionIndex = maxPartitionIndex;
+ }
+
+ private void updatePartitionIndexStat(TRaftLogEntry entry) {
+ if (entry.getPartitionIndex() > maxPartitionIndex) {
+ maxPartitionIndex = entry.getPartitionIndex();
+ currentPartitionIndexCount = 1;
+ return;
+ }
+ if (entry.getPartitionIndex() == maxPartitionIndex) {
+ currentPartitionIndexCount++;
+ }
+ }
+
+ //
────────────────────────────────────────────────────────────────────────────
+ // Private: configuration persistence
+ //
────────────────────────────────────────────────────────────────────────────
+
+ private TreeSet<Peer> loadConfigurationFromDisk() throws IOException {
+ TreeSet<Peer> peers = new TreeSet<>();
+ File file = new File(storageDir, CONFIGURATION_FILE_NAME);
+ if (!file.exists()) {
+ return peers;
+ }
+ try (BufferedReader reader = Files.newBufferedReader(file.toPath(),
StandardCharsets.UTF_8)) {
+ String line;
+ while ((line = reader.readLine()) != null) {
+ String[] split = line.split(",", 3);
+ if (split.length != 3) {
+ continue;
+ }
+ int nodeId = Integer.parseInt(split[0]);
+ String ip = split[1];
+ int port = Integer.parseInt(split[2]);
+ peers.add(new Peer(thisNode.getGroupId(), nodeId, new TEndPoint(ip,
port)));
+ }
+ }
+ return peers;
+ }
+
+ private void persistConfiguration() throws IOException {
+ File file = new File(storageDir, CONFIGURATION_FILE_NAME);
+ try (BufferedWriter writer = Files.newBufferedWriter(file.toPath(),
StandardCharsets.UTF_8)) {
+ for (Peer peer : configuration) {
+ writer.write(
+ String.format(
+ "%s,%s,%s",
+ peer.getNodeId(), peer.getEndpoint().getIp(),
peer.getEndpoint().getPort()));
+ writer.newLine();
+ }
+ }
+ }
+
+ //
────────────────────────────────────────────────────────────────────────────
+ // Private: election utilities
+ //
────────────────────────────────────────────────────────────────────────────
+
+ private int compareCandidateFreshness(
+ long candidatePartitionIndex,
+ long candidatePartitionCount,
+ long localPartitionIndex,
+ long localPartitionCount) {
+ int partitionCompare = Long.compare(candidatePartitionIndex,
localPartitionIndex);
+ if (partitionCompare != 0) {
+ return partitionCompare;
+ }
+ return Long.compare(candidatePartitionCount, localPartitionCount);
+ }
+
+ private boolean isSuccess(TSStatus status) {
+ return status != null
+ && (status.getCode() == 0
+ || status.getCode() ==
TSStatusCode.SUCCESS_STATUS.getStatusCode());
+ }
+}
diff --git
a/iotdb-core/consensus/src/main/java/org/apache/iotdb/consensus/common/request/IConsensusRequest.java
b/iotdb-core/consensus/src/main/java/org/apache/iotdb/consensus/traft/TRaftVoteRequest.java
similarity index 51%
copy from
iotdb-core/consensus/src/main/java/org/apache/iotdb/consensus/common/request/IConsensusRequest.java
copy to
iotdb-core/consensus/src/main/java/org/apache/iotdb/consensus/traft/TRaftVoteRequest.java
index cae54c2bcbc..9393f518804 100644
---
a/iotdb-core/consensus/src/main/java/org/apache/iotdb/consensus/common/request/IConsensusRequest.java
+++
b/iotdb-core/consensus/src/main/java/org/apache/iotdb/consensus/traft/TRaftVoteRequest.java
@@ -17,31 +17,35 @@
* under the License.
*/
-package org.apache.iotdb.consensus.common.request;
-
-import java.nio.ByteBuffer;
-
-public interface IConsensusRequest {
- /**
- * Serialize all the data to a ByteBuffer.
- *
- * <p>In a specific implementation, ByteBuf or PublicBAOS can be used to
reduce the number of
- * memory copies.
- *
- * <p>To improve efficiency, a specific implementation could return a
DirectByteBuffer to reduce
- * the memory copy required to send an RPC
- *
- * <p>Note: The implementation needs to ensure that the data in the returned
Bytebuffer cannot be
- * changed or an error may occur
- */
- ByteBuffer serializeToByteBuffer();
-
- default long getMemorySize() {
- // return 0 by default
- return 0;
+package org.apache.iotdb.consensus.traft;
+
+class TRaftVoteRequest {
+
+ private final int candidateId;
+ private final long term;
+ private final long partitionIndex;
+ private final long currentPartitionIndexCount;
+
+ TRaftVoteRequest(int candidateId, long term, long partitionIndex, long
currentPartitionIndexCount) {
+ this.candidateId = candidateId;
+ this.term = term;
+ this.partitionIndex = partitionIndex;
+ this.currentPartitionIndexCount = currentPartitionIndexCount;
+ }
+
+ int getCandidateId() {
+ return candidateId;
+ }
+
+ long getTerm() {
+ return term;
+ }
+
+ long getPartitionIndex() {
+ return partitionIndex;
}
- default void markAsGeneratedByRemoteConsensusLeader() {
- // do nothing by default
+ long getCurrentPartitionIndexCount() {
+ return currentPartitionIndexCount;
}
}
diff --git
a/iotdb-core/consensus/src/main/java/org/apache/iotdb/consensus/common/request/IConsensusRequest.java
b/iotdb-core/consensus/src/main/java/org/apache/iotdb/consensus/traft/TRaftVoteResult.java
similarity index 51%
copy from
iotdb-core/consensus/src/main/java/org/apache/iotdb/consensus/common/request/IConsensusRequest.java
copy to
iotdb-core/consensus/src/main/java/org/apache/iotdb/consensus/traft/TRaftVoteResult.java
index cae54c2bcbc..6e905c3f88e 100644
---
a/iotdb-core/consensus/src/main/java/org/apache/iotdb/consensus/common/request/IConsensusRequest.java
+++
b/iotdb-core/consensus/src/main/java/org/apache/iotdb/consensus/traft/TRaftVoteResult.java
@@ -17,31 +17,23 @@
* under the License.
*/
-package org.apache.iotdb.consensus.common.request;
+package org.apache.iotdb.consensus.traft;
-import java.nio.ByteBuffer;
+class TRaftVoteResult {
-public interface IConsensusRequest {
- /**
- * Serialize all the data to a ByteBuffer.
- *
- * <p>In a specific implementation, ByteBuf or PublicBAOS can be used to
reduce the number of
- * memory copies.
- *
- * <p>To improve efficiency, a specific implementation could return a
DirectByteBuffer to reduce
- * the memory copy required to send an RPC
- *
- * <p>Note: The implementation needs to ensure that the data in the returned
Bytebuffer cannot be
- * changed or an error may occur
- */
- ByteBuffer serializeToByteBuffer();
+ private final boolean granted;
+ private final long term;
- default long getMemorySize() {
- // return 0 by default
- return 0;
+ TRaftVoteResult(boolean granted, long term) {
+ this.granted = granted;
+ this.term = term;
}
- default void markAsGeneratedByRemoteConsensusLeader() {
- // do nothing by default
+ boolean isGranted() {
+ return granted;
+ }
+
+ long getTerm() {
+ return term;
}
}
diff --git
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/consensus/DataRegionConsensusImpl.java
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/consensus/DataRegionConsensusImpl.java
index a33cdd11024..7481a261fef 100644
---
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/consensus/DataRegionConsensusImpl.java
+++
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/consensus/DataRegionConsensusImpl.java
@@ -38,6 +38,7 @@ import org.apache.iotdb.consensus.config.PipeConsensusConfig;
import org.apache.iotdb.consensus.config.PipeConsensusConfig.ReplicateMode;
import org.apache.iotdb.consensus.config.RatisConfig;
import org.apache.iotdb.consensus.config.RatisConfig.Snapshot;
+import org.apache.iotdb.consensus.config.TRaftConfig;
import org.apache.iotdb.db.conf.DataNodeMemoryConfig;
import org.apache.iotdb.db.conf.IoTDBConfig;
import org.apache.iotdb.db.conf.IoTDBDescriptor;
@@ -286,6 +287,7 @@ public class DataRegionConsensusImpl {
CONF.getConnectionTimeoutInMS(),
TimeUnit.MILLISECONDS))
.build())
.build())
+ .setTRaftConfig(TRaftConfig.newBuilder().build())
.build();
}
}
diff --git
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/planner/plan/node/pipe/PipeEnrichedInsertNode.java
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/planner/plan/node/pipe/PipeEnrichedInsertNode.java
index 2e517700217..369324b0850 100644
---
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/planner/plan/node/pipe/PipeEnrichedInsertNode.java
+++
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/planner/plan/node/pipe/PipeEnrichedInsertNode.java
@@ -259,6 +259,16 @@ public class PipeEnrichedInsertNode extends InsertNode {
return insertNode.getMinTime();
}
+ @Override
+ public boolean hasTime() {
+ return insertNode.hasTime();
+ }
+
+ @Override
+ public long getTime() {
+ return insertNode.getTime();
+ }
+
@Override
public void markFailedMeasurement(final int index) {
insertNode.markFailedMeasurement(index);
diff --git
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/planner/plan/node/write/InsertNode.java
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/planner/plan/node/write/InsertNode.java
index 88a6faa0047..021317b496d 100644
---
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/planner/plan/node/write/InsertNode.java
+++
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/planner/plan/node/write/InsertNode.java
@@ -239,6 +239,7 @@ public abstract class InsertNode extends SearchNode {
switch (config.getDataRegionConsensusProtocolClass()) {
case ConsensusFactory.IOT_CONSENSUS:
case ConsensusFactory.IOT_CONSENSUS_V2:
+ case ConsensusFactory.TRAFT_CONSENSUS:
case ConsensusFactory.RATIS_CONSENSUS:
return isGeneratedByRemoteConsensusLeader;
case ConsensusFactory.SIMPLE_CONSENSUS:
@@ -315,6 +316,11 @@ public abstract class InsertNode extends SearchNode {
public abstract long getMinTime();
+ @Override
+ public boolean hasTime() {
+ return true;
+ }
+
// region partial insert
public void markFailedMeasurement(int index) {
throw new UnsupportedOperationException();
diff --git
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/planner/plan/node/write/InsertRowNode.java
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/planner/plan/node/write/InsertRowNode.java
index a2bd6cb1a00..57115b8061a 100644
---
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/planner/plan/node/write/InsertRowNode.java
+++
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/planner/plan/node/write/InsertRowNode.java
@@ -208,6 +208,7 @@ public class InsertRowNode extends InsertNode implements
WALEntryValue {
this.values = values;
}
+ @Override
public long getTime() {
return time;
}
diff --git
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/planner/plan/node/write/InsertRowsNode.java
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/planner/plan/node/write/InsertRowsNode.java
index 7392b761270..83c35482d90 100644
---
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/planner/plan/node/write/InsertRowsNode.java
+++
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/planner/plan/node/write/InsertRowsNode.java
@@ -301,6 +301,11 @@ public class InsertRowsNode extends InsertNode implements
WALEntryValue {
return visitor.visitInsertRows(this, context);
}
+ @Override
+ public long getTime() {
+ return getMinTime();
+ }
+
@Override
public long getMinTime() {
return insertRowNodeList.stream()
diff --git
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/planner/plan/node/write/InsertTabletNode.java
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/planner/plan/node/write/InsertTabletNode.java
index 39683e5d9f9..7fb2454624c 100644
---
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/planner/plan/node/write/InsertTabletNode.java
+++
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/planner/plan/node/write/InsertTabletNode.java
@@ -449,6 +449,11 @@ public class InsertTabletNode extends InsertNode
implements WALEntryValue {
columns[index] = null;
}
+ @Override
+ public long getTime() {
+ return getMinTime();
+ }
+
@Override
public long getMinTime() {
return times[0];
diff --git
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/planner/plan/node/write/RelationalInsertRowNode.java
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/planner/plan/node/write/RelationalInsertRowNode.java
index 08c78328732..1df485fffea 100644
---
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/planner/plan/node/write/RelationalInsertRowNode.java
+++
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/planner/plan/node/write/RelationalInsertRowNode.java
@@ -108,6 +108,11 @@ public class RelationalInsertRowNode extends InsertRowNode
{
return visitor.visitRelationalInsertRow(this, context);
}
+ @Override
+ public long getTime() {
+ return super.getTime();
+ }
+
public static RelationalInsertRowNode deserialize(ByteBuffer byteBuffer) {
RelationalInsertRowNode insertNode = new RelationalInsertRowNode(new
PlanNodeId(""));
insertNode.subDeserialize(byteBuffer);
diff --git
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/planner/plan/node/write/RelationalInsertRowsNode.java
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/planner/plan/node/write/RelationalInsertRowsNode.java
index 594ccf50471..f316177e263 100644
---
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/planner/plan/node/write/RelationalInsertRowsNode.java
+++
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/planner/plan/node/write/RelationalInsertRowsNode.java
@@ -79,6 +79,12 @@ public class RelationalInsertRowsNode extends InsertRowsNode
{
return visitor.visitRelationalInsertRows(this, context);
}
+ @Override
+ public long getTime() {
+ return super.getTime();
+ }
+
+
public static RelationalInsertRowsNode deserialize(ByteBuffer byteBuffer) {
PlanNodeId planNodeId;
List<InsertRowNode> insertRowNodeList = new ArrayList<>();
diff --git
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/planner/plan/node/write/RelationalInsertTabletNode.java
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/planner/plan/node/write/RelationalInsertTabletNode.java
index 257f691e4a7..01ae91d64e5 100644
---
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/planner/plan/node/write/RelationalInsertTabletNode.java
+++
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/planner/plan/node/write/RelationalInsertTabletNode.java
@@ -175,6 +175,11 @@ public class RelationalInsertTabletNode extends
InsertTabletNode {
return visitor.visitRelationalInsertTablet(this, context);
}
+ @Override
+ public long getTime() {
+ return super.getTime();
+ }
+
@Override
protected InsertTabletNode getEmptySplit(int count) {
long[] subTimes = new long[count];
diff --git
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/service/DataNode.java
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/service/DataNode.java
index 2d7cbe18358..03c06136d5a 100644
---
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/service/DataNode.java
+++
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/service/DataNode.java
@@ -240,7 +240,7 @@ public class DataNode extends ServerCommandLine implements
DataNodeMBean {
if (isFirstStart) {
sendRegisterRequestToConfigNode(true);
IoTDBStartCheck.getInstance().generateOrOverwriteSystemPropertiesFile();
- IoTDBStartCheck.getInstance().serializeEncryptMagicString();
+// IoTDBStartCheck.getInstance().serializeEncryptMagicString();
ConfigNodeInfo.getInstance().storeConfigNodeList();
// Register this DataNode to the cluster when first start
sendRegisterRequestToConfigNode(false);