This is an automated email from the ASF dual-hosted git repository.
hxpserein pushed a commit to branch research/airreplication
in repository https://gitbox.apache.org/repos/asf/iotdb.git
The following commit(s) were added to refs/heads/research/airreplication by
this push:
new f9e05a4a44d system design for air-replication (#17012)
f9e05a4a44d is described below
commit f9e05a4a44d5eb7ffb343add549158a73377c4b8
Author: Gewu <[email protected]>
AuthorDate: Mon Jan 12 18:13:25 2026 +0800
system design for air-replication (#17012)
---
.../iotdb/db/airreplication/AirReplication.java | 600 +++++++++++++++++
.../airreplication/AirReplicationPeerManager.java | 73 +++
.../airreplication/AirReplicationServerImpl.java | 716 +++++++++++++++++++++
.../iotdb/db/airreplication/PipeConsensus.java | 600 +++++++++++++++++
.../airreplication/AirReplicationDispatcher.java | 42 ++
.../airreplication/AirReplicationGuardian.java | 26 +
.../airreplication/AirReplicationManager.java | 157 +++++
.../airreplication/AirReplicationName.java | 98 +++
.../airreplication/AirReplicationReceiver.java | 30 +
.../airreplication/AirReplicationSelector.java | 28 +
.../airreplication/AirReplicationSink.java | 25 +
.../airreplication/ReplicateProgressManager.java | 37 ++
.../metric/AirReplicationServerMetrics.java | 190 ++++++
.../metric/AirReplicationSyncLagManager.java | 157 +++++
.../service/AirReplicationRPCService.java | 106 +++
.../service/AirReplicationRPCServiceHandler.java | 51 ++
.../service/AirReplicationRPCServiceMBean.java | 22 +
.../service/AirReplicationRPCServiceProcessor.java | 225 +++++++
18 files changed, 3183 insertions(+)
diff --git
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/airreplication/AirReplication.java
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/airreplication/AirReplication.java
new file mode 100644
index 00000000000..77b217625b6
--- /dev/null
+++
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/airreplication/AirReplication.java
@@ -0,0 +1,600 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.iotdb.consensus.air;
+
+import org.apache.iotdb.common.rpc.thrift.TEndPoint;
+import org.apache.iotdb.common.rpc.thrift.TSStatus;
+import org.apache.iotdb.commons.client.IClientManager;
+import org.apache.iotdb.commons.client.async.AsyncAirReplicationServiceClient;
+import org.apache.iotdb.commons.client.sync.SyncAirReplicationServiceClient;
+import org.apache.iotdb.commons.consensus.ConsensusGroupId;
+import
org.apache.iotdb.commons.consensus.iotv2.container.IoTV2GlobalComponentContainer;
+import org.apache.iotdb.commons.exception.StartupException;
+import org.apache.iotdb.commons.pipe.agent.task.meta.PipeStatus;
+import org.apache.iotdb.commons.service.RegisterManager;
+import org.apache.iotdb.commons.utils.FileUtils;
+import org.apache.iotdb.commons.utils.KillPoint.DataNodeKillPoints;
+import
org.apache.iotdb.commons.utils.KillPoint.IoTConsensusDeleteLocalPeerKillPoints;
+import
org.apache.iotdb.commons.utils.KillPoint.IoTConsensusRemovePeerCoordinatorKillPoints;
+import org.apache.iotdb.commons.utils.KillPoint.KillPoint;
+import org.apache.iotdb.commons.utils.StatusUtils;
+import org.apache.iotdb.consensus.IConsensus;
+import org.apache.iotdb.consensus.IStateMachine;
+import org.apache.iotdb.consensus.common.DataSet;
+import org.apache.iotdb.consensus.common.Peer;
+import org.apache.iotdb.consensus.common.request.IConsensusRequest;
+import org.apache.iotdb.consensus.config.ConsensusConfig;
+import org.apache.iotdb.consensus.config.AirReplicationConfig;
+import org.apache.iotdb.consensus.exception.ConsensusException;
+import
org.apache.iotdb.consensus.exception.ConsensusGroupAlreadyExistException;
+import org.apache.iotdb.consensus.exception.ConsensusGroupModifyPeerException;
+import org.apache.iotdb.consensus.exception.ConsensusGroupNotExistException;
+import org.apache.iotdb.consensus.exception.IllegalPeerEndpointException;
+import org.apache.iotdb.consensus.exception.IllegalPeerNumException;
+import
org.apache.iotdb.consensus.exception.PeerAlreadyInConsensusGroupException;
+import org.apache.iotdb.consensus.exception.PeerNotInConsensusGroupException;
+import org.apache.iotdb.consensus.air.airreplication.AirReplicationGuardian;
+import org.apache.iotdb.consensus.air.airreplication.AirReplicationManager;
+import org.apache.iotdb.consensus.air.airreplication.AirReplicationName;
+import org.apache.iotdb.consensus.air.service.AirReplicationRPCService;
+import
org.apache.iotdb.consensus.air.service.AirReplicationRPCServiceProcessor;
+import org.apache.iotdb.rpc.RpcUtils;
+import org.apache.iotdb.rpc.TSStatusCode;
+
+import com.google.common.collect.ImmutableList;
+import com.google.common.collect.ImmutableMap;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import java.io.File;
+import java.io.IOException;
+import java.nio.file.DirectoryStream;
+import java.nio.file.Files;
+import java.nio.file.Path;
+import java.util.ArrayList;
+import java.util.Collections;
+import java.util.List;
+import java.util.Map;
+import java.util.Optional;
+import java.util.concurrent.CancellationException;
+import java.util.concurrent.CompletableFuture;
+import java.util.concurrent.ConcurrentHashMap;
+import java.util.concurrent.ExecutionException;
+import java.util.concurrent.Future;
+import java.util.concurrent.locks.Lock;
+import java.util.concurrent.locks.ReentrantLock;
+import java.util.concurrent.locks.ReentrantReadWriteLock;
+import java.util.function.BiConsumer;
+import java.util.stream.Collectors;
+
+public class AirReplication implements IConsensus {
+ private static final String REPLICATION_AIR_GUARDIAN_TASK_ID =
"replication_air_guardian";
+ private static final String CLASS_NAME =
AirReplication.class.getSimpleName();
+ private static final Logger LOGGER =
LoggerFactory.getLogger(AirReplication.class);
+
+ private final TEndPoint thisNode;
+ private final int thisNodeId;
+ private final File storageDir;
+ private final IStateMachine.Registry registry;
+ private final Map<ConsensusGroupId, AirReplicationServerImpl>
stateMachineMap =
+ new ConcurrentHashMap<>();
+ private final AirReplicationRPCService rpcService;
+ private final RegisterManager registerManager = new RegisterManager();
+ private final Map<ConsensusGroupId, ReentrantLock>
replicationGroupIdReentrantLockMap =
+ new ConcurrentHashMap<>();
+ private final ReentrantReadWriteLock stateMachineMapLock = new
ReentrantReadWriteLock();
+ private final AirReplicationConfig config;
+ private final AirReplicationManager airReplicationManager;
+ private final AirReplicationGuardian airReplicationGuardian;
+ private final IClientManager<TEndPoint, AsyncAirReplicationServiceClient>
asyncClientManager;
+ private final IClientManager<TEndPoint, SyncAirReplicationServiceClient>
syncClientManager;
+ private Map<ConsensusGroupId, List<Peer>> correctPeerListBeforeStart = null;
+
+ public AirReplication(ConsensusConfig config, IStateMachine.Registry
registry) {
+ this.thisNode = config.getThisNodeEndPoint();
+ this.thisNodeId = config.getThisNodeId();
+ this.storageDir = new File(config.getStorageDir());
+ this.config = config.getAirReplicationConfig();
+ this.registry = registry;
+ this.rpcService = new AirReplicationRPCService(thisNode,
config.getAirReplicationConfig());
+ this.airReplicationManager =
+ new AirReplicationManager(
+ config.getAirReplicationConfig().getAir(),
+ config.getAirReplicationConfig().getReplicateMode());
+ this.airReplicationGuardian =
+ config.getAirReplicationConfig().getAir().getAirReplicationGuardian();
+ this.asyncClientManager =
+
IoTV2GlobalComponentContainer.getInstance().getGlobalAsyncClientManager();
+ this.syncClientManager =
+
IoTV2GlobalComponentContainer.getInstance().getGlobalSyncClientManager();
+ }
+
+ @Override
+ public synchronized void start() throws IOException {
+ Future<Void> recoverFuture = initAndRecover();
+
+ rpcService.initSyncedServiceImpl(new
AirReplicationRPCServiceProcessor(this, config.getAir()));
+ try {
+ registerManager.register(rpcService);
+ } catch (StartupException e) {
+ throw new IOException(e);
+ }
+
+ try {
+ recoverFuture.get();
+ } catch (CancellationException ce) {
+ LOGGER.info("IoTV2 Recover Task is cancelled", ce);
+ } catch (ExecutionException ee) {
+ LOGGER.error("Exception while waiting for recover future completion",
ee);
+ } catch (InterruptedException ie) {
+ Thread.currentThread().interrupt();
+ LOGGER.warn("IoTV2 Recover Task is interrupted", ie);
+ }
+ // only when we recover all replication group can we launch async backend
checker thread
+ airReplicationGuardian.start(
+ REPLICATION_AIR_GUARDIAN_TASK_ID,
+ this::checkAllAirReplication,
+ config.getAir().getAirReplicationGuardJobIntervalInSeconds());
+ }
+
+ private Future<Void> initAndRecover() throws IOException {
+ if (!storageDir.exists()) {
+ // init
+ if (!storageDir.mkdirs()) {
+ LOGGER.warn("Unable to create replication dir at {}", storageDir);
+ throw new IOException(String.format("Unable to create replication dir
at %s", storageDir));
+ }
+ return CompletableFuture.completedFuture(null);
+ } else {
+ // asynchronously recover, retry logic is implemented at
AirReplicationImpl
+ return CompletableFuture.runAsync(
+ () -> {
+ try (DirectoryStream<Path> stream =
Files.newDirectoryStream(storageDir.toPath())) {
+ for (Path path : stream) {
+ ConsensusGroupId consensusGroupId =
+ parsePeerFileName(path.getFileName().toString());
+ try {
+ AirReplicationServerImpl replication =
+ new AirReplicationServerImpl(
+ new Peer(consensusGroupId, thisNodeId, thisNode),
+ registry.apply(consensusGroupId),
+ new ArrayList<>(),
+ config,
+ airReplicationManager,
+ syncClientManager);
+ stateMachineMap.put(consensusGroupId, replication);
+ checkPeerListAndStartIfEligible(consensusGroupId,
replication);
+ } catch (Exception e) {
+ LOGGER.error(
+ "Failed to recover replication from {} for {},
ignore it and continue recover other group, async backend checker thread will
automatically deregister related air side effects for this failed replication
group.",
+ storageDir,
+ consensusGroupId,
+ e);
+ }
+ }
+ } catch (IOException e) {
+ LOGGER.error(
+ "Failed to recover replication from {} because read dir
failed", storageDir, e);
+ }
+ })
+ .exceptionally(
+ e -> {
+ LOGGER.error("Failed to recover replication from {}",
storageDir, e);
+ return null;
+ });
+ }
+ }
+
+ private void checkPeerListAndStartIfEligible(
+ ConsensusGroupId consensusGroupId, AirReplicationServerImpl replication)
throws IOException {
+ BiConsumer<ConsensusGroupId, List<Peer>> resetPeerListWithoutThrow =
+ (dataRegionId, peers) -> {
+ try {
+ resetPeerList(dataRegionId, peers);
+ } catch (ConsensusGroupNotExistException ignore) {
+
+ } catch (Exception e) {
+ LOGGER.warn("Failed to reset peer list while start", e);
+ }
+ };
+
+ if (correctPeerListBeforeStart != null) {
+ if (correctPeerListBeforeStart.containsKey(consensusGroupId)) {
+ // make peers which are in list correct
+ resetPeerListWithoutThrow.accept(
+ consensusGroupId,
correctPeerListBeforeStart.get(consensusGroupId));
+ replication.start(true);
+ } else {
+ // clear peers which are not in the list
+ resetPeerListWithoutThrow.accept(consensusGroupId,
Collections.emptyList());
+ }
+
+ } else {
+ replication.start(true);
+ }
+ }
+
+ @Override
+ public synchronized void stop() {
+ asyncClientManager.close();
+ syncClientManager.close();
+ registerManager.deregisterAll();
+ airReplicationGuardian.stop();
+
stateMachineMap.values().parallelStream().forEach(AirReplicationServerImpl::stop);
+ IoTV2GlobalComponentContainer.getInstance().stopBackgroundTaskService();
+ }
+
+ private void checkAllAirReplication() {
+ final Map<ConsensusGroupId, Map<AirReplicationName, PipeStatus>>
existedAirs =
+ airReplicationManager.getAllAirReplication().entrySet().stream()
+ .filter(entry -> entry.getKey().getSenderDataNodeId() ==
thisNodeId)
+ .collect(
+ Collectors.groupingBy(
+ entry -> entry.getKey().getConsensusGroupId(),
+ Collectors.toMap(Map.Entry::getKey, Map.Entry::getValue)));
+ stateMachineMapLock.writeLock().lock();
+ try {
+ stateMachineMap.forEach(
+ (key, value) ->
+ value.checkAirReplication(existedAirs.getOrDefault(key,
ImmutableMap.of())));
+ existedAirs.entrySet().stream()
+ .filter(entry -> !stateMachineMap.containsKey(entry.getKey()))
+ .flatMap(entry -> entry.getValue().keySet().stream())
+ .forEach(
+ airReplicationName -> {
+ try {
+ LOGGER.warn(
+ "{} drop air replication [{}]",
+ airReplicationName.getConsensusGroupId(),
+ airReplicationName);
+
airReplicationManager.updateAirReplication(airReplicationName,
PipeStatus.DROPPED);
+ } catch (Exception e) {
+ LOGGER.warn(
+ "{} cannot drop air replication [{}]",
+ airReplicationName.getConsensusGroupId(),
+ airReplicationName,
+ e);
+ }
+ });
+ } finally {
+ stateMachineMapLock.writeLock().unlock();
+ }
+ }
+
+ @Override
+ public TSStatus write(ConsensusGroupId groupId, IConsensusRequest request)
+ throws ConsensusException {
+ final AirReplicationServerImpl impl =
+ Optional.ofNullable(stateMachineMap.get(groupId))
+ .orElseThrow(() -> new ConsensusGroupNotExistException(groupId));
+ if (impl.isReadOnly()) {
+ return StatusUtils.getStatus(TSStatusCode.SYSTEM_READ_ONLY);
+ } else if (!impl.isActive()) {
+ return RpcUtils.getStatus(
+ TSStatusCode.WRITE_PROCESS_REJECT,
+ "current node is not active and is not ready to receive user
write.");
+ } else {
+ return impl.write(request);
+ }
+ }
+
+ @Override
+ public DataSet read(ConsensusGroupId groupId, IConsensusRequest request)
+ throws ConsensusException {
+ return Optional.ofNullable(stateMachineMap.get(groupId))
+ .orElseThrow(() -> new ConsensusGroupNotExistException(groupId))
+ .read(request);
+ }
+
+ private String getPeerDir(ConsensusGroupId groupId) {
+ return storageDir + File.separator + groupId.getType().getValue() + "_" +
groupId.getId();
+ }
+
+ private ConsensusGroupId parsePeerFileName(String fileName) {
+ String[] items = fileName.split("_");
+ return ConsensusGroupId.Factory.create(Integer.parseInt(items[0]),
Integer.parseInt(items[1]));
+ }
+
+ @Override
+ public void createLocalPeer(ConsensusGroupId groupId, List<Peer> peers)
+ throws ConsensusException {
+ final int replicationGroupSize = peers.size();
+ if (replicationGroupSize == 0) {
+ throw new IllegalPeerNumException(replicationGroupSize);
+ }
+ if (!peers.contains(new Peer(groupId, thisNodeId, thisNode))) {
+ throw new IllegalPeerEndpointException(thisNode, peers);
+ }
+
+ Lock lock =
+ replicationGroupIdReentrantLockMap.computeIfAbsent(groupId, key -> new
ReentrantLock());
+ lock.lock();
+ try {
+ stateMachineMapLock.readLock().lock();
+ try {
+ if (stateMachineMap.containsKey(groupId)) {
+ throw new ConsensusGroupAlreadyExistException(groupId);
+ }
+
+ final String path = getPeerDir(groupId);
+ File replicationDir = new File(path);
+ if (!replicationDir.exists() && !replicationDir.mkdirs()) {
+ LOGGER.warn("Unable to create replication dir for group {} at {}",
groupId, path);
+ throw new ConsensusException(
+ String.format("Unable to create replication dir for group %s",
groupId));
+ }
+
+ AirReplicationServerImpl replication =
+ new AirReplicationServerImpl(
+ new Peer(groupId, thisNodeId, thisNode),
+ registry.apply(groupId),
+ peers,
+ config,
+ airReplicationManager,
+ syncClientManager);
+ stateMachineMap.put(groupId, replication);
+ replication.start(false); // air will start after creating
+
KillPoint.setKillPoint(DataNodeKillPoints.DESTINATION_CREATE_LOCAL_PEER);
+ } catch (IOException e) {
+ LOGGER.warn("Cannot create local peer for group {} with peers {}",
groupId, peers, e);
+ throw new ConsensusException(e);
+ } finally {
+ stateMachineMapLock.readLock().unlock();
+ }
+ } finally {
+ lock.unlock();
+ }
+ }
+
+ @Override
+ public void deleteLocalPeer(ConsensusGroupId groupId) throws
ConsensusException {
+
KillPoint.setKillPoint(IoTConsensusDeleteLocalPeerKillPoints.BEFORE_DELETE);
+ Lock lock =
+ replicationGroupIdReentrantLockMap.computeIfAbsent(groupId, key -> new
ReentrantLock());
+ lock.lock();
+ try {
+ stateMachineMapLock.readLock().lock();
+ try {
+ if (!stateMachineMap.containsKey(groupId)) {
+ throw new ConsensusGroupNotExistException(groupId);
+ }
+ LOGGER.info("[{}] start to delete local peer for group {}",
CLASS_NAME, groupId);
+ final AirReplicationServerImpl replication =
stateMachineMap.get(groupId);
+ replication.clear();
+ stateMachineMap.remove(groupId);
+
+ FileUtils.deleteFileOrDirectory(new File(getPeerDir(groupId)));
+
KillPoint.setKillPoint(IoTConsensusDeleteLocalPeerKillPoints.AFTER_DELETE);
+ LOGGER.info("[{}] finish deleting local peer for group {}",
CLASS_NAME, groupId);
+ } finally {
+ stateMachineMapLock.readLock().unlock();
+ }
+ } finally {
+ lock.unlock();
+ replicationGroupIdReentrantLockMap.remove(groupId);
+ }
+ }
+
+ @Override
+ public void addRemotePeer(ConsensusGroupId groupId, Peer peer) throws
ConsensusException {
+ AirReplicationServerImpl impl =
+ Optional.ofNullable(stateMachineMap.get(groupId))
+ .orElseThrow(() -> new ConsensusGroupNotExistException(groupId));
+ if (impl.containsPeer(peer)) {
+ throw new PeerAlreadyInConsensusGroupException(groupId, peer);
+ }
+ try {
+ // step 1: inactive new Peer to prepare for following steps
+ LOGGER.info("[{}] inactivate new peer: {}", CLASS_NAME, peer);
+ impl.setRemotePeerActive(peer, false, false);
+
+ // step 2: notify all the other Peers to create air replications to
newPeer
+ // NOTE: For this step, all the other peers will try to transfer its
user write data to target
+ LOGGER.info("[{}] notify current peers to create air replications...",
CLASS_NAME);
+ impl.notifyPeersToCreateAirReplications(peer);
+
KillPoint.setKillPoint(DataNodeKillPoints.COORDINATOR_ADD_PEER_TRANSITION);
+
+ // step 3: wait until all other Peers finish transferring
+ LOGGER.info("[{}] wait until all the other peers finish
transferring...", CLASS_NAME);
+ impl.waitPeersToTargetPeerTransmissionCompleted(peer);
+
+ // step 4: active new Peer to let new Peer receive client requests
+ LOGGER.info("[{}] activate new peer...", CLASS_NAME);
+ impl.setRemotePeerActive(peer, true, false);
+ KillPoint.setKillPoint(DataNodeKillPoints.COORDINATOR_ADD_PEER_DONE);
+ } catch (ConsensusGroupModifyPeerException e) {
+ try {
+ LOGGER.warn(
+ "[{}] add remote peer failed, automatic cleanup side effects...",
CLASS_NAME, e);
+
+ // roll back
+ impl.notifyPeersToDropAirReplication(peer);
+
+ } catch (ConsensusGroupModifyPeerException mpe) {
+ LOGGER.error(
+ "[{}] failed to cleanup side effects after failed to add remote
peer", CLASS_NAME, mpe);
+ }
+ throw new ConsensusException(e);
+ }
+ }
+
+ @Override
+ public void removeRemotePeer(ConsensusGroupId groupId, Peer peer) throws
ConsensusException {
+ AirReplicationServerImpl impl =
+ Optional.ofNullable(stateMachineMap.get(groupId))
+ .orElseThrow(() -> new ConsensusGroupNotExistException(groupId));
+ if (!impl.containsPeer(peer)) {
+ throw new PeerNotInConsensusGroupException(groupId, peer.toString());
+ }
+ KillPoint.setKillPoint(IoTConsensusRemovePeerCoordinatorKillPoints.INIT);
+
+ try {
+ // let other peers to drop air replications to target
+ LOGGER.info("[{}] notify other peers to drop air replications...",
CLASS_NAME);
+ impl.notifyPeersToDropAirReplication(peer);
+ KillPoint.setKillPoint(
+ IoTConsensusRemovePeerCoordinatorKillPoints
+ .AFTER_NOTIFY_PEERS_TO_REMOVE_REPLICATE_CHANNEL);
+
+ // let target peer reject new write
+ LOGGER.info("[{}] inactivate peer {}", CLASS_NAME, peer);
+ impl.setRemotePeerActive(peer, false, true);
+
KillPoint.setKillPoint(IoTConsensusRemovePeerCoordinatorKillPoints.AFTER_INACTIVE_PEER);
+
+ // wait its air replications to complete
+ LOGGER.info("[{}] wait target peer{} complete transfer...", CLASS_NAME,
peer);
+ impl.waitTargetPeerToPeersTransmissionCompleted(peer);
+
+ // wait target peer to release all resource
+ LOGGER.info("[{}] wait {} to release all resource...", CLASS_NAME, peer);
+ impl.waitReleaseAllRegionRelatedResource(peer);
+ } catch (ConsensusGroupModifyPeerException e) {
+ throw new ConsensusException(e);
+ }
+ KillPoint.setKillPoint(IoTConsensusRemovePeerCoordinatorKillPoints.FINISH);
+ }
+
+ @Override
+ public void recordCorrectPeerListBeforeStarting(
+ Map<ConsensusGroupId, List<Peer>> correctPeerList) {
+ LOGGER.info("Record correct peer list: {}", correctPeerList);
+ this.correctPeerListBeforeStart = correctPeerList;
+ }
+
+ @Override
+ public void resetPeerList(ConsensusGroupId groupId, List<Peer> correctPeers)
+ throws ConsensusException {
+ AirReplicationServerImpl impl =
+ Optional.ofNullable(stateMachineMap.get(groupId))
+ .orElseThrow(() -> new ConsensusGroupNotExistException(groupId));
+
+ if (!correctPeers.contains(new Peer(groupId, thisNodeId, thisNode))) {
+ LOGGER.warn(
+ "[RESET PEER LIST] {} Local peer is not in the correct
configuration, delete it.",
+ groupId);
+ deleteLocalPeer(groupId);
+ return;
+ }
+
+ ImmutableList<Peer> currentPeers = ImmutableList.copyOf(impl.getPeers());
+ String previousPeerListStr = impl.getPeers().toString();
+ // remove invalid peer
+ for (Peer peer : currentPeers) {
+ if (!correctPeers.contains(peer)) {
+ try {
+ impl.dropAirReplicationToTargetPeer(peer);
+ LOGGER.info("[RESET PEER LIST] {} Remove sync channel with: {}",
groupId, peer);
+ } catch (ConsensusGroupModifyPeerException e) {
+ LOGGER.error(
+ "[RESET PEER LIST] {} Failed to remove sync channel with: {}",
groupId, peer, e);
+ }
+ }
+ }
+ // add correct peer
+ for (Peer peer : correctPeers) {
+ if (!impl.containsPeer(peer) && peer.getNodeId() != this.thisNodeId) {
+ try {
+ impl.createAirReplicationToTargetPeer(peer, false);
+ LOGGER.info("[RESET PEER LIST] {} Build sync channel with: {}",
groupId, peer);
+ } catch (ConsensusGroupModifyPeerException e) {
+ LOGGER.warn(
+ "[RESET PEER LIST] {} Failed to build sync channel with: {}",
groupId, peer, e);
+ }
+ }
+ }
+ // show result
+ String currentPeerListStr = impl.getPeers().toString();
+ if (!previousPeerListStr.equals(currentPeerListStr)) {
+ LOGGER.info(
+ "[RESET PEER LIST] {} Local peer list has been reset: {} -> {}",
+ groupId,
+ previousPeerListStr,
+ impl.getPeers());
+ } else {
+ LOGGER.info(
+ "[RESET PEER LIST] {} The current peer list is correct, nothing need
to be reset: {}",
+ groupId,
+ previousPeerListStr);
+ }
+ }
+
+ @Override
+ public void transferLeader(ConsensusGroupId groupId, Peer newLeader) throws
ConsensusException {
+ throw new ConsensusException(String.format("%s does not support leader
transfer", CLASS_NAME));
+ }
+
+ @Override
+ public void triggerSnapshot(ConsensusGroupId groupId, boolean force) throws
ConsensusException {
+ if (!stateMachineMap.containsKey(groupId)) {
+ throw new ConsensusGroupNotExistException(groupId);
+ }
+ // Do nothing here because we do not need to transfer snapshot when there
are new peers
+ }
+
+ @Override
+ public boolean isLeader(ConsensusGroupId groupId) {
+ return true;
+ }
+
+ @Override
+ public long getLogicalClock(ConsensusGroupId groupId) {
+ // TODO: check logical clock
+ return 0;
+ }
+
+ @Override
+ public boolean isLeaderReady(ConsensusGroupId groupId) {
+ return true;
+ }
+
+ @Override
+ public Peer getLeader(ConsensusGroupId groupId) {
+ if (!stateMachineMap.containsKey(groupId)) {
+ return null;
+ }
+ return new Peer(groupId, thisNodeId, thisNode);
+ }
+
+ @Override
+ public int getReplicationNum(ConsensusGroupId groupId) {
+ AirReplicationServerImpl impl = stateMachineMap.get(groupId);
+ return impl != null ? impl.getPeers().size() : 0;
+ }
+
+ @Override
+ public List<ConsensusGroupId> getAllConsensusGroupIds() {
+ return new ArrayList<>(stateMachineMap.keySet());
+ }
+
+ @Override
+ public String getRegionDirFromConsensusGroupId(ConsensusGroupId groupId) {
+ return getPeerDir(groupId);
+ }
+
+ @Override
+ public void reloadConsensusConfig(ConsensusConfig consensusConfig) {
+ // AirReplication doesn't support reload consensus config, related config
can be reloaded in
+ // iotdb-core layer.
+ }
+
+ public AirReplicationServerImpl getImpl(ConsensusGroupId groupId) {
+ return stateMachineMap.get(groupId);
+ }
+}
diff --git
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/airreplication/AirReplicationPeerManager.java
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/airreplication/AirReplicationPeerManager.java
new file mode 100644
index 00000000000..d8e467a642f
--- /dev/null
+++
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/airreplication/AirReplicationPeerManager.java
@@ -0,0 +1,73 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.iotdb.consensus.air;
+
+import org.apache.iotdb.consensus.common.Peer;
+
+import com.google.common.collect.ImmutableList;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import java.util.Collections;
+import java.util.List;
+import java.util.Set;
+import java.util.concurrent.ConcurrentHashMap;
+
+public class AirReplicationPeerManager {
+
+ private static final Logger LOGGER =
LoggerFactory.getLogger(AirReplicationPeerManager.class);
+
+ private final Set<Peer> peers;
+
+ public AirReplicationPeerManager(List<Peer> peers) {
+ this.peers = Collections.newSetFromMap(new ConcurrentHashMap<>());
+
+ this.peers.addAll(peers);
+ if (this.peers.size() != peers.size()) {
+ LOGGER.warn("Duplicate peers in the input list, ignore the duplicates.");
+ }
+ }
+
+ public boolean contains(Peer peer) {
+ return peers.contains(peer);
+ }
+
+ public void addPeer(Peer peer) {
+ peers.add(peer);
+ }
+
+ public void removePeer(Peer peer) {
+ peers.remove(peer);
+ }
+
+ public List<Peer> getOtherPeers(Peer thisNode) {
+ return peers.stream()
+ .filter(peer -> !peer.equals(thisNode))
+ .collect(ImmutableList.toImmutableList());
+ }
+
+ public List<Peer> getPeers() {
+ return ImmutableList.copyOf(peers);
+ }
+
+ public void clear() {
+ peers.clear();
+ }
+}
diff --git
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/airreplication/AirReplicationServerImpl.java
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/airreplication/AirReplicationServerImpl.java
new file mode 100644
index 00000000000..783c98d50f0
--- /dev/null
+++
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/airreplication/AirReplicationServerImpl.java
@@ -0,0 +1,716 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.iotdb.consensus.air;
+
+import org.apache.iotdb.common.rpc.thrift.TEndPoint;
+import org.apache.iotdb.common.rpc.thrift.TSStatus;
+import org.apache.iotdb.commons.client.IClientManager;
+import org.apache.iotdb.commons.client.exception.ClientManagerException;
+import org.apache.iotdb.commons.client.sync.SyncAirReplicationServiceClient;
+import org.apache.iotdb.commons.consensus.ConsensusGroupId;
+import org.apache.iotdb.commons.consensus.index.ComparableConsensusRequest;
+import org.apache.iotdb.commons.consensus.index.ProgressIndex;
+import org.apache.iotdb.commons.consensus.index.impl.MinimumProgressIndex;
+import org.apache.iotdb.commons.pipe.agent.task.meta.PipeStatus;
+import org.apache.iotdb.commons.service.metric.MetricService;
+import org.apache.iotdb.commons.service.metric.PerformanceOverviewMetrics;
+import org.apache.iotdb.commons.utils.KillPoint.DataNodeKillPoints;
+import org.apache.iotdb.commons.utils.KillPoint.KillPoint;
+import org.apache.iotdb.consensus.IStateMachine;
+import org.apache.iotdb.consensus.common.DataSet;
+import org.apache.iotdb.consensus.common.Peer;
+import org.apache.iotdb.consensus.common.request.IConsensusRequest;
+import org.apache.iotdb.consensus.config.AirReplicationConfig;
+import org.apache.iotdb.consensus.config.AirReplicationConfig.ReplicateMode;
+import org.apache.iotdb.consensus.exception.ConsensusGroupModifyPeerException;
+import org.apache.iotdb.consensus.air.airreplication.AirReplicationManager;
+import org.apache.iotdb.consensus.air.airreplication.AirReplicationName;
+import org.apache.iotdb.consensus.air.airreplication.ReplicateProgressManager;
+import org.apache.iotdb.consensus.air.metric.AirReplicationServerMetrics;
+import org.apache.iotdb.consensus.air.thrift.TCheckAirReplicationCompletedReq;
+import org.apache.iotdb.consensus.air.thrift.TCheckAirReplicationCompletedResp;
+import
org.apache.iotdb.consensus.air.thrift.TNotifyPeerToCreateAirReplicationReq;
+import
org.apache.iotdb.consensus.air.thrift.TNotifyPeerToCreateAirReplicationResp;
+import
org.apache.iotdb.consensus.air.thrift.TNotifyPeerToDropAirReplicationReq;
+import
org.apache.iotdb.consensus.air.thrift.TNotifyPeerToDropAirReplicationResp;
+import org.apache.iotdb.consensus.pipe.thrift.TSetActiveReq;
+import org.apache.iotdb.consensus.pipe.thrift.TSetActiveResp;
+import
org.apache.iotdb.consensus.pipe.thrift.TWaitReleaseAllRegionRelatedResourceReq;
+import
org.apache.iotdb.consensus.pipe.thrift.TWaitReleaseAllRegionRelatedResourceResp;
+import org.apache.iotdb.pipe.api.exception.PipeException;
+import org.apache.iotdb.rpc.RpcUtils;
+
+import com.google.common.collect.ImmutableMap;
+import org.apache.thrift.TException;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import java.io.IOException;
+import java.util.ArrayList;
+import java.util.Collections;
+import java.util.List;
+import java.util.Map;
+import java.util.Set;
+import java.util.concurrent.atomic.AtomicBoolean;
+import java.util.concurrent.locks.Lock;
+import java.util.concurrent.locks.ReentrantLock;
+import java.util.stream.Collectors;
+
+/** AirReplicationServerImpl is a replication server implementation for air
replication. */
+public class AirReplicationServerImpl {
+ private static final Logger LOGGER =
LoggerFactory.getLogger(AirReplicationServerImpl.class);
+ private static final long
CHECK_TRANSMISSION_COMPLETION_INTERVAL_IN_MILLISECONDS = 2_000L;
+ private static final PerformanceOverviewMetrics PERFORMANCE_OVERVIEW_METRICS
=
+ PerformanceOverviewMetrics.getInstance();
+ private static final long RETRY_WAIT_TIME_IN_MS = 500;
+ private static final long MAX_RETRY_TIMES = 20;
+ private final Peer thisNode;
+ private final IStateMachine stateMachine;
+ private final Lock stateMachineLock = new ReentrantLock();
+ private final AirReplicationPeerManager peerManager;
+ private final AtomicBoolean active;
+ private final AtomicBoolean isStarted;
+ private final String consensusGroupId;
+ private final AirReplicationManager airReplicationManager;
+ private final ReplicateProgressManager replicateProgressManager;
+ private final IClientManager<TEndPoint, SyncAirReplicationServiceClient>
syncClientManager;
+ private final AirReplicationServerMetrics airReplicationServerMetrics;
+ private final ReplicateMode replicateMode;
+
+ private ProgressIndex cachedProgressIndex = MinimumProgressIndex.INSTANCE;
+
+ public AirReplicationServerImpl(
+ Peer thisNode,
+ IStateMachine stateMachine,
+ List<Peer> peers,
+ AirReplicationConfig config,
+ AirReplicationManager airReplicationManager,
+ IClientManager<TEndPoint, SyncAirReplicationServiceClient>
syncClientManager)
+ throws IOException {
+ this.thisNode = thisNode;
+ this.stateMachine = stateMachine;
+ this.peerManager = new AirReplicationPeerManager(peers);
+ this.active = new AtomicBoolean(true);
+ this.isStarted = new AtomicBoolean(false);
+ this.consensusGroupId = thisNode.getGroupId().toString();
+ this.airReplicationManager = airReplicationManager;
+ this.replicateProgressManager = config.getAir().getProgressIndexManager();
+ this.syncClientManager = syncClientManager;
+ this.airReplicationServerMetrics = new AirReplicationServerMetrics(this);
+ this.replicateMode = config.getReplicateMode();
+
+ // if peers is empty, the `resetPeerList` will automatically fetch correct
peers' info from CN.
+ if (!peers.isEmpty()) {
+ // create air replications
+ Set<Peer> deepCopyPeersWithoutSelf =
+ peers.stream().filter(peer ->
!peer.equals(thisNode)).collect(Collectors.toSet());
+ final List<Peer> successfulAirs =
createAirReplications(deepCopyPeersWithoutSelf);
+ if (successfulAirs.size() < deepCopyPeersWithoutSelf.size()) {
+ // roll back
+ updateAirReplicationsStatus(successfulAirs, PipeStatus.DROPPED);
+ throw new IOException(String.format("%s cannot create all air
replications", thisNode));
+ }
+ }
+ }
+
+ @SuppressWarnings("java:S2276")
+ public synchronized void start(boolean startConsensusPipes) throws
IOException {
+ stateMachine.start();
+ MetricService.getInstance().addMetricSet(this.airReplicationServerMetrics);
+
+ if (startConsensusPipes) {
+ // start all air replications
+ final List<Peer> otherPeers = peerManager.getOtherPeers(thisNode);
+ List<Peer> failedAirs =
+ updateAirReplicationsStatus(new ArrayList<>(otherPeers),
PipeStatus.RUNNING);
+ // considering procedure can easily time out, keep trying
updateAirReplicationsStatus until all
+ // air replications are started gracefully or exceed the maximum number
of attempts.
+ // NOTE: start air procedure is idempotent guaranteed.
+ try {
+ for (int i = 0; i < MAX_RETRY_TIMES && !failedAirs.isEmpty(); i++) {
+ failedAirs = updateAirReplicationsStatus(failedAirs,
PipeStatus.RUNNING);
+ Thread.sleep(RETRY_WAIT_TIME_IN_MS);
+ }
+ } catch (InterruptedException e) {
+ LOGGER.warn(
+ "AirReplicationImpl-peer{}: airReplicationImpl thread get
interrupted when start air replication. May because IoTDB process is killed.",
+ thisNode);
+ throw new IOException(String.format("%s cannot start all air
replications", thisNode));
+ }
+ // if there still are some air replications failed to start, throw an
exception.
+ if (!failedAirs.isEmpty()) {
+ // roll back
+ List<Peer> successfulAirs = new ArrayList<>(otherPeers);
+ successfulAirs.removeAll(failedAirs);
+ updateAirReplicationsStatus(successfulAirs, PipeStatus.STOPPED);
+ throw new IOException(String.format("%s cannot start all air
replications", thisNode));
+ }
+ }
+ isStarted.set(true);
+ }
+
+ public synchronized void stop() {
+ // stop all air replications
+ final List<Peer> otherPeers = peerManager.getOtherPeers(thisNode);
+ final List<Peer> failedAirs =
+ updateAirReplicationsStatus(new ArrayList<>(otherPeers),
PipeStatus.STOPPED);
+ if (!failedAirs.isEmpty()) {
+ // do not roll back, because it will stop anyway
+ LOGGER.warn("{} cannot stop all air replications", thisNode);
+ }
+
MetricService.getInstance().removeMetricSet(this.airReplicationServerMetrics);
+ stateMachine.stop();
+ isStarted.set(false);
+ }
+
+ public synchronized void clear() {
+ final List<Peer> otherPeers = peerManager.getOtherPeers(thisNode);
+ final List<Peer> failedAirs =
+ updateAirReplicationsStatus(new ArrayList<>(otherPeers),
PipeStatus.DROPPED);
+ if (!failedAirs.isEmpty()) {
+ // do not roll back, because it will clear anyway
+ LOGGER.warn("{} cannot drop all air replications", thisNode);
+ }
+
+
MetricService.getInstance().removeMetricSet(this.airReplicationServerMetrics);
+ peerManager.clear();
+ stateMachine.stop();
+ isStarted.set(false);
+ active.set(false);
+ }
+
+ private List<Peer> createAirReplications(Set<Peer> peers) {
+ return peers.stream()
+ .filter(
+ peer -> {
+ try {
+ if (!peers.equals(thisNode)) {
+ airReplicationManager.createAirReplication(thisNode, peer);
+ }
+ return true;
+ } catch (Exception e) {
+ LOGGER.warn(
+ "{}: cannot create air replication between {} and {}",
+ e.getMessage(),
+ thisNode,
+ peer,
+ e);
+ return false;
+ }
+ })
+ .collect(Collectors.toList());
+ }
+
+ /**
+ * update given air replications' status, returns the peer corresponding to
the air that failed to
+ * update
+ */
+ private List<Peer> updateAirReplicationsStatus(List<Peer> peers, PipeStatus
status) {
+ return peers.stream()
+ .filter(
+ peer -> {
+ try {
+ if (!peer.equals(thisNode)) {
+ airReplicationManager.updateAirReplication(
+ new AirReplicationName(thisNode, peer), status);
+ }
+ return false;
+ } catch (Exception e) {
+ LOGGER.warn(
+ "{}: cannot update air replication between {} and {} to
status {}",
+ e.getMessage(),
+ thisNode,
+ peer,
+ status);
+ return true;
+ }
+ })
+ .collect(Collectors.toList());
+ }
+
+ public synchronized void checkAirReplication(Map<AirReplicationName,
PipeStatus> existedAirs) {
+ final PipeStatus expectedStatus = isStarted.get() ? PipeStatus.RUNNING :
PipeStatus.STOPPED;
+ final Map<AirReplicationName, Peer> expectedAirs =
+ peerManager.getOtherPeers(thisNode).stream()
+ .collect(
+ ImmutableMap.toImmutableMap(
+ peer -> new AirReplicationName(thisNode, peer), peer ->
peer));
+
+ existedAirs.forEach(
+ (existedName, existedStatus) -> {
+ if (!expectedAirs.containsKey(existedName)) {
+ try {
+ LOGGER.warn("{} drop air replication [{}]", consensusGroupId,
existedName);
+ airReplicationManager.updateAirReplication(existedName,
PipeStatus.DROPPED);
+ } catch (Exception e) {
+ LOGGER.warn("{} cannot drop air replication [{}]",
consensusGroupId, existedName, e);
+ }
+ } else if (!expectedStatus.equals(existedStatus)) {
+ try {
+ LOGGER.warn(
+ "{} update air replication [{}] to status {}",
+ consensusGroupId,
+ existedName,
+ expectedStatus);
+ if (expectedStatus.equals(PipeStatus.RUNNING)) {
+ // Do nothing. Because Air framework's metaSync will do that.
+ return;
+ }
+ airReplicationManager.updateAirReplication(existedName,
expectedStatus);
+ } catch (Exception e) {
+ LOGGER.warn(
+ "{} cannot update air replication [{}] to status {}",
+ consensusGroupId,
+ existedName,
+ expectedStatus,
+ e);
+ }
+ }
+ });
+
+ expectedAirs.forEach(
+ (expectedName, expectedPeer) -> {
+ if (!existedAirs.containsKey(expectedName)) {
+ try {
+ LOGGER.warn(
+ "{} create and update air replication [{}] to status {}",
+ consensusGroupId,
+ expectedName,
+ expectedStatus);
+ airReplicationManager.createAirReplication(thisNode,
expectedPeer);
+ airReplicationManager.updateAirReplication(expectedName,
expectedStatus);
+ } catch (Exception e) {
+ LOGGER.warn(
+ "{} cannot create and update air replication [{}] to status
{}",
+ consensusGroupId,
+ expectedName,
+ expectedStatus,
+ e);
+ }
+ }
+ });
+ }
+
+ public TSStatus write(IConsensusRequest request) {
+ stateMachineLock.lock();
+ try {
+ long consensusWriteStartTime = System.nanoTime();
+ long getStateMachineLockTime = System.nanoTime();
+ // statistic the time of acquiring stateMachine lock
+ airReplicationServerMetrics.recordGetStateMachineLockTime(
+ getStateMachineLockTime - consensusWriteStartTime);
+ long writeToStateMachineStartTime = System.nanoTime();
+ if (request instanceof ComparableConsensusRequest) {
+ ((ComparableConsensusRequest) request)
+
.setProgressIndex(replicateProgressManager.assignProgressIndex(thisNode.getGroupId()));
+ }
+ TSStatus result = stateMachine.write(request);
+ long writeToStateMachineEndTime = System.nanoTime();
+ PERFORMANCE_OVERVIEW_METRICS.recordEngineCost(
+ writeToStateMachineEndTime - writeToStateMachineStartTime);
+ // statistic the time of writing request into stateMachine
+ airReplicationServerMetrics.recordUserWriteStateMachineTime(
+ writeToStateMachineEndTime - writeToStateMachineStartTime);
+ return result;
+ } finally {
+ stateMachineLock.unlock();
+ }
+ }
+
+ public TSStatus writeOnFollowerReplica(IConsensusRequest request) {
+ stateMachineLock.lock();
+ try {
+ long consensusWriteStartTime = System.nanoTime();
+ long getStateMachineLockTime = System.nanoTime();
+ // statistic the time of acquiring stateMachine lock
+ airReplicationServerMetrics.recordGetStateMachineLockTime(
+ getStateMachineLockTime - consensusWriteStartTime);
+
+ long writeToStateMachineStartTime = System.nanoTime();
+ TSStatus result = stateMachine.write(request);
+ long writeToStateMachineEndTime = System.nanoTime();
+
+ PERFORMANCE_OVERVIEW_METRICS.recordEngineCost(
+ writeToStateMachineEndTime - writeToStateMachineStartTime);
+ // statistic the time of writing request into stateMachine
+ airReplicationServerMetrics.recordReplicaWriteStateMachineTime(
+ writeToStateMachineEndTime - writeToStateMachineStartTime);
+ return result;
+ } finally {
+ stateMachineLock.unlock();
+ }
+ }
+
+ public DataSet read(IConsensusRequest request) {
+ return stateMachine.read(request);
+ }
+
+ public void setRemotePeerActive(Peer peer, boolean isActive, boolean
isForDeletionPurpose)
+ throws ConsensusGroupModifyPeerException {
+ try (SyncAirReplicationServiceClient client =
+ syncClientManager.borrowClient(peer.getEndpoint())) {
+ try {
+ TSetActiveResp res =
+ client.setActive(
+ new TSetActiveReq(
+ peer.getGroupId().convertToTConsensusGroupId(),
+ isActive,
+ isForDeletionPurpose));
+ if (!RpcUtils.SUCCESS_STATUS.equals(res.getStatus())) {
+ throw new ConsensusGroupModifyPeerException(
+ String.format(
+ "error when set peer %s to active %s. result status: %s",
+ peer, isActive, res.getStatus()));
+ }
+ } catch (Exception e) {
+ throw new ConsensusGroupModifyPeerException(
+ String.format("error when set peer %s to active %s", peer,
isActive), e);
+ }
+ } catch (ClientManagerException e) {
+ if (isForDeletionPurpose) {
+ // for remove peer, if target peer is already down, we can skip this
step.
+ LOGGER.warn(
+ "target peer may be down, error when set peer {} to active {}",
peer, isActive, e);
+ } else {
+ // for add peer, if target peer is down, we need to throw exception to
identify the failure
+ // of this addPeerProcedure.
+ throw new ConsensusGroupModifyPeerException(e);
+ }
+ }
+ }
+
+ public void notifyPeersToCreateAirReplications(Peer targetPeer)
+ throws ConsensusGroupModifyPeerException {
+ final List<Peer> otherPeers = peerManager.getOtherPeers(thisNode);
+ for (Peer peer : otherPeers) {
+ if (peer.equals(targetPeer)) {
+ continue;
+ }
+ try (SyncAirReplicationServiceClient client =
+ syncClientManager.borrowClient(peer.getEndpoint())) {
+ TNotifyPeerToCreateAirReplicationResp resp =
+ client.notifyPeerToCreateAirReplication(
+ new TNotifyPeerToCreateAirReplicationReq(
+ targetPeer.getGroupId().convertToTConsensusGroupId(),
+ targetPeer.getEndpoint(),
+ targetPeer.getNodeId()));
+ if (!RpcUtils.SUCCESS_STATUS.equals(resp.getStatus())) {
+ throw new ConsensusGroupModifyPeerException(
+ String.format("error when notify peer %s to create air
replication", peer));
+ }
+ } catch (Exception e) {
+ LOGGER.warn(
+ "{} cannot notify peer {} to create air replication, may because
that peer is unknown currently, please manually check!",
+ thisNode,
+ peer,
+ e);
+ }
+ }
+
+ try {
+ // This node which acts as coordinator will transfer complete historical
snapshot to new
+ // target.
+ createAirReplicationToTargetPeer(targetPeer, false);
+ } catch (Exception e) {
+ LOGGER.warn(
+ "{} cannot create air replication to {}, may because target peer is
unknown currently, please manually check!",
+ thisNode,
+ targetPeer,
+ e);
+ throw new ConsensusGroupModifyPeerException(e);
+ }
+ }
+
+ public synchronized void createAirReplicationToTargetPeer(
+ Peer targetPeer, boolean needManuallyStart) throws
ConsensusGroupModifyPeerException {
+ try {
+ KillPoint.setKillPoint(DataNodeKillPoints.ORIGINAL_ADD_PEER_DONE);
+ airReplicationManager.createAirReplication(thisNode, targetPeer,
needManuallyStart);
+ peerManager.addPeer(targetPeer);
+ } catch (Exception e) {
+ LOGGER.warn("{} cannot create air replication to {}", thisNode,
targetPeer, e);
+ throw new ConsensusGroupModifyPeerException(
+ String.format("%s cannot create air replication to %s", thisNode,
targetPeer), e);
+ }
+ }
+
+ public void notifyPeersToDropAirReplication(Peer targetPeer)
+ throws ConsensusGroupModifyPeerException {
+ final List<Peer> otherPeers = peerManager.getOtherPeers(thisNode);
+ for (Peer peer : otherPeers) {
+ if (peer.equals(targetPeer)) {
+ continue;
+ }
+ try (SyncAirReplicationServiceClient client =
+ syncClientManager.borrowClient(peer.getEndpoint())) {
+ TNotifyPeerToDropAirReplicationResp resp =
+ client.notifyPeerToDropAirReplication(
+ new TNotifyPeerToDropAirReplicationReq(
+ targetPeer.getGroupId().convertToTConsensusGroupId(),
+ targetPeer.getEndpoint(),
+ targetPeer.getNodeId()));
+ if (!RpcUtils.SUCCESS_STATUS.equals(resp.getStatus())) {
+ throw new ConsensusGroupModifyPeerException(
+ String.format("error when notify peer %s to drop air
replication", peer));
+ }
+ } catch (Exception e) {
+ LOGGER.warn(
+ "{} cannot notify peer {} to drop air replication, may because
that peer is unknown currently, please manually check!",
+ thisNode,
+ peer,
+ e);
+ }
+ }
+
+ try {
+ dropAirReplicationToTargetPeer(targetPeer);
+ } catch (Exception e) {
+ LOGGER.warn(
+ "{} cannot drop air replication to {}, may because target peer is
unknown currently, please manually check!",
+ thisNode,
+ targetPeer,
+ e);
+ throw new ConsensusGroupModifyPeerException(e);
+ }
+ }
+
+ public synchronized void dropAirReplicationToTargetPeer(Peer targetPeer)
+ throws ConsensusGroupModifyPeerException {
+ try {
+ airReplicationManager.dropAirReplication(thisNode, targetPeer);
+ peerManager.removePeer(targetPeer);
+ } catch (Exception e) {
+ LOGGER.warn("{} cannot drop air replication to {}", thisNode,
targetPeer, e);
+ throw new ConsensusGroupModifyPeerException(
+ String.format("%s cannot drop air replication to %s", thisNode,
targetPeer), e);
+ }
+ }
+
+ public void startOtherAirReplicationsToTargetPeer(Peer targetPeer)
+ throws ConsensusGroupModifyPeerException {
+ final List<Peer> otherPeers = peerManager.getOtherPeers(thisNode);
+ for (Peer peer : otherPeers) {
+ if (peer.equals(targetPeer)) {
+ continue;
+ }
+ try {
+ airReplicationManager.updateAirReplication(
+ new AirReplicationName(peer, targetPeer), PipeStatus.RUNNING);
+ } catch (Exception e) {
+ // just warn but not throw exceptions. Because there may exist unknown
nodes in replication
+ // group
+ LOGGER.warn("{} cannot start air replication to {}", peer, targetPeer,
e);
+ }
+ }
+ }
+
+ /** Wait for the user written data up to firstCheck to be replicated */
+ public void waitPeersToTargetPeerTransmissionCompleted(Peer targetPeer)
+ throws ConsensusGroupModifyPeerException {
+ boolean isTransmissionCompleted = false;
+ boolean isFirstCheckForCurrentPeer = true;
+ boolean isFirstCheckForOtherPeers = true;
+
+ try {
+ while (!isTransmissionCompleted) {
+ Thread.sleep(CHECK_TRANSMISSION_COMPLETION_INTERVAL_IN_MILLISECONDS);
+
+ if (isAirReplicationsTransmissionCompleted(
+ Collections.singletonList(new AirReplicationName(thisNode,
targetPeer).toString()),
+ isFirstCheckForCurrentPeer)) {
+ final List<Peer> otherPeers = peerManager.getOtherPeers(thisNode);
+
+ isTransmissionCompleted = true;
+ for (Peer peer : otherPeers) {
+ if (!peer.equals(targetPeer)) {
+ isTransmissionCompleted &=
+ isRemotePeerAirReplicationsTransmissionCompleted(
+ peer,
+ Collections.singletonList(new AirReplicationName(peer,
targetPeer).toString()),
+ isFirstCheckForOtherPeers);
+ }
+ }
+ isFirstCheckForOtherPeers = false;
+ }
+ isFirstCheckForCurrentPeer = false;
+ }
+ } catch (InterruptedException e) {
+ LOGGER.warn("{} is interrupted when waiting for transfer completed",
thisNode, e);
+ Thread.currentThread().interrupt();
+ throw new ConsensusGroupModifyPeerException(
+ String.format("%s is interrupted when waiting for transfer
completed", thisNode), e);
+ }
+ }
+
+ /** Wait for the user written data up to firstCheck to be replicated */
+ public void waitTargetPeerToPeersTransmissionCompleted(Peer targetPeer)
+ throws ConsensusGroupModifyPeerException {
+ boolean isTransmissionCompleted = false;
+ boolean isFirstCheck = true;
+
+ try {
+ while (!isTransmissionCompleted) {
+ Thread.sleep(CHECK_TRANSMISSION_COMPLETION_INTERVAL_IN_MILLISECONDS);
+
+ final List<String> airReplicationNames =
+ peerManager.getPeers().stream()
+ .filter(peer -> !peer.equals(targetPeer))
+ .map(peer -> new AirReplicationName(targetPeer,
peer).toString())
+ .collect(Collectors.toList());
+ isTransmissionCompleted =
+ isRemotePeerAirReplicationsTransmissionCompleted(
+ targetPeer, airReplicationNames, isFirstCheck);
+
+ isFirstCheck = false;
+ }
+ } catch (InterruptedException e) {
+ LOGGER.warn("{} is interrupted when waiting for transfer completed",
thisNode, e);
+ Thread.currentThread().interrupt();
+ throw new ConsensusGroupModifyPeerException(
+ String.format("%s is interrupted when waiting for transfer
completed", thisNode), e);
+ }
+ }
+
+ private boolean isRemotePeerAirReplicationsTransmissionCompleted(
+ Peer targetPeer, List<String> airReplicationNames, boolean
refreshCachedProgressIndex) {
+ try (SyncAirReplicationServiceClient client =
+ syncClientManager.borrowClient(targetPeer.getEndpoint())) {
+ TCheckAirReplicationCompletedResp resp =
+ client.checkAirReplicationCompleted(
+ new TCheckAirReplicationCompletedReq(
+ thisNode.getGroupId().convertToTConsensusGroupId(),
+ airReplicationNames,
+ refreshCachedProgressIndex));
+ if (!RpcUtils.SUCCESS_STATUS.equals(resp.getStatus())) {
+ LOGGER.warn(
+ "{} cannot check air replications transmission completed to peer
{}",
+ thisNode,
+ targetPeer);
+ throw new ConsensusGroupModifyPeerException(
+ String.format(
+ "error when check air replications transmission completed to
peer %s", targetPeer));
+ }
+ return resp.isCompleted;
+ } catch (Exception e) {
+ LOGGER.warn("{} cannot check air replications transmission completed",
thisNode, e);
+ return true;
+ }
+ }
+
+ public boolean isAirReplicationsTransmissionCompleted(List<String>
airReplicationNames) {
+ return airReplicationNames.stream()
+ .noneMatch(
+ airName ->
+ replicateProgressManager.getSyncLagForSpecificAirReplication(
+ thisNode.getGroupId(), new AirReplicationName(airName))
+ > 0);
+ }
+
+ public synchronized boolean isAirReplicationsTransmissionCompleted(
+ List<String> airReplicationNames, boolean refreshCachedProgressIndex) {
+ if (refreshCachedProgressIndex) {
+ cachedProgressIndex =
+ cachedProgressIndex.updateToMinimumEqualOrIsAfterProgressIndex(
+
replicateProgressManager.getMaxAssignedProgressIndex(thisNode.getGroupId()));
+ }
+
+ try {
+ return airReplicationNames.stream()
+ .noneMatch(
+ name ->
+ cachedProgressIndex.isAfter(
+ replicateProgressManager.getProgressIndex(new
AirReplicationName(name))));
+ } catch (PipeException e) {
+ LOGGER.info(e.getMessage());
+ return false;
+ }
+ }
+
+ public void waitReleaseAllRegionRelatedResource(Peer targetPeer)
+ throws ConsensusGroupModifyPeerException {
+ long checkIntervalInMs = 10_000L;
+ try (SyncAirReplicationServiceClient client =
+ syncClientManager.borrowClient(targetPeer.getEndpoint())) {
+ while (true) {
+ TWaitReleaseAllRegionRelatedResourceResp res =
+ client.waitReleaseAllRegionRelatedResource(
+ new TWaitReleaseAllRegionRelatedResourceReq(
+ targetPeer.getGroupId().convertToTConsensusGroupId()));
+ if (res.releaseAllResource) {
+ LOGGER.info("[WAIT RELEASE] {} has released all region related
resource", targetPeer);
+ return;
+ }
+ LOGGER.info("[WAIT RELEASE] {} is still releasing all region related
resource", targetPeer);
+ Thread.sleep(checkIntervalInMs);
+ }
+ } catch (ClientManagerException | TException e) {
+ // in case of target peer is down or can not serve, we simply skip it.
+ LOGGER.warn(
+ String.format(
+ "error when waiting %s to release all region related resource.
%s",
+ targetPeer, e.getMessage()),
+ e);
+ } catch (InterruptedException e) {
+ Thread.currentThread().interrupt();
+ throw new ConsensusGroupModifyPeerException(
+ String.format(
+ "thread interrupted when waiting %s to release all region
related resource. %s",
+ targetPeer, e.getMessage()),
+ e);
+ }
+ }
+
+ public boolean hasReleaseAllRegionRelatedResource(ConsensusGroupId groupId) {
+ return stateMachine.hasReleaseAllRegionRelatedResource(groupId);
+ }
+
+ public boolean isReadOnly() {
+ return stateMachine.isReadOnly();
+ }
+
+ public boolean isActive() {
+ return active.get();
+ }
+
+ public void setActive(boolean active) {
+ LOGGER.info("set {} active status to {}", this.thisNode, active);
+ this.active.set(active);
+ }
+
+ public boolean containsPeer(Peer peer) {
+ return peerManager.contains(peer);
+ }
+
+ public List<Peer> getPeers() {
+ return peerManager.getPeers();
+ }
+
+ public String getConsensusGroupId() {
+ return consensusGroupId;
+ }
+
+ public long getReplicateMode() {
+ return (replicateMode == ReplicateMode.BATCH) ? 2 : 1;
+ }
+
+ public Peer getThisNodePeer() {
+ return thisNode;
+ }
+}
diff --git
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/airreplication/PipeConsensus.java
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/airreplication/PipeConsensus.java
new file mode 100644
index 00000000000..77b217625b6
--- /dev/null
+++
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/airreplication/PipeConsensus.java
@@ -0,0 +1,600 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.iotdb.consensus.air;
+
+import org.apache.iotdb.common.rpc.thrift.TEndPoint;
+import org.apache.iotdb.common.rpc.thrift.TSStatus;
+import org.apache.iotdb.commons.client.IClientManager;
+import org.apache.iotdb.commons.client.async.AsyncAirReplicationServiceClient;
+import org.apache.iotdb.commons.client.sync.SyncAirReplicationServiceClient;
+import org.apache.iotdb.commons.consensus.ConsensusGroupId;
+import
org.apache.iotdb.commons.consensus.iotv2.container.IoTV2GlobalComponentContainer;
+import org.apache.iotdb.commons.exception.StartupException;
+import org.apache.iotdb.commons.pipe.agent.task.meta.PipeStatus;
+import org.apache.iotdb.commons.service.RegisterManager;
+import org.apache.iotdb.commons.utils.FileUtils;
+import org.apache.iotdb.commons.utils.KillPoint.DataNodeKillPoints;
+import
org.apache.iotdb.commons.utils.KillPoint.IoTConsensusDeleteLocalPeerKillPoints;
+import
org.apache.iotdb.commons.utils.KillPoint.IoTConsensusRemovePeerCoordinatorKillPoints;
+import org.apache.iotdb.commons.utils.KillPoint.KillPoint;
+import org.apache.iotdb.commons.utils.StatusUtils;
+import org.apache.iotdb.consensus.IConsensus;
+import org.apache.iotdb.consensus.IStateMachine;
+import org.apache.iotdb.consensus.common.DataSet;
+import org.apache.iotdb.consensus.common.Peer;
+import org.apache.iotdb.consensus.common.request.IConsensusRequest;
+import org.apache.iotdb.consensus.config.ConsensusConfig;
+import org.apache.iotdb.consensus.config.AirReplicationConfig;
+import org.apache.iotdb.consensus.exception.ConsensusException;
+import
org.apache.iotdb.consensus.exception.ConsensusGroupAlreadyExistException;
+import org.apache.iotdb.consensus.exception.ConsensusGroupModifyPeerException;
+import org.apache.iotdb.consensus.exception.ConsensusGroupNotExistException;
+import org.apache.iotdb.consensus.exception.IllegalPeerEndpointException;
+import org.apache.iotdb.consensus.exception.IllegalPeerNumException;
+import
org.apache.iotdb.consensus.exception.PeerAlreadyInConsensusGroupException;
+import org.apache.iotdb.consensus.exception.PeerNotInConsensusGroupException;
+import org.apache.iotdb.consensus.air.airreplication.AirReplicationGuardian;
+import org.apache.iotdb.consensus.air.airreplication.AirReplicationManager;
+import org.apache.iotdb.consensus.air.airreplication.AirReplicationName;
+import org.apache.iotdb.consensus.air.service.AirReplicationRPCService;
+import
org.apache.iotdb.consensus.air.service.AirReplicationRPCServiceProcessor;
+import org.apache.iotdb.rpc.RpcUtils;
+import org.apache.iotdb.rpc.TSStatusCode;
+
+import com.google.common.collect.ImmutableList;
+import com.google.common.collect.ImmutableMap;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import java.io.File;
+import java.io.IOException;
+import java.nio.file.DirectoryStream;
+import java.nio.file.Files;
+import java.nio.file.Path;
+import java.util.ArrayList;
+import java.util.Collections;
+import java.util.List;
+import java.util.Map;
+import java.util.Optional;
+import java.util.concurrent.CancellationException;
+import java.util.concurrent.CompletableFuture;
+import java.util.concurrent.ConcurrentHashMap;
+import java.util.concurrent.ExecutionException;
+import java.util.concurrent.Future;
+import java.util.concurrent.locks.Lock;
+import java.util.concurrent.locks.ReentrantLock;
+import java.util.concurrent.locks.ReentrantReadWriteLock;
+import java.util.function.BiConsumer;
+import java.util.stream.Collectors;
+
+public class AirReplication implements IConsensus {
+ private static final String REPLICATION_AIR_GUARDIAN_TASK_ID =
"replication_air_guardian";
+ private static final String CLASS_NAME =
AirReplication.class.getSimpleName();
+ private static final Logger LOGGER =
LoggerFactory.getLogger(AirReplication.class);
+
+ private final TEndPoint thisNode;
+ private final int thisNodeId;
+ private final File storageDir;
+ private final IStateMachine.Registry registry;
+ private final Map<ConsensusGroupId, AirReplicationServerImpl>
stateMachineMap =
+ new ConcurrentHashMap<>();
+ private final AirReplicationRPCService rpcService;
+ private final RegisterManager registerManager = new RegisterManager();
+ private final Map<ConsensusGroupId, ReentrantLock>
replicationGroupIdReentrantLockMap =
+ new ConcurrentHashMap<>();
+ private final ReentrantReadWriteLock stateMachineMapLock = new
ReentrantReadWriteLock();
+ private final AirReplicationConfig config;
+ private final AirReplicationManager airReplicationManager;
+ private final AirReplicationGuardian airReplicationGuardian;
+ private final IClientManager<TEndPoint, AsyncAirReplicationServiceClient>
asyncClientManager;
+ private final IClientManager<TEndPoint, SyncAirReplicationServiceClient>
syncClientManager;
+ private Map<ConsensusGroupId, List<Peer>> correctPeerListBeforeStart = null;
+
+ public AirReplication(ConsensusConfig config, IStateMachine.Registry
registry) {
+ this.thisNode = config.getThisNodeEndPoint();
+ this.thisNodeId = config.getThisNodeId();
+ this.storageDir = new File(config.getStorageDir());
+ this.config = config.getAirReplicationConfig();
+ this.registry = registry;
+ this.rpcService = new AirReplicationRPCService(thisNode,
config.getAirReplicationConfig());
+ this.airReplicationManager =
+ new AirReplicationManager(
+ config.getAirReplicationConfig().getAir(),
+ config.getAirReplicationConfig().getReplicateMode());
+ this.airReplicationGuardian =
+ config.getAirReplicationConfig().getAir().getAirReplicationGuardian();
+ this.asyncClientManager =
+
IoTV2GlobalComponentContainer.getInstance().getGlobalAsyncClientManager();
+ this.syncClientManager =
+
IoTV2GlobalComponentContainer.getInstance().getGlobalSyncClientManager();
+ }
+
+ @Override
+ public synchronized void start() throws IOException {
+ Future<Void> recoverFuture = initAndRecover();
+
+ rpcService.initSyncedServiceImpl(new
AirReplicationRPCServiceProcessor(this, config.getAir()));
+ try {
+ registerManager.register(rpcService);
+ } catch (StartupException e) {
+ throw new IOException(e);
+ }
+
+ try {
+ recoverFuture.get();
+ } catch (CancellationException ce) {
+ LOGGER.info("IoTV2 Recover Task is cancelled", ce);
+ } catch (ExecutionException ee) {
+ LOGGER.error("Exception while waiting for recover future completion",
ee);
+ } catch (InterruptedException ie) {
+ Thread.currentThread().interrupt();
+ LOGGER.warn("IoTV2 Recover Task is interrupted", ie);
+ }
+ // only when we recover all replication group can we launch async backend
checker thread
+ airReplicationGuardian.start(
+ REPLICATION_AIR_GUARDIAN_TASK_ID,
+ this::checkAllAirReplication,
+ config.getAir().getAirReplicationGuardJobIntervalInSeconds());
+ }
+
+ private Future<Void> initAndRecover() throws IOException {
+ if (!storageDir.exists()) {
+ // init
+ if (!storageDir.mkdirs()) {
+ LOGGER.warn("Unable to create replication dir at {}", storageDir);
+ throw new IOException(String.format("Unable to create replication dir
at %s", storageDir));
+ }
+ return CompletableFuture.completedFuture(null);
+ } else {
+ // asynchronously recover, retry logic is implemented at
AirReplicationImpl
+ return CompletableFuture.runAsync(
+ () -> {
+ try (DirectoryStream<Path> stream =
Files.newDirectoryStream(storageDir.toPath())) {
+ for (Path path : stream) {
+ ConsensusGroupId consensusGroupId =
+ parsePeerFileName(path.getFileName().toString());
+ try {
+ AirReplicationServerImpl replication =
+ new AirReplicationServerImpl(
+ new Peer(consensusGroupId, thisNodeId, thisNode),
+ registry.apply(consensusGroupId),
+ new ArrayList<>(),
+ config,
+ airReplicationManager,
+ syncClientManager);
+ stateMachineMap.put(consensusGroupId, replication);
+ checkPeerListAndStartIfEligible(consensusGroupId,
replication);
+ } catch (Exception e) {
+ LOGGER.error(
+ "Failed to recover replication from {} for {},
ignore it and continue recover other group, async backend checker thread will
automatically deregister related air side effects for this failed replication
group.",
+ storageDir,
+ consensusGroupId,
+ e);
+ }
+ }
+ } catch (IOException e) {
+ LOGGER.error(
+ "Failed to recover replication from {} because read dir
failed", storageDir, e);
+ }
+ })
+ .exceptionally(
+ e -> {
+ LOGGER.error("Failed to recover replication from {}",
storageDir, e);
+ return null;
+ });
+ }
+ }
+
+ private void checkPeerListAndStartIfEligible(
+ ConsensusGroupId consensusGroupId, AirReplicationServerImpl replication)
throws IOException {
+ BiConsumer<ConsensusGroupId, List<Peer>> resetPeerListWithoutThrow =
+ (dataRegionId, peers) -> {
+ try {
+ resetPeerList(dataRegionId, peers);
+ } catch (ConsensusGroupNotExistException ignore) {
+
+ } catch (Exception e) {
+ LOGGER.warn("Failed to reset peer list while start", e);
+ }
+ };
+
+ if (correctPeerListBeforeStart != null) {
+ if (correctPeerListBeforeStart.containsKey(consensusGroupId)) {
+ // make peers which are in list correct
+ resetPeerListWithoutThrow.accept(
+ consensusGroupId,
correctPeerListBeforeStart.get(consensusGroupId));
+ replication.start(true);
+ } else {
+ // clear peers which are not in the list
+ resetPeerListWithoutThrow.accept(consensusGroupId,
Collections.emptyList());
+ }
+
+ } else {
+ replication.start(true);
+ }
+ }
+
+ @Override
+ public synchronized void stop() {
+ asyncClientManager.close();
+ syncClientManager.close();
+ registerManager.deregisterAll();
+ airReplicationGuardian.stop();
+
stateMachineMap.values().parallelStream().forEach(AirReplicationServerImpl::stop);
+ IoTV2GlobalComponentContainer.getInstance().stopBackgroundTaskService();
+ }
+
+ private void checkAllAirReplication() {
+ final Map<ConsensusGroupId, Map<AirReplicationName, PipeStatus>>
existedAirs =
+ airReplicationManager.getAllAirReplication().entrySet().stream()
+ .filter(entry -> entry.getKey().getSenderDataNodeId() ==
thisNodeId)
+ .collect(
+ Collectors.groupingBy(
+ entry -> entry.getKey().getConsensusGroupId(),
+ Collectors.toMap(Map.Entry::getKey, Map.Entry::getValue)));
+ stateMachineMapLock.writeLock().lock();
+ try {
+ stateMachineMap.forEach(
+ (key, value) ->
+ value.checkAirReplication(existedAirs.getOrDefault(key,
ImmutableMap.of())));
+ existedAirs.entrySet().stream()
+ .filter(entry -> !stateMachineMap.containsKey(entry.getKey()))
+ .flatMap(entry -> entry.getValue().keySet().stream())
+ .forEach(
+ airReplicationName -> {
+ try {
+ LOGGER.warn(
+ "{} drop air replication [{}]",
+ airReplicationName.getConsensusGroupId(),
+ airReplicationName);
+
airReplicationManager.updateAirReplication(airReplicationName,
PipeStatus.DROPPED);
+ } catch (Exception e) {
+ LOGGER.warn(
+ "{} cannot drop air replication [{}]",
+ airReplicationName.getConsensusGroupId(),
+ airReplicationName,
+ e);
+ }
+ });
+ } finally {
+ stateMachineMapLock.writeLock().unlock();
+ }
+ }
+
+ @Override
+ public TSStatus write(ConsensusGroupId groupId, IConsensusRequest request)
+ throws ConsensusException {
+ final AirReplicationServerImpl impl =
+ Optional.ofNullable(stateMachineMap.get(groupId))
+ .orElseThrow(() -> new ConsensusGroupNotExistException(groupId));
+ if (impl.isReadOnly()) {
+ return StatusUtils.getStatus(TSStatusCode.SYSTEM_READ_ONLY);
+ } else if (!impl.isActive()) {
+ return RpcUtils.getStatus(
+ TSStatusCode.WRITE_PROCESS_REJECT,
+ "current node is not active and is not ready to receive user
write.");
+ } else {
+ return impl.write(request);
+ }
+ }
+
+ @Override
+ public DataSet read(ConsensusGroupId groupId, IConsensusRequest request)
+ throws ConsensusException {
+ return Optional.ofNullable(stateMachineMap.get(groupId))
+ .orElseThrow(() -> new ConsensusGroupNotExistException(groupId))
+ .read(request);
+ }
+
+ private String getPeerDir(ConsensusGroupId groupId) {
+ return storageDir + File.separator + groupId.getType().getValue() + "_" +
groupId.getId();
+ }
+
+ private ConsensusGroupId parsePeerFileName(String fileName) {
+ String[] items = fileName.split("_");
+ return ConsensusGroupId.Factory.create(Integer.parseInt(items[0]),
Integer.parseInt(items[1]));
+ }
+
+ @Override
+ public void createLocalPeer(ConsensusGroupId groupId, List<Peer> peers)
+ throws ConsensusException {
+ final int replicationGroupSize = peers.size();
+ if (replicationGroupSize == 0) {
+ throw new IllegalPeerNumException(replicationGroupSize);
+ }
+ if (!peers.contains(new Peer(groupId, thisNodeId, thisNode))) {
+ throw new IllegalPeerEndpointException(thisNode, peers);
+ }
+
+ Lock lock =
+ replicationGroupIdReentrantLockMap.computeIfAbsent(groupId, key -> new
ReentrantLock());
+ lock.lock();
+ try {
+ stateMachineMapLock.readLock().lock();
+ try {
+ if (stateMachineMap.containsKey(groupId)) {
+ throw new ConsensusGroupAlreadyExistException(groupId);
+ }
+
+ final String path = getPeerDir(groupId);
+ File replicationDir = new File(path);
+ if (!replicationDir.exists() && !replicationDir.mkdirs()) {
+ LOGGER.warn("Unable to create replication dir for group {} at {}",
groupId, path);
+ throw new ConsensusException(
+ String.format("Unable to create replication dir for group %s",
groupId));
+ }
+
+ AirReplicationServerImpl replication =
+ new AirReplicationServerImpl(
+ new Peer(groupId, thisNodeId, thisNode),
+ registry.apply(groupId),
+ peers,
+ config,
+ airReplicationManager,
+ syncClientManager);
+ stateMachineMap.put(groupId, replication);
+ replication.start(false); // air will start after creating
+
KillPoint.setKillPoint(DataNodeKillPoints.DESTINATION_CREATE_LOCAL_PEER);
+ } catch (IOException e) {
+ LOGGER.warn("Cannot create local peer for group {} with peers {}",
groupId, peers, e);
+ throw new ConsensusException(e);
+ } finally {
+ stateMachineMapLock.readLock().unlock();
+ }
+ } finally {
+ lock.unlock();
+ }
+ }
+
+ @Override
+ public void deleteLocalPeer(ConsensusGroupId groupId) throws
ConsensusException {
+
KillPoint.setKillPoint(IoTConsensusDeleteLocalPeerKillPoints.BEFORE_DELETE);
+ Lock lock =
+ replicationGroupIdReentrantLockMap.computeIfAbsent(groupId, key -> new
ReentrantLock());
+ lock.lock();
+ try {
+ stateMachineMapLock.readLock().lock();
+ try {
+ if (!stateMachineMap.containsKey(groupId)) {
+ throw new ConsensusGroupNotExistException(groupId);
+ }
+ LOGGER.info("[{}] start to delete local peer for group {}",
CLASS_NAME, groupId);
+ final AirReplicationServerImpl replication =
stateMachineMap.get(groupId);
+ replication.clear();
+ stateMachineMap.remove(groupId);
+
+ FileUtils.deleteFileOrDirectory(new File(getPeerDir(groupId)));
+
KillPoint.setKillPoint(IoTConsensusDeleteLocalPeerKillPoints.AFTER_DELETE);
+ LOGGER.info("[{}] finish deleting local peer for group {}",
CLASS_NAME, groupId);
+ } finally {
+ stateMachineMapLock.readLock().unlock();
+ }
+ } finally {
+ lock.unlock();
+ replicationGroupIdReentrantLockMap.remove(groupId);
+ }
+ }
+
+ @Override
+ public void addRemotePeer(ConsensusGroupId groupId, Peer peer) throws
ConsensusException {
+ AirReplicationServerImpl impl =
+ Optional.ofNullable(stateMachineMap.get(groupId))
+ .orElseThrow(() -> new ConsensusGroupNotExistException(groupId));
+ if (impl.containsPeer(peer)) {
+ throw new PeerAlreadyInConsensusGroupException(groupId, peer);
+ }
+ try {
+ // step 1: inactive new Peer to prepare for following steps
+ LOGGER.info("[{}] inactivate new peer: {}", CLASS_NAME, peer);
+ impl.setRemotePeerActive(peer, false, false);
+
+ // step 2: notify all the other Peers to create air replications to
newPeer
+ // NOTE: For this step, all the other peers will try to transfer its
user write data to target
+ LOGGER.info("[{}] notify current peers to create air replications...",
CLASS_NAME);
+ impl.notifyPeersToCreateAirReplications(peer);
+
KillPoint.setKillPoint(DataNodeKillPoints.COORDINATOR_ADD_PEER_TRANSITION);
+
+ // step 3: wait until all other Peers finish transferring
+ LOGGER.info("[{}] wait until all the other peers finish
transferring...", CLASS_NAME);
+ impl.waitPeersToTargetPeerTransmissionCompleted(peer);
+
+ // step 4: active new Peer to let new Peer receive client requests
+ LOGGER.info("[{}] activate new peer...", CLASS_NAME);
+ impl.setRemotePeerActive(peer, true, false);
+ KillPoint.setKillPoint(DataNodeKillPoints.COORDINATOR_ADD_PEER_DONE);
+ } catch (ConsensusGroupModifyPeerException e) {
+ try {
+ LOGGER.warn(
+ "[{}] add remote peer failed, automatic cleanup side effects...",
CLASS_NAME, e);
+
+ // roll back
+ impl.notifyPeersToDropAirReplication(peer);
+
+ } catch (ConsensusGroupModifyPeerException mpe) {
+ LOGGER.error(
+ "[{}] failed to cleanup side effects after failed to add remote
peer", CLASS_NAME, mpe);
+ }
+ throw new ConsensusException(e);
+ }
+ }
+
+ @Override
+ public void removeRemotePeer(ConsensusGroupId groupId, Peer peer) throws
ConsensusException {
+ AirReplicationServerImpl impl =
+ Optional.ofNullable(stateMachineMap.get(groupId))
+ .orElseThrow(() -> new ConsensusGroupNotExistException(groupId));
+ if (!impl.containsPeer(peer)) {
+ throw new PeerNotInConsensusGroupException(groupId, peer.toString());
+ }
+ KillPoint.setKillPoint(IoTConsensusRemovePeerCoordinatorKillPoints.INIT);
+
+ try {
+ // let other peers to drop air replications to target
+ LOGGER.info("[{}] notify other peers to drop air replications...",
CLASS_NAME);
+ impl.notifyPeersToDropAirReplication(peer);
+ KillPoint.setKillPoint(
+ IoTConsensusRemovePeerCoordinatorKillPoints
+ .AFTER_NOTIFY_PEERS_TO_REMOVE_REPLICATE_CHANNEL);
+
+ // let target peer reject new write
+ LOGGER.info("[{}] inactivate peer {}", CLASS_NAME, peer);
+ impl.setRemotePeerActive(peer, false, true);
+
KillPoint.setKillPoint(IoTConsensusRemovePeerCoordinatorKillPoints.AFTER_INACTIVE_PEER);
+
+ // wait its air replications to complete
+ LOGGER.info("[{}] wait target peer{} complete transfer...", CLASS_NAME,
peer);
+ impl.waitTargetPeerToPeersTransmissionCompleted(peer);
+
+ // wait target peer to release all resource
+ LOGGER.info("[{}] wait {} to release all resource...", CLASS_NAME, peer);
+ impl.waitReleaseAllRegionRelatedResource(peer);
+ } catch (ConsensusGroupModifyPeerException e) {
+ throw new ConsensusException(e);
+ }
+ KillPoint.setKillPoint(IoTConsensusRemovePeerCoordinatorKillPoints.FINISH);
+ }
+
+ @Override
+ public void recordCorrectPeerListBeforeStarting(
+ Map<ConsensusGroupId, List<Peer>> correctPeerList) {
+ LOGGER.info("Record correct peer list: {}", correctPeerList);
+ this.correctPeerListBeforeStart = correctPeerList;
+ }
+
+ @Override
+ public void resetPeerList(ConsensusGroupId groupId, List<Peer> correctPeers)
+ throws ConsensusException {
+ AirReplicationServerImpl impl =
+ Optional.ofNullable(stateMachineMap.get(groupId))
+ .orElseThrow(() -> new ConsensusGroupNotExistException(groupId));
+
+ if (!correctPeers.contains(new Peer(groupId, thisNodeId, thisNode))) {
+ LOGGER.warn(
+ "[RESET PEER LIST] {} Local peer is not in the correct
configuration, delete it.",
+ groupId);
+ deleteLocalPeer(groupId);
+ return;
+ }
+
+ ImmutableList<Peer> currentPeers = ImmutableList.copyOf(impl.getPeers());
+ String previousPeerListStr = impl.getPeers().toString();
+ // remove invalid peer
+ for (Peer peer : currentPeers) {
+ if (!correctPeers.contains(peer)) {
+ try {
+ impl.dropAirReplicationToTargetPeer(peer);
+ LOGGER.info("[RESET PEER LIST] {} Remove sync channel with: {}",
groupId, peer);
+ } catch (ConsensusGroupModifyPeerException e) {
+ LOGGER.error(
+ "[RESET PEER LIST] {} Failed to remove sync channel with: {}",
groupId, peer, e);
+ }
+ }
+ }
+ // add correct peer
+ for (Peer peer : correctPeers) {
+ if (!impl.containsPeer(peer) && peer.getNodeId() != this.thisNodeId) {
+ try {
+ impl.createAirReplicationToTargetPeer(peer, false);
+ LOGGER.info("[RESET PEER LIST] {} Build sync channel with: {}",
groupId, peer);
+ } catch (ConsensusGroupModifyPeerException e) {
+ LOGGER.warn(
+ "[RESET PEER LIST] {} Failed to build sync channel with: {}",
groupId, peer, e);
+ }
+ }
+ }
+ // show result
+ String currentPeerListStr = impl.getPeers().toString();
+ if (!previousPeerListStr.equals(currentPeerListStr)) {
+ LOGGER.info(
+ "[RESET PEER LIST] {} Local peer list has been reset: {} -> {}",
+ groupId,
+ previousPeerListStr,
+ impl.getPeers());
+ } else {
+ LOGGER.info(
+ "[RESET PEER LIST] {} The current peer list is correct, nothing need
to be reset: {}",
+ groupId,
+ previousPeerListStr);
+ }
+ }
+
+ @Override
+ public void transferLeader(ConsensusGroupId groupId, Peer newLeader) throws
ConsensusException {
+ throw new ConsensusException(String.format("%s does not support leader
transfer", CLASS_NAME));
+ }
+
+ @Override
+ public void triggerSnapshot(ConsensusGroupId groupId, boolean force) throws
ConsensusException {
+ if (!stateMachineMap.containsKey(groupId)) {
+ throw new ConsensusGroupNotExistException(groupId);
+ }
+ // Do nothing here because we do not need to transfer snapshot when there
are new peers
+ }
+
+ @Override
+ public boolean isLeader(ConsensusGroupId groupId) {
+ return true;
+ }
+
+ @Override
+ public long getLogicalClock(ConsensusGroupId groupId) {
+ // TODO: check logical clock
+ return 0;
+ }
+
+ @Override
+ public boolean isLeaderReady(ConsensusGroupId groupId) {
+ return true;
+ }
+
+ @Override
+ public Peer getLeader(ConsensusGroupId groupId) {
+ if (!stateMachineMap.containsKey(groupId)) {
+ return null;
+ }
+ return new Peer(groupId, thisNodeId, thisNode);
+ }
+
+ @Override
+ public int getReplicationNum(ConsensusGroupId groupId) {
+ AirReplicationServerImpl impl = stateMachineMap.get(groupId);
+ return impl != null ? impl.getPeers().size() : 0;
+ }
+
+ @Override
+ public List<ConsensusGroupId> getAllConsensusGroupIds() {
+ return new ArrayList<>(stateMachineMap.keySet());
+ }
+
+ @Override
+ public String getRegionDirFromConsensusGroupId(ConsensusGroupId groupId) {
+ return getPeerDir(groupId);
+ }
+
+ @Override
+ public void reloadConsensusConfig(ConsensusConfig consensusConfig) {
+ // AirReplication doesn't support reload consensus config, related config
can be reloaded in
+ // iotdb-core layer.
+ }
+
+ public AirReplicationServerImpl getImpl(ConsensusGroupId groupId) {
+ return stateMachineMap.get(groupId);
+ }
+}
diff --git
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/airreplication/airreplication/AirReplicationDispatcher.java
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/airreplication/airreplication/AirReplicationDispatcher.java
new file mode 100644
index 00000000000..dd7ada46d48
--- /dev/null
+++
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/airreplication/airreplication/AirReplicationDispatcher.java
@@ -0,0 +1,42 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.iotdb.consensus.air.airreplication;
+
+import java.util.Map;
+
+public interface AirReplicationDispatcher {
+ void createAir(
+ String airName,
+ Map<String, String> extractorAttributes,
+ Map<String, String> processorAttributes,
+ Map<String, String> connectorAttributes,
+ boolean needManuallyStart)
+ throws Exception;
+
+ void startAir(String airName) throws Exception;
+
+ void stopAir(String airName) throws Exception;
+
+ /**
+ * Use AirReplicationName instead of String to provide information for
receiverAgent to release
+ * corresponding resource
+ */
+ void dropAir(AirReplicationName airName) throws Exception;
+}
diff --git
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/airreplication/airreplication/AirReplicationGuardian.java
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/airreplication/airreplication/AirReplicationGuardian.java
new file mode 100644
index 00000000000..91a3b82480a
--- /dev/null
+++
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/airreplication/airreplication/AirReplicationGuardian.java
@@ -0,0 +1,26 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.iotdb.consensus.air.airreplication;
+
+public interface AirReplicationGuardian {
+ void start(String id, Runnable guardJob, long intervalInSeconds);
+
+ void stop();
+}
diff --git
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/airreplication/airreplication/AirReplicationManager.java
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/airreplication/airreplication/AirReplicationManager.java
new file mode 100644
index 00000000000..897a4278b80
--- /dev/null
+++
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/airreplication/airreplication/AirReplicationManager.java
@@ -0,0 +1,157 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.iotdb.consensus.air.airreplication;
+
+import org.apache.iotdb.commons.conf.CommonDescriptor;
+import org.apache.iotdb.commons.pipe.agent.task.meta.PipeStatus;
+import org.apache.iotdb.consensus.common.Peer;
+import org.apache.iotdb.consensus.config.AirReplicationConfig;
+import org.apache.iotdb.consensus.config.AirReplicationConfig.ReplicateMode;
+
+import com.google.common.collect.ImmutableMap;
+import org.apache.tsfile.external.commons.lang3.tuple.ImmutableTriple;
+import org.apache.tsfile.external.commons.lang3.tuple.Triple;
+
+import java.util.Map;
+
+import static
org.apache.iotdb.commons.pipe.config.constant.PipeProcessorConstant.PROCESSOR_KEY;
+import static
org.apache.iotdb.commons.pipe.config.constant.PipeSinkConstant.CONNECTOR_CONSENSUS_GROUP_ID_KEY;
+import static
org.apache.iotdb.commons.pipe.config.constant.PipeSinkConstant.CONNECTOR_CONSENSUS_PIPE_NAME;
+import static
org.apache.iotdb.commons.pipe.config.constant.PipeSinkConstant.CONNECTOR_IOTDB_IP_KEY;
+import static
org.apache.iotdb.commons.pipe.config.constant.PipeSinkConstant.CONNECTOR_IOTDB_PARALLEL_TASKS_KEY;
+import static
org.apache.iotdb.commons.pipe.config.constant.PipeSinkConstant.CONNECTOR_IOTDB_PORT_KEY;
+import static
org.apache.iotdb.commons.pipe.config.constant.PipeSinkConstant.CONNECTOR_KEY;
+import static
org.apache.iotdb.commons.pipe.config.constant.PipeSinkConstant.CONNECTOR_REALTIME_FIRST_KEY;
+import static
org.apache.iotdb.commons.pipe.config.constant.PipeSourceConstant.EXTRACTOR_CAPTURE_TABLE_KEY;
+import static
org.apache.iotdb.commons.pipe.config.constant.PipeSourceConstant.EXTRACTOR_CAPTURE_TREE_KEY;
+import static
org.apache.iotdb.commons.pipe.config.constant.PipeSourceConstant.EXTRACTOR_CONSENSUS_GROUP_ID_KEY;
+import static
org.apache.iotdb.commons.pipe.config.constant.PipeSourceConstant.EXTRACTOR_CONSENSUS_RECEIVER_DATANODE_ID_KEY;
+import static
org.apache.iotdb.commons.pipe.config.constant.PipeSourceConstant.EXTRACTOR_CONSENSUS_SENDER_DATANODE_ID_KEY;
+import static
org.apache.iotdb.commons.pipe.config.constant.PipeSourceConstant.EXTRACTOR_INCLUSION_KEY;
+import static
org.apache.iotdb.commons.pipe.config.constant.PipeSourceConstant.EXTRACTOR_IOTDB_USER_KEY;
+import static
org.apache.iotdb.commons.pipe.config.constant.PipeSourceConstant.EXTRACTOR_KEY;
+import static
org.apache.iotdb.commons.pipe.config.constant.PipeSourceConstant.EXTRACTOR_REALTIME_MODE_KEY;
+
+public class AirReplicationManager {
+ // Extract data.insert and data.delete to support deletion.
+ private static final String REPLICATION_EXTRACTOR_INCLUSION_VALUE = "data";
+ private final AirReplicationConfig.Air config;
+ private final ReplicateMode replicateMode;
+ private final AirReplicationDispatcher dispatcher;
+ private final AirReplicationSelector selector;
+
+ public AirReplicationManager(AirReplicationConfig.Air config, ReplicateMode
replicateMode) {
+ this.config = config;
+ this.replicateMode = replicateMode;
+ this.dispatcher = config.getAirReplicationDispatcher();
+ this.selector = config.getAirReplicationSelector();
+ }
+
+ /** This method is used except region migration. */
+ public void createAirReplication(Peer senderPeer, Peer receiverPeer) throws
Exception {
+ AirReplicationName airReplicationName = new AirReplicationName(senderPeer,
receiverPeer);
+ // The third parameter is only used when region migration. Since this
method is not called by
+ // region migration, just pass senderPeer in to get the correct result.
+ Triple<ImmutableMap<String, String>, ImmutableMap<String, String>,
ImmutableMap<String, String>>
+ params = buildAirParams(senderPeer, receiverPeer);
+ dispatcher.createAir(
+ airReplicationName.toString(),
+ params.getLeft(),
+ params.getMiddle(),
+ params.getRight(),
+ false);
+ }
+
+ /** This method is used when executing region migration */
+ public void createAirReplication(Peer senderPeer, Peer receiverPeer, boolean
needManuallyStart)
+ throws Exception {
+ AirReplicationName airReplicationName = new AirReplicationName(senderPeer,
receiverPeer);
+ Triple<ImmutableMap<String, String>, ImmutableMap<String, String>,
ImmutableMap<String, String>>
+ params = buildAirParams(senderPeer, receiverPeer);
+ dispatcher.createAir(
+ airReplicationName.toString(),
+ params.getLeft(),
+ params.getMiddle(),
+ params.getRight(),
+ needManuallyStart);
+ }
+
+ public Triple<
+ ImmutableMap<String, String>, ImmutableMap<String, String>,
ImmutableMap<String, String>>
+ buildAirParams(final Peer senderPeer, final Peer receiverPeer) {
+ final AirReplicationName airReplicationName = new
AirReplicationName(senderPeer, receiverPeer);
+ return new ImmutableTriple<>(
+ ImmutableMap.<String, String>builder()
+ .put(EXTRACTOR_KEY, config.getExtractorPluginName())
+ .put(EXTRACTOR_INCLUSION_KEY,
REPLICATION_EXTRACTOR_INCLUSION_VALUE)
+ .put(
+ EXTRACTOR_CONSENSUS_GROUP_ID_KEY,
+ airReplicationName.getConsensusGroupId().toString())
+ .put(
+ EXTRACTOR_CONSENSUS_SENDER_DATANODE_ID_KEY,
+ String.valueOf(airReplicationName.getSenderDataNodeId()))
+ .put(
+ EXTRACTOR_CONSENSUS_RECEIVER_DATANODE_ID_KEY,
+ String.valueOf(airReplicationName.getReceiverDataNodeId()))
+ .put(EXTRACTOR_REALTIME_MODE_KEY, replicateMode.getValue())
+ .put(EXTRACTOR_CAPTURE_TABLE_KEY, String.valueOf(true))
+ .put(EXTRACTOR_CAPTURE_TREE_KEY, String.valueOf(true))
+ .put(
+ EXTRACTOR_IOTDB_USER_KEY,
+
CommonDescriptor.getInstance().getConfig().getDefaultAdminName())
+ .build(),
+ ImmutableMap.<String, String>builder()
+ .put(PROCESSOR_KEY, config.getProcessorPluginName())
+ .build(),
+ ImmutableMap.<String, String>builder()
+ .put(CONNECTOR_KEY, config.getConnectorPluginName())
+ .put(
+ CONNECTOR_CONSENSUS_GROUP_ID_KEY,
+
String.valueOf(airReplicationName.getConsensusGroupId().getId()))
+ .put(CONNECTOR_CONSENSUS_PIPE_NAME, airReplicationName.toString())
+ .put(CONNECTOR_IOTDB_IP_KEY, receiverPeer.getEndpoint().ip)
+ .put(CONNECTOR_IOTDB_PORT_KEY,
String.valueOf(receiverPeer.getEndpoint().port))
+ .put(CONNECTOR_IOTDB_PARALLEL_TASKS_KEY, String.valueOf(1))
+ .put(CONNECTOR_REALTIME_FIRST_KEY, String.valueOf(false))
+ .build());
+ }
+
+ public void dropAirReplication(Peer senderPeer, Peer receiverPeer) throws
Exception {
+ AirReplicationName airReplicationName = new AirReplicationName(senderPeer,
receiverPeer);
+ dispatcher.dropAir(airReplicationName);
+ }
+
+ public void updateAirReplication(AirReplicationName airReplicationName,
PipeStatus pipeStatus)
+ throws Exception {
+ if (PipeStatus.RUNNING.equals(pipeStatus)) {
+ dispatcher.startAir(airReplicationName.toString());
+ } else if (PipeStatus.STOPPED.equals(pipeStatus)) {
+ dispatcher.stopAir(airReplicationName.toString());
+ } else if (PipeStatus.DROPPED.equals(pipeStatus)) {
+ dispatcher.dropAir(airReplicationName);
+ } else {
+ throw new IllegalArgumentException("Unsupported air status: " +
pipeStatus);
+ }
+ }
+
+ public Map<AirReplicationName, PipeStatus> getAllAirReplication() {
+ return selector.getAllAirReplication();
+ }
+}
diff --git
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/airreplication/airreplication/AirReplicationName.java
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/airreplication/airreplication/AirReplicationName.java
new file mode 100644
index 00000000000..687285468ca
--- /dev/null
+++
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/airreplication/airreplication/AirReplicationName.java
@@ -0,0 +1,98 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.iotdb.consensus.air.airreplication;
+
+import org.apache.iotdb.commons.consensus.ConsensusGroupId;
+import org.apache.iotdb.commons.pipe.agent.task.meta.PipeStaticMeta;
+import org.apache.iotdb.consensus.common.Peer;
+
+import java.util.Objects;
+
+public class AirReplicationName {
+ private static final String REPLICATION_AIR_NAME_SPLITTER_CHAR = "_";
+ private final ConsensusGroupId consensusGroupId;
+ private final int senderDataNodeId;
+ private final int receiverDataNodeId;
+
+ public AirReplicationName(Peer senderPeer, Peer receiverPeer) {
+ this.consensusGroupId = senderPeer.getGroupId();
+ this.senderDataNodeId = senderPeer.getNodeId();
+ this.receiverDataNodeId = receiverPeer.getNodeId();
+ }
+
+ public AirReplicationName(
+ ConsensusGroupId consensusGroupId, int senderDataNodeId, int
receiverDataNodeId) {
+ this.consensusGroupId = consensusGroupId;
+ this.senderDataNodeId = senderDataNodeId;
+ this.receiverDataNodeId = receiverDataNodeId;
+ }
+
+ public AirReplicationName(String airName) throws IllegalArgumentException {
+ if (!airName.startsWith(PipeStaticMeta.CONSENSUS_PIPE_PREFIX)) {
+ throw new IllegalArgumentException("Invalid air name: " + airName);
+ }
+ String[] airNameParts =
+ airName
+ .substring(PipeStaticMeta.CONSENSUS_PIPE_PREFIX.length())
+ .split(REPLICATION_AIR_NAME_SPLITTER_CHAR);
+ if (airNameParts.length != 3) {
+ throw new IllegalArgumentException("Invalid air name: " + airName);
+ }
+ this.consensusGroupId =
ConsensusGroupId.Factory.createFromString(airNameParts[0]);
+ this.senderDataNodeId = Integer.parseInt(airNameParts[1]);
+ this.receiverDataNodeId = Integer.parseInt(airNameParts[2]);
+ }
+
+ public ConsensusGroupId getConsensusGroupId() {
+ return consensusGroupId;
+ }
+
+ public int getSenderDataNodeId() {
+ return senderDataNodeId;
+ }
+
+ public int getReceiverDataNodeId() {
+ return receiverDataNodeId;
+ }
+
+ @Override
+ public boolean equals(Object o) {
+ if (this == o) return true;
+ if (o == null || getClass() != o.getClass()) return false;
+ AirReplicationName that = (AirReplicationName) o;
+ return Objects.equals(consensusGroupId, that.consensusGroupId)
+ && Objects.equals(senderDataNodeId, that.senderDataNodeId)
+ && Objects.equals(receiverDataNodeId, that.receiverDataNodeId);
+ }
+
+ @Override
+ public int hashCode() {
+ return Objects.hash(consensusGroupId, senderDataNodeId,
receiverDataNodeId);
+ }
+
+ @Override
+ public String toString() {
+ return String.join(
+ REPLICATION_AIR_NAME_SPLITTER_CHAR,
+ PipeStaticMeta.CONSENSUS_PIPE_PREFIX + consensusGroupId,
+ String.valueOf(senderDataNodeId),
+ String.valueOf(receiverDataNodeId));
+ }
+}
diff --git
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/airreplication/airreplication/AirReplicationReceiver.java
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/airreplication/airreplication/AirReplicationReceiver.java
new file mode 100644
index 00000000000..ea254b4227e
--- /dev/null
+++
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/airreplication/airreplication/AirReplicationReceiver.java
@@ -0,0 +1,30 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.iotdb.consensus.air.airreplication;
+
+import org.apache.iotdb.commons.consensus.DataRegionId;
+import org.apache.iotdb.consensus.air.thrift.TAirReplicationTransferReq;
+import org.apache.iotdb.consensus.air.thrift.TAirReplicationTransferResp;
+
+public interface AirReplicationReceiver {
+ TAirReplicationTransferResp receive(TAirReplicationTransferReq req);
+
+ void releaseReceiverResource(DataRegionId regionId);
+}
diff --git
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/airreplication/airreplication/AirReplicationSelector.java
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/airreplication/airreplication/AirReplicationSelector.java
new file mode 100644
index 00000000000..3bf5a81ac06
--- /dev/null
+++
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/airreplication/airreplication/AirReplicationSelector.java
@@ -0,0 +1,28 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.iotdb.consensus.air.airreplication;
+
+import org.apache.iotdb.commons.pipe.agent.task.meta.PipeStatus;
+
+import java.util.Map;
+
+public interface AirReplicationSelector {
+ Map<AirReplicationName, PipeStatus> getAllAirReplication();
+}
diff --git
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/airreplication/airreplication/AirReplicationSink.java
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/airreplication/airreplication/AirReplicationSink.java
new file mode 100644
index 00000000000..fe3bb009c17
--- /dev/null
+++
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/airreplication/airreplication/AirReplicationSink.java
@@ -0,0 +1,25 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.iotdb.consensus.air.airreplication;
+
+public interface AirReplicationSink {
+ long getLeaderReplicateProgress();
+
+ long getFollowerApplyProgress();
+}
diff --git
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/airreplication/airreplication/ReplicateProgressManager.java
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/airreplication/airreplication/ReplicateProgressManager.java
new file mode 100644
index 00000000000..9cf830b0edd
--- /dev/null
+++
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/airreplication/airreplication/ReplicateProgressManager.java
@@ -0,0 +1,37 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.iotdb.consensus.air.airreplication;
+
+import org.apache.iotdb.commons.consensus.ConsensusGroupId;
+import org.apache.iotdb.commons.consensus.index.ProgressIndex;
+
+public interface ReplicateProgressManager {
+ ProgressIndex getProgressIndex(AirReplicationName airReplicationName);
+
+ ProgressIndex assignProgressIndex(ConsensusGroupId consensusGroupId);
+
+ ProgressIndex getMaxAssignedProgressIndex(ConsensusGroupId consensusGroupId);
+
+ long getSyncLagForSpecificAirReplication(
+ ConsensusGroupId consensusGroupId, AirReplicationName
airReplicationName);
+
+ void pinReplicateIndexForRegionMigration(
+ ConsensusGroupId consensusGroupId, AirReplicationName
airReplicationName);
+}
diff --git
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/airreplication/metric/AirReplicationServerMetrics.java
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/airreplication/metric/AirReplicationServerMetrics.java
new file mode 100644
index 00000000000..c666d7b7e03
--- /dev/null
+++
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/airreplication/metric/AirReplicationServerMetrics.java
@@ -0,0 +1,190 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.iotdb.consensus.air.metric;
+
+import org.apache.iotdb.commons.service.metric.enums.Metric;
+import org.apache.iotdb.commons.service.metric.enums.Tag;
+import org.apache.iotdb.consensus.air.AirReplicationServerImpl;
+import org.apache.iotdb.metrics.AbstractMetricService;
+import org.apache.iotdb.metrics.impl.DoNothingMetricManager;
+import org.apache.iotdb.metrics.metricsets.IMetricSet;
+import org.apache.iotdb.metrics.type.Timer;
+import org.apache.iotdb.metrics.utils.MetricLevel;
+import org.apache.iotdb.metrics.utils.MetricType;
+
+public class AirReplicationServerMetrics implements IMetricSet {
+ private final AirReplicationServerImpl impl;
+ private final AirReplicationSyncLagManager syncLagManager;
+
+ private Timer getStateMachineLockTimer =
DoNothingMetricManager.DO_NOTHING_TIMER;
+ private Timer userWriteStateMachineTimer =
DoNothingMetricManager.DO_NOTHING_TIMER;
+ private Timer replicaWriteStateMachineTimer =
DoNothingMetricManager.DO_NOTHING_TIMER;
+
+ public AirReplicationServerMetrics(AirReplicationServerImpl impl) {
+ this.impl = impl;
+ this.syncLagManager =
AirReplicationSyncLagManager.getInstance(impl.getConsensusGroupId());
+ }
+
+ private static final String IMPL = "AirReplicationServerImpl";
+
+ public void recordGetStateMachineLockTime(long costTimeInNanos) {
+ getStateMachineLockTimer.updateNanos(costTimeInNanos);
+ }
+
+ public void recordUserWriteStateMachineTime(long costTimeInNanos) {
+ userWriteStateMachineTimer.updateNanos(costTimeInNanos);
+ }
+
+ public void recordReplicaWriteStateMachineTime(long costTimeInNanos) {
+ replicaWriteStateMachineTimer.updateNanos(costTimeInNanos);
+ }
+
+ @Override
+ public void bindTo(AbstractMetricService metricService) {
+ bindAutoGauge(metricService);
+ bindGauge(metricService);
+ bindStageTimer(metricService);
+ }
+
+ @Override
+ public void unbindFrom(AbstractMetricService metricService) {
+ unbindAutoGauge(metricService);
+ unbindGauge(metricService);
+ unbindStageTimer(metricService);
+
+ // release corresponding resource
+ AirReplicationSyncLagManager.release(impl.getConsensusGroupId());
+ }
+
+ public void bindGauge(AbstractMetricService metricService) {
+ metricService
+ .getOrCreateGauge(
+ Metric.AIR_REPLICATION_MODE.toString(),
+ MetricLevel.IMPORTANT,
+ Tag.NAME.toString(),
+ IMPL,
+ Tag.TYPE.toString(),
+ "replicateMode")
+ .set(impl.getReplicateMode());
+ }
+
+ public void unbindGauge(AbstractMetricService metricService) {
+ metricService.remove(
+ MetricType.GAUGE,
+ Metric.PIPE_CONSENSUS_MODE.toString(),
+ Tag.NAME.toString(),
+ IMPL,
+ Tag.TYPE.toString(),
+ "replicateMode");
+ }
+
+ public void bindAutoGauge(AbstractMetricService metricService) {
+ metricService.createAutoGauge(
+ Metric.AIR_REPLICATION.toString(),
+ MetricLevel.IMPORTANT,
+ syncLagManager,
+ PipeConsensusSyncLagManager::calculateSyncLag,
+ Tag.NAME.toString(),
+ IMPL,
+ Tag.REGION.toString(),
+ impl.getConsensusGroupId(),
+ Tag.TYPE.toString(),
+ "syncLag");
+ }
+
+ public void unbindAutoGauge(AbstractMetricService metricService) {
+ metricService.remove(
+ MetricType.AUTO_GAUGE,
+ Metric.AIR_REPLICATION.toString(),
+ Tag.NAME.toString(),
+ IMPL,
+ Tag.REGION.toString(),
+ impl.getConsensusGroupId(),
+ Tag.TYPE.toString(),
+ "syncLag");
+ }
+
+ public void bindStageTimer(AbstractMetricService metricService) {
+ getStateMachineLockTimer =
+ metricService.getOrCreateTimer(
+ Metric.STAGE.toString(),
+ MetricLevel.IMPORTANT,
+ Tag.NAME.toString(),
+ Metric.AIR_REPLICATION.toString(),
+ Tag.TYPE.toString(),
+ "getStateMachineLock",
+ Tag.REGION.toString(),
+ impl.getConsensusGroupId());
+ userWriteStateMachineTimer =
+ metricService.getOrCreateTimer(
+ Metric.STAGE.toString(),
+ MetricLevel.IMPORTANT,
+ Tag.NAME.toString(),
+ Metric.AIR_REPLICATION.toString(),
+ Tag.TYPE.toString(),
+ "userWriteStateMachine",
+ Tag.REGION.toString(),
+ impl.getConsensusGroupId());
+ replicaWriteStateMachineTimer =
+ metricService.getOrCreateTimer(
+ Metric.PIPE_RECEIVE_EVENT.toString(),
+ MetricLevel.IMPORTANT,
+ Tag.NAME.toString(),
+ Metric.AIR_REPLICATION.toString(),
+ Tag.TYPE.toString(),
+ "replicaWriteStateMachine",
+ Tag.REGION.toString(),
+ impl.getConsensusGroupId());
+ }
+
+ public void unbindStageTimer(AbstractMetricService metricService) {
+ getStateMachineLockTimer = DoNothingMetricManager.DO_NOTHING_TIMER;
+ userWriteStateMachineTimer = DoNothingMetricManager.DO_NOTHING_TIMER;
+ replicaWriteStateMachineTimer = DoNothingMetricManager.DO_NOTHING_TIMER;
+
+ metricService.remove(
+ MetricType.TIMER,
+ Metric.STAGE.toString(),
+ Tag.NAME.toString(),
+ Metric.AIR_REPLICATION.toString(),
+ Tag.TYPE.toString(),
+ "getStateMachineLock",
+ Tag.REGION.toString(),
+ impl.getConsensusGroupId());
+ metricService.remove(
+ MetricType.TIMER,
+ Metric.STAGE.toString(),
+ Tag.NAME.toString(),
+ Metric.AIR_REPLICATION.toString(),
+ Tag.TYPE.toString(),
+ "writeStateMachine",
+ Tag.REGION.toString(),
+ impl.getConsensusGroupId());
+ metricService.remove(
+ MetricType.TIMER,
+ Metric.PIPE_RECEIVE_EVENT.toString(),
+ Tag.NAME.toString(),
+ Metric.AIR_REPLICATION.toString(),
+ Tag.TYPE.toString(),
+ "replicaWriteStateMachine",
+ Tag.REGION.toString(),
+ impl.getConsensusGroupId());
+ }
+}
diff --git
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/airreplication/metric/AirReplicationSyncLagManager.java
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/airreplication/metric/AirReplicationSyncLagManager.java
new file mode 100644
index 00000000000..1f796d38dea
--- /dev/null
+++
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/airreplication/metric/AirReplicationSyncLagManager.java
@@ -0,0 +1,157 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.iotdb.consensus.air.metric;
+
+import org.apache.iotdb.consensus.air.airreplication.AirReplicationName;
+import org.apache.iotdb.consensus.air.airreplication.AirReplicationSink;
+
+import java.util.Map;
+import java.util.Optional;
+import java.util.concurrent.ConcurrentHashMap;
+import java.util.concurrent.locks.ReentrantLock;
+
+/**
+ * This class is used to aggregate the write progress of all Connectors to
calculate the minimum
+ * synchronization progress of all follower copies, thereby calculating
syncLag.
+ *
+ * <p>Note: every consensusGroup/dataRegion has and only has 1 instance of
this class.
+ */
+public class AirReplicationSyncLagManager {
+ long syncLag = Long.MIN_VALUE;
+ ReentrantLock lock = new ReentrantLock();
+ Map<AirReplicationName, AirReplicationSink> airReplication2ConnectorMap =
new ConcurrentHashMap<>();
+
+ /**
+ * pinnedCommitIndex - currentReplicateProgress. If res <= 0, indicating
that replication is
+ * finished.
+ */
+ public long getSyncLagForRegionMigration(
+ AirReplicationName airReplicationName, long pinnedCommitIndex) {
+ return
Optional.ofNullable(airReplication2ConnectorMap.get(airReplicationName))
+ .map(
+ airReplicationSink ->
+ Math.max(pinnedCommitIndex -
airReplicationSink.getFollowerApplyProgress(), 0L))
+ .orElse(0L);
+ }
+
+ /**
+ * userWriteProgress - currentReplicateProgress. If res <= 0, indicating
that replication is
+ * finished.
+ */
+ public long getSyncLagForSpecificAirReplication(AirReplicationName
airReplicationName) {
+ return
Optional.ofNullable(airReplication2ConnectorMap.get(airReplicationName))
+ .map(
+ airReplicationSink -> {
+ long userWriteProgress =
airReplicationSink.getLeaderReplicateProgress();
+ long replicateProgress =
airReplicationSink.getFollowerApplyProgress();
+ return Math.max(userWriteProgress - replicateProgress, 0L);
+ })
+ .orElse(0L);
+ }
+
+ public long getCurrentLeaderReplicateIndex(AirReplicationName
airReplicationName) {
+ return
Optional.ofNullable(airReplication2ConnectorMap.get(airReplicationName))
+ .map(AirReplicationSink::getLeaderReplicateProgress)
+ .orElse(0L);
+ }
+
+ public void addAirReplicationConnector(
+ AirReplicationName airReplicationName, AirReplicationSink
airReplicationSink) {
+ lock.lock();
+ try {
+ airReplication2ConnectorMap.put(airReplicationName, airReplicationSink);
+ } finally {
+ lock.unlock();
+ }
+ }
+
+ public void removeAirReplicationConnector(AirReplicationName
airReplicationName) {
+ lock.lock();
+ try {
+ airReplication2ConnectorMap.remove(airReplicationName);
+ } finally {
+ lock.unlock();
+ }
+ }
+
+ /**
+ * SyncLag represents the biggest difference between the current replica
users' write progress and
+ * the synchronization progress of all other replicas. The semantics is how
much data the leader
+ * has left to synchronize.
+ */
+ public long calculateSyncLag() {
+ lock.lock();
+ try {
+ // if there isn't a air replication task, the syncLag is 0
+ if (airReplication2ConnectorMap.isEmpty()) {
+ return 0;
+ }
+ // else we find the biggest gap between leader and replicas in all air
replication task.
+ syncLag = Long.MIN_VALUE;
+ airReplication2ConnectorMap
+ .keySet()
+ .forEach(
+ airReplicationName ->
+ syncLag =
+ Math.max(syncLag,
getSyncLagForSpecificAirReplication(airReplicationName)));
+ return syncLag;
+ } finally {
+ lock.unlock();
+ }
+ }
+
+ public void clear() {
+ this.airReplication2ConnectorMap.clear();
+ }
+
+ private AirReplicationSyncLagManager() {
+ // do nothing
+ }
+
+ private static class AirReplicationSyncLagManagerHolder {
+ private static Map<String, AirReplicationSyncLagManager>
REPLICATION_GROUP_ID_2_INSTANCE_MAP;
+
+ private AirReplicationSyncLagManagerHolder() {
+ // empty constructor
+ }
+
+ private static void build() {
+ if (REPLICATION_GROUP_ID_2_INSTANCE_MAP == null) {
+ REPLICATION_GROUP_ID_2_INSTANCE_MAP = new ConcurrentHashMap<>();
+ }
+ }
+ }
+
+ public static AirReplicationSyncLagManager getInstance(String groupId) {
+ return
AirReplicationSyncLagManagerHolder.REPLICATION_GROUP_ID_2_INSTANCE_MAP.computeIfAbsent(
+ groupId, key -> new AirReplicationSyncLagManager());
+ }
+
+ public static void release(String groupId) {
+ AirReplicationSyncLagManager.getInstance(groupId).clear();
+
AirReplicationSyncLagManagerHolder.REPLICATION_GROUP_ID_2_INSTANCE_MAP.remove(groupId);
+ }
+
+ // Only when replication protocol is AirReplication, this method will be
called once when construct
+ // replication class.
+ public static void build() {
+ AirReplicationSyncLagManagerHolder.build();
+ }
+}
diff --git
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/airreplication/service/AirReplicationRPCService.java
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/airreplication/service/AirReplicationRPCService.java
new file mode 100644
index 00000000000..09cbfe96cc3
--- /dev/null
+++
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/airreplication/service/AirReplicationRPCService.java
@@ -0,0 +1,106 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.iotdb.consensus.air.service;
+
+import org.apache.iotdb.common.rpc.thrift.TEndPoint;
+import org.apache.iotdb.commons.concurrent.ThreadName;
+import org.apache.iotdb.commons.exception.runtime.RPCServiceException;
+import org.apache.iotdb.commons.service.ServiceType;
+import org.apache.iotdb.commons.service.ThriftService;
+import org.apache.iotdb.commons.service.ThriftServiceThread;
+import org.apache.iotdb.consensus.config.AirReplicationConfig;
+import org.apache.iotdb.consensus.air.thrift.AirReplicationIService;
+import org.apache.iotdb.rpc.ZeroCopyRpcTransportFactory;
+
+public class AirReplicationRPCService extends ThriftService implements
AirReplicationRPCServiceMBean {
+
+ private final TEndPoint thisNode;
+ private final AirReplicationConfig config;
+ private AirReplicationRPCServiceProcessor airReplicationRPCServiceProcessor;
+
+ public AirReplicationRPCService(TEndPoint thisNode, AirReplicationConfig
config) {
+ this.thisNode = thisNode;
+ this.config = config;
+ }
+
+ @Override
+ public ServiceType getID() {
+ return ServiceType.AIR_REPLICATION_SERVICE;
+ }
+
+ @Override
+ public void initSyncedServiceImpl(Object airReplicationRPCServiceProcessor) {
+ this.airReplicationRPCServiceProcessor =
+ (AirReplicationRPCServiceProcessor) airReplicationRPCServiceProcessor;
+ super.initSyncedServiceImpl(this.airReplicationRPCServiceProcessor);
+ }
+
+ @Override
+ public void initTProcessor() {
+ processor = new
AirReplicationIService.Processor<>(airReplicationRPCServiceProcessor);
+ }
+
+ @Override
+ public void initThriftServiceThread() throws IllegalAccessException {
+ try {
+ thriftServiceThread =
+ config.getRpc().isEnableSSL()
+ ? new ThriftServiceThread(
+ processor,
+ getID().getName(),
+ ThreadName.AIR_REPLICATION_RPC_PROCESSOR.getName(),
+ getBindIP(),
+ getBindPort(),
+ config.getRpc().getRpcMaxConcurrentClientNum(),
+ config.getRpc().getThriftServerAwaitTimeForStopService(),
+ new
AirReplicationRPCServiceHandler(airReplicationRPCServiceProcessor),
+ config.getRpc().isRpcThriftCompressionEnabled(),
+ config.getRpc().getSslKeyStorePath(),
+ config.getRpc().getSslKeyStorePassword(),
+ config.getRpc().getSslTrustStorePath(),
+ config.getRpc().getSslTrustStorePassword(),
+ ZeroCopyRpcTransportFactory.INSTANCE)
+ : new ThriftServiceThread(
+ processor,
+ getID().getName(),
+ ThreadName.AIR_REPLICATION_RPC_PROCESSOR.getName(),
+ getBindIP(),
+ getBindPort(),
+ config.getRpc().getRpcMaxConcurrentClientNum(),
+ config.getRpc().getThriftServerAwaitTimeForStopService(),
+ new
AirReplicationRPCServiceHandler(airReplicationRPCServiceProcessor),
+ config.getRpc().isRpcThriftCompressionEnabled(),
+ ZeroCopyRpcTransportFactory.INSTANCE);
+ } catch (RPCServiceException e) {
+ throw new IllegalAccessException(e.getMessage());
+ }
+
thriftServiceThread.setName(ThreadName.AIR_REPLICATION_RPC_SERVICE.getName());
+ }
+
+ @Override
+ public String getBindIP() {
+ return thisNode.getIp();
+ }
+
+ @Override
+ public int getBindPort() {
+ return thisNode.getPort();
+ }
+}
diff --git
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/airreplication/service/AirReplicationRPCServiceHandler.java
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/airreplication/service/AirReplicationRPCServiceHandler.java
new file mode 100644
index 00000000000..716fa38e3fb
--- /dev/null
+++
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/airreplication/service/AirReplicationRPCServiceHandler.java
@@ -0,0 +1,51 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.iotdb.consensus.air.service;
+
+import org.apache.thrift.protocol.TProtocol;
+import org.apache.thrift.server.ServerContext;
+import org.apache.thrift.server.TServerEventHandler;
+import org.apache.thrift.transport.TTransport;
+
+public class AirReplicationRPCServiceHandler implements TServerEventHandler {
+
+ private final AirReplicationRPCServiceProcessor processor;
+
+ public AirReplicationRPCServiceHandler(AirReplicationRPCServiceProcessor
processor) {
+ this.processor = processor;
+ }
+
+ @Override
+ public void preServe() {}
+
+ @Override
+ public ServerContext createContext(TProtocol input, TProtocol output) {
+ return null;
+ }
+
+ @Override
+ public void deleteContext(ServerContext serverContext, TProtocol input,
TProtocol output) {
+ processor.handleExit();
+ }
+
+ @Override
+ public void processContext(
+ ServerContext serverContext, TTransport inputTransport, TTransport
outputTransport) {}
+}
diff --git
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/airreplication/service/AirReplicationRPCServiceMBean.java
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/airreplication/service/AirReplicationRPCServiceMBean.java
new file mode 100644
index 00000000000..52d0e7a53c1
--- /dev/null
+++
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/airreplication/service/AirReplicationRPCServiceMBean.java
@@ -0,0 +1,22 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.iotdb.consensus.air.service;
+
+public interface AirReplicationRPCServiceMBean {}
diff --git
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/airreplication/service/AirReplicationRPCServiceProcessor.java
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/airreplication/service/AirReplicationRPCServiceProcessor.java
new file mode 100644
index 00000000000..aa36042e12f
--- /dev/null
+++
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/airreplication/service/AirReplicationRPCServiceProcessor.java
@@ -0,0 +1,225 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.iotdb.consensus.air.service;
+
+import org.apache.iotdb.common.rpc.thrift.TSStatus;
+import org.apache.iotdb.commons.consensus.ConsensusGroupId;
+import org.apache.iotdb.commons.utils.KillPoint.DataNodeKillPoints;
+import
org.apache.iotdb.commons.utils.KillPoint.IoTConsensusInactivatePeerKillPoints;
+import org.apache.iotdb.commons.utils.KillPoint.KillPoint;
+import org.apache.iotdb.consensus.common.Peer;
+import org.apache.iotdb.consensus.config.AirReplicationConfig;
+import org.apache.iotdb.consensus.exception.ConsensusGroupModifyPeerException;
+import org.apache.iotdb.consensus.air.AirReplication;
+import org.apache.iotdb.consensus.air.AirReplicationServerImpl;
+import org.apache.iotdb.consensus.air.thrift.AirReplicationIService;
+import org.apache.iotdb.consensus.air.thrift.TCheckAirReplicationCompletedReq;
+import org.apache.iotdb.consensus.air.thrift.TCheckAirReplicationCompletedResp;
+import
org.apache.iotdb.consensus.air.thrift.TNotifyPeerToCreateAirReplicationReq;
+import
org.apache.iotdb.consensus.air.thrift.TNotifyPeerToCreateAirReplicationResp;
+import
org.apache.iotdb.consensus.air.thrift.TNotifyPeerToDropAirReplicationReq;
+import
org.apache.iotdb.consensus.air.thrift.TNotifyPeerToDropAirReplicationResp;
+import org.apache.iotdb.consensus.air.thrift.TAirReplicationBatchTransferReq;
+import org.apache.iotdb.consensus.air.thrift.TAirReplicationBatchTransferResp;
+import org.apache.iotdb.consensus.air.thrift.TAirReplicationTransferReq;
+import org.apache.iotdb.consensus.air.thrift.TAirReplicationTransferResp;
+import org.apache.iotdb.consensus.pipe.thrift.TSetActiveReq;
+import org.apache.iotdb.consensus.pipe.thrift.TSetActiveResp;
+import
org.apache.iotdb.consensus.pipe.thrift.TWaitReleaseAllRegionRelatedResourceReq;
+import
org.apache.iotdb.consensus.pipe.thrift.TWaitReleaseAllRegionRelatedResourceResp;
+import org.apache.iotdb.rpc.RpcUtils;
+import org.apache.iotdb.rpc.TSStatusCode;
+
+import org.apache.thrift.TException;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+public class AirReplicationRPCServiceProcessor implements
AirReplicationIService.Iface {
+ private static final Logger LOGGER =
+ LoggerFactory.getLogger(AirReplicationRPCServiceProcessor.class);
+ private final AirReplication airReplication;
+
+ private final AirReplicationConfig.Air config;
+
+ public AirReplicationRPCServiceProcessor(
+ AirReplication airReplication, AirReplicationConfig.Air config) {
+ this.airReplication = airReplication;
+ this.config = config;
+ }
+
+ @Override
+ public TAirReplicationTransferResp
airReplicationTransfer(TAirReplicationTransferReq req) {
+ return config.getAirReplicationReceiver().receive(req);
+ }
+
+ // TODO: consider batch transfer
+ @Override
+ public TAirReplicationBatchTransferResp airReplicationBatchTransfer(
+ TAirReplicationBatchTransferReq req) throws TException {
+ return new TAirReplicationBatchTransferResp();
+ }
+
+ @Override
+ public TSetActiveResp setActive(TSetActiveReq req) throws TException {
+ if (req.isForDeletionPurpose && !req.isActive) {
+
KillPoint.setKillPoint(IoTConsensusInactivatePeerKillPoints.BEFORE_INACTIVATE);
+ }
+ ConsensusGroupId groupId =
+
ConsensusGroupId.Factory.createFromTConsensusGroupId(req.consensusGroupId);
+ AirReplicationServerImpl impl = airReplication.getImpl(groupId);
+ if (impl == null) {
+ String message =
+ String.format("unexpected consensusGroupId %s for set active request
%s", groupId, req);
+ LOGGER.error(message);
+ TSStatus status = new
TSStatus(TSStatusCode.INTERNAL_SERVER_ERROR.getStatusCode());
+ status.setMessage(message);
+ return new TSetActiveResp(status);
+ }
+ impl.setActive(req.isActive);
+ if (req.isActive) {
+ KillPoint.setKillPoint(DataNodeKillPoints.DESTINATION_ADD_PEER_DONE);
+ }
+ if (req.isForDeletionPurpose && !req.isActive) {
+
KillPoint.setKillPoint(IoTConsensusInactivatePeerKillPoints.AFTER_INACTIVATE);
+ }
+ return new TSetActiveResp(RpcUtils.SUCCESS_STATUS);
+ }
+
+ @Override
+ public TNotifyPeerToCreateConsensusPipeResp notifyPeerToCreateConsensusPipe(
+ TNotifyPeerToCreateConsensusPipeReq req) throws TException {
+ ConsensusGroupId groupId =
+
ConsensusGroupId.Factory.createFromTConsensusGroupId(req.targetPeerConsensusGroupId);
+ AirReplicationServerImpl impl = airReplication.getImpl(groupId);
+ if (impl == null) {
+ String message =
+ String.format(
+ "unexpected consensusGroupId %s for create consensus pipe
request %s", groupId, req);
+ LOGGER.error(message);
+ TSStatus status = new
TSStatus(TSStatusCode.INTERNAL_SERVER_ERROR.getStatusCode());
+ status.setMessage(message);
+ return new TNotifyPeerToCreateConsensusPipeResp(status);
+ }
+ TSStatus responseStatus;
+ try {
+ // Other peers which don't act as coordinator will only transfer
data(may contain both
+ // historical and realtime data) after the snapshot progress.
+ impl.createConsensusPipeToTargetPeer(
+ new Peer(
+
ConsensusGroupId.Factory.createFromTConsensusGroupId(req.targetPeerConsensusGroupId),
+ req.targetPeerNodeId,
+ req.targetPeerEndPoint),
+ false);
+ responseStatus = new
TSStatus(TSStatusCode.SUCCESS_STATUS.getStatusCode());
+ } catch (ConsensusGroupModifyPeerException e) {
+ responseStatus = new
TSStatus(TSStatusCode.INTERNAL_SERVER_ERROR.getStatusCode());
+ responseStatus.setMessage(e.getMessage());
+ LOGGER.warn("Failed to create consensus pipe to target peer with req
{}", req, e);
+ }
+ return new TNotifyPeerToCreateConsensusPipeResp(responseStatus);
+ }
+
+ @Override
+ public TNotifyPeerToDropConsensusPipeResp notifyPeerToDropConsensusPipe(
+ TNotifyPeerToDropConsensusPipeReq req) throws TException {
+ ConsensusGroupId groupId =
+
ConsensusGroupId.Factory.createFromTConsensusGroupId(req.targetPeerConsensusGroupId);
+ AirReplicationServerImpl impl = airReplication.getImpl(groupId);
+ if (impl == null) {
+ String message =
+ String.format(
+ "unexpected consensusGroupId %s for drop consensus pipe request
%s", groupId, req);
+ LOGGER.error(message);
+ TSStatus status = new
TSStatus(TSStatusCode.INTERNAL_SERVER_ERROR.getStatusCode());
+ status.setMessage(message);
+ return new TNotifyPeerToDropConsensusPipeResp(status);
+ }
+ TSStatus responseStatus;
+ try {
+ impl.dropConsensusPipeToTargetPeer(
+ new Peer(
+
ConsensusGroupId.Factory.createFromTConsensusGroupId(req.targetPeerConsensusGroupId),
+ req.targetPeerNodeId,
+ req.targetPeerEndPoint));
+ responseStatus = new
TSStatus(TSStatusCode.SUCCESS_STATUS.getStatusCode());
+ } catch (ConsensusGroupModifyPeerException e) {
+ responseStatus = new
TSStatus(TSStatusCode.INTERNAL_SERVER_ERROR.getStatusCode());
+ responseStatus.setMessage(e.getMessage());
+ LOGGER.warn("Failed to drop consensus pipe to target peer with req {}",
req, e);
+ }
+ return new TNotifyPeerToDropConsensusPipeResp(responseStatus);
+ }
+
+ @Override
+ public TCheckConsensusPipeCompletedResp checkConsensusPipeCompleted(
+ TCheckConsensusPipeCompletedReq req) throws TException {
+ ConsensusGroupId groupId =
+
ConsensusGroupId.Factory.createFromTConsensusGroupId(req.consensusGroupId);
+ AirReplicationServerImpl impl = airReplication.getImpl(groupId);
+ if (impl == null) {
+ String message =
+ String.format(
+ "unexpected consensusGroupId %s for check transfer completed
request %s",
+ groupId, req);
+ LOGGER.error(message);
+ TSStatus status = new
TSStatus(TSStatusCode.INTERNAL_SERVER_ERROR.getStatusCode());
+ status.setMessage(message);
+ return new TCheckConsensusPipeCompletedResp(status, true);
+ }
+ TSStatus responseStatus;
+ boolean isCompleted;
+ try {
+ isCompleted =
+ impl.isConsensusPipesTransmissionCompleted(
+ req.consensusPipeNames, req.refreshCachedProgressIndex);
+ responseStatus = new
TSStatus(TSStatusCode.SUCCESS_STATUS.getStatusCode());
+ } catch (Exception e) {
+ responseStatus = new
TSStatus(TSStatusCode.INTERNAL_SERVER_ERROR.getStatusCode());
+ responseStatus.setMessage(e.getMessage());
+ isCompleted = true;
+ LOGGER.warn(
+ "Failed to check consensus pipe completed with req {}, set is
completed to {}",
+ req,
+ true,
+ e);
+ }
+ return new TCheckConsensusPipeCompletedResp(responseStatus, isCompleted);
+ }
+
+ @Override
+ public TWaitReleaseAllRegionRelatedResourceResp
waitReleaseAllRegionRelatedResource(
+ TWaitReleaseAllRegionRelatedResourceReq req) {
+ ConsensusGroupId groupId =
+
ConsensusGroupId.Factory.createFromTConsensusGroupId(req.getConsensusGroupId());
+ AirReplicationServerImpl impl = airReplication.getImpl(groupId);
+ if (impl == null) {
+ String message =
+ String.format(
+ "unexpected consensusGroupId %s for
TWaitReleaseAllRegionRelatedResourceRes request",
+ groupId);
+ LOGGER.error(message);
+ return new TWaitReleaseAllRegionRelatedResourceResp(true);
+ }
+ return new TWaitReleaseAllRegionRelatedResourceResp(
+ impl.hasReleaseAllRegionRelatedResource(groupId));
+ }
+
+ public void handleExit() {}
+}