This is an automated email from the ASF dual-hosted git repository.

colinlee pushed a commit to branch research/traft
in repository https://gitbox.apache.org/repos/asf/iotdb.git


The following commit(s) were added to refs/heads/research/traft by this push:
     new 6de79acdbe7 [TRaft]Raft Optimization for Time series Workloads (#17273)
6de79acdbe7 is described below

commit 6de79acdbe7f0576eaf57bf218e7162ea5e37031
Author: Little Health <[email protected]>
AuthorDate: Sun Mar 8 23:06:30 2026 +0800

    [TRaft]Raft Optimization for Time series Workloads (#17273)
---
 .../manager/load/balancer/RouteBalancer.java       |   3 +
 .../procedure/env/RegionMaintainHandler.java       |   4 +-
 .../apache/iotdb/consensus/ConsensusFactory.java   |   1 +
 .../common/request/IConsensusRequest.java          |  14 +
 .../iotdb/consensus/config/ConsensusConfig.java    |  18 +-
 .../apache/iotdb/consensus/config/TRaftConfig.java | 141 +++
 .../iotdb/consensus/traft/TRaftConsensus.java      | 328 +++++++
 .../iotdb/consensus/traft/TRaftFollowerInfo.java   | 153 ++++
 .../iotdb/consensus/traft/TRaftLogAppender.java    |  95 ++
 .../iotdb/consensus/traft/TRaftLogEntry.java       |  89 ++
 .../iotdb/consensus/traft/TRaftLogStore.java       | 155 ++++
 .../iotdb/consensus/traft/TRaftNodeRegistry.java   |  54 ++
 .../iotdb/consensus/traft/TRaftRequestParser.java  |  52 ++
 .../TRaftRole.java}                                |  31 +-
 .../iotdb/consensus/traft/TRaftServerImpl.java     | 985 +++++++++++++++++++++
 .../TRaftVoteRequest.java}                         |  52 +-
 .../TRaftVoteResult.java}                          |  34 +-
 .../db/consensus/DataRegionConsensusImpl.java      |   2 +
 .../plan/node/pipe/PipeEnrichedInsertNode.java     |  10 +
 .../plan/planner/plan/node/write/InsertNode.java   |   6 +
 .../planner/plan/node/write/InsertRowNode.java     |   1 +
 .../planner/plan/node/write/InsertRowsNode.java    |   5 +
 .../planner/plan/node/write/InsertTabletNode.java  |   5 +
 .../plan/node/write/RelationalInsertRowNode.java   |   5 +
 .../plan/node/write/RelationalInsertRowsNode.java  |   6 +
 .../node/write/RelationalInsertTabletNode.java     |   5 +
 .../java/org/apache/iotdb/db/service/DataNode.java |   2 +-
 27 files changed, 2181 insertions(+), 75 deletions(-)

diff --git 
a/iotdb-core/confignode/src/main/java/org/apache/iotdb/confignode/manager/load/balancer/RouteBalancer.java
 
b/iotdb-core/confignode/src/main/java/org/apache/iotdb/confignode/manager/load/balancer/RouteBalancer.java
index 37d86daf153..347a8e5399e 100644
--- 
a/iotdb-core/confignode/src/main/java/org/apache/iotdb/confignode/manager/load/balancer/RouteBalancer.java
+++ 
b/iotdb-core/confignode/src/main/java/org/apache/iotdb/confignode/manager/load/balancer/RouteBalancer.java
@@ -87,6 +87,8 @@ public class RouteBalancer implements 
IClusterStatusSubscriber {
               && 
ConsensusFactory.RATIS_CONSENSUS.equals(DATA_REGION_CONSENSUS_PROTOCOL_CLASS))
           || (CONF.isEnableAutoLeaderBalanceForIoTConsensus()
               && 
ConsensusFactory.IOT_CONSENSUS.equals(DATA_REGION_CONSENSUS_PROTOCOL_CLASS))
+          || (CONF.isEnableAutoLeaderBalanceForIoTConsensus()
+              && 
ConsensusFactory.TRAFT_CONSENSUS.equals(DATA_REGION_CONSENSUS_PROTOCOL_CLASS))
           || (CONF.isEnableAutoLeaderBalanceForIoTConsensus()
               && 
ConsensusFactory.IOT_CONSENSUS_V2.equals(DATA_REGION_CONSENSUS_PROTOCOL_CLASS))
           // The simple consensus protocol will always automatically designate 
itself as the leader
@@ -200,6 +202,7 @@ public class RouteBalancer implements 
IClusterStatusSubscriber {
                 newLeaderId);
             switch (consensusProtocolClass) {
               case ConsensusFactory.IOT_CONSENSUS:
+              case ConsensusFactory.TRAFT_CONSENSUS:
               case ConsensusFactory.SIMPLE_CONSENSUS:
                 // For IoTConsensus or SimpleConsensus protocol, change
                 // RegionRouteMap is enough
diff --git 
a/iotdb-core/confignode/src/main/java/org/apache/iotdb/confignode/procedure/env/RegionMaintainHandler.java
 
b/iotdb-core/confignode/src/main/java/org/apache/iotdb/confignode/procedure/env/RegionMaintainHandler.java
index f827adea5d4..e9ae21e9492 100644
--- 
a/iotdb-core/confignode/src/main/java/org/apache/iotdb/confignode/procedure/env/RegionMaintainHandler.java
+++ 
b/iotdb-core/confignode/src/main/java/org/apache/iotdb/confignode/procedure/env/RegionMaintainHandler.java
@@ -91,6 +91,7 @@ import static 
org.apache.iotdb.confignode.conf.ConfigNodeConstant.REGION_MIGRATE
 import static org.apache.iotdb.consensus.ConsensusFactory.IOT_CONSENSUS;
 import static org.apache.iotdb.consensus.ConsensusFactory.IOT_CONSENSUS_V2;
 import static org.apache.iotdb.consensus.ConsensusFactory.RATIS_CONSENSUS;
+import static org.apache.iotdb.consensus.ConsensusFactory.TRAFT_CONSENSUS;
 
 public class RegionMaintainHandler {
 
@@ -171,7 +172,8 @@ public class RegionMaintainHandler {
     List<TDataNodeLocation> currentPeerNodes;
     if (TConsensusGroupType.DataRegion.equals(regionId.getType())
         && (IOT_CONSENSUS.equals(CONF.getDataRegionConsensusProtocolClass())
-            || 
IOT_CONSENSUS_V2.equals(CONF.getDataRegionConsensusProtocolClass()))) {
+            || 
IOT_CONSENSUS_V2.equals(CONF.getDataRegionConsensusProtocolClass())
+            || 
TRAFT_CONSENSUS.equals(CONF.getDataRegionConsensusProtocolClass()))) {
       // parameter of createPeer for MultiLeader should be all peers
       currentPeerNodes = new ArrayList<>(regionReplicaNodes);
       currentPeerNodes.add(destDataNode);
diff --git 
a/iotdb-core/consensus/src/main/java/org/apache/iotdb/consensus/ConsensusFactory.java
 
b/iotdb-core/consensus/src/main/java/org/apache/iotdb/consensus/ConsensusFactory.java
index 9d7a30279ad..1cd071e871b 100644
--- 
a/iotdb-core/consensus/src/main/java/org/apache/iotdb/consensus/ConsensusFactory.java
+++ 
b/iotdb-core/consensus/src/main/java/org/apache/iotdb/consensus/ConsensusFactory.java
@@ -37,6 +37,7 @@ public class ConsensusFactory {
   public static final String SIMPLE_CONSENSUS = 
"org.apache.iotdb.consensus.simple.SimpleConsensus";
   public static final String RATIS_CONSENSUS = 
"org.apache.iotdb.consensus.ratis.RatisConsensus";
   public static final String IOT_CONSENSUS = 
"org.apache.iotdb.consensus.iot.IoTConsensus";
+  public static final String TRAFT_CONSENSUS = 
"org.apache.iotdb.consensus.traft.TRaftConsensus";
   public static final String REAL_PIPE_CONSENSUS = 
"org.apache.iotdb.consensus.pipe.PipeConsensus";
   public static final String IOT_CONSENSUS_V2 = 
"org.apache.iotdb.consensus.iot.IoTConsensusV2";
   public static final String IOT_CONSENSUS_V2_BATCH_MODE = "batch";
diff --git 
a/iotdb-core/consensus/src/main/java/org/apache/iotdb/consensus/common/request/IConsensusRequest.java
 
b/iotdb-core/consensus/src/main/java/org/apache/iotdb/consensus/common/request/IConsensusRequest.java
index cae54c2bcbc..6079ac75505 100644
--- 
a/iotdb-core/consensus/src/main/java/org/apache/iotdb/consensus/common/request/IConsensusRequest.java
+++ 
b/iotdb-core/consensus/src/main/java/org/apache/iotdb/consensus/common/request/IConsensusRequest.java
@@ -36,6 +36,20 @@ public interface IConsensusRequest {
    */
   ByteBuffer serializeToByteBuffer();
 
+  default boolean hasTime() {
+    return false;
+  }
+
+  /**
+   * Return the primary timestamp carried by this request.
+   *
+   * <p>Callers should check {@link #hasTime()} before calling this method.
+   */
+  default long getTime() {
+    throw new UnsupportedOperationException(
+        String.format("%s does not carry timestamp", getClass().getName()));
+  }
+
   default long getMemorySize() {
     // return 0 by default
     return 0;
diff --git 
a/iotdb-core/consensus/src/main/java/org/apache/iotdb/consensus/config/ConsensusConfig.java
 
b/iotdb-core/consensus/src/main/java/org/apache/iotdb/consensus/config/ConsensusConfig.java
index dba53214abd..10109ec06b5 100644
--- 
a/iotdb-core/consensus/src/main/java/org/apache/iotdb/consensus/config/ConsensusConfig.java
+++ 
b/iotdb-core/consensus/src/main/java/org/apache/iotdb/consensus/config/ConsensusConfig.java
@@ -33,6 +33,7 @@ public class ConsensusConfig {
   private final RatisConfig ratisConfig;
   private final IoTConsensusConfig iotConsensusConfig;
   private final PipeConsensusConfig pipeConsensusConfig;
+  private final TRaftConfig tRaftConfig;
 
   private ConsensusConfig(
       TEndPoint thisNode,
@@ -41,7 +42,8 @@ public class ConsensusConfig {
       TConsensusGroupType consensusGroupType,
       RatisConfig ratisConfig,
       IoTConsensusConfig iotConsensusConfig,
-      PipeConsensusConfig pipeConsensusConfig) {
+      PipeConsensusConfig pipeConsensusConfig,
+      TRaftConfig tRaftConfig) {
     this.thisNodeEndPoint = thisNode;
     this.thisNodeId = thisNodeId;
     this.storageDir = storageDir;
@@ -49,6 +51,7 @@ public class ConsensusConfig {
     this.ratisConfig = ratisConfig;
     this.iotConsensusConfig = iotConsensusConfig;
     this.pipeConsensusConfig = pipeConsensusConfig;
+    this.tRaftConfig = tRaftConfig;
   }
 
   public TEndPoint getThisNodeEndPoint() {
@@ -79,6 +82,10 @@ public class ConsensusConfig {
     return pipeConsensusConfig;
   }
 
+  public TRaftConfig getTRaftConfig() {
+    return tRaftConfig;
+  }
+
   public static ConsensusConfig.Builder newBuilder() {
     return new ConsensusConfig.Builder();
   }
@@ -92,6 +99,7 @@ public class ConsensusConfig {
     private RatisConfig ratisConfig;
     private IoTConsensusConfig iotConsensusConfig;
     private PipeConsensusConfig pipeConsensusConfig;
+    private TRaftConfig tRaftConfig;
 
     public ConsensusConfig build() {
       return new ConsensusConfig(
@@ -103,7 +111,8 @@ public class ConsensusConfig {
           Optional.ofNullable(iotConsensusConfig)
               .orElseGet(() -> IoTConsensusConfig.newBuilder().build()),
           Optional.ofNullable(pipeConsensusConfig)
-              .orElseGet(() -> PipeConsensusConfig.newBuilder().build()));
+              .orElseGet(() -> PipeConsensusConfig.newBuilder().build()),
+          Optional.ofNullable(tRaftConfig).orElseGet(() -> 
TRaftConfig.newBuilder().build()));
     }
 
     public Builder setThisNode(TEndPoint thisNode) {
@@ -140,5 +149,10 @@ public class ConsensusConfig {
       this.pipeConsensusConfig = pipeConsensusConfig;
       return this;
     }
+
+    public Builder setTRaftConfig(TRaftConfig tRaftConfig) {
+      this.tRaftConfig = tRaftConfig;
+      return this;
+    }
   }
 }
diff --git 
a/iotdb-core/consensus/src/main/java/org/apache/iotdb/consensus/config/TRaftConfig.java
 
b/iotdb-core/consensus/src/main/java/org/apache/iotdb/consensus/config/TRaftConfig.java
new file mode 100644
index 00000000000..25296b1f107
--- /dev/null
+++ 
b/iotdb-core/consensus/src/main/java/org/apache/iotdb/consensus/config/TRaftConfig.java
@@ -0,0 +1,141 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.iotdb.consensus.config;
+
+import java.util.Optional;
+
+public class TRaftConfig {
+
+  private final Replication replication;
+  private final Election election;
+
+  private TRaftConfig(Replication replication, Election election) {
+    this.replication = replication;
+    this.election = election;
+  }
+
+  public Replication getReplication() {
+    return replication;
+  }
+
+  public Election getElection() {
+    return election;
+  }
+
+  public static Builder newBuilder() {
+    return new Builder();
+  }
+
+  public static class Builder {
+
+    private Replication replication;
+    private Election election;
+
+    public Builder setReplication(Replication replication) {
+      this.replication = replication;
+      return this;
+    }
+
+    public Builder setElection(Election election) {
+      this.election = election;
+      return this;
+    }
+
+    public TRaftConfig build() {
+      return new TRaftConfig(
+          Optional.ofNullable(replication).orElseGet(() -> 
Replication.newBuilder().build()),
+          Optional.ofNullable(election).orElseGet(() -> 
Election.newBuilder().build()));
+    }
+  }
+
+  public static class Replication {
+
+    private final int maxPendingRetryEntriesPerFollower;
+    private final long waitingReplicationTimeMs;
+
+    private Replication(int maxPendingRetryEntriesPerFollower, long 
waitingReplicationTimeMs) {
+      this.maxPendingRetryEntriesPerFollower = 
maxPendingRetryEntriesPerFollower;
+      this.waitingReplicationTimeMs = waitingReplicationTimeMs;
+    }
+
+    public int getMaxPendingRetryEntriesPerFollower() {
+      return maxPendingRetryEntriesPerFollower;
+    }
+
+    public long getWaitingReplicationTimeMs() {
+      return waitingReplicationTimeMs;
+    }
+
+    public static Builder newBuilder() {
+      return new Builder();
+    }
+
+    public static class Builder {
+
+      private int maxPendingRetryEntriesPerFollower = 100_000;
+      private long waitingReplicationTimeMs = 1L;
+
+      public Builder setMaxPendingRetryEntriesPerFollower(int 
maxPendingRetryEntriesPerFollower) {
+        this.maxPendingRetryEntriesPerFollower = 
maxPendingRetryEntriesPerFollower;
+        return this;
+      }
+
+      public Builder setWaitingReplicationTimeMs(long 
waitingReplicationTimeMs) {
+        this.waitingReplicationTimeMs = waitingReplicationTimeMs;
+        return this;
+      }
+
+      public Replication build() {
+        return new Replication(maxPendingRetryEntriesPerFollower, 
waitingReplicationTimeMs);
+      }
+    }
+  }
+
+  public static class Election {
+
+    private final long randomSeed;
+
+    private Election(long randomSeed) {
+      this.randomSeed = randomSeed;
+    }
+
+    public long getRandomSeed() {
+      return randomSeed;
+    }
+
+    public static Builder newBuilder() {
+      return new Builder();
+    }
+
+    public static class Builder {
+
+      private long randomSeed = System.nanoTime();
+
+      public Builder setRandomSeed(long randomSeed) {
+        this.randomSeed = randomSeed;
+        return this;
+      }
+
+      public Election build() {
+        return new Election(randomSeed);
+      }
+    }
+  }
+}
diff --git 
a/iotdb-core/consensus/src/main/java/org/apache/iotdb/consensus/traft/TRaftConsensus.java
 
b/iotdb-core/consensus/src/main/java/org/apache/iotdb/consensus/traft/TRaftConsensus.java
new file mode 100644
index 00000000000..db4d9875f94
--- /dev/null
+++ 
b/iotdb-core/consensus/src/main/java/org/apache/iotdb/consensus/traft/TRaftConsensus.java
@@ -0,0 +1,328 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.iotdb.consensus.traft;
+
+import org.apache.iotdb.common.rpc.thrift.TEndPoint;
+import org.apache.iotdb.common.rpc.thrift.TSStatus;
+import org.apache.iotdb.commons.consensus.ConsensusGroupId;
+import org.apache.iotdb.commons.utils.FileUtils;
+import org.apache.iotdb.consensus.IConsensus;
+import org.apache.iotdb.consensus.IStateMachine;
+import org.apache.iotdb.consensus.common.DataSet;
+import org.apache.iotdb.consensus.common.Peer;
+import org.apache.iotdb.consensus.common.request.IConsensusRequest;
+import org.apache.iotdb.consensus.config.ConsensusConfig;
+import org.apache.iotdb.consensus.config.TRaftConfig;
+import org.apache.iotdb.consensus.exception.ConsensusException;
+import 
org.apache.iotdb.consensus.exception.ConsensusGroupAlreadyExistException;
+import org.apache.iotdb.consensus.exception.ConsensusGroupNotExistException;
+import org.apache.iotdb.consensus.exception.IllegalPeerEndpointException;
+import org.apache.iotdb.consensus.exception.IllegalPeerNumException;
+import 
org.apache.iotdb.consensus.exception.PeerAlreadyInConsensusGroupException;
+import org.apache.iotdb.consensus.exception.PeerNotInConsensusGroupException;
+
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import java.io.File;
+import java.io.IOException;
+import java.nio.file.DirectoryStream;
+import java.nio.file.Files;
+import java.nio.file.Path;
+import java.util.ArrayList;
+import java.util.Collections;
+import java.util.List;
+import java.util.Map;
+import java.util.Optional;
+import java.util.TreeSet;
+import java.util.concurrent.ConcurrentHashMap;
+import java.util.concurrent.atomic.AtomicBoolean;
+
+public class TRaftConsensus implements IConsensus {
+
+  private static final Logger LOGGER = 
LoggerFactory.getLogger(TRaftConsensus.class);
+
+  private final TEndPoint thisNode;
+  private final int thisNodeId;
+  private final File storageDir;
+  private final IStateMachine.Registry registry;
+  private final Map<ConsensusGroupId, TRaftServerImpl> stateMachineMap = new 
ConcurrentHashMap<>();
+
+  private TRaftConfig config;
+  private Map<ConsensusGroupId, List<Peer>> correctPeerListBeforeStart = null;
+
+  public TRaftConsensus(ConsensusConfig config, IStateMachine.Registry 
registry) {
+    this.thisNode = config.getThisNodeEndPoint();
+    this.thisNodeId = config.getThisNodeId();
+    this.storageDir = new File(config.getStorageDir());
+    this.registry = registry;
+    this.config = config.getTRaftConfig();
+  }
+
+  @Override
+  public synchronized void start() throws IOException {
+    initAndRecover();
+    stateMachineMap.values().forEach(TRaftServerImpl::start);
+    TRaftNodeRegistry.register(thisNode, this);
+  }
+
+  @Override
+  public synchronized void stop() {
+    TRaftNodeRegistry.unregister(thisNode);
+    stateMachineMap.values().forEach(TRaftServerImpl::stop);
+  }
+
+  @Override
+  public TSStatus write(ConsensusGroupId groupId, IConsensusRequest request)
+      throws ConsensusException {
+    TRaftServerImpl impl =
+        Optional.ofNullable(stateMachineMap.get(groupId))
+            .orElseThrow(() -> new ConsensusGroupNotExistException(groupId));
+    if (impl.isReadOnly()) {
+      throw new ConsensusException("Current peer is read-only");
+    }
+    return impl.write(request);
+  }
+
+  @Override
+  public DataSet read(ConsensusGroupId groupId, IConsensusRequest request)
+      throws ConsensusException {
+    TRaftServerImpl impl =
+        Optional.ofNullable(stateMachineMap.get(groupId))
+            .orElseThrow(() -> new ConsensusGroupNotExistException(groupId));
+    return impl.read(request);
+  }
+
+  @Override
+  public void createLocalPeer(ConsensusGroupId groupId, List<Peer> peers) 
throws ConsensusException {
+    List<Peer> effectivePeers = peers;
+    if (effectivePeers == null || effectivePeers.isEmpty()) {
+      effectivePeers =
+          Collections.singletonList(new Peer(groupId, thisNodeId, thisNode));
+    }
+    if (!effectivePeers.contains(new Peer(groupId, thisNodeId, thisNode))) {
+      throw new IllegalPeerEndpointException(thisNode, effectivePeers);
+    }
+    if (effectivePeers.size() < 1) {
+      throw new IllegalPeerNumException(effectivePeers.size());
+    }
+    final List<Peer> finalPeers = effectivePeers;
+    AtomicBoolean alreadyExists = new AtomicBoolean(true);
+    Optional.ofNullable(
+            stateMachineMap.computeIfAbsent(
+                groupId,
+                key -> {
+                  alreadyExists.set(false);
+                  File peerDir = new File(buildPeerDir(groupId));
+                  if (!peerDir.exists() && !peerDir.mkdirs()) {
+                    LOGGER.warn("Failed to create TRaft peer dir {}", peerDir);
+                    return null;
+                  }
+                  try {
+                    return new TRaftServerImpl(
+                        peerDir.getAbsolutePath(),
+                        new Peer(groupId, thisNodeId, thisNode),
+                        new TreeSet<>(finalPeers),
+                        registry.apply(groupId),
+                        config);
+                  } catch (IOException e) {
+                    LOGGER.error("Failed to create TRaft server for {}", 
groupId, e);
+                    return null;
+                  }
+                }))
+        .map(
+            impl -> {
+              impl.start();
+              return impl;
+            })
+        .orElseThrow(
+            () -> new ConsensusException(String.format("Failed to create local 
peer %s", groupId)));
+    if (alreadyExists.get()) {
+      throw new ConsensusGroupAlreadyExistException(groupId);
+    }
+  }
+
+  @Override
+  public void deleteLocalPeer(ConsensusGroupId groupId) throws 
ConsensusException {
+    AtomicBoolean exist = new AtomicBoolean(false);
+    stateMachineMap.computeIfPresent(
+        groupId,
+        (key, value) -> {
+          exist.set(true);
+          value.stop();
+          FileUtils.deleteFileOrDirectory(new File(buildPeerDir(groupId)));
+          return null;
+        });
+    if (!exist.get()) {
+      throw new ConsensusGroupNotExistException(groupId);
+    }
+  }
+
+  @Override
+  public void addRemotePeer(ConsensusGroupId groupId, Peer peer) throws 
ConsensusException {
+    TRaftServerImpl impl =
+        Optional.ofNullable(stateMachineMap.get(groupId))
+            .orElseThrow(() -> new ConsensusGroupNotExistException(groupId));
+    if (impl.getConfiguration().contains(peer)) {
+      throw new PeerAlreadyInConsensusGroupException(groupId, peer);
+    }
+    try {
+      impl.addPeer(peer);
+    } catch (IOException e) {
+      throw new ConsensusException("Failed to add peer in TRaft", e);
+    }
+  }
+
+  @Override
+  public void removeRemotePeer(ConsensusGroupId groupId, Peer peer) throws 
ConsensusException {
+    TRaftServerImpl impl =
+        Optional.ofNullable(stateMachineMap.get(groupId))
+            .orElseThrow(() -> new ConsensusGroupNotExistException(groupId));
+    if (!impl.getConfiguration().contains(peer)) {
+      throw new PeerNotInConsensusGroupException(groupId, peer.toString());
+    }
+    try {
+      impl.removePeer(peer);
+    } catch (IOException e) {
+      throw new ConsensusException("Failed to remove peer in TRaft", e);
+    }
+  }
+
+  @Override
+  public void recordCorrectPeerListBeforeStarting(Map<ConsensusGroupId, 
List<Peer>> correctPeerList) {
+    this.correctPeerListBeforeStart = correctPeerList;
+  }
+
+  @Override
+  public void resetPeerList(ConsensusGroupId groupId, List<Peer> correctPeers)
+      throws ConsensusException {
+    TRaftServerImpl impl =
+        Optional.ofNullable(stateMachineMap.get(groupId))
+            .orElseThrow(() -> new ConsensusGroupNotExistException(groupId));
+    try {
+      impl.resetPeerList(correctPeers);
+    } catch (IOException e) {
+      throw new ConsensusException("Failed to reset peer list in TRaft", e);
+    }
+  }
+
+  @Override
+  public void transferLeader(ConsensusGroupId groupId, Peer newLeader) throws 
ConsensusException {
+    TRaftServerImpl impl =
+        Optional.ofNullable(stateMachineMap.get(groupId))
+            .orElseThrow(() -> new ConsensusGroupNotExistException(groupId));
+    impl.transferLeader(newLeader);
+  }
+
+  @Override
+  public void triggerSnapshot(ConsensusGroupId groupId, boolean force) throws 
ConsensusException {
+    throw new ConsensusException("TRaft does not support snapshot trigger 
currently");
+  }
+
+  @Override
+  public boolean isLeader(ConsensusGroupId groupId) {
+    return 
Optional.ofNullable(stateMachineMap.get(groupId)).map(TRaftServerImpl::isLeader).orElse(false);
+  }
+
+  @Override
+  public long getLogicalClock(ConsensusGroupId groupId) {
+    return Optional.ofNullable(stateMachineMap.get(groupId))
+        .map(TRaftServerImpl::getLogicalClock)
+        .orElse(0L);
+  }
+
+  @Override
+  public boolean isLeaderReady(ConsensusGroupId groupId) {
+    return Optional.ofNullable(stateMachineMap.get(groupId))
+        .map(TRaftServerImpl::isLeaderReady)
+        .orElse(false);
+  }
+
+  @Override
+  public Peer getLeader(ConsensusGroupId groupId) {
+    return 
Optional.ofNullable(stateMachineMap.get(groupId)).map(TRaftServerImpl::getLeader).orElse(null);
+  }
+
+  @Override
+  public int getReplicationNum(ConsensusGroupId groupId) {
+    return Optional.ofNullable(stateMachineMap.get(groupId))
+        .map(TRaftServerImpl::getConfiguration)
+        .map(List::size)
+        .orElse(0);
+  }
+
+  @Override
+  public List<ConsensusGroupId> getAllConsensusGroupIds() {
+    return new ArrayList<>(stateMachineMap.keySet());
+  }
+
+  @Override
+  public String getRegionDirFromConsensusGroupId(ConsensusGroupId groupId) {
+    return buildPeerDir(groupId);
+  }
+
+  @Override
+  public void reloadConsensusConfig(ConsensusConfig consensusConfig) {
+    this.config = consensusConfig.getTRaftConfig();
+    stateMachineMap.values().forEach(server -> 
server.reloadConsensusConfig(config));
+  }
+
+  TRaftServerImpl getImpl(ConsensusGroupId groupId) {
+    return stateMachineMap.get(groupId);
+  }
+
+  private void initAndRecover() throws IOException {
+    if (!storageDir.exists() && !storageDir.mkdirs()) {
+      throw new IOException(String.format("Unable to create consensus dir at 
%s", storageDir));
+    }
+    try (DirectoryStream<Path> stream = 
Files.newDirectoryStream(storageDir.toPath())) {
+      for (Path path : stream) {
+        String[] items = path.getFileName().toString().split("_");
+        if (items.length != 2) {
+          continue;
+        }
+        ConsensusGroupId consensusGroupId =
+            ConsensusGroupId.Factory.create(
+                Integer.parseInt(items[0]), Integer.parseInt(items[1]));
+        TRaftServerImpl consensus =
+            new TRaftServerImpl(
+                path.toString(),
+                new Peer(consensusGroupId, thisNodeId, thisNode),
+                new TreeSet<>(),
+                registry.apply(consensusGroupId),
+                config);
+        stateMachineMap.put(consensusGroupId, consensus);
+      }
+    }
+    if (correctPeerListBeforeStart != null) {
+      for (Map.Entry<ConsensusGroupId, List<Peer>> entry : 
correctPeerListBeforeStart.entrySet()) {
+        TRaftServerImpl impl = stateMachineMap.get(entry.getKey());
+        if (impl == null) {
+          continue;
+        }
+        impl.resetPeerList(entry.getValue());
+      }
+    }
+  }
+
+  private String buildPeerDir(ConsensusGroupId groupId) {
+    return storageDir + File.separator + groupId.getType().getValue() + "_" + 
groupId.getId();
+  }
+}
diff --git 
a/iotdb-core/consensus/src/main/java/org/apache/iotdb/consensus/traft/TRaftFollowerInfo.java
 
b/iotdb-core/consensus/src/main/java/org/apache/iotdb/consensus/traft/TRaftFollowerInfo.java
new file mode 100644
index 00000000000..15aa3f01470
--- /dev/null
+++ 
b/iotdb-core/consensus/src/main/java/org/apache/iotdb/consensus/traft/TRaftFollowerInfo.java
@@ -0,0 +1,153 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.iotdb.consensus.traft;
+
+import java.util.HashSet;
+import java.util.Map;
+import java.util.NavigableMap;
+import java.util.NavigableSet;
+import java.util.Set;
+import java.util.TreeMap;
+import java.util.TreeSet;
+
+public class TRaftFollowerInfo {
+
+  private long currentReplicatingPartitionIndex;
+  private final Set<Long> currentReplicatingIndices = new HashSet<>();
+  private long nextPartitionIndex;
+  private long nextPartitionFirstIndex;
+  private long memoryReplicationSuccessCount;
+  private long diskReplicationSuccessCount;
+
+  // Keep delayed entries grouped by partition so we can replay one-by-one 
from disk.
+  private final NavigableMap<Long, NavigableSet<Long>> 
delayedIndicesByPartition = new TreeMap<>();
+
+  public TRaftFollowerInfo(long currentReplicatingPartitionIndex, long 
nextPartitionFirstIndex) {
+    this.currentReplicatingPartitionIndex = currentReplicatingPartitionIndex;
+    this.nextPartitionIndex = currentReplicatingPartitionIndex;
+    this.nextPartitionFirstIndex = nextPartitionFirstIndex;
+  }
+
+  public long getCurrentReplicatingPartitionIndex() {
+    return currentReplicatingPartitionIndex;
+  }
+
+  public void setCurrentReplicatingPartitionIndex(long 
currentReplicatingPartitionIndex) {
+    this.currentReplicatingPartitionIndex = currentReplicatingPartitionIndex;
+  }
+
+  public Set<Long> getCurrentReplicatingIndices() {
+    return currentReplicatingIndices;
+  }
+
+  public long getNextPartitionIndex() {
+    return nextPartitionIndex;
+  }
+
+  public void setNextPartitionIndex(long nextPartitionIndex) {
+    this.nextPartitionIndex = nextPartitionIndex;
+  }
+
+  public long getNextPartitionFirstIndex() {
+    return nextPartitionFirstIndex;
+  }
+
+  public void setNextPartitionFirstIndex(long nextPartitionFirstIndex) {
+    this.nextPartitionFirstIndex = nextPartitionFirstIndex;
+  }
+
+  public void addCurrentReplicatingIndex(long index) {
+    currentReplicatingIndices.add(index);
+  }
+
+  public void removeCurrentReplicatingIndex(long index) {
+    currentReplicatingIndices.remove(index);
+  }
+
+  public boolean hasCurrentReplicatingIndex() {
+    return !currentReplicatingIndices.isEmpty();
+  }
+
+  public long getMemoryReplicationSuccessCount() {
+    return memoryReplicationSuccessCount;
+  }
+
+  public long getDiskReplicationSuccessCount() {
+    return diskReplicationSuccessCount;
+  }
+
+  public void recordMemoryReplicationSuccess() {
+    memoryReplicationSuccessCount++;
+  }
+
+  public void recordDiskReplicationSuccess() {
+    diskReplicationSuccessCount++;
+  }
+
+  public void addDelayedIndex(long partitionIndex, long logIndex) {
+    delayedIndicesByPartition
+        .computeIfAbsent(partitionIndex, key -> new TreeSet<>())
+        .add(logIndex);
+    refreshNextPartitionPointers();
+  }
+
+  public void removeDelayedIndex(long partitionIndex, long logIndex) {
+    NavigableSet<Long> indices = delayedIndicesByPartition.get(partitionIndex);
+    if (indices == null) {
+      return;
+    }
+    indices.remove(logIndex);
+    if (indices.isEmpty()) {
+      delayedIndicesByPartition.remove(partitionIndex);
+    }
+    refreshNextPartitionPointers();
+  }
+
+  public Long getFirstDelayedPartitionIndex() {
+    return delayedIndicesByPartition.isEmpty() ? null : 
delayedIndicesByPartition.firstKey();
+  }
+
+  public Long getFirstDelayedIndexOfPartition(long partitionIndex) {
+    NavigableSet<Long> indices = delayedIndicesByPartition.get(partitionIndex);
+    return indices == null || indices.isEmpty() ? null : indices.first();
+  }
+
+  public boolean hasDelayedEntries() {
+    return !delayedIndicesByPartition.isEmpty();
+  }
+
+  public long delayedEntryCount() {
+    long count = 0;
+    for (Map.Entry<Long, NavigableSet<Long>> entry : 
delayedIndicesByPartition.entrySet()) {
+      count += entry.getValue().size();
+    }
+    return count;
+  }
+
+  private void refreshNextPartitionPointers() {
+    if (delayedIndicesByPartition.isEmpty()) {
+      nextPartitionIndex = currentReplicatingPartitionIndex;
+      return;
+    }
+    Map.Entry<Long, NavigableSet<Long>> first = 
delayedIndicesByPartition.firstEntry();
+    nextPartitionIndex = first.getKey();
+    nextPartitionFirstIndex = first.getValue().first();
+  }
+}
diff --git 
a/iotdb-core/consensus/src/main/java/org/apache/iotdb/consensus/traft/TRaftLogAppender.java
 
b/iotdb-core/consensus/src/main/java/org/apache/iotdb/consensus/traft/TRaftLogAppender.java
new file mode 100644
index 00000000000..c5bb758654f
--- /dev/null
+++ 
b/iotdb-core/consensus/src/main/java/org/apache/iotdb/consensus/traft/TRaftLogAppender.java
@@ -0,0 +1,95 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.iotdb.consensus.traft;
+
+import org.apache.iotdb.consensus.common.Peer;
+
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import java.util.concurrent.CountDownLatch;
+import java.util.concurrent.TimeUnit;
+
+/**
+ * Per-follower background appender thread.
+ *
+ * <p>Runs an infinite loop that polls every {@code waitingReplicationTimeMs} 
milliseconds and calls
+ * back into {@link TRaftServerImpl#tryReplicateDiskEntriesToFollower} to 
dispatch any log entries
+ * that are on disk but have not yet been sent to the target follower. This 
handles two scenarios:
+ *
+ * <ul>
+ *   <li>Entries whose partition index was higher than the leader's current 
inserting partition at
+ *       write time (the entry went straight to disk and was deferred).
+ *   <li>Entries that could not be sent synchronously because the follower was 
offline during the
+ *       original write, or because the previous partition's quorum commit is 
still pending.
+ * </ul>
+ */
+class TRaftLogAppender implements Runnable {
+
+  private static final Logger LOGGER = 
LoggerFactory.getLogger(TRaftLogAppender.class);
+
+  private final TRaftServerImpl server;
+  private final Peer follower;
+  private final long waitingReplicationTimeMs;
+
+  private volatile boolean stopped = false;
+  private final CountDownLatch runFinished = new CountDownLatch(1);
+
+  TRaftLogAppender(TRaftServerImpl server, Peer follower, long 
waitingReplicationTimeMs) {
+    this.server = server;
+    this.follower = follower;
+    this.waitingReplicationTimeMs = waitingReplicationTimeMs;
+  }
+
+  /** Signal the appender to stop after the current iteration completes. */
+  void stop() {
+    stopped = true;
+  }
+
+  /**
+   * Block until the appender thread has exited, up to {@code timeoutSeconds} 
seconds.
+   *
+   * @return {@code true} if the thread exited within the timeout, {@code 
false} otherwise.
+   */
+  boolean awaitTermination(long timeoutSeconds) throws InterruptedException {
+    return runFinished.await(timeoutSeconds, TimeUnit.SECONDS);
+  }
+
+  @Override
+  public void run() {
+    LOGGER.info("TRaftLogAppender started for follower {}", follower);
+    try {
+      while (!stopped && !Thread.currentThread().isInterrupted()) {
+        try {
+          // Attempt to replicate any disk entries that are ready for this 
follower.
+          server.tryReplicateDiskEntriesToFollower(follower);
+        } catch (Exception e) {
+          LOGGER.warn("Error during disk replication to {}: {}", follower, 
e.getMessage(), e);
+        }
+        TimeUnit.MILLISECONDS.sleep(waitingReplicationTimeMs);
+      }
+    } catch (InterruptedException e) {
+      Thread.currentThread().interrupt();
+    } finally {
+      runFinished.countDown();
+      LOGGER.info("TRaftLogAppender stopped for follower {}", follower);
+    }
+  }
+}
diff --git 
a/iotdb-core/consensus/src/main/java/org/apache/iotdb/consensus/traft/TRaftLogEntry.java
 
b/iotdb-core/consensus/src/main/java/org/apache/iotdb/consensus/traft/TRaftLogEntry.java
new file mode 100644
index 00000000000..66542177124
--- /dev/null
+++ 
b/iotdb-core/consensus/src/main/java/org/apache/iotdb/consensus/traft/TRaftLogEntry.java
@@ -0,0 +1,89 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.iotdb.consensus.traft;
+
+import java.util.Arrays;
+
+public class TRaftLogEntry {
+
+  private final long timestamp;
+  private final long partitionIndex;
+  private final long logIndex;
+  private final long logTerm;
+  private final long interPartitionIndex;
+  private final long lastPartitionCount;
+  private final byte[] data;
+
+  public TRaftLogEntry(
+      long timestamp,
+      long partitionIndex,
+      long logIndex,
+      long logTerm,
+      long interPartitionIndex,
+      long lastPartitionCount,
+      byte[] data) {
+    this.timestamp = timestamp;
+    this.partitionIndex = partitionIndex;
+    this.logIndex = logIndex;
+    this.logTerm = logTerm;
+    this.interPartitionIndex = interPartitionIndex;
+    this.lastPartitionCount = lastPartitionCount;
+    this.data = data;
+  }
+
+  public long getTimestamp() {
+    return timestamp;
+  }
+
+  public long getPartitionIndex() {
+    return partitionIndex;
+  }
+
+  public long getLogIndex() {
+    return logIndex;
+  }
+
+  public long getLogTerm() {
+    return logTerm;
+  }
+
+  public long getInterPartitionIndex() {
+    return interPartitionIndex;
+  }
+
+  public long getLastPartitionCount() {
+    return lastPartitionCount;
+  }
+
+  public byte[] getData() {
+    return data;
+  }
+
+  public TRaftLogEntry copy() {
+    return new TRaftLogEntry(
+        timestamp,
+        partitionIndex,
+        logIndex,
+        logTerm,
+        interPartitionIndex,
+        lastPartitionCount,
+        Arrays.copyOf(data, data.length));
+  }
+}
diff --git 
a/iotdb-core/consensus/src/main/java/org/apache/iotdb/consensus/traft/TRaftLogStore.java
 
b/iotdb-core/consensus/src/main/java/org/apache/iotdb/consensus/traft/TRaftLogStore.java
new file mode 100644
index 00000000000..0a578a22fcb
--- /dev/null
+++ 
b/iotdb-core/consensus/src/main/java/org/apache/iotdb/consensus/traft/TRaftLogStore.java
@@ -0,0 +1,155 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.iotdb.consensus.traft;
+
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import java.io.BufferedReader;
+import java.io.BufferedWriter;
+import java.io.File;
+import java.io.IOException;
+import java.nio.charset.StandardCharsets;
+import java.nio.file.Files;
+import java.nio.file.StandardOpenOption;
+import java.util.ArrayList;
+import java.util.Base64;
+import java.util.Collections;
+import java.util.LinkedHashMap;
+import java.util.List;
+import java.util.Map;
+
+class TRaftLogStore {
+
+  private static final Logger LOGGER = 
LoggerFactory.getLogger(TRaftLogStore.class);
+  private static final String LOG_FILE_NAME = "traft.log";
+
+  private final File logFile;
+  private final Map<Long, TRaftLogEntry> logEntries = new LinkedHashMap<>();
+
+  TRaftLogStore(String storageDir) throws IOException {
+    this.logFile = new File(storageDir, LOG_FILE_NAME);
+    File parent = logFile.getParentFile();
+    if (parent != null && !parent.exists() && !parent.mkdirs()) {
+      throw new IOException(String.format("Failed to create TRaft dir %s", 
parent));
+    }
+    if (!logFile.exists() && !logFile.createNewFile()) {
+      throw new IOException(String.format("Failed to create TRaft log file 
%s", logFile));
+    }
+    loadAll();
+  }
+
+  synchronized void append(TRaftLogEntry entry) throws IOException {
+    if (logEntries.containsKey(entry.getLogIndex())) {
+      return;
+    }
+    try (BufferedWriter writer =
+        Files.newBufferedWriter(
+            logFile.toPath(), StandardCharsets.UTF_8, 
StandardOpenOption.APPEND)) {
+      writer.write(serialize(entry));
+      writer.newLine();
+    }
+    logEntries.put(entry.getLogIndex(), entry.copy());
+  }
+
+  synchronized TRaftLogEntry getByIndex(long index) {
+    TRaftLogEntry entry = logEntries.get(index);
+    return entry == null ? null : entry.copy();
+  }
+
+  synchronized boolean contains(long index) {
+    return logEntries.containsKey(index);
+  }
+
+  synchronized TRaftLogEntry getLastEntry() {
+    if (logEntries.isEmpty()) {
+      return null;
+    }
+    TRaftLogEntry last = null;
+    for (TRaftLogEntry entry : logEntries.values()) {
+      last = entry;
+    }
+    return last == null ? null : last.copy();
+  }
+
+  synchronized List<TRaftLogEntry> getAllEntries() {
+    List<TRaftLogEntry> result = new ArrayList<>(logEntries.size());
+    for (TRaftLogEntry entry : logEntries.values()) {
+      result.add(entry.copy());
+    }
+    return Collections.unmodifiableList(result);
+  }
+
+  private void loadAll() throws IOException {
+    try (BufferedReader reader = Files.newBufferedReader(logFile.toPath(), 
StandardCharsets.UTF_8)) {
+      String line;
+      while ((line = reader.readLine()) != null) {
+        if (line.isEmpty()) {
+          continue;
+        }
+        TRaftLogEntry entry = deserialize(line);
+        logEntries.put(entry.getLogIndex(), entry);
+      }
+    }
+  }
+
+  private String serialize(TRaftLogEntry entry) {
+    return entry.getLogIndex()
+        + ","
+        + entry.getLogTerm()
+        + ","
+        + entry.getTimestamp()
+        + ","
+        + entry.getPartitionIndex()
+        + ","
+        + entry.getInterPartitionIndex()
+        + ","
+        + entry.getLastPartitionCount()
+        + ","
+        + Base64.getEncoder().encodeToString(entry.getData());
+  }
+
+  private TRaftLogEntry deserialize(String line) {
+    String[] fields = line.split(",", 7);
+    if (fields.length != 7) {
+      throw new IllegalArgumentException("Invalid TRaft log line: " + line);
+    }
+    try {
+      long logIndex = Long.parseLong(fields[0]);
+      long logTerm = Long.parseLong(fields[1]);
+      long timestamp = Long.parseLong(fields[2]);
+      long partitionIndex = Long.parseLong(fields[3]);
+      long interPartitionIndex = Long.parseLong(fields[4]);
+      long lastPartitionCount = Long.parseLong(fields[5]);
+      byte[] data = Base64.getDecoder().decode(fields[6]);
+      return new TRaftLogEntry(
+          timestamp,
+          partitionIndex,
+          logIndex,
+          logTerm,
+          interPartitionIndex,
+          lastPartitionCount,
+          data);
+    } catch (Exception e) {
+      LOGGER.error("Failed to parse TRaft log line {}", line, e);
+      throw e;
+    }
+  }
+}
diff --git 
a/iotdb-core/consensus/src/main/java/org/apache/iotdb/consensus/traft/TRaftNodeRegistry.java
 
b/iotdb-core/consensus/src/main/java/org/apache/iotdb/consensus/traft/TRaftNodeRegistry.java
new file mode 100644
index 00000000000..098dcb7a4ed
--- /dev/null
+++ 
b/iotdb-core/consensus/src/main/java/org/apache/iotdb/consensus/traft/TRaftNodeRegistry.java
@@ -0,0 +1,54 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.iotdb.consensus.traft;
+
+import org.apache.iotdb.common.rpc.thrift.TEndPoint;
+import org.apache.iotdb.consensus.common.Peer;
+
+import java.util.Map;
+import java.util.Optional;
+import java.util.concurrent.ConcurrentHashMap;
+
+class TRaftNodeRegistry {
+
+  private static final Map<String, TRaftConsensus> CONSENSUS_BY_ENDPOINT = new 
ConcurrentHashMap<>();
+
+  private TRaftNodeRegistry() {}
+
+  static void register(TEndPoint endpoint, TRaftConsensus consensus) {
+    CONSENSUS_BY_ENDPOINT.put(toEndpointKey(endpoint), consensus);
+  }
+
+  static void unregister(TEndPoint endpoint) {
+    CONSENSUS_BY_ENDPOINT.remove(toEndpointKey(endpoint));
+  }
+
+  static Optional<TRaftServerImpl> resolveServer(Peer peer) {
+    TRaftConsensus consensus = 
CONSENSUS_BY_ENDPOINT.get(toEndpointKey(peer.getEndpoint()));
+    if (consensus == null) {
+      return Optional.empty();
+    }
+    return Optional.ofNullable(consensus.getImpl(peer.getGroupId()));
+  }
+
+  private static String toEndpointKey(TEndPoint endpoint) {
+    return endpoint.getIp() + ":" + endpoint.getPort();
+  }
+}
diff --git 
a/iotdb-core/consensus/src/main/java/org/apache/iotdb/consensus/traft/TRaftRequestParser.java
 
b/iotdb-core/consensus/src/main/java/org/apache/iotdb/consensus/traft/TRaftRequestParser.java
new file mode 100644
index 00000000000..2ce516cedf9
--- /dev/null
+++ 
b/iotdb-core/consensus/src/main/java/org/apache/iotdb/consensus/traft/TRaftRequestParser.java
@@ -0,0 +1,52 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.iotdb.consensus.traft;
+
+import org.apache.iotdb.consensus.common.request.ByteBufferConsensusRequest;
+import org.apache.iotdb.consensus.common.request.IConsensusRequest;
+
+import java.nio.ByteBuffer;
+
+class TRaftRequestParser {
+
+  private TRaftRequestParser() {}
+
+  static long extractTimestamp(IConsensusRequest request, long 
fallbackTimestamp) {
+    if (request.hasTime()) {
+      return request.getTime();
+    }
+    ByteBuffer buffer = request.serializeToByteBuffer().duplicate();
+    if (buffer.remaining() < Long.BYTES) {
+      return fallbackTimestamp;
+    }
+    return buffer.getLong(buffer.position());
+  }
+
+  static byte[] extractRawRequest(IConsensusRequest request) {
+    ByteBuffer buffer = request.serializeToByteBuffer().duplicate();
+    byte[] result = new byte[buffer.remaining()];
+    buffer.get(result);
+    return result;
+  }
+
+  static IConsensusRequest buildRequest(byte[] rawRequest) {
+    return new ByteBufferConsensusRequest(ByteBuffer.wrap(rawRequest));
+  }
+}
diff --git 
a/iotdb-core/consensus/src/main/java/org/apache/iotdb/consensus/common/request/IConsensusRequest.java
 
b/iotdb-core/consensus/src/main/java/org/apache/iotdb/consensus/traft/TRaftRole.java
similarity index 50%
copy from 
iotdb-core/consensus/src/main/java/org/apache/iotdb/consensus/common/request/IConsensusRequest.java
copy to 
iotdb-core/consensus/src/main/java/org/apache/iotdb/consensus/traft/TRaftRole.java
index cae54c2bcbc..39ff57143c9 100644
--- 
a/iotdb-core/consensus/src/main/java/org/apache/iotdb/consensus/common/request/IConsensusRequest.java
+++ 
b/iotdb-core/consensus/src/main/java/org/apache/iotdb/consensus/traft/TRaftRole.java
@@ -17,31 +17,10 @@
  * under the License.
  */
 
-package org.apache.iotdb.consensus.common.request;
+package org.apache.iotdb.consensus.traft;
 
-import java.nio.ByteBuffer;
-
-public interface IConsensusRequest {
-  /**
-   * Serialize all the data to a ByteBuffer.
-   *
-   * <p>In a specific implementation, ByteBuf or PublicBAOS can be used to 
reduce the number of
-   * memory copies.
-   *
-   * <p>To improve efficiency, a specific implementation could return a 
DirectByteBuffer to reduce
-   * the memory copy required to send an RPC
-   *
-   * <p>Note: The implementation needs to ensure that the data in the returned 
Bytebuffer cannot be
-   * changed or an error may occur
-   */
-  ByteBuffer serializeToByteBuffer();
-
-  default long getMemorySize() {
-    // return 0 by default
-    return 0;
-  }
-
-  default void markAsGeneratedByRemoteConsensusLeader() {
-    // do nothing by default
-  }
+public enum TRaftRole {
+  LEADER,
+  FOLLOWER,
+  CANDIDATE
 }
diff --git 
a/iotdb-core/consensus/src/main/java/org/apache/iotdb/consensus/traft/TRaftServerImpl.java
 
b/iotdb-core/consensus/src/main/java/org/apache/iotdb/consensus/traft/TRaftServerImpl.java
new file mode 100644
index 00000000000..d48dbe35f3b
--- /dev/null
+++ 
b/iotdb-core/consensus/src/main/java/org/apache/iotdb/consensus/traft/TRaftServerImpl.java
@@ -0,0 +1,985 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.iotdb.consensus.traft;
+
+import org.apache.iotdb.common.rpc.thrift.TEndPoint;
+import org.apache.iotdb.common.rpc.thrift.TSStatus;
+import org.apache.iotdb.consensus.IStateMachine;
+import org.apache.iotdb.consensus.common.DataSet;
+import org.apache.iotdb.consensus.common.Peer;
+import org.apache.iotdb.consensus.common.request.IConsensusRequest;
+import org.apache.iotdb.consensus.config.TRaftConfig;
+import org.apache.iotdb.rpc.RpcUtils;
+import org.apache.iotdb.rpc.TSStatusCode;
+
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import java.io.BufferedReader;
+import java.io.BufferedWriter;
+import java.io.File;
+import java.io.IOException;
+import java.nio.charset.StandardCharsets;
+import java.nio.file.Files;
+import java.util.ArrayList;
+import java.util.List;
+import java.util.Objects;
+import java.util.Optional;
+import java.util.Random;
+import java.util.TreeSet;
+import java.util.concurrent.ConcurrentHashMap;
+import java.util.concurrent.ExecutorService;
+import java.util.concurrent.Executors;
+import java.util.concurrent.atomic.AtomicInteger;
+import java.util.concurrent.atomic.AtomicLong;
+
+class TRaftServerImpl {
+
+  private static final Logger LOGGER = 
LoggerFactory.getLogger(TRaftServerImpl.class);
+  private static final String CONFIGURATION_FILE_NAME = "configuration.dat";
+  private static final long INITIAL_PARTITION_INDEX = 0;
+
+  // ── persistent / structural state 
───────────────────────────────────────────
+  private final String storageDir;
+  private final Peer thisNode;
+  private final IStateMachine stateMachine;
+  private final TreeSet<Peer> configuration = new TreeSet<>();
+  private final ConcurrentHashMap<Integer, TRaftFollowerInfo> followerInfoMap =
+      new ConcurrentHashMap<>();
+  private final TRaftLogStore logStore;
+  private final AtomicLong logicalClock = new AtomicLong(0);
+  private final Random random;
+
+  // ── volatile node state 
──────────────────────────────────────────────────────
+  private volatile TRaftRole role = TRaftRole.FOLLOWER;
+  private volatile boolean active = true;
+  private volatile boolean started = false;
+
+  // ── term / election state 
────────────────────────────────────────────────────
+  private long currentTerm = 0;
+  private int leaderId = -1;
+  private int votedFor = -1;
+
+  // ── partition tracking 
───────────────────────────────────────────────────────
+  private long historicalMaxTimestamp = Long.MIN_VALUE;
+  private long maxPartitionIndex = INITIAL_PARTITION_INDEX;
+  private long currentPartitionIndexCount = 0;
+
+  /**
+   * The partition index of entries currently being actively inserted by the 
leader (fast path).
+   * Entries arriving with a partition index equal to this value are eligible 
for direct
+   * in-memory replication to matching followers. Entries with a higher 
partition index are
+   * written directly to disk and replicated by the per-follower {@link 
TRaftLogAppender}.
+   */
+  private long currentLeaderInsertingPartitionIndex = INITIAL_PARTITION_INDEX;
+
+  // ── quorum commit tracking 
───────────────────────────────────────────────────
+  /**
+   * Tracks, per log index, how many followers have acknowledged that entry. 
Once the ACK count
+   * reaches ⌈followerCount/2⌉ the entry is considered committed and removed 
from the map.
+   * The map becoming empty signals that all entries of the current partition 
have achieved
+   * quorum, allowing the next partition's entries to be transmitted.
+   */
+  private final ConcurrentHashMap<Long, AtomicInteger> 
currentReplicationgIndicesToAckFollowerCount =
+      new ConcurrentHashMap<>();
+
+  // ── background appenders 
─────────────────────────────────────────────────────
+  /** One background appender thread per follower, keyed by follower node-id. 
*/
+  private final ConcurrentHashMap<Integer, TRaftLogAppender> appenderMap =
+      new ConcurrentHashMap<>();
+
+  private ExecutorService appenderExecutor;
+
+  private TRaftConfig config;
+
+  // 
────────────────────────────────────────────────────────────────────────────
+  // Construction
+  // 
────────────────────────────────────────────────────────────────────────────
+
+  TRaftServerImpl(
+      String storageDir,
+      Peer thisNode,
+      TreeSet<Peer> peers,
+      IStateMachine stateMachine,
+      TRaftConfig config)
+      throws IOException {
+    this.storageDir = storageDir;
+    this.thisNode = thisNode;
+    this.stateMachine = stateMachine;
+    this.config = config;
+    this.logStore = new TRaftLogStore(storageDir);
+    this.random = new Random(config.getElection().getRandomSeed() + 
thisNode.getNodeId());
+    if (peers.isEmpty()) {
+      this.configuration.addAll(loadConfigurationFromDisk());
+    } else {
+      this.configuration.addAll(peers);
+      persistConfiguration();
+    }
+    if (!this.configuration.contains(thisNode)) {
+      this.configuration.add(thisNode);
+      persistConfiguration();
+    }
+    recoverFromDisk();
+    electInitialLeader();
+    initFollowerInfoMap();
+  }
+
+  // 
────────────────────────────────────────────────────────────────────────────
+  // Lifecycle
+  // 
────────────────────────────────────────────────────────────────────────────
+
+  public synchronized void start() {
+    if (started) {
+      return;
+    }
+    stateMachine.start();
+    started = true;
+    notifyLeaderChanged();
+    if (role == TRaftRole.LEADER) {
+      initAppenders();
+    }
+  }
+
+  public synchronized void stop() {
+    // Mark as stopped first so appender callbacks become no-ops immediately.
+    started = false;
+    stopAppenders();
+    stateMachine.stop();
+  }
+
+  // 
────────────────────────────────────────────────────────────────────────────
+  // Write path (Leader)
+  // 
────────────────────────────────────────────────────────────────────────────
+
+  /**
+   * Process a client write request as the leader.
+   *
+   * <p><b>Step 1 – Parse:</b> Build a {@link TRaftLogEntry} with timestamp 
and partition index
+   * derived from the request.
+   *
+   * <p><b>Step 2 – Route by partition index:</b>
+   * <ul>
+   *   <li>If the entry's partition index is <em>greater than</em>
+   *       {@code currentLeaderInsertingPartitionIndex}: the entry and all 
subsequent entries go
+   *       directly to disk. The partition index pointer is advanced. A 
best-effort quorum-commit
+   *       check is performed (if the map is already empty the pointer 
advances cleanly; otherwise
+   *       it advances anyway and the appender will enforce ordering). All 
followers receive the
+   *       entry in their delayed queue; disk dispatch is attempted 
synchronously.
+   *   <li>If the entry's partition index <em>equals</em>
+   *       {@code currentLeaderInsertingPartitionIndex}: each follower is 
inspected individually.
+   *       Followers whose {@code currentFollowerReplicatingPartitionIndex} 
matches are sent the
+   *       entry immediately (fast path) and the index is recorded in
+   *       {@code currentReplicatingIndices}. Followers that are behind 
receive it as a delayed
+   *       entry and disk dispatch is attempted. The entry is then persisted 
to disk.
+   * </ul>
+   *
+   * <p><b>Step 3 – Quorum tracking:</b> Each sent entry is registered in
+   * {@code currentReplicationgIndicesToAckFollowerCount}. When a follower 
acknowledges the entry,
+   * its ACK count is incremented; once ≥ half the follower count the entry is 
removed from the
+   * map. When the map empties the current partition is fully committed and 
the next partition may
+   * be transmitted.
+   */
+  public synchronized TSStatus write(IConsensusRequest request) {
+    if (!active) {
+      return RpcUtils.getStatus(
+          TSStatusCode.WRITE_PROCESS_REJECT,
+          String.format("Peer %s is inactive and cannot process writes", 
thisNode));
+    }
+    if (role != TRaftRole.LEADER) {
+      return RpcUtils.getStatus(
+          TSStatusCode.WRITE_PROCESS_REJECT,
+          String.format("Peer %s is not leader, current leader id: %s", 
thisNode, leaderId));
+    }
+
+    TRaftLogEntry logEntry = buildLogEntry(request);
+
+    // Apply to local state machine immediately (leader apply-on-write model).
+    TSStatus localStatus = 
stateMachine.write(stateMachine.deserializeRequest(request));
+    if (!isSuccess(localStatus)) {
+      return localStatus;
+    }
+
+    List<Peer> followers = getFollowers();
+    int followerCount = followers.size();
+    long entryPartitionIndex = logEntry.getPartitionIndex();
+
+    if (entryPartitionIndex > currentLeaderInsertingPartitionIndex) {
+      // ── Higher partition: write directly to disk ─────────────────────────
+      // Log a debug note if there are still uncommitted entries from the old 
partition.
+      if (!currentReplicationgIndicesToAckFollowerCount.isEmpty()) {
+        LOGGER.debug(
+            "Partition boundary crossed from {} to {} with {} uncommitted 
indices pending quorum",
+            currentLeaderInsertingPartitionIndex,
+            entryPartitionIndex,
+            currentReplicationgIndicesToAckFollowerCount.size());
+      }
+      currentLeaderInsertingPartitionIndex = entryPartitionIndex;
+
+      try {
+        logStore.append(logEntry);
+      } catch (IOException e) {
+        LOGGER.error("Failed to append TRaft log entry {}", 
logEntry.getLogIndex(), e);
+        return RpcUtils.getStatus(
+            TSStatusCode.INTERNAL_SERVER_ERROR, "Failed to persist TRaft log 
entry");
+      }
+      logicalClock.updateAndGet(v -> Math.max(v, logEntry.getLogIndex()));
+      updatePartitionIndexStat(logEntry);
+
+      // Add to every follower's delayed queue; attempt synchronous dispatch.
+      for (Peer follower : followers) {
+        TRaftFollowerInfo info =
+            followerInfoMap.computeIfAbsent(
+                follower.getNodeId(),
+                key -> new TRaftFollowerInfo(getLatestPartitionIndex(), 
logicalClock.get()));
+        info.addDelayedIndex(logEntry.getPartitionIndex(), 
logEntry.getLogIndex());
+        limitDelayedEntriesIfNecessary(info);
+        dispatchNextDelayedEntry(follower, info);
+      }
+
+    } else {
+      // ── Same partition: attempt fast-path replication per follower ────────
+      if (followerCount > 0) {
+        currentReplicationgIndicesToAckFollowerCount.put(
+            logEntry.getLogIndex(), new AtomicInteger(0));
+      }
+
+      List<Peer> delayedFollowers = new ArrayList<>();
+      for (Peer follower : followers) {
+        TRaftFollowerInfo info =
+            followerInfoMap.computeIfAbsent(
+                follower.getNodeId(),
+                key -> new TRaftFollowerInfo(getLatestPartitionIndex(), 
logicalClock.get() + 1));
+
+        if (entryPartitionIndex == info.getCurrentReplicatingPartitionIndex()) 
{
+          // Fast path: record index and send directly.
+          info.addCurrentReplicatingIndex(logEntry.getLogIndex());
+          boolean success = sendEntryToFollower(follower, logEntry);
+          if (success) {
+            info.recordMemoryReplicationSuccess();
+            onFollowerAck(follower, info, logEntry);
+          } else {
+            onFollowerSendFailed(info, logEntry);
+            delayedFollowers.add(follower);
+          }
+        } else if (entryPartitionIndex > 
info.getCurrentReplicatingPartitionIndex()) {
+          // Follower is behind – defer to disk path.
+          delayedFollowers.add(follower);
+        }
+        // entryPartitionIndex < info.getCurrentReplicatingPartitionIndex() 
should not happen.
+      }
+
+      // Persist to disk after in-memory replication attempts.
+      try {
+        logStore.append(logEntry);
+      } catch (IOException e) {
+        LOGGER.error("Failed to append TRaft log entry {}", 
logEntry.getLogIndex(), e);
+        return RpcUtils.getStatus(
+            TSStatusCode.INTERNAL_SERVER_ERROR, "Failed to persist TRaft log 
entry");
+      }
+      logicalClock.updateAndGet(v -> Math.max(v, logEntry.getLogIndex()));
+      updatePartitionIndexStat(logEntry);
+
+      // For delayed followers: enqueue + synchronous disk dispatch.
+      for (Peer follower : delayedFollowers) {
+        TRaftFollowerInfo info = followerInfoMap.get(follower.getNodeId());
+        if (info == null) {
+          continue;
+        }
+        info.addDelayedIndex(logEntry.getPartitionIndex(), 
logEntry.getLogIndex());
+        limitDelayedEntriesIfNecessary(info);
+        dispatchNextDelayedEntry(follower, info);
+      }
+    }
+
+    return localStatus;
+  }
+
+  // 
────────────────────────────────────────────────────────────────────────────
+  // Read path
+  // 
────────────────────────────────────────────────────────────────────────────
+
+  public synchronized DataSet read(IConsensusRequest request) {
+    return stateMachine.read(request);
+  }
+
+  // 
────────────────────────────────────────────────────────────────────────────
+  // Follower RPC handler
+  // 
────────────────────────────────────────────────────────────────────────────
+
+  /**
+   * Receive a replicated log entry from the leader.
+   *
+   * <p><b>Term check:</b>
+   * <ul>
+   *   <li>If the leader's term is less than the follower's current term the 
request is rejected
+   *       so the leader can discover it is stale and step down.
+   *   <li>If the leader's term is greater the follower updates its own term 
and resets its vote.
+   * </ul>
+   *
+   * <p><b>Partition index check (after term is accepted):</b>
+   * <ul>
+   *   <li>If the entry's partition index is <em>less than</em> the follower's 
current maximum
+   *       partition index, the follower is ahead of what the leader is 
sending – this is an
+   *       inconsistency and an error is returned.
+   *   <li>If the entry's partition index is equal to or greater than the 
follower's maximum, the
+   *       entry is accepted, persisted, and applied to the state machine.
+   * </ul>
+   */
+  synchronized TSStatus receiveReplicatedLog(TRaftLogEntry logEntry, int 
newLeaderId, long term) {
+    // ── Term comparison 
──────────────────────────────────────────────────────
+    if (term < currentTerm) {
+      LOGGER.warn(
+          "Rejecting replicated log from leader {} with stale term {} 
(currentTerm={})",
+          newLeaderId,
+          term,
+          currentTerm);
+      return RpcUtils.getStatus(
+          TSStatusCode.WRITE_PROCESS_REJECT,
+          "Leader term " + term + " is stale; follower currentTerm=" + 
currentTerm);
+    }
+    if (term > currentTerm) {
+      currentTerm = term;
+      votedFor = -1;
+    }
+    becomeFollower(newLeaderId);
+
+    // Idempotency: already have this log index.
+    if (logStore.contains(logEntry.getLogIndex())) {
+      return RpcUtils.getStatus(TSStatusCode.SUCCESS_STATUS);
+    }
+
+    // ── Partition index comparison 
───────────────────────────────────────────
+    if (logEntry.getPartitionIndex() < maxPartitionIndex) {
+      // Follower's partition is ahead of what the leader sent – inconsistency.
+      LOGGER.error(
+          "Partition inconsistency on follower {}: entry partitionIndex={}, 
follower maxPartitionIndex={}",
+          thisNode,
+          logEntry.getPartitionIndex(),
+          maxPartitionIndex);
+      return RpcUtils.getStatus(
+          TSStatusCode.INTERNAL_SERVER_ERROR,
+          "Partition index inconsistency: follower maxPartition="
+              + maxPartitionIndex
+              + " > entry partition="
+              + logEntry.getPartitionIndex());
+    }
+
+    // ── Persist and apply 
────────────────────────────────────────────────────
+    try {
+      logStore.append(logEntry);
+    } catch (IOException e) {
+      LOGGER.error("Failed to persist replicated TRaft log {}", 
logEntry.getLogIndex(), e);
+      return RpcUtils.getStatus(TSStatusCode.INTERNAL_SERVER_ERROR, "Failed to 
persist log");
+    }
+
+    logicalClock.updateAndGet(v -> Math.max(v, logEntry.getLogIndex()));
+    updatePartitionIndexStat(logEntry);
+
+    IConsensusRequest deserializedRequest =
+        
stateMachine.deserializeRequest(TRaftRequestParser.buildRequest(logEntry.getData()));
+    deserializedRequest.markAsGeneratedByRemoteConsensusLeader();
+    return stateMachine.write(deserializedRequest);
+  }
+
+  // 
────────────────────────────────────────────────────────────────────────────
+  // Election
+  // 
────────────────────────────────────────────────────────────────────────────
+
+  synchronized TRaftVoteResult requestVote(TRaftVoteRequest voteRequest) {
+    if (voteRequest.getTerm() < currentTerm) {
+      return new TRaftVoteResult(false, currentTerm);
+    }
+    if (voteRequest.getTerm() > currentTerm) {
+      currentTerm = voteRequest.getTerm();
+      votedFor = voteRequest.getCandidateId();
+      becomeFollower(-1);
+      return new TRaftVoteResult(true, currentTerm);
+    }
+    if (votedFor != -1 && votedFor != voteRequest.getCandidateId()) {
+      return new TRaftVoteResult(false, currentTerm);
+    }
+    int freshnessCompareResult =
+        compareCandidateFreshness(
+            voteRequest.getPartitionIndex(),
+            voteRequest.getCurrentPartitionIndexCount(),
+            maxPartitionIndex,
+            currentPartitionIndexCount);
+    boolean shouldVote = freshnessCompareResult > 0;
+    if (!shouldVote && freshnessCompareResult == 0) {
+      shouldVote = random.nextBoolean();
+    }
+    if (!shouldVote) {
+      return new TRaftVoteResult(false, currentTerm);
+    }
+    votedFor = voteRequest.getCandidateId();
+    return new TRaftVoteResult(true, currentTerm);
+  }
+
+  synchronized boolean campaignLeader() {
+    if (!active) {
+      return false;
+    }
+    role = TRaftRole.CANDIDATE;
+    leaderId = -1;
+    currentTerm++;
+    votedFor = thisNode.getNodeId();
+    int grantVotes = 1;
+    TRaftVoteRequest voteRequest =
+        new TRaftVoteRequest(
+            thisNode.getNodeId(), currentTerm, maxPartitionIndex, 
currentPartitionIndexCount);
+    for (Peer follower : getFollowers()) {
+      Optional<TRaftServerImpl> followerServer = 
TRaftNodeRegistry.resolveServer(follower);
+      if (!followerServer.isPresent()) {
+        continue;
+      }
+      TRaftVoteResult voteResult = 
followerServer.get().requestVote(voteRequest);
+      if (voteResult.getTerm() > currentTerm) {
+        currentTerm = voteResult.getTerm();
+        becomeFollower(-1);
+        return false;
+      }
+      if (voteResult.isGranted()) {
+        grantVotes++;
+      }
+    }
+    if (grantVotes > configuration.size() / 2) {
+      becomeLeader();
+      return true;
+    }
+    becomeFollower(-1);
+    return false;
+  }
+
+  // 
────────────────────────────────────────────────────────────────────────────
+  // Configuration management
+  // 
────────────────────────────────────────────────────────────────────────────
+
+  synchronized void transferLeader(Peer newLeader) {
+    if (Objects.equals(newLeader, thisNode)) {
+      campaignLeader();
+      return;
+    }
+    Optional<TRaftServerImpl> targetServer = 
TRaftNodeRegistry.resolveServer(newLeader);
+    if (!targetServer.isPresent()) {
+      return;
+    }
+    targetServer.get().campaignLeader();
+    if (targetServer.get().isLeader()) {
+      becomeFollower(newLeader.getNodeId());
+    }
+  }
+
+  synchronized void resetPeerList(List<Peer> newPeers) throws IOException {
+    configuration.clear();
+    configuration.addAll(newPeers);
+    if (!configuration.contains(thisNode)) {
+      configuration.add(thisNode);
+    }
+    persistConfiguration();
+    electInitialLeader();
+    initFollowerInfoMap();
+    if (started && role == TRaftRole.LEADER) {
+      stopAppenders();
+      initAppenders();
+    }
+  }
+
+  synchronized void addPeer(Peer peer) throws IOException {
+    configuration.add(peer);
+    persistConfiguration();
+    initFollowerInfoMap();
+    if (started && role == TRaftRole.LEADER) {
+      stopAppenders();
+      initAppenders();
+    }
+  }
+
+  synchronized void removePeer(Peer peer) throws IOException {
+    configuration.remove(peer);
+    persistConfiguration();
+    initFollowerInfoMap();
+    if (started && role == TRaftRole.LEADER) {
+      stopAppenders();
+      initAppenders();
+    }
+    if (peer.getNodeId() == leaderId) {
+      electInitialLeader();
+    }
+  }
+
+  // 
────────────────────────────────────────────────────────────────────────────
+  // Background appender: called by TRaftLogAppender every 
waitingReplicationTimeMs
+  // 
────────────────────────────────────────────────────────────────────────────
+
+  /**
+   * Entry point for the per-follower {@link TRaftLogAppender} background 
thread.
+   *
+   * <p>Checks whether the follower has pending delayed entries and, if so, 
attempts to dispatch
+   * the next one from disk. This provides a periodic retry mechanism for 
followers that were
+   * offline during the original write, and ensures that entries written to 
disk during partition
+   * transitions are eventually transmitted.
+   */
+  synchronized void tryReplicateDiskEntriesToFollower(Peer follower) {
+    if (!started || role != TRaftRole.LEADER || !active) {
+      return;
+    }
+    TRaftFollowerInfo info = followerInfoMap.get(follower.getNodeId());
+    if (info == null || !info.hasDelayedEntries() || 
info.hasCurrentReplicatingIndex()) {
+      return;
+    }
+    dispatchNextDelayedEntry(follower, info);
+  }
+
+  // 
────────────────────────────────────────────────────────────────────────────
+  // Accessors
+  // 
────────────────────────────────────────────────────────────────────────────
+
+  synchronized Peer getLeader() {
+    return configuration.stream()
+        .filter(peer -> peer.getNodeId() == leaderId)
+        .findFirst()
+        .orElse(null);
+  }
+
+  synchronized List<Peer> getConfiguration() {
+    return new ArrayList<>(configuration);
+  }
+
+  synchronized long getLogicalClock() {
+    return logicalClock.get();
+  }
+
+  synchronized boolean isLeader() {
+    return role == TRaftRole.LEADER;
+  }
+
+  synchronized boolean isLeaderReady() {
+    return isLeader() && active;
+  }
+
+  synchronized boolean isReadOnly() {
+    return stateMachine.isReadOnly();
+  }
+
+  synchronized boolean isActive() {
+    return active;
+  }
+
+  synchronized void setActive(boolean active) {
+    this.active = active;
+  }
+
+  synchronized long getCurrentPartitionIndexCount() {
+    return currentPartitionIndexCount;
+  }
+
+  synchronized List<TRaftLogEntry> getLogEntries() {
+    return logStore.getAllEntries();
+  }
+
+  synchronized TRaftFollowerInfo getFollowerInfo(int followerNodeId) {
+    return followerInfoMap.get(followerNodeId);
+  }
+
+  synchronized long getLatestPartitionIndex() {
+    TRaftLogEntry last = logStore.getLastEntry();
+    return last == null ? INITIAL_PARTITION_INDEX : last.getPartitionIndex();
+  }
+
+  synchronized long getCurrentLeaderInsertingPartitionIndex() {
+    return currentLeaderInsertingPartitionIndex;
+  }
+
+  synchronized int getPendingQuorumEntryCount() {
+    return currentReplicationgIndicesToAckFollowerCount.size();
+  }
+
+  synchronized void reloadConsensusConfig(TRaftConfig config) {
+    this.config = config;
+  }
+
+  // 
────────────────────────────────────────────────────────────────────────────
+  // Private: log entry construction
+  // 
────────────────────────────────────────────────────────────────────────────
+
+  private TRaftLogEntry buildLogEntry(IConsensusRequest request) {
+    TRaftLogEntry previous = logStore.getLastEntry();
+    long timestamp =
+        TRaftRequestParser.extractTimestamp(
+            request, historicalMaxTimestamp == Long.MIN_VALUE ? 0 : 
historicalMaxTimestamp);
+    long logIndex = previous == null ? 1 : previous.getLogIndex() + 1;
+    long logTerm = currentTerm;
+    long partitionIndex;
+    long interPartitionIndex;
+    long lastPartitionCount;
+    if (previous == null) {
+      partitionIndex = INITIAL_PARTITION_INDEX;
+      interPartitionIndex = INITIAL_PARTITION_INDEX;
+      lastPartitionCount = 0;
+      historicalMaxTimestamp = timestamp;
+    } else if (timestamp < historicalMaxTimestamp) {
+      partitionIndex = previous.getPartitionIndex() + 1;
+      interPartitionIndex = 0;
+      lastPartitionCount = previous.getInterPartitionIndex();
+    } else {
+      partitionIndex = previous.getPartitionIndex();
+      interPartitionIndex = previous.getInterPartitionIndex() + 1;
+      lastPartitionCount = previous.getLastPartitionCount();
+      historicalMaxTimestamp = Math.max(historicalMaxTimestamp, timestamp);
+    }
+    return new TRaftLogEntry(
+        timestamp,
+        partitionIndex,
+        logIndex,
+        logTerm,
+        interPartitionIndex,
+        lastPartitionCount,
+        TRaftRequestParser.extractRawRequest(request));
+  }
+
+  // 
────────────────────────────────────────────────────────────────────────────
+  // Private: follower ACK and quorum tracking
+  // 
────────────────────────────────────────────────────────────────────────────
+
+  /**
+   * Called when a follower successfully receives and acknowledges {@code 
ackEntry}.
+   *
+   * <ol>
+   *   <li>Clears the entry from the follower's in-flight tracking.
+   *   <li>Updates the quorum commit counter for this log index.
+   *   <li>Chains dispatch of the next pending delayed entry for this follower.
+   * </ol>
+   */
+  private void onFollowerAck(Peer follower, TRaftFollowerInfo info, 
TRaftLogEntry ackEntry) {
+    info.removeCurrentReplicatingIndex(ackEntry.getLogIndex());
+    info.removeDelayedIndex(ackEntry.getPartitionIndex(), 
ackEntry.getLogIndex());
+    updateQuorumTracking(ackEntry.getLogIndex());
+    dispatchNextDelayedEntry(follower, info);
+  }
+
+  /**
+   * Increments the ACK counter for {@code logIndex}. When the count reaches 
at least half the
+   * follower count the entry is considered committed and removed from the 
tracking map.
+   */
+  private void updateQuorumTracking(long logIndex) {
+    AtomicInteger counter = 
currentReplicationgIndicesToAckFollowerCount.get(logIndex);
+    if (counter == null) {
+      return;
+    }
+    int count = counter.incrementAndGet();
+    int followerCount = getFollowers().size();
+    // Quorum: ACK count >= half of follower count (per the requirement).
+    // Using count * 2 >= followerCount avoids floating-point and handles all 
sizes correctly.
+    if (followerCount > 0 && count * 2 >= followerCount) {
+      currentReplicationgIndicesToAckFollowerCount.remove(logIndex);
+    }
+  }
+
+  private void onFollowerSendFailed(TRaftFollowerInfo info, TRaftLogEntry 
failedEntry) {
+    info.removeCurrentReplicatingIndex(failedEntry.getLogIndex());
+    info.addDelayedIndex(failedEntry.getPartitionIndex(), 
failedEntry.getLogIndex());
+    limitDelayedEntriesIfNecessary(info);
+  }
+
+  // 
────────────────────────────────────────────────────────────────────────────
+  // Private: disk dispatch
+  // 
────────────────────────────────────────────────────────────────────────────
+
+  /**
+   * Attempts to dispatch the next pending delayed log entry for {@code 
follower} from the local
+   * disk. After a successful dispatch the chain continues via {@link 
#onFollowerAck} so that
+   * subsequent delayed entries in the same (or an advanced) partition are 
sent without waiting
+   * for the next appender cycle.
+   */
+  private void dispatchNextDelayedEntry(Peer follower, TRaftFollowerInfo info) 
{
+    if (info.hasCurrentReplicatingIndex()) {
+      return;
+    }
+
+    // Advance the follower's current replicating partition if possible.
+    Long firstDelayedPartition = info.getFirstDelayedPartitionIndex();
+    if (firstDelayedPartition == null) {
+      return;
+    }
+    if (firstDelayedPartition > info.getCurrentReplicatingPartitionIndex()) {
+      info.setCurrentReplicatingPartitionIndex(firstDelayedPartition);
+    }
+
+    Long delayedIndex =
+        
info.getFirstDelayedIndexOfPartition(info.getCurrentReplicatingPartitionIndex());
+    if (delayedIndex == null) {
+      return;
+    }
+    info.setNextPartitionFirstIndex(delayedIndex);
+
+    TRaftLogEntry diskEntry = logStore.getByIndex(delayedIndex);
+    if (diskEntry == null) {
+      // Entry not yet on disk (race between write path and appender); retry 
next cycle.
+      return;
+    }
+
+    info.addCurrentReplicatingIndex(diskEntry.getLogIndex());
+    boolean success = sendEntryToFollower(follower, diskEntry);
+    if (success) {
+      info.recordDiskReplicationSuccess();
+      // ACK handling: remove from in-flight, update quorum, chain next 
dispatch.
+      info.removeCurrentReplicatingIndex(diskEntry.getLogIndex());
+      info.removeDelayedIndex(diskEntry.getPartitionIndex(), 
diskEntry.getLogIndex());
+      updateQuorumTracking(diskEntry.getLogIndex());
+      // Recurse into the chain until there is nothing left or a send fails.
+      dispatchNextDelayedEntry(follower, info);
+    } else {
+      info.removeCurrentReplicatingIndex(diskEntry.getLogIndex());
+    }
+  }
+
+  // 
────────────────────────────────────────────────────────────────────────────
+  // Private: RPC helpers
+  // 
────────────────────────────────────────────────────────────────────────────
+
+  private boolean sendEntryToFollower(Peer follower, TRaftLogEntry entry) {
+    Optional<TRaftServerImpl> followerServer = 
TRaftNodeRegistry.resolveServer(follower);
+    if (!followerServer.isPresent()) {
+      return false;
+    }
+    TSStatus status =
+        followerServer
+            .get()
+            .receiveReplicatedLog(entry.copy(), thisNode.getNodeId(), 
currentTerm);
+    return isSuccess(status);
+  }
+
+  // 
────────────────────────────────────────────────────────────────────────────
+  // Private: misc helpers
+  // 
────────────────────────────────────────────────────────────────────────────
+
+  private void limitDelayedEntriesIfNecessary(TRaftFollowerInfo info) {
+    if (info.delayedEntryCount()
+        <= config.getReplication().getMaxPendingRetryEntriesPerFollower()) {
+      return;
+    }
+    LOGGER.warn(
+        "Delayed TRaft entries for a follower exceed limit {}. Current count: 
{}",
+        config.getReplication().getMaxPendingRetryEntriesPerFollower(),
+        info.delayedEntryCount());
+  }
+
+  private List<Peer> getFollowers() {
+    List<Peer> followers = new ArrayList<>();
+    for (Peer peer : configuration) {
+      if (peer.getNodeId() != thisNode.getNodeId()) {
+        followers.add(peer);
+      }
+    }
+    return followers;
+  }
+
+  // 
────────────────────────────────────────────────────────────────────────────
+  // Private: follower info init
+  // 
────────────────────────────────────────────────────────────────────────────
+
+  private void initFollowerInfoMap() {
+    followerInfoMap.clear();
+    long latestPartitionIndex = getLatestPartitionIndex();
+    long nextLogIndex = logicalClock.get() + 1;
+    for (Peer follower : getFollowers()) {
+      followerInfoMap.put(
+          follower.getNodeId(), new TRaftFollowerInfo(latestPartitionIndex, 
nextLogIndex));
+    }
+  }
+
+  // 
────────────────────────────────────────────────────────────────────────────
+  // Private: appender lifecycle
+  // 
────────────────────────────────────────────────────────────────────────────
+
+  private void initAppenders() {
+    if (appenderExecutor == null || appenderExecutor.isShutdown()) {
+      appenderExecutor = Executors.newCachedThreadPool(
+          r -> {
+            Thread t = new Thread(r);
+            t.setName("TRaftAppender-" + thisNode.getGroupId());
+            t.setDaemon(true);
+            return t;
+          });
+    }
+    long waitMs = config.getReplication().getWaitingReplicationTimeMs();
+    for (Peer follower : getFollowers()) {
+      TRaftLogAppender appender = new TRaftLogAppender(this, follower, waitMs);
+      appenderMap.put(follower.getNodeId(), appender);
+      appenderExecutor.submit(appender);
+    }
+  }
+
+  private void stopAppenders() {
+    // Signal all appenders to stop, then interrupt their threads via 
shutdownNow().
+    // We do NOT block here: stopAppenders() is called while holding the 
server lock, and appender
+    // threads also need that lock for tryReplicateDiskEntriesToFollower(). 
Waiting here would
+    // deadlock. Instead, the guard in tryReplicateDiskEntriesToFollower() 
(started == false)
+    // ensures appenders become no-ops as soon as they wake from their sleep, 
and they exit on
+    // the next loop iteration once they detect stopped==true or an 
InterruptedException.
+    appenderMap.values().forEach(TRaftLogAppender::stop);
+    appenderMap.clear();
+    if (appenderExecutor != null) {
+      appenderExecutor.shutdownNow();
+      appenderExecutor = null;
+    }
+  }
+
+  // 
────────────────────────────────────────────────────────────────────────────
+  // Private: leader/follower transitions
+  // 
────────────────────────────────────────────────────────────────────────────
+
+  private void electInitialLeader() {
+    leaderId =
+        configuration.stream()
+            .map(Peer::getNodeId)
+            .max(Integer::compareTo)
+            .orElse(thisNode.getNodeId());
+    if (leaderId == thisNode.getNodeId()) {
+      role = TRaftRole.LEADER;
+    } else {
+      role = TRaftRole.FOLLOWER;
+    }
+  }
+
+  private void becomeLeader() {
+    role = TRaftRole.LEADER;
+    leaderId = thisNode.getNodeId();
+    votedFor = thisNode.getNodeId();
+    currentReplicationgIndicesToAckFollowerCount.clear();
+    initFollowerInfoMap();
+    if (started) {
+      stopAppenders();
+      initAppenders();
+      notifyLeaderChanged();
+    }
+  }
+
+  private void becomeFollower(int newLeaderId) {
+    boolean wasLeader = (role == TRaftRole.LEADER);
+    role = TRaftRole.FOLLOWER;
+    leaderId = newLeaderId;
+    if (wasLeader) {
+      stopAppenders();
+    }
+    if (started) {
+      notifyLeaderChanged();
+    }
+  }
+
+  private void notifyLeaderChanged() {
+    if (leaderId != -1) {
+      stateMachine.event().notifyLeaderChanged(thisNode.getGroupId(), 
leaderId);
+    }
+    if (role == TRaftRole.LEADER) {
+      stateMachine.event().notifyLeaderReady();
+    } else {
+      stateMachine.event().notifyNotLeader();
+    }
+  }
+
+  // 
────────────────────────────────────────────────────────────────────────────
+  // Private: recovery
+  // 
────────────────────────────────────────────────────────────────────────────
+
+  private void recoverFromDisk() {
+    List<TRaftLogEntry> entries = logStore.getAllEntries();
+    for (TRaftLogEntry entry : entries) {
+      logicalClock.updateAndGet(v -> Math.max(v, entry.getLogIndex()));
+      currentTerm = Math.max(currentTerm, entry.getLogTerm());
+      historicalMaxTimestamp = Math.max(historicalMaxTimestamp, 
entry.getTimestamp());
+      updatePartitionIndexStat(entry);
+    }
+    // On recovery, the inserting partition pointer starts at the maximum 
persisted partition.
+    currentLeaderInsertingPartitionIndex = maxPartitionIndex;
+  }
+
+  private void updatePartitionIndexStat(TRaftLogEntry entry) {
+    if (entry.getPartitionIndex() > maxPartitionIndex) {
+      maxPartitionIndex = entry.getPartitionIndex();
+      currentPartitionIndexCount = 1;
+      return;
+    }
+    if (entry.getPartitionIndex() == maxPartitionIndex) {
+      currentPartitionIndexCount++;
+    }
+  }
+
+  // 
────────────────────────────────────────────────────────────────────────────
+  // Private: configuration persistence
+  // 
────────────────────────────────────────────────────────────────────────────
+
+  private TreeSet<Peer> loadConfigurationFromDisk() throws IOException {
+    TreeSet<Peer> peers = new TreeSet<>();
+    File file = new File(storageDir, CONFIGURATION_FILE_NAME);
+    if (!file.exists()) {
+      return peers;
+    }
+    try (BufferedReader reader = Files.newBufferedReader(file.toPath(), 
StandardCharsets.UTF_8)) {
+      String line;
+      while ((line = reader.readLine()) != null) {
+        String[] split = line.split(",", 3);
+        if (split.length != 3) {
+          continue;
+        }
+        int nodeId = Integer.parseInt(split[0]);
+        String ip = split[1];
+        int port = Integer.parseInt(split[2]);
+        peers.add(new Peer(thisNode.getGroupId(), nodeId, new TEndPoint(ip, 
port)));
+      }
+    }
+    return peers;
+  }
+
+  private void persistConfiguration() throws IOException {
+    File file = new File(storageDir, CONFIGURATION_FILE_NAME);
+    try (BufferedWriter writer = Files.newBufferedWriter(file.toPath(), 
StandardCharsets.UTF_8)) {
+      for (Peer peer : configuration) {
+        writer.write(
+            String.format(
+                "%s,%s,%s",
+                peer.getNodeId(), peer.getEndpoint().getIp(), 
peer.getEndpoint().getPort()));
+        writer.newLine();
+      }
+    }
+  }
+
+  // 
────────────────────────────────────────────────────────────────────────────
+  // Private: election utilities
+  // 
────────────────────────────────────────────────────────────────────────────
+
+  private int compareCandidateFreshness(
+      long candidatePartitionIndex,
+      long candidatePartitionCount,
+      long localPartitionIndex,
+      long localPartitionCount) {
+    int partitionCompare = Long.compare(candidatePartitionIndex, 
localPartitionIndex);
+    if (partitionCompare != 0) {
+      return partitionCompare;
+    }
+    return Long.compare(candidatePartitionCount, localPartitionCount);
+  }
+
+  private boolean isSuccess(TSStatus status) {
+    return status != null
+        && (status.getCode() == 0
+            || status.getCode() == 
TSStatusCode.SUCCESS_STATUS.getStatusCode());
+  }
+}
diff --git 
a/iotdb-core/consensus/src/main/java/org/apache/iotdb/consensus/common/request/IConsensusRequest.java
 
b/iotdb-core/consensus/src/main/java/org/apache/iotdb/consensus/traft/TRaftVoteRequest.java
similarity index 51%
copy from 
iotdb-core/consensus/src/main/java/org/apache/iotdb/consensus/common/request/IConsensusRequest.java
copy to 
iotdb-core/consensus/src/main/java/org/apache/iotdb/consensus/traft/TRaftVoteRequest.java
index cae54c2bcbc..9393f518804 100644
--- 
a/iotdb-core/consensus/src/main/java/org/apache/iotdb/consensus/common/request/IConsensusRequest.java
+++ 
b/iotdb-core/consensus/src/main/java/org/apache/iotdb/consensus/traft/TRaftVoteRequest.java
@@ -17,31 +17,35 @@
  * under the License.
  */
 
-package org.apache.iotdb.consensus.common.request;
-
-import java.nio.ByteBuffer;
-
-public interface IConsensusRequest {
-  /**
-   * Serialize all the data to a ByteBuffer.
-   *
-   * <p>In a specific implementation, ByteBuf or PublicBAOS can be used to 
reduce the number of
-   * memory copies.
-   *
-   * <p>To improve efficiency, a specific implementation could return a 
DirectByteBuffer to reduce
-   * the memory copy required to send an RPC
-   *
-   * <p>Note: The implementation needs to ensure that the data in the returned 
Bytebuffer cannot be
-   * changed or an error may occur
-   */
-  ByteBuffer serializeToByteBuffer();
-
-  default long getMemorySize() {
-    // return 0 by default
-    return 0;
+package org.apache.iotdb.consensus.traft;
+
+class TRaftVoteRequest {
+
+  private final int candidateId;
+  private final long term;
+  private final long partitionIndex;
+  private final long currentPartitionIndexCount;
+
+  TRaftVoteRequest(int candidateId, long term, long partitionIndex, long 
currentPartitionIndexCount) {
+    this.candidateId = candidateId;
+    this.term = term;
+    this.partitionIndex = partitionIndex;
+    this.currentPartitionIndexCount = currentPartitionIndexCount;
+  }
+
+  int getCandidateId() {
+    return candidateId;
+  }
+
+  long getTerm() {
+    return term;
+  }
+
+  long getPartitionIndex() {
+    return partitionIndex;
   }
 
-  default void markAsGeneratedByRemoteConsensusLeader() {
-    // do nothing by default
+  long getCurrentPartitionIndexCount() {
+    return currentPartitionIndexCount;
   }
 }
diff --git 
a/iotdb-core/consensus/src/main/java/org/apache/iotdb/consensus/common/request/IConsensusRequest.java
 
b/iotdb-core/consensus/src/main/java/org/apache/iotdb/consensus/traft/TRaftVoteResult.java
similarity index 51%
copy from 
iotdb-core/consensus/src/main/java/org/apache/iotdb/consensus/common/request/IConsensusRequest.java
copy to 
iotdb-core/consensus/src/main/java/org/apache/iotdb/consensus/traft/TRaftVoteResult.java
index cae54c2bcbc..6e905c3f88e 100644
--- 
a/iotdb-core/consensus/src/main/java/org/apache/iotdb/consensus/common/request/IConsensusRequest.java
+++ 
b/iotdb-core/consensus/src/main/java/org/apache/iotdb/consensus/traft/TRaftVoteResult.java
@@ -17,31 +17,23 @@
  * under the License.
  */
 
-package org.apache.iotdb.consensus.common.request;
+package org.apache.iotdb.consensus.traft;
 
-import java.nio.ByteBuffer;
+class TRaftVoteResult {
 
-public interface IConsensusRequest {
-  /**
-   * Serialize all the data to a ByteBuffer.
-   *
-   * <p>In a specific implementation, ByteBuf or PublicBAOS can be used to 
reduce the number of
-   * memory copies.
-   *
-   * <p>To improve efficiency, a specific implementation could return a 
DirectByteBuffer to reduce
-   * the memory copy required to send an RPC
-   *
-   * <p>Note: The implementation needs to ensure that the data in the returned 
Bytebuffer cannot be
-   * changed or an error may occur
-   */
-  ByteBuffer serializeToByteBuffer();
+  private final boolean granted;
+  private final long term;
 
-  default long getMemorySize() {
-    // return 0 by default
-    return 0;
+  TRaftVoteResult(boolean granted, long term) {
+    this.granted = granted;
+    this.term = term;
   }
 
-  default void markAsGeneratedByRemoteConsensusLeader() {
-    // do nothing by default
+  boolean isGranted() {
+    return granted;
+  }
+
+  long getTerm() {
+    return term;
   }
 }
diff --git 
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/consensus/DataRegionConsensusImpl.java
 
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/consensus/DataRegionConsensusImpl.java
index a33cdd11024..7481a261fef 100644
--- 
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/consensus/DataRegionConsensusImpl.java
+++ 
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/consensus/DataRegionConsensusImpl.java
@@ -38,6 +38,7 @@ import org.apache.iotdb.consensus.config.PipeConsensusConfig;
 import org.apache.iotdb.consensus.config.PipeConsensusConfig.ReplicateMode;
 import org.apache.iotdb.consensus.config.RatisConfig;
 import org.apache.iotdb.consensus.config.RatisConfig.Snapshot;
+import org.apache.iotdb.consensus.config.TRaftConfig;
 import org.apache.iotdb.db.conf.DataNodeMemoryConfig;
 import org.apache.iotdb.db.conf.IoTDBConfig;
 import org.apache.iotdb.db.conf.IoTDBDescriptor;
@@ -286,6 +287,7 @@ public class DataRegionConsensusImpl {
                                   CONF.getConnectionTimeoutInMS(), 
TimeUnit.MILLISECONDS))
                           .build())
                   .build())
+          .setTRaftConfig(TRaftConfig.newBuilder().build())
           .build();
     }
   }
diff --git 
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/planner/plan/node/pipe/PipeEnrichedInsertNode.java
 
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/planner/plan/node/pipe/PipeEnrichedInsertNode.java
index 2e517700217..369324b0850 100644
--- 
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/planner/plan/node/pipe/PipeEnrichedInsertNode.java
+++ 
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/planner/plan/node/pipe/PipeEnrichedInsertNode.java
@@ -259,6 +259,16 @@ public class PipeEnrichedInsertNode extends InsertNode {
     return insertNode.getMinTime();
   }
 
+  @Override
+  public boolean hasTime() {
+    return insertNode.hasTime();
+  }
+
+  @Override
+  public long getTime() {
+    return insertNode.getTime();
+  }
+
   @Override
   public void markFailedMeasurement(final int index) {
     insertNode.markFailedMeasurement(index);
diff --git 
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/planner/plan/node/write/InsertNode.java
 
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/planner/plan/node/write/InsertNode.java
index 88a6faa0047..021317b496d 100644
--- 
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/planner/plan/node/write/InsertNode.java
+++ 
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/planner/plan/node/write/InsertNode.java
@@ -239,6 +239,7 @@ public abstract class InsertNode extends SearchNode {
     switch (config.getDataRegionConsensusProtocolClass()) {
       case ConsensusFactory.IOT_CONSENSUS:
       case ConsensusFactory.IOT_CONSENSUS_V2:
+      case ConsensusFactory.TRAFT_CONSENSUS:
       case ConsensusFactory.RATIS_CONSENSUS:
         return isGeneratedByRemoteConsensusLeader;
       case ConsensusFactory.SIMPLE_CONSENSUS:
@@ -315,6 +316,11 @@ public abstract class InsertNode extends SearchNode {
 
   public abstract long getMinTime();
 
+  @Override
+  public boolean hasTime() {
+    return true;
+  }
+
   // region partial insert
   public void markFailedMeasurement(int index) {
     throw new UnsupportedOperationException();
diff --git 
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/planner/plan/node/write/InsertRowNode.java
 
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/planner/plan/node/write/InsertRowNode.java
index a2bd6cb1a00..57115b8061a 100644
--- 
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/planner/plan/node/write/InsertRowNode.java
+++ 
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/planner/plan/node/write/InsertRowNode.java
@@ -208,6 +208,7 @@ public class InsertRowNode extends InsertNode implements 
WALEntryValue {
     this.values = values;
   }
 
+  @Override
   public long getTime() {
     return time;
   }
diff --git 
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/planner/plan/node/write/InsertRowsNode.java
 
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/planner/plan/node/write/InsertRowsNode.java
index 7392b761270..83c35482d90 100644
--- 
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/planner/plan/node/write/InsertRowsNode.java
+++ 
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/planner/plan/node/write/InsertRowsNode.java
@@ -301,6 +301,11 @@ public class InsertRowsNode extends InsertNode implements 
WALEntryValue {
     return visitor.visitInsertRows(this, context);
   }
 
+  @Override
+  public long getTime() {
+    return getMinTime();
+  }
+
   @Override
   public long getMinTime() {
     return insertRowNodeList.stream()
diff --git 
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/planner/plan/node/write/InsertTabletNode.java
 
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/planner/plan/node/write/InsertTabletNode.java
index 39683e5d9f9..7fb2454624c 100644
--- 
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/planner/plan/node/write/InsertTabletNode.java
+++ 
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/planner/plan/node/write/InsertTabletNode.java
@@ -449,6 +449,11 @@ public class InsertTabletNode extends InsertNode 
implements WALEntryValue {
     columns[index] = null;
   }
 
+  @Override
+  public long getTime() {
+    return getMinTime();
+  }
+
   @Override
   public long getMinTime() {
     return times[0];
diff --git 
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/planner/plan/node/write/RelationalInsertRowNode.java
 
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/planner/plan/node/write/RelationalInsertRowNode.java
index 08c78328732..1df485fffea 100644
--- 
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/planner/plan/node/write/RelationalInsertRowNode.java
+++ 
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/planner/plan/node/write/RelationalInsertRowNode.java
@@ -108,6 +108,11 @@ public class RelationalInsertRowNode extends InsertRowNode 
{
     return visitor.visitRelationalInsertRow(this, context);
   }
 
+  @Override
+  public long getTime() {
+    return super.getTime();
+  }
+
   public static RelationalInsertRowNode deserialize(ByteBuffer byteBuffer) {
     RelationalInsertRowNode insertNode = new RelationalInsertRowNode(new 
PlanNodeId(""));
     insertNode.subDeserialize(byteBuffer);
diff --git 
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/planner/plan/node/write/RelationalInsertRowsNode.java
 
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/planner/plan/node/write/RelationalInsertRowsNode.java
index 594ccf50471..f316177e263 100644
--- 
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/planner/plan/node/write/RelationalInsertRowsNode.java
+++ 
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/planner/plan/node/write/RelationalInsertRowsNode.java
@@ -79,6 +79,12 @@ public class RelationalInsertRowsNode extends InsertRowsNode 
{
     return visitor.visitRelationalInsertRows(this, context);
   }
 
+  @Override
+  public long getTime() {
+    return super.getTime();
+  }
+
+
   public static RelationalInsertRowsNode deserialize(ByteBuffer byteBuffer) {
     PlanNodeId planNodeId;
     List<InsertRowNode> insertRowNodeList = new ArrayList<>();
diff --git 
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/planner/plan/node/write/RelationalInsertTabletNode.java
 
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/planner/plan/node/write/RelationalInsertTabletNode.java
index 257f691e4a7..01ae91d64e5 100644
--- 
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/planner/plan/node/write/RelationalInsertTabletNode.java
+++ 
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/planner/plan/node/write/RelationalInsertTabletNode.java
@@ -175,6 +175,11 @@ public class RelationalInsertTabletNode extends 
InsertTabletNode {
     return visitor.visitRelationalInsertTablet(this, context);
   }
 
+  @Override
+  public long getTime() {
+    return super.getTime();
+  }
+
   @Override
   protected InsertTabletNode getEmptySplit(int count) {
     long[] subTimes = new long[count];
diff --git 
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/service/DataNode.java 
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/service/DataNode.java
index 2d7cbe18358..03c06136d5a 100644
--- 
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/service/DataNode.java
+++ 
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/service/DataNode.java
@@ -240,7 +240,7 @@ public class DataNode extends ServerCommandLine implements 
DataNodeMBean {
       if (isFirstStart) {
         sendRegisterRequestToConfigNode(true);
         
IoTDBStartCheck.getInstance().generateOrOverwriteSystemPropertiesFile();
-        IoTDBStartCheck.getInstance().serializeEncryptMagicString();
+//        IoTDBStartCheck.getInstance().serializeEncryptMagicString();
         ConfigNodeInfo.getInstance().storeConfigNodeList();
         // Register this DataNode to the cluster when first start
         sendRegisterRequestToConfigNode(false);

Reply via email to