This is an automated email from the ASF dual-hosted git repository.

hxpserein pushed a commit to branch research/airreplication
in repository https://gitbox.apache.org/repos/asf/iotdb.git


The following commit(s) were added to refs/heads/research/airreplication by 
this push:
     new f9e05a4a44d system design for air-replication (#17012)
f9e05a4a44d is described below

commit f9e05a4a44d5eb7ffb343add549158a73377c4b8
Author: Gewu <[email protected]>
AuthorDate: Mon Jan 12 18:13:25 2026 +0800

    system design for air-replication (#17012)
---
 .../iotdb/db/airreplication/AirReplication.java    | 600 +++++++++++++++++
 .../airreplication/AirReplicationPeerManager.java  |  73 +++
 .../airreplication/AirReplicationServerImpl.java   | 716 +++++++++++++++++++++
 .../iotdb/db/airreplication/PipeConsensus.java     | 600 +++++++++++++++++
 .../airreplication/AirReplicationDispatcher.java   |  42 ++
 .../airreplication/AirReplicationGuardian.java     |  26 +
 .../airreplication/AirReplicationManager.java      | 157 +++++
 .../airreplication/AirReplicationName.java         |  98 +++
 .../airreplication/AirReplicationReceiver.java     |  30 +
 .../airreplication/AirReplicationSelector.java     |  28 +
 .../airreplication/AirReplicationSink.java         |  25 +
 .../airreplication/ReplicateProgressManager.java   |  37 ++
 .../metric/AirReplicationServerMetrics.java        | 190 ++++++
 .../metric/AirReplicationSyncLagManager.java       | 157 +++++
 .../service/AirReplicationRPCService.java          | 106 +++
 .../service/AirReplicationRPCServiceHandler.java   |  51 ++
 .../service/AirReplicationRPCServiceMBean.java     |  22 +
 .../service/AirReplicationRPCServiceProcessor.java | 225 +++++++
 18 files changed, 3183 insertions(+)

diff --git 
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/airreplication/AirReplication.java
 
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/airreplication/AirReplication.java
new file mode 100644
index 00000000000..77b217625b6
--- /dev/null
+++ 
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/airreplication/AirReplication.java
@@ -0,0 +1,600 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.iotdb.consensus.air;
+
+import org.apache.iotdb.common.rpc.thrift.TEndPoint;
+import org.apache.iotdb.common.rpc.thrift.TSStatus;
+import org.apache.iotdb.commons.client.IClientManager;
+import org.apache.iotdb.commons.client.async.AsyncAirReplicationServiceClient;
+import org.apache.iotdb.commons.client.sync.SyncAirReplicationServiceClient;
+import org.apache.iotdb.commons.consensus.ConsensusGroupId;
+import 
org.apache.iotdb.commons.consensus.iotv2.container.IoTV2GlobalComponentContainer;
+import org.apache.iotdb.commons.exception.StartupException;
+import org.apache.iotdb.commons.pipe.agent.task.meta.PipeStatus;
+import org.apache.iotdb.commons.service.RegisterManager;
+import org.apache.iotdb.commons.utils.FileUtils;
+import org.apache.iotdb.commons.utils.KillPoint.DataNodeKillPoints;
+import 
org.apache.iotdb.commons.utils.KillPoint.IoTConsensusDeleteLocalPeerKillPoints;
+import 
org.apache.iotdb.commons.utils.KillPoint.IoTConsensusRemovePeerCoordinatorKillPoints;
+import org.apache.iotdb.commons.utils.KillPoint.KillPoint;
+import org.apache.iotdb.commons.utils.StatusUtils;
+import org.apache.iotdb.consensus.IConsensus;
+import org.apache.iotdb.consensus.IStateMachine;
+import org.apache.iotdb.consensus.common.DataSet;
+import org.apache.iotdb.consensus.common.Peer;
+import org.apache.iotdb.consensus.common.request.IConsensusRequest;
+import org.apache.iotdb.consensus.config.ConsensusConfig;
+import org.apache.iotdb.consensus.config.AirReplicationConfig;
+import org.apache.iotdb.consensus.exception.ConsensusException;
+import 
org.apache.iotdb.consensus.exception.ConsensusGroupAlreadyExistException;
+import org.apache.iotdb.consensus.exception.ConsensusGroupModifyPeerException;
+import org.apache.iotdb.consensus.exception.ConsensusGroupNotExistException;
+import org.apache.iotdb.consensus.exception.IllegalPeerEndpointException;
+import org.apache.iotdb.consensus.exception.IllegalPeerNumException;
+import 
org.apache.iotdb.consensus.exception.PeerAlreadyInConsensusGroupException;
+import org.apache.iotdb.consensus.exception.PeerNotInConsensusGroupException;
+import org.apache.iotdb.consensus.air.airreplication.AirReplicationGuardian;
+import org.apache.iotdb.consensus.air.airreplication.AirReplicationManager;
+import org.apache.iotdb.consensus.air.airreplication.AirReplicationName;
+import org.apache.iotdb.consensus.air.service.AirReplicationRPCService;
+import 
org.apache.iotdb.consensus.air.service.AirReplicationRPCServiceProcessor;
+import org.apache.iotdb.rpc.RpcUtils;
+import org.apache.iotdb.rpc.TSStatusCode;
+
+import com.google.common.collect.ImmutableList;
+import com.google.common.collect.ImmutableMap;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import java.io.File;
+import java.io.IOException;
+import java.nio.file.DirectoryStream;
+import java.nio.file.Files;
+import java.nio.file.Path;
+import java.util.ArrayList;
+import java.util.Collections;
+import java.util.List;
+import java.util.Map;
+import java.util.Optional;
+import java.util.concurrent.CancellationException;
+import java.util.concurrent.CompletableFuture;
+import java.util.concurrent.ConcurrentHashMap;
+import java.util.concurrent.ExecutionException;
+import java.util.concurrent.Future;
+import java.util.concurrent.locks.Lock;
+import java.util.concurrent.locks.ReentrantLock;
+import java.util.concurrent.locks.ReentrantReadWriteLock;
+import java.util.function.BiConsumer;
+import java.util.stream.Collectors;
+
+public class AirReplication implements IConsensus {
+  private static final String REPLICATION_AIR_GUARDIAN_TASK_ID = 
"replication_air_guardian";
+  private static final String CLASS_NAME = 
AirReplication.class.getSimpleName();
+  private static final Logger LOGGER = 
LoggerFactory.getLogger(AirReplication.class);
+
+  private final TEndPoint thisNode;
+  private final int thisNodeId;
+  private final File storageDir;
+  private final IStateMachine.Registry registry;
+  private final Map<ConsensusGroupId, AirReplicationServerImpl> 
stateMachineMap =
+      new ConcurrentHashMap<>();
+  private final AirReplicationRPCService rpcService;
+  private final RegisterManager registerManager = new RegisterManager();
+  private final Map<ConsensusGroupId, ReentrantLock> 
replicationGroupIdReentrantLockMap =
+      new ConcurrentHashMap<>();
+  private final ReentrantReadWriteLock stateMachineMapLock = new 
ReentrantReadWriteLock();
+  private final AirReplicationConfig config;
+  private final AirReplicationManager airReplicationManager;
+  private final AirReplicationGuardian airReplicationGuardian;
+  private final IClientManager<TEndPoint, AsyncAirReplicationServiceClient> 
asyncClientManager;
+  private final IClientManager<TEndPoint, SyncAirReplicationServiceClient> 
syncClientManager;
+  private Map<ConsensusGroupId, List<Peer>> correctPeerListBeforeStart = null;
+
+  public AirReplication(ConsensusConfig config, IStateMachine.Registry 
registry) {
+    this.thisNode = config.getThisNodeEndPoint();
+    this.thisNodeId = config.getThisNodeId();
+    this.storageDir = new File(config.getStorageDir());
+    this.config = config.getAirReplicationConfig();
+    this.registry = registry;
+    this.rpcService = new AirReplicationRPCService(thisNode, 
config.getAirReplicationConfig());
+    this.airReplicationManager =
+        new AirReplicationManager(
+            config.getAirReplicationConfig().getAir(),
+            config.getAirReplicationConfig().getReplicateMode());
+    this.airReplicationGuardian =
+        config.getAirReplicationConfig().getAir().getAirReplicationGuardian();
+    this.asyncClientManager =
+        
IoTV2GlobalComponentContainer.getInstance().getGlobalAsyncClientManager();
+    this.syncClientManager =
+        
IoTV2GlobalComponentContainer.getInstance().getGlobalSyncClientManager();
+  }
+
+  @Override
+  public synchronized void start() throws IOException {
+    Future<Void> recoverFuture = initAndRecover();
+
+    rpcService.initSyncedServiceImpl(new 
AirReplicationRPCServiceProcessor(this, config.getAir()));
+    try {
+      registerManager.register(rpcService);
+    } catch (StartupException e) {
+      throw new IOException(e);
+    }
+
+    try {
+      recoverFuture.get();
+    } catch (CancellationException ce) {
+      LOGGER.info("IoTV2 Recover Task is cancelled", ce);
+    } catch (ExecutionException ee) {
+      LOGGER.error("Exception while waiting for recover future completion", 
ee);
+    } catch (InterruptedException ie) {
+      Thread.currentThread().interrupt();
+      LOGGER.warn("IoTV2 Recover Task is interrupted", ie);
+    }
+    // only when we recover all replication group can we launch async backend 
checker thread
+    airReplicationGuardian.start(
+        REPLICATION_AIR_GUARDIAN_TASK_ID,
+        this::checkAllAirReplication,
+        config.getAir().getAirReplicationGuardJobIntervalInSeconds());
+  }
+
+  private Future<Void> initAndRecover() throws IOException {
+    if (!storageDir.exists()) {
+      // init
+      if (!storageDir.mkdirs()) {
+        LOGGER.warn("Unable to create replication dir at {}", storageDir);
+        throw new IOException(String.format("Unable to create replication dir 
at %s", storageDir));
+      }
+      return CompletableFuture.completedFuture(null);
+    } else {
+      // asynchronously recover, retry logic is implemented at 
AirReplicationImpl
+      return CompletableFuture.runAsync(
+              () -> {
+                try (DirectoryStream<Path> stream = 
Files.newDirectoryStream(storageDir.toPath())) {
+                  for (Path path : stream) {
+                    ConsensusGroupId consensusGroupId =
+                        parsePeerFileName(path.getFileName().toString());
+                    try {
+                      AirReplicationServerImpl replication =
+                          new AirReplicationServerImpl(
+                              new Peer(consensusGroupId, thisNodeId, thisNode),
+                              registry.apply(consensusGroupId),
+                              new ArrayList<>(),
+                              config,
+                              airReplicationManager,
+                              syncClientManager);
+                      stateMachineMap.put(consensusGroupId, replication);
+                      checkPeerListAndStartIfEligible(consensusGroupId, 
replication);
+                    } catch (Exception e) {
+                      LOGGER.error(
+                          "Failed to recover replication from {} for {}, 
ignore it and continue recover other group, async backend checker thread will 
automatically deregister related air side effects for this failed replication 
group.",
+                          storageDir,
+                          consensusGroupId,
+                          e);
+                    }
+                  }
+                } catch (IOException e) {
+                  LOGGER.error(
+                      "Failed to recover replication from {} because read dir 
failed", storageDir, e);
+                }
+              })
+          .exceptionally(
+              e -> {
+                LOGGER.error("Failed to recover replication from {}", 
storageDir, e);
+                return null;
+              });
+    }
+  }
+
+  private void checkPeerListAndStartIfEligible(
+      ConsensusGroupId consensusGroupId, AirReplicationServerImpl replication) 
throws IOException {
+    BiConsumer<ConsensusGroupId, List<Peer>> resetPeerListWithoutThrow =
+        (dataRegionId, peers) -> {
+          try {
+            resetPeerList(dataRegionId, peers);
+          } catch (ConsensusGroupNotExistException ignore) {
+
+          } catch (Exception e) {
+            LOGGER.warn("Failed to reset peer list while start", e);
+          }
+        };
+
+    if (correctPeerListBeforeStart != null) {
+      if (correctPeerListBeforeStart.containsKey(consensusGroupId)) {
+        // make peers which are in list correct
+        resetPeerListWithoutThrow.accept(
+            consensusGroupId, 
correctPeerListBeforeStart.get(consensusGroupId));
+        replication.start(true);
+      } else {
+        // clear peers which are not in the list
+        resetPeerListWithoutThrow.accept(consensusGroupId, 
Collections.emptyList());
+      }
+
+    } else {
+      replication.start(true);
+    }
+  }
+
+  @Override
+  public synchronized void stop() {
+    asyncClientManager.close();
+    syncClientManager.close();
+    registerManager.deregisterAll();
+    airReplicationGuardian.stop();
+    
stateMachineMap.values().parallelStream().forEach(AirReplicationServerImpl::stop);
+    IoTV2GlobalComponentContainer.getInstance().stopBackgroundTaskService();
+  }
+
+  private void checkAllAirReplication() {
+    final Map<ConsensusGroupId, Map<AirReplicationName, PipeStatus>> 
existedAirs =
+        airReplicationManager.getAllAirReplication().entrySet().stream()
+            .filter(entry -> entry.getKey().getSenderDataNodeId() == 
thisNodeId)
+            .collect(
+                Collectors.groupingBy(
+                    entry -> entry.getKey().getConsensusGroupId(),
+                    Collectors.toMap(Map.Entry::getKey, Map.Entry::getValue)));
+    stateMachineMapLock.writeLock().lock();
+    try {
+      stateMachineMap.forEach(
+          (key, value) ->
+              value.checkAirReplication(existedAirs.getOrDefault(key, 
ImmutableMap.of())));
+      existedAirs.entrySet().stream()
+          .filter(entry -> !stateMachineMap.containsKey(entry.getKey()))
+          .flatMap(entry -> entry.getValue().keySet().stream())
+          .forEach(
+              airReplicationName -> {
+                try {
+                  LOGGER.warn(
+                      "{} drop air replication [{}]",
+                      airReplicationName.getConsensusGroupId(),
+                      airReplicationName);
+                  
airReplicationManager.updateAirReplication(airReplicationName, 
PipeStatus.DROPPED);
+                } catch (Exception e) {
+                  LOGGER.warn(
+                      "{} cannot drop air replication [{}]",
+                      airReplicationName.getConsensusGroupId(),
+                      airReplicationName,
+                      e);
+                }
+              });
+    } finally {
+      stateMachineMapLock.writeLock().unlock();
+    }
+  }
+
+  @Override
+  public TSStatus write(ConsensusGroupId groupId, IConsensusRequest request)
+      throws ConsensusException {
+    final AirReplicationServerImpl impl =
+        Optional.ofNullable(stateMachineMap.get(groupId))
+            .orElseThrow(() -> new ConsensusGroupNotExistException(groupId));
+    if (impl.isReadOnly()) {
+      return StatusUtils.getStatus(TSStatusCode.SYSTEM_READ_ONLY);
+    } else if (!impl.isActive()) {
+      return RpcUtils.getStatus(
+          TSStatusCode.WRITE_PROCESS_REJECT,
+          "current node is not active and is not ready to receive user 
write.");
+    } else {
+      return impl.write(request);
+    }
+  }
+
+  @Override
+  public DataSet read(ConsensusGroupId groupId, IConsensusRequest request)
+      throws ConsensusException {
+    return Optional.ofNullable(stateMachineMap.get(groupId))
+        .orElseThrow(() -> new ConsensusGroupNotExistException(groupId))
+        .read(request);
+  }
+
+  private String getPeerDir(ConsensusGroupId groupId) {
+    return storageDir + File.separator + groupId.getType().getValue() + "_" + 
groupId.getId();
+  }
+
+  private ConsensusGroupId parsePeerFileName(String fileName) {
+    String[] items = fileName.split("_");
+    return ConsensusGroupId.Factory.create(Integer.parseInt(items[0]), 
Integer.parseInt(items[1]));
+  }
+
+  @Override
+  public void createLocalPeer(ConsensusGroupId groupId, List<Peer> peers)
+      throws ConsensusException {
+    final int replicationGroupSize = peers.size();
+    if (replicationGroupSize == 0) {
+      throw new IllegalPeerNumException(replicationGroupSize);
+    }
+    if (!peers.contains(new Peer(groupId, thisNodeId, thisNode))) {
+      throw new IllegalPeerEndpointException(thisNode, peers);
+    }
+
+    Lock lock =
+        replicationGroupIdReentrantLockMap.computeIfAbsent(groupId, key -> new 
ReentrantLock());
+    lock.lock();
+    try {
+      stateMachineMapLock.readLock().lock();
+      try {
+        if (stateMachineMap.containsKey(groupId)) {
+          throw new ConsensusGroupAlreadyExistException(groupId);
+        }
+
+        final String path = getPeerDir(groupId);
+        File replicationDir = new File(path);
+        if (!replicationDir.exists() && !replicationDir.mkdirs()) {
+          LOGGER.warn("Unable to create replication dir for group {} at {}", 
groupId, path);
+          throw new ConsensusException(
+              String.format("Unable to create replication dir for group %s", 
groupId));
+        }
+
+        AirReplicationServerImpl replication =
+            new AirReplicationServerImpl(
+                new Peer(groupId, thisNodeId, thisNode),
+                registry.apply(groupId),
+                peers,
+                config,
+                airReplicationManager,
+                syncClientManager);
+        stateMachineMap.put(groupId, replication);
+        replication.start(false); // air will start after creating
+        
KillPoint.setKillPoint(DataNodeKillPoints.DESTINATION_CREATE_LOCAL_PEER);
+      } catch (IOException e) {
+        LOGGER.warn("Cannot create local peer for group {} with peers {}", 
groupId, peers, e);
+        throw new ConsensusException(e);
+      } finally {
+        stateMachineMapLock.readLock().unlock();
+      }
+    } finally {
+      lock.unlock();
+    }
+  }
+
+  @Override
+  public void deleteLocalPeer(ConsensusGroupId groupId) throws 
ConsensusException {
+    
KillPoint.setKillPoint(IoTConsensusDeleteLocalPeerKillPoints.BEFORE_DELETE);
+    Lock lock =
+        replicationGroupIdReentrantLockMap.computeIfAbsent(groupId, key -> new 
ReentrantLock());
+    lock.lock();
+    try {
+      stateMachineMapLock.readLock().lock();
+      try {
+        if (!stateMachineMap.containsKey(groupId)) {
+          throw new ConsensusGroupNotExistException(groupId);
+        }
+        LOGGER.info("[{}] start to delete local peer for group {}", 
CLASS_NAME, groupId);
+        final AirReplicationServerImpl replication = 
stateMachineMap.get(groupId);
+        replication.clear();
+        stateMachineMap.remove(groupId);
+
+        FileUtils.deleteFileOrDirectory(new File(getPeerDir(groupId)));
+        
KillPoint.setKillPoint(IoTConsensusDeleteLocalPeerKillPoints.AFTER_DELETE);
+        LOGGER.info("[{}] finish deleting local peer for group {}", 
CLASS_NAME, groupId);
+      } finally {
+        stateMachineMapLock.readLock().unlock();
+      }
+    } finally {
+      lock.unlock();
+      replicationGroupIdReentrantLockMap.remove(groupId);
+    }
+  }
+
+  @Override
+  public void addRemotePeer(ConsensusGroupId groupId, Peer peer) throws 
ConsensusException {
+    AirReplicationServerImpl impl =
+        Optional.ofNullable(stateMachineMap.get(groupId))
+            .orElseThrow(() -> new ConsensusGroupNotExistException(groupId));
+    if (impl.containsPeer(peer)) {
+      throw new PeerAlreadyInConsensusGroupException(groupId, peer);
+    }
+    try {
+      // step 1: inactive new Peer to prepare for following steps
+      LOGGER.info("[{}] inactivate new peer: {}", CLASS_NAME, peer);
+      impl.setRemotePeerActive(peer, false, false);
+
+      // step 2: notify all the other Peers to create air replications to 
newPeer
+      // NOTE: For this step, all the other peers will try to transfer its 
user write data to target
+      LOGGER.info("[{}] notify current peers to create air replications...", 
CLASS_NAME);
+      impl.notifyPeersToCreateAirReplications(peer);
+      
KillPoint.setKillPoint(DataNodeKillPoints.COORDINATOR_ADD_PEER_TRANSITION);
+
+      // step 3: wait until all other Peers finish transferring
+      LOGGER.info("[{}] wait until all the other peers finish 
transferring...", CLASS_NAME);
+      impl.waitPeersToTargetPeerTransmissionCompleted(peer);
+
+      // step 4: active new Peer to let new Peer receive client requests
+      LOGGER.info("[{}] activate new peer...", CLASS_NAME);
+      impl.setRemotePeerActive(peer, true, false);
+      KillPoint.setKillPoint(DataNodeKillPoints.COORDINATOR_ADD_PEER_DONE);
+    } catch (ConsensusGroupModifyPeerException e) {
+      try {
+        LOGGER.warn(
+            "[{}] add remote peer failed, automatic cleanup side effects...", 
CLASS_NAME, e);
+
+        // roll back
+        impl.notifyPeersToDropAirReplication(peer);
+
+      } catch (ConsensusGroupModifyPeerException mpe) {
+        LOGGER.error(
+            "[{}] failed to cleanup side effects after failed to add remote 
peer", CLASS_NAME, mpe);
+      }
+      throw new ConsensusException(e);
+    }
+  }
+
+  @Override
+  public void removeRemotePeer(ConsensusGroupId groupId, Peer peer) throws 
ConsensusException {
+    AirReplicationServerImpl impl =
+        Optional.ofNullable(stateMachineMap.get(groupId))
+            .orElseThrow(() -> new ConsensusGroupNotExistException(groupId));
+    if (!impl.containsPeer(peer)) {
+      throw new PeerNotInConsensusGroupException(groupId, peer.toString());
+    }
+    KillPoint.setKillPoint(IoTConsensusRemovePeerCoordinatorKillPoints.INIT);
+
+    try {
+      // let other peers to drop air replications to target
+      LOGGER.info("[{}] notify other peers to drop air replications...", 
CLASS_NAME);
+      impl.notifyPeersToDropAirReplication(peer);
+      KillPoint.setKillPoint(
+          IoTConsensusRemovePeerCoordinatorKillPoints
+              .AFTER_NOTIFY_PEERS_TO_REMOVE_REPLICATE_CHANNEL);
+
+      // let target peer reject new write
+      LOGGER.info("[{}] inactivate peer {}", CLASS_NAME, peer);
+      impl.setRemotePeerActive(peer, false, true);
+      
KillPoint.setKillPoint(IoTConsensusRemovePeerCoordinatorKillPoints.AFTER_INACTIVE_PEER);
+
+      // wait its air replications to complete
+      LOGGER.info("[{}] wait target peer{} complete transfer...", CLASS_NAME, 
peer);
+      impl.waitTargetPeerToPeersTransmissionCompleted(peer);
+
+      // wait target peer to release all resource
+      LOGGER.info("[{}] wait {} to release all resource...", CLASS_NAME, peer);
+      impl.waitReleaseAllRegionRelatedResource(peer);
+    } catch (ConsensusGroupModifyPeerException e) {
+      throw new ConsensusException(e);
+    }
+    KillPoint.setKillPoint(IoTConsensusRemovePeerCoordinatorKillPoints.FINISH);
+  }
+
+  @Override
+  public void recordCorrectPeerListBeforeStarting(
+      Map<ConsensusGroupId, List<Peer>> correctPeerList) {
+    LOGGER.info("Record correct peer list: {}", correctPeerList);
+    this.correctPeerListBeforeStart = correctPeerList;
+  }
+
+  @Override
+  public void resetPeerList(ConsensusGroupId groupId, List<Peer> correctPeers)
+      throws ConsensusException {
+    AirReplicationServerImpl impl =
+        Optional.ofNullable(stateMachineMap.get(groupId))
+            .orElseThrow(() -> new ConsensusGroupNotExistException(groupId));
+
+    if (!correctPeers.contains(new Peer(groupId, thisNodeId, thisNode))) {
+      LOGGER.warn(
+          "[RESET PEER LIST] {} Local peer is not in the correct 
configuration, delete it.",
+          groupId);
+      deleteLocalPeer(groupId);
+      return;
+    }
+
+    ImmutableList<Peer> currentPeers = ImmutableList.copyOf(impl.getPeers());
+    String previousPeerListStr = impl.getPeers().toString();
+    // remove invalid peer
+    for (Peer peer : currentPeers) {
+      if (!correctPeers.contains(peer)) {
+        try {
+          impl.dropAirReplicationToTargetPeer(peer);
+          LOGGER.info("[RESET PEER LIST] {} Remove sync channel with: {}", 
groupId, peer);
+        } catch (ConsensusGroupModifyPeerException e) {
+          LOGGER.error(
+              "[RESET PEER LIST] {} Failed to remove sync channel with: {}", 
groupId, peer, e);
+        }
+      }
+    }
+    // add correct peer
+    for (Peer peer : correctPeers) {
+      if (!impl.containsPeer(peer) && peer.getNodeId() != this.thisNodeId) {
+        try {
+          impl.createAirReplicationToTargetPeer(peer, false);
+          LOGGER.info("[RESET PEER LIST] {} Build sync channel with: {}", 
groupId, peer);
+        } catch (ConsensusGroupModifyPeerException e) {
+          LOGGER.warn(
+              "[RESET PEER LIST] {} Failed to build sync channel with: {}", 
groupId, peer, e);
+        }
+      }
+    }
+    // show result
+    String currentPeerListStr = impl.getPeers().toString();
+    if (!previousPeerListStr.equals(currentPeerListStr)) {
+      LOGGER.info(
+          "[RESET PEER LIST] {} Local peer list has been reset: {} -> {}",
+          groupId,
+          previousPeerListStr,
+          impl.getPeers());
+    } else {
+      LOGGER.info(
+          "[RESET PEER LIST] {} The current peer list is correct, nothing need 
to be reset: {}",
+          groupId,
+          previousPeerListStr);
+    }
+  }
+
+  @Override
+  public void transferLeader(ConsensusGroupId groupId, Peer newLeader) throws 
ConsensusException {
+    throw new ConsensusException(String.format("%s does not support leader 
transfer", CLASS_NAME));
+  }
+
+  @Override
+  public void triggerSnapshot(ConsensusGroupId groupId, boolean force) throws 
ConsensusException {
+    if (!stateMachineMap.containsKey(groupId)) {
+      throw new ConsensusGroupNotExistException(groupId);
+    }
+    // Do nothing here because we do not need to transfer snapshot when there 
are new peers
+  }
+
+  @Override
+  public boolean isLeader(ConsensusGroupId groupId) {
+    return true;
+  }
+
+  @Override
+  public long getLogicalClock(ConsensusGroupId groupId) {
+    // TODO: check logical clock
+    return 0;
+  }
+
+  @Override
+  public boolean isLeaderReady(ConsensusGroupId groupId) {
+    return true;
+  }
+
+  @Override
+  public Peer getLeader(ConsensusGroupId groupId) {
+    if (!stateMachineMap.containsKey(groupId)) {
+      return null;
+    }
+    return new Peer(groupId, thisNodeId, thisNode);
+  }
+
+  @Override
+  public int getReplicationNum(ConsensusGroupId groupId) {
+    AirReplicationServerImpl impl = stateMachineMap.get(groupId);
+    return impl != null ? impl.getPeers().size() : 0;
+  }
+
+  @Override
+  public List<ConsensusGroupId> getAllConsensusGroupIds() {
+    return new ArrayList<>(stateMachineMap.keySet());
+  }
+
+  @Override
+  public String getRegionDirFromConsensusGroupId(ConsensusGroupId groupId) {
+    return getPeerDir(groupId);
+  }
+
+  @Override
+  public void reloadConsensusConfig(ConsensusConfig consensusConfig) {
+    // AirReplication doesn't support reload consensus config, related config 
can be reloaded in
+    // iotdb-core layer.
+  }
+
+  public AirReplicationServerImpl getImpl(ConsensusGroupId groupId) {
+    return stateMachineMap.get(groupId);
+  }
+}
diff --git 
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/airreplication/AirReplicationPeerManager.java
 
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/airreplication/AirReplicationPeerManager.java
new file mode 100644
index 00000000000..d8e467a642f
--- /dev/null
+++ 
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/airreplication/AirReplicationPeerManager.java
@@ -0,0 +1,73 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.iotdb.consensus.air;
+
+import org.apache.iotdb.consensus.common.Peer;
+
+import com.google.common.collect.ImmutableList;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import java.util.Collections;
+import java.util.List;
+import java.util.Set;
+import java.util.concurrent.ConcurrentHashMap;
+
+public class AirReplicationPeerManager {
+
+  private static final Logger LOGGER = 
LoggerFactory.getLogger(AirReplicationPeerManager.class);
+
+  private final Set<Peer> peers;
+
+  public AirReplicationPeerManager(List<Peer> peers) {
+    this.peers = Collections.newSetFromMap(new ConcurrentHashMap<>());
+
+    this.peers.addAll(peers);
+    if (this.peers.size() != peers.size()) {
+      LOGGER.warn("Duplicate peers in the input list, ignore the duplicates.");
+    }
+  }
+
+  public boolean contains(Peer peer) {
+    return peers.contains(peer);
+  }
+
+  public void addPeer(Peer peer) {
+    peers.add(peer);
+  }
+
+  public void removePeer(Peer peer) {
+    peers.remove(peer);
+  }
+
+  public List<Peer> getOtherPeers(Peer thisNode) {
+    return peers.stream()
+        .filter(peer -> !peer.equals(thisNode))
+        .collect(ImmutableList.toImmutableList());
+  }
+
+  public List<Peer> getPeers() {
+    return ImmutableList.copyOf(peers);
+  }
+
+  public void clear() {
+    peers.clear();
+  }
+}
diff --git 
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/airreplication/AirReplicationServerImpl.java
 
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/airreplication/AirReplicationServerImpl.java
new file mode 100644
index 00000000000..783c98d50f0
--- /dev/null
+++ 
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/airreplication/AirReplicationServerImpl.java
@@ -0,0 +1,716 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.iotdb.consensus.air;
+
+import org.apache.iotdb.common.rpc.thrift.TEndPoint;
+import org.apache.iotdb.common.rpc.thrift.TSStatus;
+import org.apache.iotdb.commons.client.IClientManager;
+import org.apache.iotdb.commons.client.exception.ClientManagerException;
+import org.apache.iotdb.commons.client.sync.SyncAirReplicationServiceClient;
+import org.apache.iotdb.commons.consensus.ConsensusGroupId;
+import org.apache.iotdb.commons.consensus.index.ComparableConsensusRequest;
+import org.apache.iotdb.commons.consensus.index.ProgressIndex;
+import org.apache.iotdb.commons.consensus.index.impl.MinimumProgressIndex;
+import org.apache.iotdb.commons.pipe.agent.task.meta.PipeStatus;
+import org.apache.iotdb.commons.service.metric.MetricService;
+import org.apache.iotdb.commons.service.metric.PerformanceOverviewMetrics;
+import org.apache.iotdb.commons.utils.KillPoint.DataNodeKillPoints;
+import org.apache.iotdb.commons.utils.KillPoint.KillPoint;
+import org.apache.iotdb.consensus.IStateMachine;
+import org.apache.iotdb.consensus.common.DataSet;
+import org.apache.iotdb.consensus.common.Peer;
+import org.apache.iotdb.consensus.common.request.IConsensusRequest;
+import org.apache.iotdb.consensus.config.AirReplicationConfig;
+import org.apache.iotdb.consensus.config.AirReplicationConfig.ReplicateMode;
+import org.apache.iotdb.consensus.exception.ConsensusGroupModifyPeerException;
+import org.apache.iotdb.consensus.air.airreplication.AirReplicationManager;
+import org.apache.iotdb.consensus.air.airreplication.AirReplicationName;
+import org.apache.iotdb.consensus.air.airreplication.ReplicateProgressManager;
+import org.apache.iotdb.consensus.air.metric.AirReplicationServerMetrics;
+import org.apache.iotdb.consensus.air.thrift.TCheckAirReplicationCompletedReq;
+import org.apache.iotdb.consensus.air.thrift.TCheckAirReplicationCompletedResp;
+import 
org.apache.iotdb.consensus.air.thrift.TNotifyPeerToCreateAirReplicationReq;
+import 
org.apache.iotdb.consensus.air.thrift.TNotifyPeerToCreateAirReplicationResp;
+import 
org.apache.iotdb.consensus.air.thrift.TNotifyPeerToDropAirReplicationReq;
+import 
org.apache.iotdb.consensus.air.thrift.TNotifyPeerToDropAirReplicationResp;
+import org.apache.iotdb.consensus.pipe.thrift.TSetActiveReq;
+import org.apache.iotdb.consensus.pipe.thrift.TSetActiveResp;
+import 
org.apache.iotdb.consensus.pipe.thrift.TWaitReleaseAllRegionRelatedResourceReq;
+import 
org.apache.iotdb.consensus.pipe.thrift.TWaitReleaseAllRegionRelatedResourceResp;
+import org.apache.iotdb.pipe.api.exception.PipeException;
+import org.apache.iotdb.rpc.RpcUtils;
+
+import com.google.common.collect.ImmutableMap;
+import org.apache.thrift.TException;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import java.io.IOException;
+import java.util.ArrayList;
+import java.util.Collections;
+import java.util.List;
+import java.util.Map;
+import java.util.Set;
+import java.util.concurrent.atomic.AtomicBoolean;
+import java.util.concurrent.locks.Lock;
+import java.util.concurrent.locks.ReentrantLock;
+import java.util.stream.Collectors;
+
+/** AirReplicationServerImpl is a replication server implementation for air 
replication. */
+public class AirReplicationServerImpl {
+  private static final Logger LOGGER = 
LoggerFactory.getLogger(AirReplicationServerImpl.class);
+  private static final long 
CHECK_TRANSMISSION_COMPLETION_INTERVAL_IN_MILLISECONDS = 2_000L;
+  private static final PerformanceOverviewMetrics PERFORMANCE_OVERVIEW_METRICS 
=
+      PerformanceOverviewMetrics.getInstance();
+  private static final long RETRY_WAIT_TIME_IN_MS = 500;
+  private static final long MAX_RETRY_TIMES = 20;
+  private final Peer thisNode;
+  private final IStateMachine stateMachine;
+  private final Lock stateMachineLock = new ReentrantLock();
+  private final AirReplicationPeerManager peerManager;
+  private final AtomicBoolean active;
+  private final AtomicBoolean isStarted;
+  private final String consensusGroupId;
+  private final AirReplicationManager airReplicationManager;
+  private final ReplicateProgressManager replicateProgressManager;
+  private final IClientManager<TEndPoint, SyncAirReplicationServiceClient> 
syncClientManager;
+  private final AirReplicationServerMetrics airReplicationServerMetrics;
+  private final ReplicateMode replicateMode;
+
+  private ProgressIndex cachedProgressIndex = MinimumProgressIndex.INSTANCE;
+
+  public AirReplicationServerImpl(
+      Peer thisNode,
+      IStateMachine stateMachine,
+      List<Peer> peers,
+      AirReplicationConfig config,
+      AirReplicationManager airReplicationManager,
+      IClientManager<TEndPoint, SyncAirReplicationServiceClient> 
syncClientManager)
+      throws IOException {
+    this.thisNode = thisNode;
+    this.stateMachine = stateMachine;
+    this.peerManager = new AirReplicationPeerManager(peers);
+    this.active = new AtomicBoolean(true);
+    this.isStarted = new AtomicBoolean(false);
+    this.consensusGroupId = thisNode.getGroupId().toString();
+    this.airReplicationManager = airReplicationManager;
+    this.replicateProgressManager = config.getAir().getProgressIndexManager();
+    this.syncClientManager = syncClientManager;
+    this.airReplicationServerMetrics = new AirReplicationServerMetrics(this);
+    this.replicateMode = config.getReplicateMode();
+
+    // if peers is empty, the `resetPeerList` will automatically fetch correct 
peers' info from CN.
+    if (!peers.isEmpty()) {
+      // create air replications
+      Set<Peer> deepCopyPeersWithoutSelf =
+          peers.stream().filter(peer -> 
!peer.equals(thisNode)).collect(Collectors.toSet());
+      final List<Peer> successfulAirs = 
createAirReplications(deepCopyPeersWithoutSelf);
+      if (successfulAirs.size() < deepCopyPeersWithoutSelf.size()) {
+        // roll back
+        updateAirReplicationsStatus(successfulAirs, PipeStatus.DROPPED);
+        throw new IOException(String.format("%s cannot create all air 
replications", thisNode));
+      }
+    }
+  }
+
+  @SuppressWarnings("java:S2276")
+  public synchronized void start(boolean startConsensusPipes) throws 
IOException {
+    stateMachine.start();
+    MetricService.getInstance().addMetricSet(this.airReplicationServerMetrics);
+
+    if (startConsensusPipes) {
+      // start all air replications
+      final List<Peer> otherPeers = peerManager.getOtherPeers(thisNode);
+      List<Peer> failedAirs =
+          updateAirReplicationsStatus(new ArrayList<>(otherPeers), 
PipeStatus.RUNNING);
+      // considering procedure can easily time out, keep trying 
updateAirReplicationsStatus until all
+      // air replications are started gracefully or exceed the maximum number 
of attempts.
+      // NOTE: start air procedure is idempotent guaranteed.
+      try {
+        for (int i = 0; i < MAX_RETRY_TIMES && !failedAirs.isEmpty(); i++) {
+          failedAirs = updateAirReplicationsStatus(failedAirs, 
PipeStatus.RUNNING);
+          Thread.sleep(RETRY_WAIT_TIME_IN_MS);
+        }
+      } catch (InterruptedException e) {
+        LOGGER.warn(
+            "AirReplicationImpl-peer{}: airReplicationImpl thread get 
interrupted when start air replication. May because IoTDB process is killed.",
+            thisNode);
+        throw new IOException(String.format("%s cannot start all air 
replications", thisNode));
+      }
+      // if there still are some air replications failed to start, throw an 
exception.
+      if (!failedAirs.isEmpty()) {
+        // roll back
+        List<Peer> successfulAirs = new ArrayList<>(otherPeers);
+        successfulAirs.removeAll(failedAirs);
+        updateAirReplicationsStatus(successfulAirs, PipeStatus.STOPPED);
+        throw new IOException(String.format("%s cannot start all air 
replications", thisNode));
+      }
+    }
+    isStarted.set(true);
+  }
+
+  public synchronized void stop() {
+    // stop all air replications
+    final List<Peer> otherPeers = peerManager.getOtherPeers(thisNode);
+    final List<Peer> failedAirs =
+        updateAirReplicationsStatus(new ArrayList<>(otherPeers), 
PipeStatus.STOPPED);
+    if (!failedAirs.isEmpty()) {
+      // do not roll back, because it will stop anyway
+      LOGGER.warn("{} cannot stop all air replications", thisNode);
+    }
+    
MetricService.getInstance().removeMetricSet(this.airReplicationServerMetrics);
+    stateMachine.stop();
+    isStarted.set(false);
+  }
+
+  public synchronized void clear() {
+    final List<Peer> otherPeers = peerManager.getOtherPeers(thisNode);
+    final List<Peer> failedAirs =
+        updateAirReplicationsStatus(new ArrayList<>(otherPeers), 
PipeStatus.DROPPED);
+    if (!failedAirs.isEmpty()) {
+      // do not roll back, because it will clear anyway
+      LOGGER.warn("{} cannot drop all air replications", thisNode);
+    }
+
+    
MetricService.getInstance().removeMetricSet(this.airReplicationServerMetrics);
+    peerManager.clear();
+    stateMachine.stop();
+    isStarted.set(false);
+    active.set(false);
+  }
+
+  private List<Peer> createAirReplications(Set<Peer> peers) {
+    return peers.stream()
+        .filter(
+            peer -> {
+              try {
+                if (!peers.equals(thisNode)) {
+                  airReplicationManager.createAirReplication(thisNode, peer);
+                }
+                return true;
+              } catch (Exception e) {
+                LOGGER.warn(
+                    "{}: cannot create air replication between {} and {}",
+                    e.getMessage(),
+                    thisNode,
+                    peer,
+                    e);
+                return false;
+              }
+            })
+        .collect(Collectors.toList());
+  }
+
+  /**
+   * update given air replications' status, returns the peer corresponding to 
the air that failed to
+   * update
+   */
+  private List<Peer> updateAirReplicationsStatus(List<Peer> peers, PipeStatus 
status) {
+    return peers.stream()
+        .filter(
+            peer -> {
+              try {
+                if (!peer.equals(thisNode)) {
+                  airReplicationManager.updateAirReplication(
+                      new AirReplicationName(thisNode, peer), status);
+                }
+                return false;
+              } catch (Exception e) {
+                LOGGER.warn(
+                    "{}: cannot update air replication between {} and {} to 
status {}",
+                    e.getMessage(),
+                    thisNode,
+                    peer,
+                    status);
+                return true;
+              }
+            })
+        .collect(Collectors.toList());
+  }
+
+  public synchronized void checkAirReplication(Map<AirReplicationName, 
PipeStatus> existedAirs) {
+    final PipeStatus expectedStatus = isStarted.get() ? PipeStatus.RUNNING : 
PipeStatus.STOPPED;
+    final Map<AirReplicationName, Peer> expectedAirs =
+        peerManager.getOtherPeers(thisNode).stream()
+            .collect(
+                ImmutableMap.toImmutableMap(
+                    peer -> new AirReplicationName(thisNode, peer), peer -> 
peer));
+
+    existedAirs.forEach(
+        (existedName, existedStatus) -> {
+          if (!expectedAirs.containsKey(existedName)) {
+            try {
+              LOGGER.warn("{} drop air replication [{}]", consensusGroupId, 
existedName);
+              airReplicationManager.updateAirReplication(existedName, 
PipeStatus.DROPPED);
+            } catch (Exception e) {
+              LOGGER.warn("{} cannot drop air replication [{}]", 
consensusGroupId, existedName, e);
+            }
+          } else if (!expectedStatus.equals(existedStatus)) {
+            try {
+              LOGGER.warn(
+                  "{} update air replication [{}] to status {}",
+                  consensusGroupId,
+                  existedName,
+                  expectedStatus);
+              if (expectedStatus.equals(PipeStatus.RUNNING)) {
+                // Do nothing. Because Air framework's metaSync will do that.
+                return;
+              }
+              airReplicationManager.updateAirReplication(existedName, 
expectedStatus);
+            } catch (Exception e) {
+              LOGGER.warn(
+                  "{} cannot update air replication [{}] to status {}",
+                  consensusGroupId,
+                  existedName,
+                  expectedStatus,
+                  e);
+            }
+          }
+        });
+
+    expectedAirs.forEach(
+        (expectedName, expectedPeer) -> {
+          if (!existedAirs.containsKey(expectedName)) {
+            try {
+              LOGGER.warn(
+                  "{} create and update air replication [{}] to status {}",
+                  consensusGroupId,
+                  expectedName,
+                  expectedStatus);
+              airReplicationManager.createAirReplication(thisNode, 
expectedPeer);
+              airReplicationManager.updateAirReplication(expectedName, 
expectedStatus);
+            } catch (Exception e) {
+              LOGGER.warn(
+                  "{} cannot create and update air replication [{}] to status 
{}",
+                  consensusGroupId,
+                  expectedName,
+                  expectedStatus,
+                  e);
+            }
+          }
+        });
+  }
+
+  public TSStatus write(IConsensusRequest request) {
+    stateMachineLock.lock();
+    try {
+      long consensusWriteStartTime = System.nanoTime();
+      long getStateMachineLockTime = System.nanoTime();
+      // statistic the time of acquiring stateMachine lock
+      airReplicationServerMetrics.recordGetStateMachineLockTime(
+          getStateMachineLockTime - consensusWriteStartTime);
+      long writeToStateMachineStartTime = System.nanoTime();
+      if (request instanceof ComparableConsensusRequest) {
+        ((ComparableConsensusRequest) request)
+            
.setProgressIndex(replicateProgressManager.assignProgressIndex(thisNode.getGroupId()));
+      }
+      TSStatus result = stateMachine.write(request);
+      long writeToStateMachineEndTime = System.nanoTime();
+      PERFORMANCE_OVERVIEW_METRICS.recordEngineCost(
+          writeToStateMachineEndTime - writeToStateMachineStartTime);
+      // statistic the time of writing request into stateMachine
+      airReplicationServerMetrics.recordUserWriteStateMachineTime(
+          writeToStateMachineEndTime - writeToStateMachineStartTime);
+      return result;
+    } finally {
+      stateMachineLock.unlock();
+    }
+  }
+
+  public TSStatus writeOnFollowerReplica(IConsensusRequest request) {
+    stateMachineLock.lock();
+    try {
+      long consensusWriteStartTime = System.nanoTime();
+      long getStateMachineLockTime = System.nanoTime();
+      // statistic the time of acquiring stateMachine lock
+      airReplicationServerMetrics.recordGetStateMachineLockTime(
+          getStateMachineLockTime - consensusWriteStartTime);
+
+      long writeToStateMachineStartTime = System.nanoTime();
+      TSStatus result = stateMachine.write(request);
+      long writeToStateMachineEndTime = System.nanoTime();
+
+      PERFORMANCE_OVERVIEW_METRICS.recordEngineCost(
+          writeToStateMachineEndTime - writeToStateMachineStartTime);
+      // statistic the time of writing request into stateMachine
+      airReplicationServerMetrics.recordReplicaWriteStateMachineTime(
+          writeToStateMachineEndTime - writeToStateMachineStartTime);
+      return result;
+    } finally {
+      stateMachineLock.unlock();
+    }
+  }
+
+  public DataSet read(IConsensusRequest request) {
+    return stateMachine.read(request);
+  }
+
+  public void setRemotePeerActive(Peer peer, boolean isActive, boolean 
isForDeletionPurpose)
+      throws ConsensusGroupModifyPeerException {
+    try (SyncAirReplicationServiceClient client =
+        syncClientManager.borrowClient(peer.getEndpoint())) {
+      try {
+        TSetActiveResp res =
+            client.setActive(
+                new TSetActiveReq(
+                    peer.getGroupId().convertToTConsensusGroupId(),
+                    isActive,
+                    isForDeletionPurpose));
+        if (!RpcUtils.SUCCESS_STATUS.equals(res.getStatus())) {
+          throw new ConsensusGroupModifyPeerException(
+              String.format(
+                  "error when set peer %s to active %s. result status: %s",
+                  peer, isActive, res.getStatus()));
+        }
+      } catch (Exception e) {
+        throw new ConsensusGroupModifyPeerException(
+            String.format("error when set peer %s to active %s", peer, 
isActive), e);
+      }
+    } catch (ClientManagerException e) {
+      if (isForDeletionPurpose) {
+        // for remove peer, if target peer is already down, we can skip this 
step.
+        LOGGER.warn(
+            "target peer may be down, error when set peer {} to active {}", 
peer, isActive, e);
+      } else {
+        // for add peer, if target peer is down, we need to throw exception to 
identify the failure
+        // of this addPeerProcedure.
+        throw new ConsensusGroupModifyPeerException(e);
+      }
+    }
+  }
+
+  public void notifyPeersToCreateAirReplications(Peer targetPeer)
+      throws ConsensusGroupModifyPeerException {
+    final List<Peer> otherPeers = peerManager.getOtherPeers(thisNode);
+    for (Peer peer : otherPeers) {
+      if (peer.equals(targetPeer)) {
+        continue;
+      }
+      try (SyncAirReplicationServiceClient client =
+          syncClientManager.borrowClient(peer.getEndpoint())) {
+        TNotifyPeerToCreateAirReplicationResp resp =
+            client.notifyPeerToCreateAirReplication(
+                new TNotifyPeerToCreateAirReplicationReq(
+                    targetPeer.getGroupId().convertToTConsensusGroupId(),
+                    targetPeer.getEndpoint(),
+                    targetPeer.getNodeId()));
+        if (!RpcUtils.SUCCESS_STATUS.equals(resp.getStatus())) {
+          throw new ConsensusGroupModifyPeerException(
+              String.format("error when notify peer %s to create air 
replication", peer));
+        }
+      } catch (Exception e) {
+        LOGGER.warn(
+            "{} cannot notify peer {} to create air replication, may because 
that peer is unknown currently, please manually check!",
+            thisNode,
+            peer,
+            e);
+      }
+    }
+
+    try {
+      // This node which acts as coordinator will transfer complete historical 
snapshot to new
+      // target.
+      createAirReplicationToTargetPeer(targetPeer, false);
+    } catch (Exception e) {
+      LOGGER.warn(
+          "{} cannot create air replication to {}, may because target peer is 
unknown currently, please manually check!",
+          thisNode,
+          targetPeer,
+          e);
+      throw new ConsensusGroupModifyPeerException(e);
+    }
+  }
+
+  public synchronized void createAirReplicationToTargetPeer(
+      Peer targetPeer, boolean needManuallyStart) throws 
ConsensusGroupModifyPeerException {
+    try {
+      KillPoint.setKillPoint(DataNodeKillPoints.ORIGINAL_ADD_PEER_DONE);
+      airReplicationManager.createAirReplication(thisNode, targetPeer, 
needManuallyStart);
+      peerManager.addPeer(targetPeer);
+    } catch (Exception e) {
+      LOGGER.warn("{} cannot create air replication to {}", thisNode, 
targetPeer, e);
+      throw new ConsensusGroupModifyPeerException(
+          String.format("%s cannot create air replication to %s", thisNode, 
targetPeer), e);
+    }
+  }
+
+  public void notifyPeersToDropAirReplication(Peer targetPeer)
+      throws ConsensusGroupModifyPeerException {
+    final List<Peer> otherPeers = peerManager.getOtherPeers(thisNode);
+    for (Peer peer : otherPeers) {
+      if (peer.equals(targetPeer)) {
+        continue;
+      }
+      try (SyncAirReplicationServiceClient client =
+          syncClientManager.borrowClient(peer.getEndpoint())) {
+        TNotifyPeerToDropAirReplicationResp resp =
+            client.notifyPeerToDropAirReplication(
+                new TNotifyPeerToDropAirReplicationReq(
+                    targetPeer.getGroupId().convertToTConsensusGroupId(),
+                    targetPeer.getEndpoint(),
+                    targetPeer.getNodeId()));
+        if (!RpcUtils.SUCCESS_STATUS.equals(resp.getStatus())) {
+          throw new ConsensusGroupModifyPeerException(
+              String.format("error when notify peer %s to drop air 
replication", peer));
+        }
+      } catch (Exception e) {
+        LOGGER.warn(
+            "{} cannot notify peer {} to drop air replication, may because 
that peer is unknown currently, please manually check!",
+            thisNode,
+            peer,
+            e);
+      }
+    }
+
+    try {
+      dropAirReplicationToTargetPeer(targetPeer);
+    } catch (Exception e) {
+      LOGGER.warn(
+          "{} cannot drop air replication to {}, may because target peer is 
unknown currently, please manually check!",
+          thisNode,
+          targetPeer,
+          e);
+      throw new ConsensusGroupModifyPeerException(e);
+    }
+  }
+
+  public synchronized void dropAirReplicationToTargetPeer(Peer targetPeer)
+      throws ConsensusGroupModifyPeerException {
+    try {
+      airReplicationManager.dropAirReplication(thisNode, targetPeer);
+      peerManager.removePeer(targetPeer);
+    } catch (Exception e) {
+      LOGGER.warn("{} cannot drop air replication to {}", thisNode, 
targetPeer, e);
+      throw new ConsensusGroupModifyPeerException(
+          String.format("%s cannot drop air replication to %s", thisNode, 
targetPeer), e);
+    }
+  }
+
+  public void startOtherAirReplicationsToTargetPeer(Peer targetPeer)
+      throws ConsensusGroupModifyPeerException {
+    final List<Peer> otherPeers = peerManager.getOtherPeers(thisNode);
+    for (Peer peer : otherPeers) {
+      if (peer.equals(targetPeer)) {
+        continue;
+      }
+      try {
+        airReplicationManager.updateAirReplication(
+            new AirReplicationName(peer, targetPeer), PipeStatus.RUNNING);
+      } catch (Exception e) {
+        // just warn but not throw exceptions. Because there may exist unknown 
nodes in replication
+        // group
+        LOGGER.warn("{} cannot start air replication to {}", peer, targetPeer, 
e);
+      }
+    }
+  }
+
+  /** Wait for the user written data up to firstCheck to be replicated */
+  public void waitPeersToTargetPeerTransmissionCompleted(Peer targetPeer)
+      throws ConsensusGroupModifyPeerException {
+    boolean isTransmissionCompleted = false;
+    boolean isFirstCheckForCurrentPeer = true;
+    boolean isFirstCheckForOtherPeers = true;
+
+    try {
+      while (!isTransmissionCompleted) {
+        Thread.sleep(CHECK_TRANSMISSION_COMPLETION_INTERVAL_IN_MILLISECONDS);
+
+        if (isAirReplicationsTransmissionCompleted(
+            Collections.singletonList(new AirReplicationName(thisNode, 
targetPeer).toString()),
+            isFirstCheckForCurrentPeer)) {
+          final List<Peer> otherPeers = peerManager.getOtherPeers(thisNode);
+
+          isTransmissionCompleted = true;
+          for (Peer peer : otherPeers) {
+            if (!peer.equals(targetPeer)) {
+              isTransmissionCompleted &=
+                  isRemotePeerAirReplicationsTransmissionCompleted(
+                      peer,
+                      Collections.singletonList(new AirReplicationName(peer, 
targetPeer).toString()),
+                      isFirstCheckForOtherPeers);
+            }
+          }
+          isFirstCheckForOtherPeers = false;
+        }
+        isFirstCheckForCurrentPeer = false;
+      }
+    } catch (InterruptedException e) {
+      LOGGER.warn("{} is interrupted when waiting for transfer completed", 
thisNode, e);
+      Thread.currentThread().interrupt();
+      throw new ConsensusGroupModifyPeerException(
+          String.format("%s is interrupted when waiting for transfer 
completed", thisNode), e);
+    }
+  }
+
+  /** Wait for the user written data up to firstCheck to be replicated */
+  public void waitTargetPeerToPeersTransmissionCompleted(Peer targetPeer)
+      throws ConsensusGroupModifyPeerException {
+    boolean isTransmissionCompleted = false;
+    boolean isFirstCheck = true;
+
+    try {
+      while (!isTransmissionCompleted) {
+        Thread.sleep(CHECK_TRANSMISSION_COMPLETION_INTERVAL_IN_MILLISECONDS);
+
+        final List<String> airReplicationNames =
+            peerManager.getPeers().stream()
+                .filter(peer -> !peer.equals(targetPeer))
+                .map(peer -> new AirReplicationName(targetPeer, 
peer).toString())
+                .collect(Collectors.toList());
+        isTransmissionCompleted =
+            isRemotePeerAirReplicationsTransmissionCompleted(
+                targetPeer, airReplicationNames, isFirstCheck);
+
+        isFirstCheck = false;
+      }
+    } catch (InterruptedException e) {
+      LOGGER.warn("{} is interrupted when waiting for transfer completed", 
thisNode, e);
+      Thread.currentThread().interrupt();
+      throw new ConsensusGroupModifyPeerException(
+          String.format("%s is interrupted when waiting for transfer 
completed", thisNode), e);
+    }
+  }
+
+  private boolean isRemotePeerAirReplicationsTransmissionCompleted(
+      Peer targetPeer, List<String> airReplicationNames, boolean 
refreshCachedProgressIndex) {
+    try (SyncAirReplicationServiceClient client =
+        syncClientManager.borrowClient(targetPeer.getEndpoint())) {
+      TCheckAirReplicationCompletedResp resp =
+          client.checkAirReplicationCompleted(
+              new TCheckAirReplicationCompletedReq(
+                  thisNode.getGroupId().convertToTConsensusGroupId(),
+                  airReplicationNames,
+                  refreshCachedProgressIndex));
+      if (!RpcUtils.SUCCESS_STATUS.equals(resp.getStatus())) {
+        LOGGER.warn(
+            "{} cannot check air replications transmission completed to peer 
{}",
+            thisNode,
+            targetPeer);
+        throw new ConsensusGroupModifyPeerException(
+            String.format(
+                "error when check air replications transmission completed to 
peer %s", targetPeer));
+      }
+      return resp.isCompleted;
+    } catch (Exception e) {
+      LOGGER.warn("{} cannot check air replications transmission completed", 
thisNode, e);
+      return true;
+    }
+  }
+
+  public boolean isAirReplicationsTransmissionCompleted(List<String> 
airReplicationNames) {
+    return airReplicationNames.stream()
+        .noneMatch(
+            airName ->
+                replicateProgressManager.getSyncLagForSpecificAirReplication(
+                        thisNode.getGroupId(), new AirReplicationName(airName))
+                    > 0);
+  }
+
+  public synchronized boolean isAirReplicationsTransmissionCompleted(
+      List<String> airReplicationNames, boolean refreshCachedProgressIndex) {
+    if (refreshCachedProgressIndex) {
+      cachedProgressIndex =
+          cachedProgressIndex.updateToMinimumEqualOrIsAfterProgressIndex(
+              
replicateProgressManager.getMaxAssignedProgressIndex(thisNode.getGroupId()));
+    }
+
+    try {
+      return airReplicationNames.stream()
+          .noneMatch(
+              name ->
+                  cachedProgressIndex.isAfter(
+                      replicateProgressManager.getProgressIndex(new 
AirReplicationName(name))));
+    } catch (PipeException e) {
+      LOGGER.info(e.getMessage());
+      return false;
+    }
+  }
+
+  public void waitReleaseAllRegionRelatedResource(Peer targetPeer)
+      throws ConsensusGroupModifyPeerException {
+    long checkIntervalInMs = 10_000L;
+    try (SyncAirReplicationServiceClient client =
+        syncClientManager.borrowClient(targetPeer.getEndpoint())) {
+      while (true) {
+        TWaitReleaseAllRegionRelatedResourceResp res =
+            client.waitReleaseAllRegionRelatedResource(
+                new TWaitReleaseAllRegionRelatedResourceReq(
+                    targetPeer.getGroupId().convertToTConsensusGroupId()));
+        if (res.releaseAllResource) {
+          LOGGER.info("[WAIT RELEASE] {} has released all region related 
resource", targetPeer);
+          return;
+        }
+        LOGGER.info("[WAIT RELEASE] {} is still releasing all region related 
resource", targetPeer);
+        Thread.sleep(checkIntervalInMs);
+      }
+    } catch (ClientManagerException | TException e) {
+      // in case of target peer is down or can not serve, we simply skip it.
+      LOGGER.warn(
+          String.format(
+              "error when waiting %s to release all region related resource. 
%s",
+              targetPeer, e.getMessage()),
+          e);
+    } catch (InterruptedException e) {
+      Thread.currentThread().interrupt();
+      throw new ConsensusGroupModifyPeerException(
+          String.format(
+              "thread interrupted when waiting %s to release all region 
related resource. %s",
+              targetPeer, e.getMessage()),
+          e);
+    }
+  }
+
+  public boolean hasReleaseAllRegionRelatedResource(ConsensusGroupId groupId) {
+    return stateMachine.hasReleaseAllRegionRelatedResource(groupId);
+  }
+
+  public boolean isReadOnly() {
+    return stateMachine.isReadOnly();
+  }
+
+  public boolean isActive() {
+    return active.get();
+  }
+
+  public void setActive(boolean active) {
+    LOGGER.info("set {} active status to {}", this.thisNode, active);
+    this.active.set(active);
+  }
+
+  public boolean containsPeer(Peer peer) {
+    return peerManager.contains(peer);
+  }
+
+  public List<Peer> getPeers() {
+    return peerManager.getPeers();
+  }
+
+  public String getConsensusGroupId() {
+    return consensusGroupId;
+  }
+
+  public long getReplicateMode() {
+    return (replicateMode == ReplicateMode.BATCH) ? 2 : 1;
+  }
+
+  public Peer getThisNodePeer() {
+    return thisNode;
+  }
+}
diff --git 
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/airreplication/PipeConsensus.java
 
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/airreplication/PipeConsensus.java
new file mode 100644
index 00000000000..77b217625b6
--- /dev/null
+++ 
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/airreplication/PipeConsensus.java
@@ -0,0 +1,600 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.iotdb.consensus.air;
+
+import org.apache.iotdb.common.rpc.thrift.TEndPoint;
+import org.apache.iotdb.common.rpc.thrift.TSStatus;
+import org.apache.iotdb.commons.client.IClientManager;
+import org.apache.iotdb.commons.client.async.AsyncAirReplicationServiceClient;
+import org.apache.iotdb.commons.client.sync.SyncAirReplicationServiceClient;
+import org.apache.iotdb.commons.consensus.ConsensusGroupId;
+import 
org.apache.iotdb.commons.consensus.iotv2.container.IoTV2GlobalComponentContainer;
+import org.apache.iotdb.commons.exception.StartupException;
+import org.apache.iotdb.commons.pipe.agent.task.meta.PipeStatus;
+import org.apache.iotdb.commons.service.RegisterManager;
+import org.apache.iotdb.commons.utils.FileUtils;
+import org.apache.iotdb.commons.utils.KillPoint.DataNodeKillPoints;
+import 
org.apache.iotdb.commons.utils.KillPoint.IoTConsensusDeleteLocalPeerKillPoints;
+import 
org.apache.iotdb.commons.utils.KillPoint.IoTConsensusRemovePeerCoordinatorKillPoints;
+import org.apache.iotdb.commons.utils.KillPoint.KillPoint;
+import org.apache.iotdb.commons.utils.StatusUtils;
+import org.apache.iotdb.consensus.IConsensus;
+import org.apache.iotdb.consensus.IStateMachine;
+import org.apache.iotdb.consensus.common.DataSet;
+import org.apache.iotdb.consensus.common.Peer;
+import org.apache.iotdb.consensus.common.request.IConsensusRequest;
+import org.apache.iotdb.consensus.config.ConsensusConfig;
+import org.apache.iotdb.consensus.config.AirReplicationConfig;
+import org.apache.iotdb.consensus.exception.ConsensusException;
+import 
org.apache.iotdb.consensus.exception.ConsensusGroupAlreadyExistException;
+import org.apache.iotdb.consensus.exception.ConsensusGroupModifyPeerException;
+import org.apache.iotdb.consensus.exception.ConsensusGroupNotExistException;
+import org.apache.iotdb.consensus.exception.IllegalPeerEndpointException;
+import org.apache.iotdb.consensus.exception.IllegalPeerNumException;
+import 
org.apache.iotdb.consensus.exception.PeerAlreadyInConsensusGroupException;
+import org.apache.iotdb.consensus.exception.PeerNotInConsensusGroupException;
+import org.apache.iotdb.consensus.air.airreplication.AirReplicationGuardian;
+import org.apache.iotdb.consensus.air.airreplication.AirReplicationManager;
+import org.apache.iotdb.consensus.air.airreplication.AirReplicationName;
+import org.apache.iotdb.consensus.air.service.AirReplicationRPCService;
+import 
org.apache.iotdb.consensus.air.service.AirReplicationRPCServiceProcessor;
+import org.apache.iotdb.rpc.RpcUtils;
+import org.apache.iotdb.rpc.TSStatusCode;
+
+import com.google.common.collect.ImmutableList;
+import com.google.common.collect.ImmutableMap;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import java.io.File;
+import java.io.IOException;
+import java.nio.file.DirectoryStream;
+import java.nio.file.Files;
+import java.nio.file.Path;
+import java.util.ArrayList;
+import java.util.Collections;
+import java.util.List;
+import java.util.Map;
+import java.util.Optional;
+import java.util.concurrent.CancellationException;
+import java.util.concurrent.CompletableFuture;
+import java.util.concurrent.ConcurrentHashMap;
+import java.util.concurrent.ExecutionException;
+import java.util.concurrent.Future;
+import java.util.concurrent.locks.Lock;
+import java.util.concurrent.locks.ReentrantLock;
+import java.util.concurrent.locks.ReentrantReadWriteLock;
+import java.util.function.BiConsumer;
+import java.util.stream.Collectors;
+
+public class AirReplication implements IConsensus {
+  private static final String REPLICATION_AIR_GUARDIAN_TASK_ID = 
"replication_air_guardian";
+  private static final String CLASS_NAME = 
AirReplication.class.getSimpleName();
+  private static final Logger LOGGER = 
LoggerFactory.getLogger(AirReplication.class);
+
+  private final TEndPoint thisNode;
+  private final int thisNodeId;
+  private final File storageDir;
+  private final IStateMachine.Registry registry;
+  private final Map<ConsensusGroupId, AirReplicationServerImpl> 
stateMachineMap =
+      new ConcurrentHashMap<>();
+  private final AirReplicationRPCService rpcService;
+  private final RegisterManager registerManager = new RegisterManager();
+  private final Map<ConsensusGroupId, ReentrantLock> 
replicationGroupIdReentrantLockMap =
+      new ConcurrentHashMap<>();
+  private final ReentrantReadWriteLock stateMachineMapLock = new 
ReentrantReadWriteLock();
+  private final AirReplicationConfig config;
+  private final AirReplicationManager airReplicationManager;
+  private final AirReplicationGuardian airReplicationGuardian;
+  private final IClientManager<TEndPoint, AsyncAirReplicationServiceClient> 
asyncClientManager;
+  private final IClientManager<TEndPoint, SyncAirReplicationServiceClient> 
syncClientManager;
+  private Map<ConsensusGroupId, List<Peer>> correctPeerListBeforeStart = null;
+
+  public AirReplication(ConsensusConfig config, IStateMachine.Registry 
registry) {
+    this.thisNode = config.getThisNodeEndPoint();
+    this.thisNodeId = config.getThisNodeId();
+    this.storageDir = new File(config.getStorageDir());
+    this.config = config.getAirReplicationConfig();
+    this.registry = registry;
+    this.rpcService = new AirReplicationRPCService(thisNode, 
config.getAirReplicationConfig());
+    this.airReplicationManager =
+        new AirReplicationManager(
+            config.getAirReplicationConfig().getAir(),
+            config.getAirReplicationConfig().getReplicateMode());
+    this.airReplicationGuardian =
+        config.getAirReplicationConfig().getAir().getAirReplicationGuardian();
+    this.asyncClientManager =
+        
IoTV2GlobalComponentContainer.getInstance().getGlobalAsyncClientManager();
+    this.syncClientManager =
+        
IoTV2GlobalComponentContainer.getInstance().getGlobalSyncClientManager();
+  }
+
+  @Override
+  public synchronized void start() throws IOException {
+    Future<Void> recoverFuture = initAndRecover();
+
+    rpcService.initSyncedServiceImpl(new 
AirReplicationRPCServiceProcessor(this, config.getAir()));
+    try {
+      registerManager.register(rpcService);
+    } catch (StartupException e) {
+      throw new IOException(e);
+    }
+
+    try {
+      recoverFuture.get();
+    } catch (CancellationException ce) {
+      LOGGER.info("IoTV2 Recover Task is cancelled", ce);
+    } catch (ExecutionException ee) {
+      LOGGER.error("Exception while waiting for recover future completion", 
ee);
+    } catch (InterruptedException ie) {
+      Thread.currentThread().interrupt();
+      LOGGER.warn("IoTV2 Recover Task is interrupted", ie);
+    }
+    // only when we recover all replication group can we launch async backend 
checker thread
+    airReplicationGuardian.start(
+        REPLICATION_AIR_GUARDIAN_TASK_ID,
+        this::checkAllAirReplication,
+        config.getAir().getAirReplicationGuardJobIntervalInSeconds());
+  }
+
+  private Future<Void> initAndRecover() throws IOException {
+    if (!storageDir.exists()) {
+      // init
+      if (!storageDir.mkdirs()) {
+        LOGGER.warn("Unable to create replication dir at {}", storageDir);
+        throw new IOException(String.format("Unable to create replication dir 
at %s", storageDir));
+      }
+      return CompletableFuture.completedFuture(null);
+    } else {
+      // asynchronously recover, retry logic is implemented at 
AirReplicationImpl
+      return CompletableFuture.runAsync(
+              () -> {
+                try (DirectoryStream<Path> stream = 
Files.newDirectoryStream(storageDir.toPath())) {
+                  for (Path path : stream) {
+                    ConsensusGroupId consensusGroupId =
+                        parsePeerFileName(path.getFileName().toString());
+                    try {
+                      AirReplicationServerImpl replication =
+                          new AirReplicationServerImpl(
+                              new Peer(consensusGroupId, thisNodeId, thisNode),
+                              registry.apply(consensusGroupId),
+                              new ArrayList<>(),
+                              config,
+                              airReplicationManager,
+                              syncClientManager);
+                      stateMachineMap.put(consensusGroupId, replication);
+                      checkPeerListAndStartIfEligible(consensusGroupId, 
replication);
+                    } catch (Exception e) {
+                      LOGGER.error(
+                          "Failed to recover replication from {} for {}, 
ignore it and continue recover other group, async backend checker thread will 
automatically deregister related air side effects for this failed replication 
group.",
+                          storageDir,
+                          consensusGroupId,
+                          e);
+                    }
+                  }
+                } catch (IOException e) {
+                  LOGGER.error(
+                      "Failed to recover replication from {} because read dir 
failed", storageDir, e);
+                }
+              })
+          .exceptionally(
+              e -> {
+                LOGGER.error("Failed to recover replication from {}", 
storageDir, e);
+                return null;
+              });
+    }
+  }
+
+  private void checkPeerListAndStartIfEligible(
+      ConsensusGroupId consensusGroupId, AirReplicationServerImpl replication) 
throws IOException {
+    BiConsumer<ConsensusGroupId, List<Peer>> resetPeerListWithoutThrow =
+        (dataRegionId, peers) -> {
+          try {
+            resetPeerList(dataRegionId, peers);
+          } catch (ConsensusGroupNotExistException ignore) {
+
+          } catch (Exception e) {
+            LOGGER.warn("Failed to reset peer list while start", e);
+          }
+        };
+
+    if (correctPeerListBeforeStart != null) {
+      if (correctPeerListBeforeStart.containsKey(consensusGroupId)) {
+        // make peers which are in list correct
+        resetPeerListWithoutThrow.accept(
+            consensusGroupId, 
correctPeerListBeforeStart.get(consensusGroupId));
+        replication.start(true);
+      } else {
+        // clear peers which are not in the list
+        resetPeerListWithoutThrow.accept(consensusGroupId, 
Collections.emptyList());
+      }
+
+    } else {
+      replication.start(true);
+    }
+  }
+
+  @Override
+  public synchronized void stop() {
+    asyncClientManager.close();
+    syncClientManager.close();
+    registerManager.deregisterAll();
+    airReplicationGuardian.stop();
+    
stateMachineMap.values().parallelStream().forEach(AirReplicationServerImpl::stop);
+    IoTV2GlobalComponentContainer.getInstance().stopBackgroundTaskService();
+  }
+
+  private void checkAllAirReplication() {
+    final Map<ConsensusGroupId, Map<AirReplicationName, PipeStatus>> 
existedAirs =
+        airReplicationManager.getAllAirReplication().entrySet().stream()
+            .filter(entry -> entry.getKey().getSenderDataNodeId() == 
thisNodeId)
+            .collect(
+                Collectors.groupingBy(
+                    entry -> entry.getKey().getConsensusGroupId(),
+                    Collectors.toMap(Map.Entry::getKey, Map.Entry::getValue)));
+    stateMachineMapLock.writeLock().lock();
+    try {
+      stateMachineMap.forEach(
+          (key, value) ->
+              value.checkAirReplication(existedAirs.getOrDefault(key, 
ImmutableMap.of())));
+      existedAirs.entrySet().stream()
+          .filter(entry -> !stateMachineMap.containsKey(entry.getKey()))
+          .flatMap(entry -> entry.getValue().keySet().stream())
+          .forEach(
+              airReplicationName -> {
+                try {
+                  LOGGER.warn(
+                      "{} drop air replication [{}]",
+                      airReplicationName.getConsensusGroupId(),
+                      airReplicationName);
+                  
airReplicationManager.updateAirReplication(airReplicationName, 
PipeStatus.DROPPED);
+                } catch (Exception e) {
+                  LOGGER.warn(
+                      "{} cannot drop air replication [{}]",
+                      airReplicationName.getConsensusGroupId(),
+                      airReplicationName,
+                      e);
+                }
+              });
+    } finally {
+      stateMachineMapLock.writeLock().unlock();
+    }
+  }
+
+  @Override
+  public TSStatus write(ConsensusGroupId groupId, IConsensusRequest request)
+      throws ConsensusException {
+    final AirReplicationServerImpl impl =
+        Optional.ofNullable(stateMachineMap.get(groupId))
+            .orElseThrow(() -> new ConsensusGroupNotExistException(groupId));
+    if (impl.isReadOnly()) {
+      return StatusUtils.getStatus(TSStatusCode.SYSTEM_READ_ONLY);
+    } else if (!impl.isActive()) {
+      return RpcUtils.getStatus(
+          TSStatusCode.WRITE_PROCESS_REJECT,
+          "current node is not active and is not ready to receive user 
write.");
+    } else {
+      return impl.write(request);
+    }
+  }
+
+  @Override
+  public DataSet read(ConsensusGroupId groupId, IConsensusRequest request)
+      throws ConsensusException {
+    return Optional.ofNullable(stateMachineMap.get(groupId))
+        .orElseThrow(() -> new ConsensusGroupNotExistException(groupId))
+        .read(request);
+  }
+
+  private String getPeerDir(ConsensusGroupId groupId) {
+    return storageDir + File.separator + groupId.getType().getValue() + "_" + 
groupId.getId();
+  }
+
+  private ConsensusGroupId parsePeerFileName(String fileName) {
+    String[] items = fileName.split("_");
+    return ConsensusGroupId.Factory.create(Integer.parseInt(items[0]), 
Integer.parseInt(items[1]));
+  }
+
+  @Override
+  public void createLocalPeer(ConsensusGroupId groupId, List<Peer> peers)
+      throws ConsensusException {
+    final int replicationGroupSize = peers.size();
+    if (replicationGroupSize == 0) {
+      throw new IllegalPeerNumException(replicationGroupSize);
+    }
+    if (!peers.contains(new Peer(groupId, thisNodeId, thisNode))) {
+      throw new IllegalPeerEndpointException(thisNode, peers);
+    }
+
+    Lock lock =
+        replicationGroupIdReentrantLockMap.computeIfAbsent(groupId, key -> new 
ReentrantLock());
+    lock.lock();
+    try {
+      stateMachineMapLock.readLock().lock();
+      try {
+        if (stateMachineMap.containsKey(groupId)) {
+          throw new ConsensusGroupAlreadyExistException(groupId);
+        }
+
+        final String path = getPeerDir(groupId);
+        File replicationDir = new File(path);
+        if (!replicationDir.exists() && !replicationDir.mkdirs()) {
+          LOGGER.warn("Unable to create replication dir for group {} at {}", 
groupId, path);
+          throw new ConsensusException(
+              String.format("Unable to create replication dir for group %s", 
groupId));
+        }
+
+        AirReplicationServerImpl replication =
+            new AirReplicationServerImpl(
+                new Peer(groupId, thisNodeId, thisNode),
+                registry.apply(groupId),
+                peers,
+                config,
+                airReplicationManager,
+                syncClientManager);
+        stateMachineMap.put(groupId, replication);
+        replication.start(false); // air will start after creating
+        
KillPoint.setKillPoint(DataNodeKillPoints.DESTINATION_CREATE_LOCAL_PEER);
+      } catch (IOException e) {
+        LOGGER.warn("Cannot create local peer for group {} with peers {}", 
groupId, peers, e);
+        throw new ConsensusException(e);
+      } finally {
+        stateMachineMapLock.readLock().unlock();
+      }
+    } finally {
+      lock.unlock();
+    }
+  }
+
+  @Override
+  public void deleteLocalPeer(ConsensusGroupId groupId) throws 
ConsensusException {
+    
KillPoint.setKillPoint(IoTConsensusDeleteLocalPeerKillPoints.BEFORE_DELETE);
+    Lock lock =
+        replicationGroupIdReentrantLockMap.computeIfAbsent(groupId, key -> new 
ReentrantLock());
+    lock.lock();
+    try {
+      stateMachineMapLock.readLock().lock();
+      try {
+        if (!stateMachineMap.containsKey(groupId)) {
+          throw new ConsensusGroupNotExistException(groupId);
+        }
+        LOGGER.info("[{}] start to delete local peer for group {}", 
CLASS_NAME, groupId);
+        final AirReplicationServerImpl replication = 
stateMachineMap.get(groupId);
+        replication.clear();
+        stateMachineMap.remove(groupId);
+
+        FileUtils.deleteFileOrDirectory(new File(getPeerDir(groupId)));
+        
KillPoint.setKillPoint(IoTConsensusDeleteLocalPeerKillPoints.AFTER_DELETE);
+        LOGGER.info("[{}] finish deleting local peer for group {}", 
CLASS_NAME, groupId);
+      } finally {
+        stateMachineMapLock.readLock().unlock();
+      }
+    } finally {
+      lock.unlock();
+      replicationGroupIdReentrantLockMap.remove(groupId);
+    }
+  }
+
+  @Override
+  public void addRemotePeer(ConsensusGroupId groupId, Peer peer) throws 
ConsensusException {
+    AirReplicationServerImpl impl =
+        Optional.ofNullable(stateMachineMap.get(groupId))
+            .orElseThrow(() -> new ConsensusGroupNotExistException(groupId));
+    if (impl.containsPeer(peer)) {
+      throw new PeerAlreadyInConsensusGroupException(groupId, peer);
+    }
+    try {
+      // step 1: inactive new Peer to prepare for following steps
+      LOGGER.info("[{}] inactivate new peer: {}", CLASS_NAME, peer);
+      impl.setRemotePeerActive(peer, false, false);
+
+      // step 2: notify all the other Peers to create air replications to 
newPeer
+      // NOTE: For this step, all the other peers will try to transfer its 
user write data to target
+      LOGGER.info("[{}] notify current peers to create air replications...", 
CLASS_NAME);
+      impl.notifyPeersToCreateAirReplications(peer);
+      
KillPoint.setKillPoint(DataNodeKillPoints.COORDINATOR_ADD_PEER_TRANSITION);
+
+      // step 3: wait until all other Peers finish transferring
+      LOGGER.info("[{}] wait until all the other peers finish 
transferring...", CLASS_NAME);
+      impl.waitPeersToTargetPeerTransmissionCompleted(peer);
+
+      // step 4: active new Peer to let new Peer receive client requests
+      LOGGER.info("[{}] activate new peer...", CLASS_NAME);
+      impl.setRemotePeerActive(peer, true, false);
+      KillPoint.setKillPoint(DataNodeKillPoints.COORDINATOR_ADD_PEER_DONE);
+    } catch (ConsensusGroupModifyPeerException e) {
+      try {
+        LOGGER.warn(
+            "[{}] add remote peer failed, automatic cleanup side effects...", 
CLASS_NAME, e);
+
+        // roll back
+        impl.notifyPeersToDropAirReplication(peer);
+
+      } catch (ConsensusGroupModifyPeerException mpe) {
+        LOGGER.error(
+            "[{}] failed to cleanup side effects after failed to add remote 
peer", CLASS_NAME, mpe);
+      }
+      throw new ConsensusException(e);
+    }
+  }
+
+  @Override
+  public void removeRemotePeer(ConsensusGroupId groupId, Peer peer) throws 
ConsensusException {
+    AirReplicationServerImpl impl =
+        Optional.ofNullable(stateMachineMap.get(groupId))
+            .orElseThrow(() -> new ConsensusGroupNotExistException(groupId));
+    if (!impl.containsPeer(peer)) {
+      throw new PeerNotInConsensusGroupException(groupId, peer.toString());
+    }
+    KillPoint.setKillPoint(IoTConsensusRemovePeerCoordinatorKillPoints.INIT);
+
+    try {
+      // let other peers to drop air replications to target
+      LOGGER.info("[{}] notify other peers to drop air replications...", 
CLASS_NAME);
+      impl.notifyPeersToDropAirReplication(peer);
+      KillPoint.setKillPoint(
+          IoTConsensusRemovePeerCoordinatorKillPoints
+              .AFTER_NOTIFY_PEERS_TO_REMOVE_REPLICATE_CHANNEL);
+
+      // let target peer reject new write
+      LOGGER.info("[{}] inactivate peer {}", CLASS_NAME, peer);
+      impl.setRemotePeerActive(peer, false, true);
+      
KillPoint.setKillPoint(IoTConsensusRemovePeerCoordinatorKillPoints.AFTER_INACTIVE_PEER);
+
+      // wait its air replications to complete
+      LOGGER.info("[{}] wait target peer{} complete transfer...", CLASS_NAME, 
peer);
+      impl.waitTargetPeerToPeersTransmissionCompleted(peer);
+
+      // wait target peer to release all resource
+      LOGGER.info("[{}] wait {} to release all resource...", CLASS_NAME, peer);
+      impl.waitReleaseAllRegionRelatedResource(peer);
+    } catch (ConsensusGroupModifyPeerException e) {
+      throw new ConsensusException(e);
+    }
+    KillPoint.setKillPoint(IoTConsensusRemovePeerCoordinatorKillPoints.FINISH);
+  }
+
+  @Override
+  public void recordCorrectPeerListBeforeStarting(
+      Map<ConsensusGroupId, List<Peer>> correctPeerList) {
+    LOGGER.info("Record correct peer list: {}", correctPeerList);
+    this.correctPeerListBeforeStart = correctPeerList;
+  }
+
+  @Override
+  public void resetPeerList(ConsensusGroupId groupId, List<Peer> correctPeers)
+      throws ConsensusException {
+    AirReplicationServerImpl impl =
+        Optional.ofNullable(stateMachineMap.get(groupId))
+            .orElseThrow(() -> new ConsensusGroupNotExistException(groupId));
+
+    if (!correctPeers.contains(new Peer(groupId, thisNodeId, thisNode))) {
+      LOGGER.warn(
+          "[RESET PEER LIST] {} Local peer is not in the correct 
configuration, delete it.",
+          groupId);
+      deleteLocalPeer(groupId);
+      return;
+    }
+
+    ImmutableList<Peer> currentPeers = ImmutableList.copyOf(impl.getPeers());
+    String previousPeerListStr = impl.getPeers().toString();
+    // remove invalid peer
+    for (Peer peer : currentPeers) {
+      if (!correctPeers.contains(peer)) {
+        try {
+          impl.dropAirReplicationToTargetPeer(peer);
+          LOGGER.info("[RESET PEER LIST] {} Remove sync channel with: {}", 
groupId, peer);
+        } catch (ConsensusGroupModifyPeerException e) {
+          LOGGER.error(
+              "[RESET PEER LIST] {} Failed to remove sync channel with: {}", 
groupId, peer, e);
+        }
+      }
+    }
+    // add correct peer
+    for (Peer peer : correctPeers) {
+      if (!impl.containsPeer(peer) && peer.getNodeId() != this.thisNodeId) {
+        try {
+          impl.createAirReplicationToTargetPeer(peer, false);
+          LOGGER.info("[RESET PEER LIST] {} Build sync channel with: {}", 
groupId, peer);
+        } catch (ConsensusGroupModifyPeerException e) {
+          LOGGER.warn(
+              "[RESET PEER LIST] {} Failed to build sync channel with: {}", 
groupId, peer, e);
+        }
+      }
+    }
+    // show result
+    String currentPeerListStr = impl.getPeers().toString();
+    if (!previousPeerListStr.equals(currentPeerListStr)) {
+      LOGGER.info(
+          "[RESET PEER LIST] {} Local peer list has been reset: {} -> {}",
+          groupId,
+          previousPeerListStr,
+          impl.getPeers());
+    } else {
+      LOGGER.info(
+          "[RESET PEER LIST] {} The current peer list is correct, nothing need 
to be reset: {}",
+          groupId,
+          previousPeerListStr);
+    }
+  }
+
+  @Override
+  public void transferLeader(ConsensusGroupId groupId, Peer newLeader) throws 
ConsensusException {
+    throw new ConsensusException(String.format("%s does not support leader 
transfer", CLASS_NAME));
+  }
+
+  @Override
+  public void triggerSnapshot(ConsensusGroupId groupId, boolean force) throws 
ConsensusException {
+    if (!stateMachineMap.containsKey(groupId)) {
+      throw new ConsensusGroupNotExistException(groupId);
+    }
+    // Do nothing here because we do not need to transfer snapshot when there 
are new peers
+  }
+
+  @Override
+  public boolean isLeader(ConsensusGroupId groupId) {
+    return true;
+  }
+
+  @Override
+  public long getLogicalClock(ConsensusGroupId groupId) {
+    // TODO: check logical clock
+    return 0;
+  }
+
+  @Override
+  public boolean isLeaderReady(ConsensusGroupId groupId) {
+    return true;
+  }
+
+  @Override
+  public Peer getLeader(ConsensusGroupId groupId) {
+    if (!stateMachineMap.containsKey(groupId)) {
+      return null;
+    }
+    return new Peer(groupId, thisNodeId, thisNode);
+  }
+
+  @Override
+  public int getReplicationNum(ConsensusGroupId groupId) {
+    AirReplicationServerImpl impl = stateMachineMap.get(groupId);
+    return impl != null ? impl.getPeers().size() : 0;
+  }
+
+  @Override
+  public List<ConsensusGroupId> getAllConsensusGroupIds() {
+    return new ArrayList<>(stateMachineMap.keySet());
+  }
+
+  @Override
+  public String getRegionDirFromConsensusGroupId(ConsensusGroupId groupId) {
+    return getPeerDir(groupId);
+  }
+
+  @Override
+  public void reloadConsensusConfig(ConsensusConfig consensusConfig) {
+    // AirReplication doesn't support reload consensus config, related config 
can be reloaded in
+    // iotdb-core layer.
+  }
+
+  public AirReplicationServerImpl getImpl(ConsensusGroupId groupId) {
+    return stateMachineMap.get(groupId);
+  }
+}
diff --git 
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/airreplication/airreplication/AirReplicationDispatcher.java
 
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/airreplication/airreplication/AirReplicationDispatcher.java
new file mode 100644
index 00000000000..dd7ada46d48
--- /dev/null
+++ 
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/airreplication/airreplication/AirReplicationDispatcher.java
@@ -0,0 +1,42 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.iotdb.consensus.air.airreplication;
+
+import java.util.Map;
+
+public interface AirReplicationDispatcher {
+  void createAir(
+      String airName,
+      Map<String, String> extractorAttributes,
+      Map<String, String> processorAttributes,
+      Map<String, String> connectorAttributes,
+      boolean needManuallyStart)
+      throws Exception;
+
+  void startAir(String airName) throws Exception;
+
+  void stopAir(String airName) throws Exception;
+
+  /**
+   * Use AirReplicationName instead of String to provide information for 
receiverAgent to release
+   * corresponding resource
+   */
+  void dropAir(AirReplicationName airName) throws Exception;
+}
diff --git 
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/airreplication/airreplication/AirReplicationGuardian.java
 
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/airreplication/airreplication/AirReplicationGuardian.java
new file mode 100644
index 00000000000..91a3b82480a
--- /dev/null
+++ 
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/airreplication/airreplication/AirReplicationGuardian.java
@@ -0,0 +1,26 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.iotdb.consensus.air.airreplication;
+
+public interface AirReplicationGuardian {
+  void start(String id, Runnable guardJob, long intervalInSeconds);
+
+  void stop();
+}
diff --git 
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/airreplication/airreplication/AirReplicationManager.java
 
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/airreplication/airreplication/AirReplicationManager.java
new file mode 100644
index 00000000000..897a4278b80
--- /dev/null
+++ 
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/airreplication/airreplication/AirReplicationManager.java
@@ -0,0 +1,157 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.iotdb.consensus.air.airreplication;
+
+import org.apache.iotdb.commons.conf.CommonDescriptor;
+import org.apache.iotdb.commons.pipe.agent.task.meta.PipeStatus;
+import org.apache.iotdb.consensus.common.Peer;
+import org.apache.iotdb.consensus.config.AirReplicationConfig;
+import org.apache.iotdb.consensus.config.AirReplicationConfig.ReplicateMode;
+
+import com.google.common.collect.ImmutableMap;
+import org.apache.tsfile.external.commons.lang3.tuple.ImmutableTriple;
+import org.apache.tsfile.external.commons.lang3.tuple.Triple;
+
+import java.util.Map;
+
+import static 
org.apache.iotdb.commons.pipe.config.constant.PipeProcessorConstant.PROCESSOR_KEY;
+import static 
org.apache.iotdb.commons.pipe.config.constant.PipeSinkConstant.CONNECTOR_CONSENSUS_GROUP_ID_KEY;
+import static 
org.apache.iotdb.commons.pipe.config.constant.PipeSinkConstant.CONNECTOR_CONSENSUS_PIPE_NAME;
+import static 
org.apache.iotdb.commons.pipe.config.constant.PipeSinkConstant.CONNECTOR_IOTDB_IP_KEY;
+import static 
org.apache.iotdb.commons.pipe.config.constant.PipeSinkConstant.CONNECTOR_IOTDB_PARALLEL_TASKS_KEY;
+import static 
org.apache.iotdb.commons.pipe.config.constant.PipeSinkConstant.CONNECTOR_IOTDB_PORT_KEY;
+import static 
org.apache.iotdb.commons.pipe.config.constant.PipeSinkConstant.CONNECTOR_KEY;
+import static 
org.apache.iotdb.commons.pipe.config.constant.PipeSinkConstant.CONNECTOR_REALTIME_FIRST_KEY;
+import static 
org.apache.iotdb.commons.pipe.config.constant.PipeSourceConstant.EXTRACTOR_CAPTURE_TABLE_KEY;
+import static 
org.apache.iotdb.commons.pipe.config.constant.PipeSourceConstant.EXTRACTOR_CAPTURE_TREE_KEY;
+import static 
org.apache.iotdb.commons.pipe.config.constant.PipeSourceConstant.EXTRACTOR_CONSENSUS_GROUP_ID_KEY;
+import static 
org.apache.iotdb.commons.pipe.config.constant.PipeSourceConstant.EXTRACTOR_CONSENSUS_RECEIVER_DATANODE_ID_KEY;
+import static 
org.apache.iotdb.commons.pipe.config.constant.PipeSourceConstant.EXTRACTOR_CONSENSUS_SENDER_DATANODE_ID_KEY;
+import static 
org.apache.iotdb.commons.pipe.config.constant.PipeSourceConstant.EXTRACTOR_INCLUSION_KEY;
+import static 
org.apache.iotdb.commons.pipe.config.constant.PipeSourceConstant.EXTRACTOR_IOTDB_USER_KEY;
+import static 
org.apache.iotdb.commons.pipe.config.constant.PipeSourceConstant.EXTRACTOR_KEY;
+import static 
org.apache.iotdb.commons.pipe.config.constant.PipeSourceConstant.EXTRACTOR_REALTIME_MODE_KEY;
+
+public class AirReplicationManager {
+  // Extract data.insert and data.delete to support deletion.
+  private static final String REPLICATION_EXTRACTOR_INCLUSION_VALUE = "data";
+  private final AirReplicationConfig.Air config;
+  private final ReplicateMode replicateMode;
+  private final AirReplicationDispatcher dispatcher;
+  private final AirReplicationSelector selector;
+
+  public AirReplicationManager(AirReplicationConfig.Air config, ReplicateMode 
replicateMode) {
+    this.config = config;
+    this.replicateMode = replicateMode;
+    this.dispatcher = config.getAirReplicationDispatcher();
+    this.selector = config.getAirReplicationSelector();
+  }
+
+  /** This method is used except region migration. */
+  public void createAirReplication(Peer senderPeer, Peer receiverPeer) throws 
Exception {
+    AirReplicationName airReplicationName = new AirReplicationName(senderPeer, 
receiverPeer);
+    // The third parameter is only used when region migration. Since this 
method is not called by
+    // region migration, just pass senderPeer in to get the correct result.
+    Triple<ImmutableMap<String, String>, ImmutableMap<String, String>, 
ImmutableMap<String, String>>
+        params = buildAirParams(senderPeer, receiverPeer);
+    dispatcher.createAir(
+        airReplicationName.toString(),
+        params.getLeft(),
+        params.getMiddle(),
+        params.getRight(),
+        false);
+  }
+
+  /** This method is used when executing region migration */
+  public void createAirReplication(Peer senderPeer, Peer receiverPeer, boolean 
needManuallyStart)
+      throws Exception {
+    AirReplicationName airReplicationName = new AirReplicationName(senderPeer, 
receiverPeer);
+    Triple<ImmutableMap<String, String>, ImmutableMap<String, String>, 
ImmutableMap<String, String>>
+        params = buildAirParams(senderPeer, receiverPeer);
+    dispatcher.createAir(
+        airReplicationName.toString(),
+        params.getLeft(),
+        params.getMiddle(),
+        params.getRight(),
+        needManuallyStart);
+  }
+
+  public Triple<
+          ImmutableMap<String, String>, ImmutableMap<String, String>, 
ImmutableMap<String, String>>
+      buildAirParams(final Peer senderPeer, final Peer receiverPeer) {
+    final AirReplicationName airReplicationName = new 
AirReplicationName(senderPeer, receiverPeer);
+    return new ImmutableTriple<>(
+        ImmutableMap.<String, String>builder()
+            .put(EXTRACTOR_KEY, config.getExtractorPluginName())
+            .put(EXTRACTOR_INCLUSION_KEY, 
REPLICATION_EXTRACTOR_INCLUSION_VALUE)
+            .put(
+                EXTRACTOR_CONSENSUS_GROUP_ID_KEY,
+                airReplicationName.getConsensusGroupId().toString())
+            .put(
+                EXTRACTOR_CONSENSUS_SENDER_DATANODE_ID_KEY,
+                String.valueOf(airReplicationName.getSenderDataNodeId()))
+            .put(
+                EXTRACTOR_CONSENSUS_RECEIVER_DATANODE_ID_KEY,
+                String.valueOf(airReplicationName.getReceiverDataNodeId()))
+            .put(EXTRACTOR_REALTIME_MODE_KEY, replicateMode.getValue())
+            .put(EXTRACTOR_CAPTURE_TABLE_KEY, String.valueOf(true))
+            .put(EXTRACTOR_CAPTURE_TREE_KEY, String.valueOf(true))
+            .put(
+                EXTRACTOR_IOTDB_USER_KEY,
+                
CommonDescriptor.getInstance().getConfig().getDefaultAdminName())
+            .build(),
+        ImmutableMap.<String, String>builder()
+            .put(PROCESSOR_KEY, config.getProcessorPluginName())
+            .build(),
+        ImmutableMap.<String, String>builder()
+            .put(CONNECTOR_KEY, config.getConnectorPluginName())
+            .put(
+                CONNECTOR_CONSENSUS_GROUP_ID_KEY,
+                
String.valueOf(airReplicationName.getConsensusGroupId().getId()))
+            .put(CONNECTOR_CONSENSUS_PIPE_NAME, airReplicationName.toString())
+            .put(CONNECTOR_IOTDB_IP_KEY, receiverPeer.getEndpoint().ip)
+            .put(CONNECTOR_IOTDB_PORT_KEY, 
String.valueOf(receiverPeer.getEndpoint().port))
+            .put(CONNECTOR_IOTDB_PARALLEL_TASKS_KEY, String.valueOf(1))
+            .put(CONNECTOR_REALTIME_FIRST_KEY, String.valueOf(false))
+            .build());
+  }
+
+  public void dropAirReplication(Peer senderPeer, Peer receiverPeer) throws 
Exception {
+    AirReplicationName airReplicationName = new AirReplicationName(senderPeer, 
receiverPeer);
+    dispatcher.dropAir(airReplicationName);
+  }
+
+  public void updateAirReplication(AirReplicationName airReplicationName, 
PipeStatus pipeStatus)
+      throws Exception {
+    if (PipeStatus.RUNNING.equals(pipeStatus)) {
+      dispatcher.startAir(airReplicationName.toString());
+    } else if (PipeStatus.STOPPED.equals(pipeStatus)) {
+      dispatcher.stopAir(airReplicationName.toString());
+    } else if (PipeStatus.DROPPED.equals(pipeStatus)) {
+      dispatcher.dropAir(airReplicationName);
+    } else {
+      throw new IllegalArgumentException("Unsupported air status: " + 
pipeStatus);
+    }
+  }
+
+  public Map<AirReplicationName, PipeStatus> getAllAirReplication() {
+    return selector.getAllAirReplication();
+  }
+}
diff --git 
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/airreplication/airreplication/AirReplicationName.java
 
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/airreplication/airreplication/AirReplicationName.java
new file mode 100644
index 00000000000..687285468ca
--- /dev/null
+++ 
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/airreplication/airreplication/AirReplicationName.java
@@ -0,0 +1,98 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.iotdb.consensus.air.airreplication;
+
+import org.apache.iotdb.commons.consensus.ConsensusGroupId;
+import org.apache.iotdb.commons.pipe.agent.task.meta.PipeStaticMeta;
+import org.apache.iotdb.consensus.common.Peer;
+
+import java.util.Objects;
+
+public class AirReplicationName {
+  private static final String REPLICATION_AIR_NAME_SPLITTER_CHAR = "_";
+  private final ConsensusGroupId consensusGroupId;
+  private final int senderDataNodeId;
+  private final int receiverDataNodeId;
+
+  public AirReplicationName(Peer senderPeer, Peer receiverPeer) {
+    this.consensusGroupId = senderPeer.getGroupId();
+    this.senderDataNodeId = senderPeer.getNodeId();
+    this.receiverDataNodeId = receiverPeer.getNodeId();
+  }
+
+  public AirReplicationName(
+      ConsensusGroupId consensusGroupId, int senderDataNodeId, int 
receiverDataNodeId) {
+    this.consensusGroupId = consensusGroupId;
+    this.senderDataNodeId = senderDataNodeId;
+    this.receiverDataNodeId = receiverDataNodeId;
+  }
+
+  public AirReplicationName(String airName) throws IllegalArgumentException {
+    if (!airName.startsWith(PipeStaticMeta.CONSENSUS_PIPE_PREFIX)) {
+      throw new IllegalArgumentException("Invalid air name: " + airName);
+    }
+    String[] airNameParts =
+        airName
+            .substring(PipeStaticMeta.CONSENSUS_PIPE_PREFIX.length())
+            .split(REPLICATION_AIR_NAME_SPLITTER_CHAR);
+    if (airNameParts.length != 3) {
+      throw new IllegalArgumentException("Invalid air name: " + airName);
+    }
+    this.consensusGroupId = 
ConsensusGroupId.Factory.createFromString(airNameParts[0]);
+    this.senderDataNodeId = Integer.parseInt(airNameParts[1]);
+    this.receiverDataNodeId = Integer.parseInt(airNameParts[2]);
+  }
+
+  public ConsensusGroupId getConsensusGroupId() {
+    return consensusGroupId;
+  }
+
+  public int getSenderDataNodeId() {
+    return senderDataNodeId;
+  }
+
+  public int getReceiverDataNodeId() {
+    return receiverDataNodeId;
+  }
+
+  @Override
+  public boolean equals(Object o) {
+    if (this == o) return true;
+    if (o == null || getClass() != o.getClass()) return false;
+    AirReplicationName that = (AirReplicationName) o;
+    return Objects.equals(consensusGroupId, that.consensusGroupId)
+        && Objects.equals(senderDataNodeId, that.senderDataNodeId)
+        && Objects.equals(receiverDataNodeId, that.receiverDataNodeId);
+  }
+
+  @Override
+  public int hashCode() {
+    return Objects.hash(consensusGroupId, senderDataNodeId, 
receiverDataNodeId);
+  }
+
+  @Override
+  public String toString() {
+    return String.join(
+        REPLICATION_AIR_NAME_SPLITTER_CHAR,
+        PipeStaticMeta.CONSENSUS_PIPE_PREFIX + consensusGroupId,
+        String.valueOf(senderDataNodeId),
+        String.valueOf(receiverDataNodeId));
+  }
+}
diff --git 
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/airreplication/airreplication/AirReplicationReceiver.java
 
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/airreplication/airreplication/AirReplicationReceiver.java
new file mode 100644
index 00000000000..ea254b4227e
--- /dev/null
+++ 
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/airreplication/airreplication/AirReplicationReceiver.java
@@ -0,0 +1,30 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.iotdb.consensus.air.airreplication;
+
+import org.apache.iotdb.commons.consensus.DataRegionId;
+import org.apache.iotdb.consensus.air.thrift.TAirReplicationTransferReq;
+import org.apache.iotdb.consensus.air.thrift.TAirReplicationTransferResp;
+
+public interface AirReplicationReceiver {
+  TAirReplicationTransferResp receive(TAirReplicationTransferReq req);
+
+  void releaseReceiverResource(DataRegionId regionId);
+}
diff --git 
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/airreplication/airreplication/AirReplicationSelector.java
 
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/airreplication/airreplication/AirReplicationSelector.java
new file mode 100644
index 00000000000..3bf5a81ac06
--- /dev/null
+++ 
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/airreplication/airreplication/AirReplicationSelector.java
@@ -0,0 +1,28 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.iotdb.consensus.air.airreplication;
+
+import org.apache.iotdb.commons.pipe.agent.task.meta.PipeStatus;
+
+import java.util.Map;
+
+public interface AirReplicationSelector {
+  Map<AirReplicationName, PipeStatus> getAllAirReplication();
+}
diff --git 
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/airreplication/airreplication/AirReplicationSink.java
 
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/airreplication/airreplication/AirReplicationSink.java
new file mode 100644
index 00000000000..fe3bb009c17
--- /dev/null
+++ 
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/airreplication/airreplication/AirReplicationSink.java
@@ -0,0 +1,25 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.iotdb.consensus.air.airreplication;
+
+public interface AirReplicationSink {
+  long getLeaderReplicateProgress();
+
+  long getFollowerApplyProgress();
+}
diff --git 
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/airreplication/airreplication/ReplicateProgressManager.java
 
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/airreplication/airreplication/ReplicateProgressManager.java
new file mode 100644
index 00000000000..9cf830b0edd
--- /dev/null
+++ 
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/airreplication/airreplication/ReplicateProgressManager.java
@@ -0,0 +1,37 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.iotdb.consensus.air.airreplication;
+
+import org.apache.iotdb.commons.consensus.ConsensusGroupId;
+import org.apache.iotdb.commons.consensus.index.ProgressIndex;
+
+public interface ReplicateProgressManager {
+  ProgressIndex getProgressIndex(AirReplicationName airReplicationName);
+
+  ProgressIndex assignProgressIndex(ConsensusGroupId consensusGroupId);
+
+  ProgressIndex getMaxAssignedProgressIndex(ConsensusGroupId consensusGroupId);
+
+  long getSyncLagForSpecificAirReplication(
+      ConsensusGroupId consensusGroupId, AirReplicationName 
airReplicationName);
+
+  void pinReplicateIndexForRegionMigration(
+      ConsensusGroupId consensusGroupId, AirReplicationName 
airReplicationName);
+}
diff --git 
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/airreplication/metric/AirReplicationServerMetrics.java
 
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/airreplication/metric/AirReplicationServerMetrics.java
new file mode 100644
index 00000000000..c666d7b7e03
--- /dev/null
+++ 
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/airreplication/metric/AirReplicationServerMetrics.java
@@ -0,0 +1,190 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.iotdb.consensus.air.metric;
+
+import org.apache.iotdb.commons.service.metric.enums.Metric;
+import org.apache.iotdb.commons.service.metric.enums.Tag;
+import org.apache.iotdb.consensus.air.AirReplicationServerImpl;
+import org.apache.iotdb.metrics.AbstractMetricService;
+import org.apache.iotdb.metrics.impl.DoNothingMetricManager;
+import org.apache.iotdb.metrics.metricsets.IMetricSet;
+import org.apache.iotdb.metrics.type.Timer;
+import org.apache.iotdb.metrics.utils.MetricLevel;
+import org.apache.iotdb.metrics.utils.MetricType;
+
+public class AirReplicationServerMetrics implements IMetricSet {
+  private final AirReplicationServerImpl impl;
+  private final AirReplicationSyncLagManager syncLagManager;
+
+  private Timer getStateMachineLockTimer = 
DoNothingMetricManager.DO_NOTHING_TIMER;
+  private Timer userWriteStateMachineTimer = 
DoNothingMetricManager.DO_NOTHING_TIMER;
+  private Timer replicaWriteStateMachineTimer = 
DoNothingMetricManager.DO_NOTHING_TIMER;
+
+  public AirReplicationServerMetrics(AirReplicationServerImpl impl) {
+    this.impl = impl;
+    this.syncLagManager = 
AirReplicationSyncLagManager.getInstance(impl.getConsensusGroupId());
+  }
+
+  private static final String IMPL = "AirReplicationServerImpl";
+
+  public void recordGetStateMachineLockTime(long costTimeInNanos) {
+    getStateMachineLockTimer.updateNanos(costTimeInNanos);
+  }
+
+  public void recordUserWriteStateMachineTime(long costTimeInNanos) {
+    userWriteStateMachineTimer.updateNanos(costTimeInNanos);
+  }
+
+  public void recordReplicaWriteStateMachineTime(long costTimeInNanos) {
+    replicaWriteStateMachineTimer.updateNanos(costTimeInNanos);
+  }
+
+  @Override
+  public void bindTo(AbstractMetricService metricService) {
+    bindAutoGauge(metricService);
+    bindGauge(metricService);
+    bindStageTimer(metricService);
+  }
+
+  @Override
+  public void unbindFrom(AbstractMetricService metricService) {
+    unbindAutoGauge(metricService);
+    unbindGauge(metricService);
+    unbindStageTimer(metricService);
+
+    // release corresponding resource
+    AirReplicationSyncLagManager.release(impl.getConsensusGroupId());
+  }
+
+  public void bindGauge(AbstractMetricService metricService) {
+    metricService
+        .getOrCreateGauge(
+            Metric.AIR_REPLICATION_MODE.toString(),
+            MetricLevel.IMPORTANT,
+            Tag.NAME.toString(),
+            IMPL,
+            Tag.TYPE.toString(),
+            "replicateMode")
+        .set(impl.getReplicateMode());
+  }
+
+  public void unbindGauge(AbstractMetricService metricService) {
+    metricService.remove(
+        MetricType.GAUGE,
+        Metric.PIPE_CONSENSUS_MODE.toString(),
+        Tag.NAME.toString(),
+        IMPL,
+        Tag.TYPE.toString(),
+        "replicateMode");
+  }
+
+  public void bindAutoGauge(AbstractMetricService metricService) {
+    metricService.createAutoGauge(
+        Metric.AIR_REPLICATION.toString(),
+        MetricLevel.IMPORTANT,
+        syncLagManager,
+        PipeConsensusSyncLagManager::calculateSyncLag,
+        Tag.NAME.toString(),
+        IMPL,
+        Tag.REGION.toString(),
+        impl.getConsensusGroupId(),
+        Tag.TYPE.toString(),
+        "syncLag");
+  }
+
+  public void unbindAutoGauge(AbstractMetricService metricService) {
+    metricService.remove(
+        MetricType.AUTO_GAUGE,
+        Metric.AIR_REPLICATION.toString(),
+        Tag.NAME.toString(),
+        IMPL,
+        Tag.REGION.toString(),
+        impl.getConsensusGroupId(),
+        Tag.TYPE.toString(),
+        "syncLag");
+  }
+
+  public void bindStageTimer(AbstractMetricService metricService) {
+    getStateMachineLockTimer =
+        metricService.getOrCreateTimer(
+            Metric.STAGE.toString(),
+            MetricLevel.IMPORTANT,
+            Tag.NAME.toString(),
+            Metric.AIR_REPLICATION.toString(),
+            Tag.TYPE.toString(),
+            "getStateMachineLock",
+            Tag.REGION.toString(),
+            impl.getConsensusGroupId());
+    userWriteStateMachineTimer =
+        metricService.getOrCreateTimer(
+            Metric.STAGE.toString(),
+            MetricLevel.IMPORTANT,
+            Tag.NAME.toString(),
+            Metric.AIR_REPLICATION.toString(),
+            Tag.TYPE.toString(),
+            "userWriteStateMachine",
+            Tag.REGION.toString(),
+            impl.getConsensusGroupId());
+    replicaWriteStateMachineTimer =
+        metricService.getOrCreateTimer(
+            Metric.PIPE_RECEIVE_EVENT.toString(),
+            MetricLevel.IMPORTANT,
+            Tag.NAME.toString(),
+            Metric.AIR_REPLICATION.toString(),
+            Tag.TYPE.toString(),
+            "replicaWriteStateMachine",
+            Tag.REGION.toString(),
+            impl.getConsensusGroupId());
+  }
+
+  public void unbindStageTimer(AbstractMetricService metricService) {
+    getStateMachineLockTimer = DoNothingMetricManager.DO_NOTHING_TIMER;
+    userWriteStateMachineTimer = DoNothingMetricManager.DO_NOTHING_TIMER;
+    replicaWriteStateMachineTimer = DoNothingMetricManager.DO_NOTHING_TIMER;
+
+    metricService.remove(
+        MetricType.TIMER,
+        Metric.STAGE.toString(),
+        Tag.NAME.toString(),
+        Metric.AIR_REPLICATION.toString(),
+        Tag.TYPE.toString(),
+        "getStateMachineLock",
+        Tag.REGION.toString(),
+        impl.getConsensusGroupId());
+    metricService.remove(
+        MetricType.TIMER,
+        Metric.STAGE.toString(),
+        Tag.NAME.toString(),
+        Metric.AIR_REPLICATION.toString(),
+        Tag.TYPE.toString(),
+        "writeStateMachine",
+        Tag.REGION.toString(),
+        impl.getConsensusGroupId());
+    metricService.remove(
+        MetricType.TIMER,
+        Metric.PIPE_RECEIVE_EVENT.toString(),
+        Tag.NAME.toString(),
+        Metric.AIR_REPLICATION.toString(),
+        Tag.TYPE.toString(),
+        "replicaWriteStateMachine",
+        Tag.REGION.toString(),
+        impl.getConsensusGroupId());
+  }
+}
diff --git 
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/airreplication/metric/AirReplicationSyncLagManager.java
 
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/airreplication/metric/AirReplicationSyncLagManager.java
new file mode 100644
index 00000000000..1f796d38dea
--- /dev/null
+++ 
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/airreplication/metric/AirReplicationSyncLagManager.java
@@ -0,0 +1,157 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.iotdb.consensus.air.metric;
+
+import org.apache.iotdb.consensus.air.airreplication.AirReplicationName;
+import org.apache.iotdb.consensus.air.airreplication.AirReplicationSink;
+
+import java.util.Map;
+import java.util.Optional;
+import java.util.concurrent.ConcurrentHashMap;
+import java.util.concurrent.locks.ReentrantLock;
+
+/**
+ * This class is used to aggregate the write progress of all Connectors to 
calculate the minimum
+ * synchronization progress of all follower copies, thereby calculating 
syncLag.
+ *
+ * <p>Note: every consensusGroup/dataRegion has and only has 1 instance of 
this class.
+ */
+public class AirReplicationSyncLagManager {
+  long syncLag = Long.MIN_VALUE;
+  ReentrantLock lock = new ReentrantLock();
+  Map<AirReplicationName, AirReplicationSink> airReplication2ConnectorMap = 
new ConcurrentHashMap<>();
+
+  /**
+   * pinnedCommitIndex - currentReplicateProgress. If res <= 0, indicating 
that replication is
+   * finished.
+   */
+  public long getSyncLagForRegionMigration(
+      AirReplicationName airReplicationName, long pinnedCommitIndex) {
+    return 
Optional.ofNullable(airReplication2ConnectorMap.get(airReplicationName))
+        .map(
+            airReplicationSink ->
+                Math.max(pinnedCommitIndex - 
airReplicationSink.getFollowerApplyProgress(), 0L))
+        .orElse(0L);
+  }
+
+  /**
+   * userWriteProgress - currentReplicateProgress. If res <= 0, indicating 
that replication is
+   * finished.
+   */
+  public long getSyncLagForSpecificAirReplication(AirReplicationName 
airReplicationName) {
+    return 
Optional.ofNullable(airReplication2ConnectorMap.get(airReplicationName))
+        .map(
+            airReplicationSink -> {
+              long userWriteProgress = 
airReplicationSink.getLeaderReplicateProgress();
+              long replicateProgress = 
airReplicationSink.getFollowerApplyProgress();
+              return Math.max(userWriteProgress - replicateProgress, 0L);
+            })
+        .orElse(0L);
+  }
+
+  public long getCurrentLeaderReplicateIndex(AirReplicationName 
airReplicationName) {
+    return 
Optional.ofNullable(airReplication2ConnectorMap.get(airReplicationName))
+        .map(AirReplicationSink::getLeaderReplicateProgress)
+        .orElse(0L);
+  }
+
+  public void addAirReplicationConnector(
+      AirReplicationName airReplicationName, AirReplicationSink 
airReplicationSink) {
+    lock.lock();
+    try {
+      airReplication2ConnectorMap.put(airReplicationName, airReplicationSink);
+    } finally {
+      lock.unlock();
+    }
+  }
+
+  public void removeAirReplicationConnector(AirReplicationName 
airReplicationName) {
+    lock.lock();
+    try {
+      airReplication2ConnectorMap.remove(airReplicationName);
+    } finally {
+      lock.unlock();
+    }
+  }
+
+  /**
+   * SyncLag represents the biggest difference between the current replica 
users' write progress and
+   * the synchronization progress of all other replicas. The semantics is how 
much data the leader
+   * has left to synchronize.
+   */
+  public long calculateSyncLag() {
+    lock.lock();
+    try {
+      // if there isn't a air replication task, the syncLag is 0
+      if (airReplication2ConnectorMap.isEmpty()) {
+        return 0;
+      }
+      // else we find the biggest gap between leader and replicas in all air 
replication task.
+      syncLag = Long.MIN_VALUE;
+      airReplication2ConnectorMap
+          .keySet()
+          .forEach(
+              airReplicationName ->
+                  syncLag =
+                      Math.max(syncLag, 
getSyncLagForSpecificAirReplication(airReplicationName)));
+      return syncLag;
+    } finally {
+      lock.unlock();
+    }
+  }
+
+  public void clear() {
+    this.airReplication2ConnectorMap.clear();
+  }
+
+  private AirReplicationSyncLagManager() {
+    // do nothing
+  }
+
+  private static class AirReplicationSyncLagManagerHolder {
+    private static Map<String, AirReplicationSyncLagManager> 
REPLICATION_GROUP_ID_2_INSTANCE_MAP;
+
+    private AirReplicationSyncLagManagerHolder() {
+      // empty constructor
+    }
+
+    private static void build() {
+      if (REPLICATION_GROUP_ID_2_INSTANCE_MAP == null) {
+        REPLICATION_GROUP_ID_2_INSTANCE_MAP = new ConcurrentHashMap<>();
+      }
+    }
+  }
+
+  public static AirReplicationSyncLagManager getInstance(String groupId) {
+    return 
AirReplicationSyncLagManagerHolder.REPLICATION_GROUP_ID_2_INSTANCE_MAP.computeIfAbsent(
+        groupId, key -> new AirReplicationSyncLagManager());
+  }
+
+  public static void release(String groupId) {
+    AirReplicationSyncLagManager.getInstance(groupId).clear();
+    
AirReplicationSyncLagManagerHolder.REPLICATION_GROUP_ID_2_INSTANCE_MAP.remove(groupId);
+  }
+
+  // Only when replication protocol is AirReplication, this method will be 
called once when construct
+  // replication class.
+  public static void build() {
+    AirReplicationSyncLagManagerHolder.build();
+  }
+}
diff --git 
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/airreplication/service/AirReplicationRPCService.java
 
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/airreplication/service/AirReplicationRPCService.java
new file mode 100644
index 00000000000..09cbfe96cc3
--- /dev/null
+++ 
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/airreplication/service/AirReplicationRPCService.java
@@ -0,0 +1,106 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.iotdb.consensus.air.service;
+
+import org.apache.iotdb.common.rpc.thrift.TEndPoint;
+import org.apache.iotdb.commons.concurrent.ThreadName;
+import org.apache.iotdb.commons.exception.runtime.RPCServiceException;
+import org.apache.iotdb.commons.service.ServiceType;
+import org.apache.iotdb.commons.service.ThriftService;
+import org.apache.iotdb.commons.service.ThriftServiceThread;
+import org.apache.iotdb.consensus.config.AirReplicationConfig;
+import org.apache.iotdb.consensus.air.thrift.AirReplicationIService;
+import org.apache.iotdb.rpc.ZeroCopyRpcTransportFactory;
+
+public class AirReplicationRPCService extends ThriftService implements 
AirReplicationRPCServiceMBean {
+
+  private final TEndPoint thisNode;
+  private final AirReplicationConfig config;
+  private AirReplicationRPCServiceProcessor airReplicationRPCServiceProcessor;
+
+  public AirReplicationRPCService(TEndPoint thisNode, AirReplicationConfig 
config) {
+    this.thisNode = thisNode;
+    this.config = config;
+  }
+
+  @Override
+  public ServiceType getID() {
+    return ServiceType.AIR_REPLICATION_SERVICE;
+  }
+
+  @Override
+  public void initSyncedServiceImpl(Object airReplicationRPCServiceProcessor) {
+    this.airReplicationRPCServiceProcessor =
+        (AirReplicationRPCServiceProcessor) airReplicationRPCServiceProcessor;
+    super.initSyncedServiceImpl(this.airReplicationRPCServiceProcessor);
+  }
+
+  @Override
+  public void initTProcessor() {
+    processor = new 
AirReplicationIService.Processor<>(airReplicationRPCServiceProcessor);
+  }
+
+  @Override
+  public void initThriftServiceThread() throws IllegalAccessException {
+    try {
+      thriftServiceThread =
+          config.getRpc().isEnableSSL()
+              ? new ThriftServiceThread(
+                  processor,
+                  getID().getName(),
+                  ThreadName.AIR_REPLICATION_RPC_PROCESSOR.getName(),
+                  getBindIP(),
+                  getBindPort(),
+                  config.getRpc().getRpcMaxConcurrentClientNum(),
+                  config.getRpc().getThriftServerAwaitTimeForStopService(),
+                  new 
AirReplicationRPCServiceHandler(airReplicationRPCServiceProcessor),
+                  config.getRpc().isRpcThriftCompressionEnabled(),
+                  config.getRpc().getSslKeyStorePath(),
+                  config.getRpc().getSslKeyStorePassword(),
+                  config.getRpc().getSslTrustStorePath(),
+                  config.getRpc().getSslTrustStorePassword(),
+                  ZeroCopyRpcTransportFactory.INSTANCE)
+              : new ThriftServiceThread(
+                  processor,
+                  getID().getName(),
+                  ThreadName.AIR_REPLICATION_RPC_PROCESSOR.getName(),
+                  getBindIP(),
+                  getBindPort(),
+                  config.getRpc().getRpcMaxConcurrentClientNum(),
+                  config.getRpc().getThriftServerAwaitTimeForStopService(),
+                  new 
AirReplicationRPCServiceHandler(airReplicationRPCServiceProcessor),
+                  config.getRpc().isRpcThriftCompressionEnabled(),
+                  ZeroCopyRpcTransportFactory.INSTANCE);
+    } catch (RPCServiceException e) {
+      throw new IllegalAccessException(e.getMessage());
+    }
+    
thriftServiceThread.setName(ThreadName.AIR_REPLICATION_RPC_SERVICE.getName());
+  }
+
+  @Override
+  public String getBindIP() {
+    return thisNode.getIp();
+  }
+
+  @Override
+  public int getBindPort() {
+    return thisNode.getPort();
+  }
+}
diff --git 
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/airreplication/service/AirReplicationRPCServiceHandler.java
 
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/airreplication/service/AirReplicationRPCServiceHandler.java
new file mode 100644
index 00000000000..716fa38e3fb
--- /dev/null
+++ 
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/airreplication/service/AirReplicationRPCServiceHandler.java
@@ -0,0 +1,51 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.iotdb.consensus.air.service;
+
+import org.apache.thrift.protocol.TProtocol;
+import org.apache.thrift.server.ServerContext;
+import org.apache.thrift.server.TServerEventHandler;
+import org.apache.thrift.transport.TTransport;
+
+public class AirReplicationRPCServiceHandler implements TServerEventHandler {
+
+  private final AirReplicationRPCServiceProcessor processor;
+
+  public AirReplicationRPCServiceHandler(AirReplicationRPCServiceProcessor 
processor) {
+    this.processor = processor;
+  }
+
+  @Override
+  public void preServe() {}
+
+  @Override
+  public ServerContext createContext(TProtocol input, TProtocol output) {
+    return null;
+  }
+
+  @Override
+  public void deleteContext(ServerContext serverContext, TProtocol input, 
TProtocol output) {
+    processor.handleExit();
+  }
+
+  @Override
+  public void processContext(
+      ServerContext serverContext, TTransport inputTransport, TTransport 
outputTransport) {}
+}
diff --git 
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/airreplication/service/AirReplicationRPCServiceMBean.java
 
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/airreplication/service/AirReplicationRPCServiceMBean.java
new file mode 100644
index 00000000000..52d0e7a53c1
--- /dev/null
+++ 
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/airreplication/service/AirReplicationRPCServiceMBean.java
@@ -0,0 +1,22 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.iotdb.consensus.air.service;
+
+public interface AirReplicationRPCServiceMBean {}
diff --git 
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/airreplication/service/AirReplicationRPCServiceProcessor.java
 
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/airreplication/service/AirReplicationRPCServiceProcessor.java
new file mode 100644
index 00000000000..aa36042e12f
--- /dev/null
+++ 
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/airreplication/service/AirReplicationRPCServiceProcessor.java
@@ -0,0 +1,225 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.iotdb.consensus.air.service;
+
+import org.apache.iotdb.common.rpc.thrift.TSStatus;
+import org.apache.iotdb.commons.consensus.ConsensusGroupId;
+import org.apache.iotdb.commons.utils.KillPoint.DataNodeKillPoints;
+import 
org.apache.iotdb.commons.utils.KillPoint.IoTConsensusInactivatePeerKillPoints;
+import org.apache.iotdb.commons.utils.KillPoint.KillPoint;
+import org.apache.iotdb.consensus.common.Peer;
+import org.apache.iotdb.consensus.config.AirReplicationConfig;
+import org.apache.iotdb.consensus.exception.ConsensusGroupModifyPeerException;
+import org.apache.iotdb.consensus.air.AirReplication;
+import org.apache.iotdb.consensus.air.AirReplicationServerImpl;
+import org.apache.iotdb.consensus.air.thrift.AirReplicationIService;
+import org.apache.iotdb.consensus.air.thrift.TCheckAirReplicationCompletedReq;
+import org.apache.iotdb.consensus.air.thrift.TCheckAirReplicationCompletedResp;
+import 
org.apache.iotdb.consensus.air.thrift.TNotifyPeerToCreateAirReplicationReq;
+import 
org.apache.iotdb.consensus.air.thrift.TNotifyPeerToCreateAirReplicationResp;
+import 
org.apache.iotdb.consensus.air.thrift.TNotifyPeerToDropAirReplicationReq;
+import 
org.apache.iotdb.consensus.air.thrift.TNotifyPeerToDropAirReplicationResp;
+import org.apache.iotdb.consensus.air.thrift.TAirReplicationBatchTransferReq;
+import org.apache.iotdb.consensus.air.thrift.TAirReplicationBatchTransferResp;
+import org.apache.iotdb.consensus.air.thrift.TAirReplicationTransferReq;
+import org.apache.iotdb.consensus.air.thrift.TAirReplicationTransferResp;
+import org.apache.iotdb.consensus.pipe.thrift.TSetActiveReq;
+import org.apache.iotdb.consensus.pipe.thrift.TSetActiveResp;
+import 
org.apache.iotdb.consensus.pipe.thrift.TWaitReleaseAllRegionRelatedResourceReq;
+import 
org.apache.iotdb.consensus.pipe.thrift.TWaitReleaseAllRegionRelatedResourceResp;
+import org.apache.iotdb.rpc.RpcUtils;
+import org.apache.iotdb.rpc.TSStatusCode;
+
+import org.apache.thrift.TException;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+public class AirReplicationRPCServiceProcessor implements 
AirReplicationIService.Iface {
+  private static final Logger LOGGER =
+      LoggerFactory.getLogger(AirReplicationRPCServiceProcessor.class);
+  private final AirReplication airReplication;
+
+  private final AirReplicationConfig.Air config;
+
+  public AirReplicationRPCServiceProcessor(
+      AirReplication airReplication, AirReplicationConfig.Air config) {
+    this.airReplication = airReplication;
+    this.config = config;
+  }
+
+  @Override
+  public TAirReplicationTransferResp 
airReplicationTransfer(TAirReplicationTransferReq req) {
+    return config.getAirReplicationReceiver().receive(req);
+  }
+
+  // TODO: consider batch transfer
+  @Override
+  public TAirReplicationBatchTransferResp airReplicationBatchTransfer(
+      TAirReplicationBatchTransferReq req) throws TException {
+    return new TAirReplicationBatchTransferResp();
+  }
+
+  @Override
+  public TSetActiveResp setActive(TSetActiveReq req) throws TException {
+    if (req.isForDeletionPurpose && !req.isActive) {
+      
KillPoint.setKillPoint(IoTConsensusInactivatePeerKillPoints.BEFORE_INACTIVATE);
+    }
+    ConsensusGroupId groupId =
+        
ConsensusGroupId.Factory.createFromTConsensusGroupId(req.consensusGroupId);
+    AirReplicationServerImpl impl = airReplication.getImpl(groupId);
+    if (impl == null) {
+      String message =
+          String.format("unexpected consensusGroupId %s for set active request 
%s", groupId, req);
+      LOGGER.error(message);
+      TSStatus status = new 
TSStatus(TSStatusCode.INTERNAL_SERVER_ERROR.getStatusCode());
+      status.setMessage(message);
+      return new TSetActiveResp(status);
+    }
+    impl.setActive(req.isActive);
+    if (req.isActive) {
+      KillPoint.setKillPoint(DataNodeKillPoints.DESTINATION_ADD_PEER_DONE);
+    }
+    if (req.isForDeletionPurpose && !req.isActive) {
+      
KillPoint.setKillPoint(IoTConsensusInactivatePeerKillPoints.AFTER_INACTIVATE);
+    }
+    return new TSetActiveResp(RpcUtils.SUCCESS_STATUS);
+  }
+
+  @Override
+  public TNotifyPeerToCreateConsensusPipeResp notifyPeerToCreateConsensusPipe(
+      TNotifyPeerToCreateConsensusPipeReq req) throws TException {
+    ConsensusGroupId groupId =
+        
ConsensusGroupId.Factory.createFromTConsensusGroupId(req.targetPeerConsensusGroupId);
+    AirReplicationServerImpl impl = airReplication.getImpl(groupId);
+    if (impl == null) {
+      String message =
+          String.format(
+              "unexpected consensusGroupId %s for create consensus pipe 
request %s", groupId, req);
+      LOGGER.error(message);
+      TSStatus status = new 
TSStatus(TSStatusCode.INTERNAL_SERVER_ERROR.getStatusCode());
+      status.setMessage(message);
+      return new TNotifyPeerToCreateConsensusPipeResp(status);
+    }
+    TSStatus responseStatus;
+    try {
+      // Other peers which don't act as coordinator will only transfer 
data(may contain both
+      // historical and realtime data) after the snapshot progress.
+      impl.createConsensusPipeToTargetPeer(
+          new Peer(
+              
ConsensusGroupId.Factory.createFromTConsensusGroupId(req.targetPeerConsensusGroupId),
+              req.targetPeerNodeId,
+              req.targetPeerEndPoint),
+          false);
+      responseStatus = new 
TSStatus(TSStatusCode.SUCCESS_STATUS.getStatusCode());
+    } catch (ConsensusGroupModifyPeerException e) {
+      responseStatus = new 
TSStatus(TSStatusCode.INTERNAL_SERVER_ERROR.getStatusCode());
+      responseStatus.setMessage(e.getMessage());
+      LOGGER.warn("Failed to create consensus pipe to target peer with req 
{}", req, e);
+    }
+    return new TNotifyPeerToCreateConsensusPipeResp(responseStatus);
+  }
+
+  @Override
+  public TNotifyPeerToDropConsensusPipeResp notifyPeerToDropConsensusPipe(
+      TNotifyPeerToDropConsensusPipeReq req) throws TException {
+    ConsensusGroupId groupId =
+        
ConsensusGroupId.Factory.createFromTConsensusGroupId(req.targetPeerConsensusGroupId);
+    AirReplicationServerImpl impl = airReplication.getImpl(groupId);
+    if (impl == null) {
+      String message =
+          String.format(
+              "unexpected consensusGroupId %s for drop consensus pipe request 
%s", groupId, req);
+      LOGGER.error(message);
+      TSStatus status = new 
TSStatus(TSStatusCode.INTERNAL_SERVER_ERROR.getStatusCode());
+      status.setMessage(message);
+      return new TNotifyPeerToDropConsensusPipeResp(status);
+    }
+    TSStatus responseStatus;
+    try {
+      impl.dropConsensusPipeToTargetPeer(
+          new Peer(
+              
ConsensusGroupId.Factory.createFromTConsensusGroupId(req.targetPeerConsensusGroupId),
+              req.targetPeerNodeId,
+              req.targetPeerEndPoint));
+      responseStatus = new 
TSStatus(TSStatusCode.SUCCESS_STATUS.getStatusCode());
+    } catch (ConsensusGroupModifyPeerException e) {
+      responseStatus = new 
TSStatus(TSStatusCode.INTERNAL_SERVER_ERROR.getStatusCode());
+      responseStatus.setMessage(e.getMessage());
+      LOGGER.warn("Failed to drop consensus pipe to target peer with req {}", 
req, e);
+    }
+    return new TNotifyPeerToDropConsensusPipeResp(responseStatus);
+  }
+
+  @Override
+  public TCheckConsensusPipeCompletedResp checkConsensusPipeCompleted(
+      TCheckConsensusPipeCompletedReq req) throws TException {
+    ConsensusGroupId groupId =
+        
ConsensusGroupId.Factory.createFromTConsensusGroupId(req.consensusGroupId);
+    AirReplicationServerImpl impl = airReplication.getImpl(groupId);
+    if (impl == null) {
+      String message =
+          String.format(
+              "unexpected consensusGroupId %s for check transfer completed 
request %s",
+              groupId, req);
+      LOGGER.error(message);
+      TSStatus status = new 
TSStatus(TSStatusCode.INTERNAL_SERVER_ERROR.getStatusCode());
+      status.setMessage(message);
+      return new TCheckConsensusPipeCompletedResp(status, true);
+    }
+    TSStatus responseStatus;
+    boolean isCompleted;
+    try {
+      isCompleted =
+          impl.isConsensusPipesTransmissionCompleted(
+              req.consensusPipeNames, req.refreshCachedProgressIndex);
+      responseStatus = new 
TSStatus(TSStatusCode.SUCCESS_STATUS.getStatusCode());
+    } catch (Exception e) {
+      responseStatus = new 
TSStatus(TSStatusCode.INTERNAL_SERVER_ERROR.getStatusCode());
+      responseStatus.setMessage(e.getMessage());
+      isCompleted = true;
+      LOGGER.warn(
+          "Failed to check consensus pipe completed with req {}, set is 
completed to {}",
+          req,
+          true,
+          e);
+    }
+    return new TCheckConsensusPipeCompletedResp(responseStatus, isCompleted);
+  }
+
+  @Override
+  public TWaitReleaseAllRegionRelatedResourceResp 
waitReleaseAllRegionRelatedResource(
+      TWaitReleaseAllRegionRelatedResourceReq req) {
+    ConsensusGroupId groupId =
+        
ConsensusGroupId.Factory.createFromTConsensusGroupId(req.getConsensusGroupId());
+    AirReplicationServerImpl impl = airReplication.getImpl(groupId);
+    if (impl == null) {
+      String message =
+          String.format(
+              "unexpected consensusGroupId %s for 
TWaitReleaseAllRegionRelatedResourceRes request",
+              groupId);
+      LOGGER.error(message);
+      return new TWaitReleaseAllRegionRelatedResourceResp(true);
+    }
+    return new TWaitReleaseAllRegionRelatedResourceResp(
+        impl.hasReleaseAllRegionRelatedResource(groupId));
+  }
+
+  public void handleExit() {}
+}


Reply via email to