This is an automated email from the ASF dual-hosted git repository.

rong pushed a commit to branch rel/1.2
in repository https://gitbox.apache.org/repos/asf/iotdb.git


The following commit(s) were added to refs/heads/rel/1.2 by this push:
     new ee0b463e189 [IOTDB-6118] Pipe: Support transfer data through air gap 
(IoTDBAirGapConnector) (#10883)
ee0b463e189 is described below

commit ee0b463e189aead00ee5399a89b57b3975f2dfac
Author: Caideyipi <[email protected]>
AuthorDate: Thu Aug 17 14:53:26 2023 +0800

    [IOTDB-6118] Pipe: Support transfer data through air gap 
(IoTDBAirGapConnector) (#10883)
---
 .../src/main/java/org/apache/iotdb/SSLClient.java  |   2 +-
 .../java/org/apache/iotdb/db/conf/IoTDBConfig.java |  22 +-
 .../db/pipe/agent/receiver/PipeReceiverAgent.java  |  70 ++---
 .../db/pipe/agent/runtime/PipeRuntimeAgent.java    |   6 +-
 .../config/constant/PipeConnectorConstant.java     |   4 +
 .../payload/airgap/AirGapOneByteResponse.java}     |   8 +-
 .../airgap/AirGapPseudoTPipeTransferRequest.java}  |   6 +-
 .../request/PipeTransferFilePieceReq.java          |  22 +-
 .../evolvable/request/PipeTransferFileSealReq.java |  22 +-
 .../request/PipeTransferHandshakeReq.java          |  20 +-
 .../request/PipeTransferInsertNodeReq.java         |  24 +-
 .../evolvable/request/PipeTransferTabletReq.java   | 111 ++++----
 ...TDBThriftConnector.java => IoTDBConnector.java} |   6 +-
 ...sion.java => IoTDBConnectorRequestVersion.java} |   6 +-
 .../protocol/airgap/IoTDBAirGapConnector.java      | 315 ++++++++++++++++++++-
 .../thrift/async/IoTDBThriftAsyncConnector.java    |   9 +-
 .../thrift/sync/IoTDBThriftSyncConnector.java      |  33 ++-
 .../pipe/receiver/airgap/IoTDBAirGapReceiver.java  | 180 +++++++++++-
 .../receiver/airgap/IoTDBAirGapReceiverAgent.java  |  97 +++++++
 ...iver.java => IoTDBLegacyPipeReceiverAgent.java} |  20 +-
 .../pipe/receiver/thrift/IoTDBThriftReceiver.java  |   4 +-
 .../thrift/IoTDBThriftReceiverAgent.java}          |  38 +--
 .../receiver/thrift/IoTDBThriftReceiverV1.java     |  22 +-
 .../connector/PipeConnectorSubtaskManager.java     |   4 +
 .../protocol/thrift/impl/ClientRPCServiceImpl.java |  14 +-
 .../java/org/apache/iotdb/db/service/DataNode.java |   4 +
 .../resources/conf/iotdb-common.properties         |   7 +
 .../iotdb/commons/concurrent/ThreadName.java       |   2 +
 .../apache/iotdb/commons/conf/CommonConfig.java    |  19 ++
 .../iotdb/commons/conf/CommonDescriptor.java       |  11 +
 .../iotdb/commons/pipe/config/PipeConfig.java      |  20 ++
 .../pipe/plugin/builtin/BuiltinPipePlugin.java     |   2 +
 .../builtin/connector/IoTDBAirGapConnector.java}   |  23 +-
 .../connector/IoTDBLegacyPipeConnector.java        |  54 +---
 .../builtin/connector/IoTDBThriftConnector.java    |  54 +---
 ...ipeConnector.java => PlaceholderConnector.java} |  12 +-
 .../apache/iotdb/commons/service/ServiceType.java  |   1 +
 .../org/apache/iotdb/tsfile/utils/BytesUtils.java  |   4 +-
 .../org/apache/iotdb/tsfile/utils/PublicBAOS.java  |   1 +
 39 files changed, 917 insertions(+), 362 deletions(-)

diff --git 
a/example/rest-java-example/src/main/java/org/apache/iotdb/SSLClient.java 
b/example/rest-java-example/src/main/java/org/apache/iotdb/SSLClient.java
index 8366fa507b3..e4b9f393db6 100644
--- a/example/rest-java-example/src/main/java/org/apache/iotdb/SSLClient.java
+++ b/example/rest-java-example/src/main/java/org/apache/iotdb/SSLClient.java
@@ -40,7 +40,7 @@ import java.security.cert.CertificateException;
 import java.security.cert.X509Certificate;
 
 public class SSLClient {
-  private static Logger logger = LoggerFactory.getLogger(SSLClient.class);
+  private static final Logger logger = 
LoggerFactory.getLogger(SSLClient.class);
   private static SSLConnectionSocketFactory sslConnectionSocketFactory = null;
   private static PoolingHttpClientConnectionManager 
poolingHttpClientConnectionManager = null;
 
diff --git 
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/conf/IoTDBConfig.java 
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/conf/IoTDBConfig.java
index c85043f011c..061c290f12d 100644
--- 
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/conf/IoTDBConfig.java
+++ 
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/conf/IoTDBConfig.java
@@ -92,22 +92,22 @@ public class IoTDBConfig {
   public static final Pattern NODE_PATTERN = Pattern.compile(NODE_MATCHER);
   public static final String SYSTEM_DATABASE = "root.__system";
 
-  /** whether to enable the mqtt service. */
+  /** Whether to enable the mqtt service. */
   private boolean enableMQTTService = false;
 
-  /** the mqtt service binding host. */
+  /** The mqtt service binding host. */
   private String mqttHost = "127.0.0.1";
 
-  /** the mqtt service binding port. */
+  /** The mqtt service binding port. */
   private int mqttPort = 1883;
 
-  /** the handler pool size for handing the mqtt messages. */
+  /** The handler pool size for handing the mqtt messages. */
   private int mqttHandlerPoolSize = 1;
 
-  /** the mqtt message payload formatter. */
+  /** The mqtt message payload formatter. */
   private String mqttPayloadFormatter = "json";
 
-  /** max mqtt message size. Unit: byte */
+  /** Max mqtt message size. Unit: byte */
   private int mqttMaxMessageSize = 1048576;
 
   /** Rpc binding address. */
@@ -1075,7 +1075,7 @@ public class IoTDBConfig {
   private double maxMemoryRatioForQueue = 0.6;
 
   /** Pipe related */
-  private String pipeReceiveFileDir =
+  private String pipeReceiverFileDir =
       systemDir + File.separator + "pipe" + File.separator + "receiver";
 
   /** Resource control */
@@ -1209,7 +1209,7 @@ public class IoTDBConfig {
     triggerTemporaryLibDir = addDataHomeDir(triggerTemporaryLibDir);
     pipeDir = addDataHomeDir(pipeDir);
     pipeTemporaryLibDir = addDataHomeDir(pipeTemporaryLibDir);
-    pipeReceiveFileDir = addDataHomeDir(pipeReceiveFileDir);
+    pipeReceiverFileDir = addDataHomeDir(pipeReceiverFileDir);
     mqttDir = addDataHomeDir(mqttDir);
     extPipeDir = addDataHomeDir(extPipeDir);
     queryDir = addDataHomeDir(queryDir);
@@ -3681,12 +3681,12 @@ public class IoTDBConfig {
     return modeMapSizeThreshold;
   }
 
-  public void setPipeReceiverFileDir(String pipeReceiveFileDir) {
-    this.pipeReceiveFileDir = pipeReceiveFileDir;
+  public void setPipeReceiverFileDir(String pipeReceiverFileDir) {
+    this.pipeReceiverFileDir = pipeReceiverFileDir;
   }
 
   public String getPipeReceiverFileDir() {
-    return pipeReceiveFileDir;
+    return pipeReceiverFileDir;
   }
 
   public boolean isQuotaEnable() {
diff --git 
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/agent/receiver/PipeReceiverAgent.java
 
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/agent/receiver/PipeReceiverAgent.java
index 5b5d0b2bd42..53967e17757 100644
--- 
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/agent/receiver/PipeReceiverAgent.java
+++ 
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/agent/receiver/PipeReceiverAgent.java
@@ -20,15 +20,9 @@
 package org.apache.iotdb.db.pipe.agent.receiver;
 
 import org.apache.iotdb.db.conf.IoTDBDescriptor;
-import 
org.apache.iotdb.db.pipe.connector.protocol.thrift.IoTDBThriftConnectorRequestVersion;
-import org.apache.iotdb.db.pipe.receiver.thrift.IoTDBThriftReceiver;
-import org.apache.iotdb.db.pipe.receiver.thrift.IoTDBThriftReceiverV1;
-import org.apache.iotdb.db.queryengine.plan.analyze.IPartitionFetcher;
-import org.apache.iotdb.db.queryengine.plan.analyze.schema.ISchemaFetcher;
-import org.apache.iotdb.rpc.RpcUtils;
-import org.apache.iotdb.rpc.TSStatusCode;
-import org.apache.iotdb.service.rpc.thrift.TPipeTransferReq;
-import org.apache.iotdb.service.rpc.thrift.TPipeTransferResp;
+import org.apache.iotdb.db.pipe.receiver.airgap.IoTDBAirGapReceiverAgent;
+import org.apache.iotdb.db.pipe.receiver.legacy.IoTDBLegacyPipeReceiverAgent;
+import org.apache.iotdb.db.pipe.receiver.thrift.IoTDBThriftReceiverAgent;
 
 import org.apache.commons.io.FileUtils;
 import org.slf4j.Logger;
@@ -37,61 +31,31 @@ import org.slf4j.LoggerFactory;
 import java.io.File;
 import java.io.IOException;
 
+/** PipeReceiverAgent is the entry point of all pipe receivers' logic. */
 public class PipeReceiverAgent {
 
   private static final Logger LOGGER = 
LoggerFactory.getLogger(PipeReceiverAgent.class);
 
-  private final ThreadLocal<IoTDBThriftReceiver> receiverThreadLocal = new 
ThreadLocal<>();
+  private final IoTDBThriftReceiverAgent thriftAgent;
+  private final IoTDBAirGapReceiverAgent airGapAgent;
+  private final IoTDBLegacyPipeReceiverAgent legacyAgent;
 
-  public TPipeTransferResp receive(
-      TPipeTransferReq req, IPartitionFetcher partitionFetcher, ISchemaFetcher 
schemaFetcher) {
-    final byte reqVersion = req.getVersion();
-    if (reqVersion == 
IoTDBThriftConnectorRequestVersion.VERSION_1.getVersion()) {
-      return getReceiver(reqVersion).receive(req, partitionFetcher, 
schemaFetcher);
-    } else {
-      return new TPipeTransferResp(
-          RpcUtils.getStatus(
-              TSStatusCode.PIPE_VERSION_ERROR,
-              String.format("Unsupported pipe version %d", reqVersion)));
-    }
+  public PipeReceiverAgent() {
+    thriftAgent = new IoTDBThriftReceiverAgent();
+    airGapAgent = new IoTDBAirGapReceiverAgent();
+    legacyAgent = new IoTDBLegacyPipeReceiverAgent();
   }
 
-  private IoTDBThriftReceiver getReceiver(byte reqVersion) {
-    if (receiverThreadLocal.get() == null) {
-      return setAndGetReceiver(reqVersion);
-    }
-
-    final byte receiverThreadLocalVersion = 
receiverThreadLocal.get().getVersion().getVersion();
-    if (receiverThreadLocalVersion != reqVersion) {
-      LOGGER.warn(
-          "The receiver version {} is different from the sender version {},"
-              + " the receiver will be reset to the sender version.",
-          receiverThreadLocalVersion,
-          reqVersion);
-      receiverThreadLocal.get().handleExit();
-      receiverThreadLocal.remove();
-      return setAndGetReceiver(reqVersion);
-    }
-
-    return receiverThreadLocal.get();
+  public IoTDBThriftReceiverAgent thrift() {
+    return thriftAgent;
   }
 
-  private IoTDBThriftReceiver setAndGetReceiver(byte reqVersion) {
-    if (reqVersion == 
IoTDBThriftConnectorRequestVersion.VERSION_1.getVersion()) {
-      receiverThreadLocal.set(new IoTDBThriftReceiverV1());
-    } else {
-      throw new UnsupportedOperationException(
-          String.format("Unsupported pipe version %d", reqVersion));
-    }
-    return receiverThreadLocal.get();
+  public IoTDBAirGapReceiverAgent airGap() {
+    return airGapAgent;
   }
 
-  public void handleClientExit() {
-    final IoTDBThriftReceiver receiver = receiverThreadLocal.get();
-    if (receiver != null) {
-      receiver.handleExit();
-      receiverThreadLocal.remove();
-    }
+  public IoTDBLegacyPipeReceiverAgent legacy() {
+    return legacyAgent;
   }
 
   public void cleanPipeReceiverDir() {
diff --git 
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/agent/runtime/PipeRuntimeAgent.java
 
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/agent/runtime/PipeRuntimeAgent.java
index 5112d1e730d..d21b7e270df 100644
--- 
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/agent/runtime/PipeRuntimeAgent.java
+++ 
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/agent/runtime/PipeRuntimeAgent.java
@@ -52,8 +52,12 @@ public class PipeRuntimeAgent implements IService {
 
   public synchronized void preparePipeResources(
       ResourcesInformationHolder resourcesInformationHolder) throws 
StartupException {
-    PipeAgent.receiver().cleanPipeReceiverDir();
+    // clean sender (connector) hardlink file dir
     PipeHardlinkFileDirStartupCleaner.clean();
+
+    // clean receiver file dir
+    PipeAgent.receiver().cleanPipeReceiverDir();
+
     PipeAgentLauncher.launchPipePluginAgent(resourcesInformationHolder);
     simpleConsensusProgressIndexAssigner.start();
   }
diff --git 
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/config/constant/PipeConnectorConstant.java
 
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/config/constant/PipeConnectorConstant.java
index 24bd5a51e75..96477c3a3aa 100644
--- 
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/config/constant/PipeConnectorConstant.java
+++ 
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/config/constant/PipeConnectorConstant.java
@@ -33,6 +33,10 @@ public class PipeConnectorConstant {
   public static final String CONNECTOR_IOTDB_PASSWORD_KEY = 
"connector.password";
   public static final String CONNECTOR_IOTDB_PASSWORD_DEFAULT_VALUE = "root";
 
+  public static final String CONNECTOR_AIR_GAP_HANDSHAKE_TIMEOUT_MS_KEY =
+      "connector.air-gap.handshake-timeout-ms";
+  public static final int CONNECTOR_AIR_GAP_HANDSHAKE_TIMEOUT_MS_DEFAULT_VALUE 
= 5000;
+
   public static final String CONNECTOR_IOTDB_SYNC_CONNECTOR_VERSION_KEY = 
"connector.version";
   public static final String 
CONNECTOR_IOTDB_SYNC_CONNECTOR_VERSION_DEFAULT_VALUE = "1.1";
 
diff --git 
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/receiver/airgap/IoTDBAirGapReceiver.java
 
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/connector/payload/airgap/AirGapOneByteResponse.java
similarity index 79%
copy from 
iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/receiver/airgap/IoTDBAirGapReceiver.java
copy to 
iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/connector/payload/airgap/AirGapOneByteResponse.java
index 2c70bd57107..b9b3bbaf086 100644
--- 
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/receiver/airgap/IoTDBAirGapReceiver.java
+++ 
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/connector/payload/airgap/AirGapOneByteResponse.java
@@ -17,6 +17,10 @@
  * under the License.
  */
 
-package org.apache.iotdb.db.pipe.receiver.airgap;
+package org.apache.iotdb.db.pipe.connector.payload.airgap;
 
-public class IoTDBAirGapReceiver {}
+public class AirGapOneByteResponse {
+
+  public static final byte[] OK = new byte[] {0};
+  public static final byte[] FAIL = new byte[] {(byte) 0xFF};
+}
diff --git 
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/receiver/airgap/IoTDBAirGapReceiver.java
 
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/connector/payload/airgap/AirGapPseudoTPipeTransferRequest.java
similarity index 80%
copy from 
iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/receiver/airgap/IoTDBAirGapReceiver.java
copy to 
iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/connector/payload/airgap/AirGapPseudoTPipeTransferRequest.java
index 2c70bd57107..5441176077d 100644
--- 
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/receiver/airgap/IoTDBAirGapReceiver.java
+++ 
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/connector/payload/airgap/AirGapPseudoTPipeTransferRequest.java
@@ -17,6 +17,8 @@
  * under the License.
  */
 
-package org.apache.iotdb.db.pipe.receiver.airgap;
+package org.apache.iotdb.db.pipe.connector.payload.airgap;
 
-public class IoTDBAirGapReceiver {}
+import org.apache.iotdb.service.rpc.thrift.TPipeTransferReq;
+
+public class AirGapPseudoTPipeTransferRequest extends TPipeTransferReq {}
diff --git 
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/connector/payload/evolvable/request/PipeTransferFilePieceReq.java
 
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/connector/payload/evolvable/request/PipeTransferFilePieceReq.java
index ead6139d4e8..12f7843d906 100644
--- 
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/connector/payload/evolvable/request/PipeTransferFilePieceReq.java
+++ 
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/connector/payload/evolvable/request/PipeTransferFilePieceReq.java
@@ -20,7 +20,7 @@
 package org.apache.iotdb.db.pipe.connector.payload.evolvable.request;
 
 import org.apache.iotdb.db.pipe.connector.payload.evolvable.PipeRequestType;
-import 
org.apache.iotdb.db.pipe.connector.protocol.thrift.IoTDBThriftConnectorRequestVersion;
+import 
org.apache.iotdb.db.pipe.connector.protocol.IoTDBConnectorRequestVersion;
 import org.apache.iotdb.service.rpc.thrift.TPipeTransferReq;
 import org.apache.iotdb.tsfile.utils.Binary;
 import org.apache.iotdb.tsfile.utils.PublicBAOS;
@@ -54,6 +54,8 @@ public class PipeTransferFilePieceReq extends 
TPipeTransferReq {
     return filePiece;
   }
 
+  /////////////////////////////// Thrift ///////////////////////////////
+
   public static PipeTransferFilePieceReq toTPipeTransferReq(
       String fileName, long startWritingOffset, byte[] filePiece) throws 
IOException {
     final PipeTransferFilePieceReq filePieceReq = new 
PipeTransferFilePieceReq();
@@ -62,7 +64,7 @@ public class PipeTransferFilePieceReq extends 
TPipeTransferReq {
     filePieceReq.startWritingOffset = startWritingOffset;
     filePieceReq.filePiece = filePiece;
 
-    filePieceReq.version = 
IoTDBThriftConnectorRequestVersion.VERSION_1.getVersion();
+    filePieceReq.version = IoTDBConnectorRequestVersion.VERSION_1.getVersion();
     filePieceReq.type = PipeRequestType.TRANSFER_FILE_PIECE.getType();
     try (final PublicBAOS byteArrayOutputStream = new PublicBAOS();
         final DataOutputStream outputStream = new 
DataOutputStream(byteArrayOutputStream)) {
@@ -90,6 +92,22 @@ public class PipeTransferFilePieceReq extends 
TPipeTransferReq {
     return filePieceReq;
   }
 
+  /////////////////////////////// Air Gap ///////////////////////////////
+  public static byte[] toTPipeTransferBytes(
+      String fileName, long startWritingOffset, byte[] filePiece) throws 
IOException {
+    try (final PublicBAOS byteArrayOutputStream = new PublicBAOS();
+        final DataOutputStream outputStream = new 
DataOutputStream(byteArrayOutputStream)) {
+      
ReadWriteIOUtils.write(IoTDBConnectorRequestVersion.VERSION_1.getVersion(), 
outputStream);
+      ReadWriteIOUtils.write(PipeRequestType.TRANSFER_FILE_PIECE.getType(), 
outputStream);
+      ReadWriteIOUtils.write(fileName, outputStream);
+      ReadWriteIOUtils.write(startWritingOffset, outputStream);
+      ReadWriteIOUtils.write(new Binary(filePiece), outputStream);
+      return byteArrayOutputStream.toByteArray();
+    }
+  }
+
+  /////////////////////////////// Object ///////////////////////////////
+
   @Override
   public boolean equals(Object obj) {
     if (this == obj) {
diff --git 
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/connector/payload/evolvable/request/PipeTransferFileSealReq.java
 
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/connector/payload/evolvable/request/PipeTransferFileSealReq.java
index b77fa735045..e9dc715bdd1 100644
--- 
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/connector/payload/evolvable/request/PipeTransferFileSealReq.java
+++ 
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/connector/payload/evolvable/request/PipeTransferFileSealReq.java
@@ -20,7 +20,7 @@
 package org.apache.iotdb.db.pipe.connector.payload.evolvable.request;
 
 import org.apache.iotdb.db.pipe.connector.payload.evolvable.PipeRequestType;
-import 
org.apache.iotdb.db.pipe.connector.protocol.thrift.IoTDBThriftConnectorRequestVersion;
+import 
org.apache.iotdb.db.pipe.connector.protocol.IoTDBConnectorRequestVersion;
 import org.apache.iotdb.service.rpc.thrift.TPipeTransferReq;
 import org.apache.iotdb.tsfile.utils.PublicBAOS;
 import org.apache.iotdb.tsfile.utils.ReadWriteIOUtils;
@@ -47,6 +47,8 @@ public class PipeTransferFileSealReq extends TPipeTransferReq 
{
     return fileLength;
   }
 
+  /////////////////////////////// Thrift ///////////////////////////////
+
   public static PipeTransferFileSealReq toTPipeTransferReq(String fileName, 
long fileLength)
       throws IOException {
     final PipeTransferFileSealReq fileSealReq = new PipeTransferFileSealReq();
@@ -54,7 +56,7 @@ public class PipeTransferFileSealReq extends TPipeTransferReq 
{
     fileSealReq.fileName = fileName;
     fileSealReq.fileLength = fileLength;
 
-    fileSealReq.version = 
IoTDBThriftConnectorRequestVersion.VERSION_1.getVersion();
+    fileSealReq.version = IoTDBConnectorRequestVersion.VERSION_1.getVersion();
     fileSealReq.type = PipeRequestType.TRANSFER_FILE_SEAL.getType();
     try (final PublicBAOS byteArrayOutputStream = new PublicBAOS();
         final DataOutputStream outputStream = new 
DataOutputStream(byteArrayOutputStream)) {
@@ -80,6 +82,22 @@ public class PipeTransferFileSealReq extends 
TPipeTransferReq {
     return fileSealReq;
   }
 
+  /////////////////////////////// Air Gap ///////////////////////////////
+
+  public static byte[] toTPipeTransferFileSealBytes(String fileName, long 
fileLength)
+      throws IOException {
+    try (final PublicBAOS byteArrayOutputStream = new PublicBAOS();
+        final DataOutputStream outputStream = new 
DataOutputStream(byteArrayOutputStream)) {
+      
ReadWriteIOUtils.write(IoTDBConnectorRequestVersion.VERSION_1.getVersion(), 
outputStream);
+      ReadWriteIOUtils.write(PipeRequestType.TRANSFER_FILE_SEAL.getType(), 
outputStream);
+      ReadWriteIOUtils.write(fileName, outputStream);
+      ReadWriteIOUtils.write(fileLength, outputStream);
+      return byteArrayOutputStream.toByteArray();
+    }
+  }
+
+  /////////////////////////////// Object ///////////////////////////////
+
   @Override
   public boolean equals(Object obj) {
     if (this == obj) {
diff --git 
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/connector/payload/evolvable/request/PipeTransferHandshakeReq.java
 
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/connector/payload/evolvable/request/PipeTransferHandshakeReq.java
index 1fe2123a425..11d6ce7ef4b 100644
--- 
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/connector/payload/evolvable/request/PipeTransferHandshakeReq.java
+++ 
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/connector/payload/evolvable/request/PipeTransferHandshakeReq.java
@@ -20,7 +20,7 @@
 package org.apache.iotdb.db.pipe.connector.payload.evolvable.request;
 
 import org.apache.iotdb.db.pipe.connector.payload.evolvable.PipeRequestType;
-import 
org.apache.iotdb.db.pipe.connector.protocol.thrift.IoTDBThriftConnectorRequestVersion;
+import 
org.apache.iotdb.db.pipe.connector.protocol.IoTDBConnectorRequestVersion;
 import org.apache.iotdb.service.rpc.thrift.TPipeTransferReq;
 import org.apache.iotdb.tsfile.utils.PublicBAOS;
 import org.apache.iotdb.tsfile.utils.ReadWriteIOUtils;
@@ -42,13 +42,15 @@ public class PipeTransferHandshakeReq extends 
TPipeTransferReq {
     return timestampPrecision;
   }
 
+  /////////////////////////////// Thrift ///////////////////////////////
+
   public static PipeTransferHandshakeReq toTPipeTransferReq(String 
timestampPrecision)
       throws IOException {
     final PipeTransferHandshakeReq handshakeReq = new 
PipeTransferHandshakeReq();
 
     handshakeReq.timestampPrecision = timestampPrecision;
 
-    handshakeReq.version = 
IoTDBThriftConnectorRequestVersion.VERSION_1.getVersion();
+    handshakeReq.version = IoTDBConnectorRequestVersion.VERSION_1.getVersion();
     handshakeReq.type = PipeRequestType.HANDSHAKE.getType();
     try (final PublicBAOS byteArrayOutputStream = new PublicBAOS();
         final DataOutputStream outputStream = new 
DataOutputStream(byteArrayOutputStream)) {
@@ -72,6 +74,20 @@ public class PipeTransferHandshakeReq extends 
TPipeTransferReq {
     return handshakeReq;
   }
 
+  /////////////////////////////// Air Gap ///////////////////////////////
+
+  public static byte[] toTransferHandshakeBytes(String timestampPrecision) 
throws IOException {
+    try (final PublicBAOS byteArrayOutputStream = new PublicBAOS();
+        final DataOutputStream outputStream = new 
DataOutputStream(byteArrayOutputStream)) {
+      
ReadWriteIOUtils.write(IoTDBConnectorRequestVersion.VERSION_1.getVersion(), 
outputStream);
+      ReadWriteIOUtils.write(PipeRequestType.HANDSHAKE.getType(), 
outputStream);
+      ReadWriteIOUtils.write(timestampPrecision, outputStream);
+      return byteArrayOutputStream.toByteArray();
+    }
+  }
+
+  /////////////////////////////// Object ///////////////////////////////
+
   @Override
   public boolean equals(Object obj) {
     if (this == obj) {
diff --git 
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/connector/payload/evolvable/request/PipeTransferInsertNodeReq.java
 
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/connector/payload/evolvable/request/PipeTransferInsertNodeReq.java
index 7c375dbc6d4..c3da0985b45 100644
--- 
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/connector/payload/evolvable/request/PipeTransferInsertNodeReq.java
+++ 
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/connector/payload/evolvable/request/PipeTransferInsertNodeReq.java
@@ -20,7 +20,7 @@
 package org.apache.iotdb.db.pipe.connector.payload.evolvable.request;
 
 import org.apache.iotdb.db.pipe.connector.payload.evolvable.PipeRequestType;
-import 
org.apache.iotdb.db.pipe.connector.protocol.thrift.IoTDBThriftConnectorRequestVersion;
+import 
org.apache.iotdb.db.pipe.connector.protocol.IoTDBConnectorRequestVersion;
 import org.apache.iotdb.db.queryengine.plan.planner.plan.node.PlanNodeType;
 import org.apache.iotdb.db.queryengine.plan.planner.plan.node.write.InsertNode;
 import 
org.apache.iotdb.db.queryengine.plan.planner.plan.node.write.InsertRowNode;
@@ -29,7 +29,12 @@ import 
org.apache.iotdb.db.queryengine.plan.statement.Statement;
 import org.apache.iotdb.db.queryengine.plan.statement.crud.InsertRowStatement;
 import 
org.apache.iotdb.db.queryengine.plan.statement.crud.InsertTabletStatement;
 import org.apache.iotdb.service.rpc.thrift.TPipeTransferReq;
+import org.apache.iotdb.tsfile.utils.BytesUtils;
+import org.apache.iotdb.tsfile.utils.PublicBAOS;
+import org.apache.iotdb.tsfile.utils.ReadWriteIOUtils;
 
+import java.io.DataOutputStream;
+import java.io.IOException;
 import java.util.Objects;
 
 public class PipeTransferInsertNodeReq extends TPipeTransferReq {
@@ -82,12 +87,14 @@ public class PipeTransferInsertNodeReq extends 
TPipeTransferReq {
             insertNode));
   }
 
+  /////////////////////////////// Thrift ///////////////////////////////
+
   public static PipeTransferInsertNodeReq toTPipeTransferReq(InsertNode 
insertNode) {
     final PipeTransferInsertNodeReq req = new PipeTransferInsertNodeReq();
 
     req.insertNode = insertNode;
 
-    req.version = IoTDBThriftConnectorRequestVersion.VERSION_1.getVersion();
+    req.version = IoTDBConnectorRequestVersion.VERSION_1.getVersion();
     req.type = PipeRequestType.TRANSFER_INSERT_NODE.getType();
     req.body = insertNode.serializeToByteBuffer();
 
@@ -106,6 +113,19 @@ public class PipeTransferInsertNodeReq extends 
TPipeTransferReq {
     return insertNodeReq;
   }
 
+  /////////////////////////////// Air Gap ///////////////////////////////
+  public static byte[] toTransferInsertNodeBytes(InsertNode insertNode) throws 
IOException {
+    try (final PublicBAOS byteArrayOutputStream = new PublicBAOS();
+        final DataOutputStream outputStream = new 
DataOutputStream(byteArrayOutputStream)) {
+      
ReadWriteIOUtils.write(IoTDBConnectorRequestVersion.VERSION_1.getVersion(), 
outputStream);
+      ReadWriteIOUtils.write(PipeRequestType.TRANSFER_INSERT_NODE.getType(), 
outputStream);
+      return BytesUtils.concatByteArray(
+          byteArrayOutputStream.toByteArray(), 
insertNode.serializeToByteBuffer().array());
+    }
+  }
+
+  /////////////////////////////// Object ///////////////////////////////
+
   @Override
   public boolean equals(Object obj) {
     if (this == obj) {
diff --git 
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/connector/payload/evolvable/request/PipeTransferTabletReq.java
 
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/connector/payload/evolvable/request/PipeTransferTabletReq.java
index 654aac9cb53..58e64520133 100644
--- 
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/connector/payload/evolvable/request/PipeTransferTabletReq.java
+++ 
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/connector/payload/evolvable/request/PipeTransferTabletReq.java
@@ -22,7 +22,7 @@ package 
org.apache.iotdb.db.pipe.connector.payload.evolvable.request;
 import org.apache.iotdb.commons.exception.MetadataException;
 import org.apache.iotdb.commons.utils.PathUtils;
 import org.apache.iotdb.db.pipe.connector.payload.evolvable.PipeRequestType;
-import 
org.apache.iotdb.db.pipe.connector.protocol.thrift.IoTDBThriftConnectorRequestVersion;
+import 
org.apache.iotdb.db.pipe.connector.protocol.IoTDBConnectorRequestVersion;
 import org.apache.iotdb.db.queryengine.plan.parser.StatementGenerator;
 import 
org.apache.iotdb.db.queryengine.plan.statement.crud.InsertTabletStatement;
 import org.apache.iotdb.service.rpc.thrift.TPipeTransferReq;
@@ -36,7 +36,6 @@ import org.apache.iotdb.tsfile.utils.PublicBAOS;
 import org.apache.iotdb.tsfile.utils.ReadWriteIOUtils;
 import org.apache.iotdb.tsfile.write.record.Tablet;
 import org.apache.iotdb.tsfile.write.schema.IMeasurementSchema;
-import org.apache.iotdb.tsfile.write.schema.MeasurementSchema;
 
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
@@ -54,23 +53,32 @@ public class PipeTransferTabletReq extends TPipeTransferReq 
{
   private Tablet tablet;
   private boolean isAligned;
 
-  public static PipeTransferTabletReq toTPipeTransferReq(Tablet tablet, 
boolean isAligned)
-      throws IOException {
-    final PipeTransferTabletReq tabletReq = new PipeTransferTabletReq();
+  public InsertTabletStatement constructStatement() {
+    if (!checkSorted(tablet)) {
+      sortTablet(tablet);
+    }
 
-    tabletReq.tablet = tablet;
+    try {
+      final TSInsertTabletReq request = new TSInsertTabletReq();
 
-    tabletReq.version = 
IoTDBThriftConnectorRequestVersion.VERSION_1.getVersion();
-    tabletReq.type = PipeRequestType.TRANSFER_TABLET.getType();
-    try (final PublicBAOS byteArrayOutputStream = new PublicBAOS();
-        final DataOutputStream outputStream = new 
DataOutputStream(byteArrayOutputStream)) {
-      tablet.serialize(outputStream);
-      ReadWriteIOUtils.write(isAligned, outputStream);
-      tabletReq.body =
-          ByteBuffer.wrap(byteArrayOutputStream.getBuf(), 0, 
byteArrayOutputStream.size());
-    }
+      for (IMeasurementSchema measurementSchema : tablet.getSchemas()) {
+        request.addToMeasurements(measurementSchema.getMeasurementId());
+        request.addToTypes(measurementSchema.getType().ordinal());
+      }
 
-    return tabletReq;
+      request.setPrefixPath(tablet.deviceId);
+      request.setIsAligned(isAligned);
+      request.setTimestamps(SessionUtils.getTimeBuffer(tablet));
+      request.setValues(SessionUtils.getValueBuffer(tablet));
+      request.setSize(tablet.rowSize);
+      request.setMeasurements(
+          
PathUtils.checkIsLegalSingleMeasurementsAndUpdate(request.getMeasurements()));
+
+      return StatementGenerator.createStatement(request);
+    } catch (MetadataException e) {
+      LOGGER.warn(String.format("Generate Statement from tablet %s error.", 
tablet), e);
+      return null;
+    }
   }
 
   private static boolean checkSorted(Tablet tablet) {
@@ -97,25 +105,12 @@ public class PipeTransferTabletReq extends 
TPipeTransferReq {
     int columnIndex = 0;
     for (int i = 0; i < tablet.getSchemas().size(); i++) {
       IMeasurementSchema schema = tablet.getSchemas().get(i);
-      if (schema instanceof MeasurementSchema) {
+      if (schema != null) {
         tablet.values[columnIndex] = sortList(tablet.values[columnIndex], 
schema.getType(), index);
         if (tablet.bitMaps != null && tablet.bitMaps[columnIndex] != null) {
           tablet.bitMaps[columnIndex] = 
sortBitMap(tablet.bitMaps[columnIndex], index);
         }
         columnIndex++;
-      } else {
-        int measurementSize = schema.getSubMeasurementsList().size();
-        for (int j = 0; j < measurementSize; j++) {
-          tablet.values[columnIndex] =
-              sortList(
-                  tablet.values[columnIndex],
-                  schema.getSubMeasurementsTSDataTypeList().get(j),
-                  index);
-          if (tablet.bitMaps != null && tablet.bitMaps[columnIndex] != null) {
-            tablet.bitMaps[columnIndex] = 
sortBitMap(tablet.bitMaps[columnIndex], index);
-          }
-          columnIndex++;
-        }
       }
     }
   }
@@ -196,6 +191,27 @@ public class PipeTransferTabletReq extends 
TPipeTransferReq {
     return sortedBitMap;
   }
 
+  /////////////////////////////// Thrift ///////////////////////////////
+
+  public static PipeTransferTabletReq toTPipeTransferReq(Tablet tablet, 
boolean isAligned)
+      throws IOException {
+    final PipeTransferTabletReq tabletReq = new PipeTransferTabletReq();
+
+    tabletReq.tablet = tablet;
+
+    tabletReq.version = IoTDBConnectorRequestVersion.VERSION_1.getVersion();
+    tabletReq.type = PipeRequestType.TRANSFER_TABLET.getType();
+    try (final PublicBAOS byteArrayOutputStream = new PublicBAOS();
+        final DataOutputStream outputStream = new 
DataOutputStream(byteArrayOutputStream)) {
+      tablet.serialize(outputStream);
+      ReadWriteIOUtils.write(isAligned, outputStream);
+      tabletReq.body =
+          ByteBuffer.wrap(byteArrayOutputStream.getBuf(), 0, 
byteArrayOutputStream.size());
+    }
+
+    return tabletReq;
+  }
+
   public static PipeTransferTabletReq fromTPipeTransferReq(TPipeTransferReq 
transferReq) {
     final PipeTransferTabletReq tabletReq = new PipeTransferTabletReq();
 
@@ -209,31 +225,16 @@ public class PipeTransferTabletReq extends 
TPipeTransferReq {
     return tabletReq;
   }
 
-  public InsertTabletStatement constructStatement() {
-    if (!checkSorted(tablet)) {
-      sortTablet(tablet);
-    }
-
-    try {
-      final TSInsertTabletReq request = new TSInsertTabletReq();
-
-      for (IMeasurementSchema measurementSchema : tablet.getSchemas()) {
-        request.addToMeasurements(measurementSchema.getMeasurementId());
-        request.addToTypes(measurementSchema.getType().ordinal());
-      }
-
-      request.setPrefixPath(tablet.deviceId);
-      request.setIsAligned(isAligned);
-      request.setTimestamps(SessionUtils.getTimeBuffer(tablet));
-      request.setValues(SessionUtils.getValueBuffer(tablet));
-      request.setSize(tablet.rowSize);
-      request.setMeasurements(
-          
PathUtils.checkIsLegalSingleMeasurementsAndUpdate(request.getMeasurements()));
-
-      return StatementGenerator.createStatement(request);
-    } catch (MetadataException e) {
-      LOGGER.warn(String.format("Generate Statement from tablet %s error.", 
tablet), e);
-      return null;
+  /////////////////////////////// Air Gap ///////////////////////////////
+  public static byte[] toTPipeTransferTabletBytes(Tablet tablet, boolean 
isAligned)
+      throws IOException {
+    try (final PublicBAOS byteArrayOutputStream = new PublicBAOS();
+        final DataOutputStream outputStream = new 
DataOutputStream(byteArrayOutputStream)) {
+      
ReadWriteIOUtils.write(IoTDBConnectorRequestVersion.VERSION_1.getVersion(), 
outputStream);
+      ReadWriteIOUtils.write(PipeRequestType.TRANSFER_TABLET.getType(), 
outputStream);
+      tablet.serialize(outputStream);
+      ReadWriteIOUtils.write(isAligned, outputStream);
+      return byteArrayOutputStream.toByteArray();
     }
   }
 }
diff --git 
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/connector/protocol/thrift/IoTDBThriftConnector.java
 
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/connector/protocol/IoTDBConnector.java
similarity index 95%
rename from 
iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/connector/protocol/thrift/IoTDBThriftConnector.java
rename to 
iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/connector/protocol/IoTDBConnector.java
index f369d80840b..3b218c60e04 100644
--- 
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/connector/protocol/thrift/IoTDBThriftConnector.java
+++ 
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/connector/protocol/IoTDBConnector.java
@@ -17,7 +17,7 @@
  * under the License.
  */
 
-package org.apache.iotdb.db.pipe.connector.protocol.thrift;
+package org.apache.iotdb.db.pipe.connector.protocol;
 
 import org.apache.iotdb.common.rpc.thrift.TEndPoint;
 import org.apache.iotdb.pipe.api.PipeConnector;
@@ -39,9 +39,9 @@ import static 
org.apache.iotdb.db.pipe.config.constant.PipeConnectorConstant.CON
 import static 
org.apache.iotdb.db.pipe.config.constant.PipeConnectorConstant.CONNECTOR_IOTDB_NODE_URLS_KEY;
 import static 
org.apache.iotdb.db.pipe.config.constant.PipeConnectorConstant.CONNECTOR_IOTDB_PORT_KEY;
 
-public abstract class IoTDBThriftConnector implements PipeConnector {
+public abstract class IoTDBConnector implements PipeConnector {
 
-  private static final Logger LOGGER = 
LoggerFactory.getLogger(IoTDBThriftConnector.class);
+  private static final Logger LOGGER = 
LoggerFactory.getLogger(IoTDBConnector.class);
 
   protected final List<TEndPoint> nodeUrls = new ArrayList<>();
 
diff --git 
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/connector/protocol/thrift/IoTDBThriftConnectorRequestVersion.java
 
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/connector/protocol/IoTDBConnectorRequestVersion.java
similarity index 85%
copy from 
iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/connector/protocol/thrift/IoTDBThriftConnectorRequestVersion.java
copy to 
iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/connector/protocol/IoTDBConnectorRequestVersion.java
index 29e59abc304..941a372a498 100644
--- 
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/connector/protocol/thrift/IoTDBThriftConnectorRequestVersion.java
+++ 
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/connector/protocol/IoTDBConnectorRequestVersion.java
@@ -17,15 +17,15 @@
  * under the License.
  */
 
-package org.apache.iotdb.db.pipe.connector.protocol.thrift;
+package org.apache.iotdb.db.pipe.connector.protocol;
 
-public enum IoTDBThriftConnectorRequestVersion {
+public enum IoTDBConnectorRequestVersion {
   VERSION_1((byte) 1),
   ;
 
   private final byte version;
 
-  IoTDBThriftConnectorRequestVersion(byte type) {
+  IoTDBConnectorRequestVersion(byte type) {
     this.version = type;
   }
 
diff --git 
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/connector/protocol/airgap/IoTDBAirGapConnector.java
 
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/connector/protocol/airgap/IoTDBAirGapConnector.java
index 928e8eebbb1..8a0e11f5ebf 100644
--- 
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/connector/protocol/airgap/IoTDBAirGapConnector.java
+++ 
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/connector/protocol/airgap/IoTDBAirGapConnector.java
@@ -19,39 +19,332 @@
 
 package org.apache.iotdb.db.pipe.connector.protocol.airgap;
 
-import org.apache.iotdb.pipe.api.PipeConnector;
+import org.apache.iotdb.commons.conf.CommonDescriptor;
+import org.apache.iotdb.commons.pipe.config.PipeConfig;
+import org.apache.iotdb.db.pipe.connector.payload.airgap.AirGapOneByteResponse;
+import 
org.apache.iotdb.db.pipe.connector.payload.evolvable.request.PipeTransferFilePieceReq;
+import 
org.apache.iotdb.db.pipe.connector.payload.evolvable.request.PipeTransferFileSealReq;
+import 
org.apache.iotdb.db.pipe.connector.payload.evolvable.request.PipeTransferHandshakeReq;
+import 
org.apache.iotdb.db.pipe.connector.payload.evolvable.request.PipeTransferInsertNodeReq;
+import 
org.apache.iotdb.db.pipe.connector.payload.evolvable.request.PipeTransferTabletReq;
+import org.apache.iotdb.db.pipe.connector.protocol.IoTDBConnector;
+import 
org.apache.iotdb.db.pipe.event.common.tablet.PipeInsertNodeTabletInsertionEvent;
+import 
org.apache.iotdb.db.pipe.event.common.tablet.PipeRawTabletInsertionEvent;
+import org.apache.iotdb.db.pipe.event.common.tsfile.PipeTsFileInsertionEvent;
+import 
org.apache.iotdb.db.storageengine.dataregion.wal.exception.WALPipeException;
 import 
org.apache.iotdb.pipe.api.customizer.configuration.PipeConnectorRuntimeConfiguration;
-import org.apache.iotdb.pipe.api.customizer.parameter.PipeParameterValidator;
 import org.apache.iotdb.pipe.api.customizer.parameter.PipeParameters;
 import org.apache.iotdb.pipe.api.event.Event;
 import org.apache.iotdb.pipe.api.event.dml.insertion.TabletInsertionEvent;
+import org.apache.iotdb.pipe.api.event.dml.insertion.TsFileInsertionEvent;
+import org.apache.iotdb.pipe.api.exception.PipeConnectionException;
+import org.apache.iotdb.pipe.api.exception.PipeException;
+import org.apache.iotdb.tsfile.utils.BytesUtils;
 
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
-public class IoTDBAirGapConnector implements PipeConnector {
+import java.io.BufferedOutputStream;
+import java.io.File;
+import java.io.IOException;
+import java.io.RandomAccessFile;
+import java.net.InetSocketAddress;
+import java.net.Socket;
+import java.util.ArrayList;
+import java.util.Arrays;
+import java.util.List;
+import java.util.zip.CRC32;
+
+import static org.apache.iotdb.commons.utils.BasicStructureSerDeUtil.LONG_LEN;
+import static 
org.apache.iotdb.db.pipe.config.constant.PipeConnectorConstant.CONNECTOR_AIR_GAP_HANDSHAKE_TIMEOUT_MS_DEFAULT_VALUE;
+import static 
org.apache.iotdb.db.pipe.config.constant.PipeConnectorConstant.CONNECTOR_AIR_GAP_HANDSHAKE_TIMEOUT_MS_KEY;
+
+public class IoTDBAirGapConnector extends IoTDBConnector {
 
   private static final Logger LOGGER = 
LoggerFactory.getLogger(IoTDBAirGapConnector.class);
 
-  @Override
-  public void validate(PipeParameterValidator validator) throws Exception {}
+  private static final PipeConfig PIPE_CONFIG = PipeConfig.getInstance();
+
+  private final List<Socket> sockets = new ArrayList<>();
+  private final List<Boolean> isSocketAlive = new ArrayList<>();
+
+  private int handshakeTimeoutMs;
+
+  private long currentClientIndex = 0;
 
   @Override
   public void customize(PipeParameters parameters, 
PipeConnectorRuntimeConfiguration configuration)
-      throws Exception {}
+      throws Exception {
+    super.customize(parameters, configuration);
+    for (int i = 0; i < nodeUrls.size(); i++) {
+      isSocketAlive.add(false);
+      sockets.add(null);
+    }
+
+    handshakeTimeoutMs =
+        parameters.getIntOrDefault(
+            CONNECTOR_AIR_GAP_HANDSHAKE_TIMEOUT_MS_KEY,
+            CONNECTOR_AIR_GAP_HANDSHAKE_TIMEOUT_MS_DEFAULT_VALUE);
+    LOGGER.info(
+        "IoTDBAirGapConnector is customized with handshakeTimeoutMs: {}.", 
handshakeTimeoutMs);
+  }
 
   @Override
-  public void handshake() throws Exception {}
+  public void handshake() throws Exception {
+    for (int i = 0; i < sockets.size(); i++) {
+      if (Boolean.TRUE.equals(isSocketAlive.get(i))) {
+        continue;
+      }
+
+      final String ip = nodeUrls.get(i).getIp();
+      final int port = nodeUrls.get(i).getPort();
+
+      // close the socket if necessary
+      if (sockets.get(i) != null) {
+        try {
+          sockets.set(i, null).close();
+        } catch (Exception e) {
+          LOGGER.warn(
+              "Failed to close socket with target server ip: {}, port: {}, 
because: {}. Ignore it.",
+              ip,
+              port,
+              e.getMessage());
+        }
+      }
+
+      final Socket socket = new Socket();
+
+      try {
+        socket.connect(new InetSocketAddress(ip, port), handshakeTimeoutMs);
+        socket.setKeepAlive(true);
+        socket.setSoTimeout(handshakeTimeoutMs);
+        sockets.set(i, socket);
+        LOGGER.info("Successfully connected to target server ip: {}, port: 
{}.", ip, port);
+      } catch (Exception e) {
+        LOGGER.warn(
+            "Failed to connect to target server ip: {}, port: {}, because: {}. 
Ignore it.",
+            ip,
+            port,
+            e.getMessage());
+        continue;
+      }
+
+      if (!send(
+          socket,
+          PipeTransferHandshakeReq.toTransferHandshakeBytes(
+              
CommonDescriptor.getInstance().getConfig().getTimestampPrecision()))) {
+        throw new PipeException("Handshake error with target server ip: " + ip 
+ ", port: " + port);
+      } else {
+        isSocketAlive.set(i, true);
+        socket.setSoTimeout((int) PIPE_CONFIG.getPipeConnectorTimeoutMs());
+        LOGGER.info("Handshake success. Target server ip: {}, port: {}", ip, 
port);
+      }
+    }
+
+    for (int i = 0; i < sockets.size(); i++) {
+      if (Boolean.TRUE.equals(isSocketAlive.get(i))) {
+        return;
+      }
+    }
+    throw new PipeConnectionException(
+        String.format("All target servers %s are not available.", nodeUrls));
+  }
 
   @Override
-  public void heartbeat() throws Exception {}
+  public void heartbeat() {
+    try {
+      handshake();
+    } catch (Exception e) {
+      LOGGER.warn(
+          "Failed to reconnect to target server, because: {}. Try to reconnect 
later.",
+          e.getMessage(),
+          e);
+    }
+  }
 
   @Override
-  public void transfer(TabletInsertionEvent tabletInsertionEvent) throws 
Exception {}
+  public void transfer(TabletInsertionEvent tabletInsertionEvent) throws 
Exception {
+    // PipeProcessor can change the type of TabletInsertionEvent
+
+    final int socketIndex = nextSocketIndex();
+    final Socket socket = sockets.get(socketIndex);
+
+    try {
+      if (tabletInsertionEvent instanceof PipeInsertNodeTabletInsertionEvent) {
+        doTransfer(socket, (PipeInsertNodeTabletInsertionEvent) 
tabletInsertionEvent);
+      } else if (tabletInsertionEvent instanceof PipeRawTabletInsertionEvent) {
+        doTransfer(socket, (PipeRawTabletInsertionEvent) tabletInsertionEvent);
+      } else {
+        LOGGER.warn(
+            "IoTDBAirGapConnector only support "
+                + "PipeInsertNodeTabletInsertionEvent and 
PipeRawTabletInsertionEvent. "
+                + "Ignore {}.",
+            tabletInsertionEvent);
+      }
+    } catch (IOException e) {
+      isSocketAlive.set(socketIndex, false);
+
+      throw new PipeConnectionException(
+          String.format(
+              "Network error when transfer tablet insertion event %s, because 
%s.",
+              tabletInsertionEvent, e.getMessage()),
+          e);
+    }
+  }
 
   @Override
-  public void transfer(Event event) throws Exception {}
+  public void transfer(TsFileInsertionEvent tsFileInsertionEvent) throws 
Exception {
+    // PipeProcessor can change the type of tsFileInsertionEvent
+    if (!(tsFileInsertionEvent instanceof PipeTsFileInsertionEvent)) {
+      LOGGER.warn(
+          "IoTDBAirGapConnector only support PipeTsFileInsertionEvent. Ignore 
{}.",
+          tsFileInsertionEvent);
+      return;
+    }
+
+    final int socketIndex = nextSocketIndex();
+    final Socket socket = sockets.get(socketIndex);
+
+    try {
+      doTransfer(socket, (PipeTsFileInsertionEvent) tsFileInsertionEvent);
+    } catch (IOException e) {
+      isSocketAlive.set(socketIndex, false);
+
+      throw new PipeConnectionException(
+          String.format(
+              "Network error when transfer tsfile insertion event %s, because 
%s.",
+              tsFileInsertionEvent, e.getMessage()),
+          e);
+    }
+  }
+
+  @Override
+  public void transfer(Event event) {
+    LOGGER.warn("IoTDBAirGapConnector does not support transfer generic event: 
{}.", event);
+  }
+
+  private void doTransfer(
+      Socket socket, PipeInsertNodeTabletInsertionEvent 
pipeInsertNodeTabletInsertionEvent)
+      throws PipeException, WALPipeException, IOException {
+    if (!send(
+        socket,
+        PipeTransferInsertNodeReq.toTransferInsertNodeBytes(
+            pipeInsertNodeTabletInsertionEvent.getInsertNode()))) {
+      throw new PipeException(
+          String.format(
+              "Transfer PipeInsertNodeTabletInsertionEvent %s error. Socket: 
%s",
+              pipeInsertNodeTabletInsertionEvent, socket));
+    }
+  }
+
+  private void doTransfer(Socket socket, PipeRawTabletInsertionEvent 
pipeRawTabletInsertionEvent)
+      throws PipeException, IOException {
+    if (!send(
+        socket,
+        PipeTransferTabletReq.toTPipeTransferTabletBytes(
+            pipeRawTabletInsertionEvent.convertToTablet(),
+            pipeRawTabletInsertionEvent.isAligned()))) {
+      throw new PipeException(
+          String.format(
+              "Transfer PipeRawTabletInsertionEvent %s error. Socket: %s.",
+              pipeRawTabletInsertionEvent, socket));
+    }
+  }
+
+  private void doTransfer(Socket socket, PipeTsFileInsertionEvent 
pipeTsFileInsertionEvent)
+      throws PipeException, InterruptedException, IOException {
+    pipeTsFileInsertionEvent.waitForTsFileClose();
+
+    final File tsFile = pipeTsFileInsertionEvent.getTsFile();
+
+    // 1. Transfer file piece by piece
+    final int readFileBufferSize = 
PipeConfig.getInstance().getPipeConnectorReadFileBufferSize();
+    final byte[] readBuffer = new byte[readFileBufferSize];
+    long position = 0;
+    try (final RandomAccessFile reader = new RandomAccessFile(tsFile, "r")) {
+      while (true) {
+        final int readLength = reader.read(readBuffer);
+        if (readLength == -1) {
+          break;
+        }
+
+        if (!send(
+            socket,
+            PipeTransferFilePieceReq.toTPipeTransferBytes(
+                tsFile.getName(),
+                position,
+                readLength == readFileBufferSize
+                    ? readBuffer
+                    : Arrays.copyOfRange(readBuffer, 0, readLength)))) {
+          throw new PipeException(
+              String.format("Transfer file %s error. Socket %s.", tsFile, 
socket));
+        } else {
+          position += readLength;
+        }
+      }
+    }
+
+    // 2. Transfer file seal signal, which means the file is transferred 
completely
+    if (!send(
+        socket,
+        PipeTransferFileSealReq.toTPipeTransferFileSealBytes(tsFile.getName(), 
tsFile.length()))) {
+      throw new PipeException(String.format("Seal file %s error. Socket %s.", 
tsFile, socket));
+    } else {
+      LOGGER.info("Successfully transferred file {}.", tsFile);
+    }
+  }
+
+  private int nextSocketIndex() {
+    final int socketSize = sockets.size();
+    // Round-robin, find the next alive client
+    for (int tryCount = 0; tryCount < socketSize; ++tryCount) {
+      final int clientIndex = (int) (currentClientIndex++ % socketSize);
+      if (Boolean.TRUE.equals(isSocketAlive.get(clientIndex))) {
+        return clientIndex;
+      }
+    }
+    throw new PipeConnectionException(
+        "All sockets are dead, please check the connection to the receiver.");
+  }
+
+  private boolean send(Socket socket, byte[] bytes) throws IOException {
+    if (!socket.isConnected()) {
+      return false;
+    }
+
+    final BufferedOutputStream outputStream = new 
BufferedOutputStream(socket.getOutputStream());
+    outputStream.write(enrichWithLengthAndChecksum(bytes));
+    outputStream.flush();
+
+    final byte[] response = new byte[1];
+    final int size = socket.getInputStream().read(response);
+    return size > 0 && Arrays.equals(AirGapOneByteResponse.OK, response);
+  }
+
+  private byte[] enrichWithLengthAndChecksum(byte[] bytes) {
+    // length of checksum and bytes payload
+    final byte[] length = BytesUtils.intToBytes(bytes.length + LONG_LEN);
+
+    final CRC32 crc32 = new CRC32();
+    crc32.update(bytes, 0, bytes.length);
+
+    // double length as simple checksum
+    return BytesUtils.concatByteArrayList(
+        Arrays.asList(length, length, 
BytesUtils.longToBytes(crc32.getValue()), bytes));
+  }
 
   @Override
-  public void close() throws Exception {}
+  public void close() {
+    for (int i = 0; i < sockets.size(); ++i) {
+      try {
+        if (sockets.get(i) != null) {
+          sockets.set(i, null).close();
+        }
+      } catch (Exception e) {
+        LOGGER.warn("Failed to close client {}.", i, e);
+      } finally {
+        isSocketAlive.set(i, false);
+      }
+    }
+  }
 }
diff --git 
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/connector/protocol/thrift/async/IoTDBThriftAsyncConnector.java
 
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/connector/protocol/thrift/async/IoTDBThriftAsyncConnector.java
index 28f6e9b10f8..a7ee768226b 100644
--- 
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/connector/protocol/thrift/async/IoTDBThriftAsyncConnector.java
+++ 
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/connector/protocol/thrift/async/IoTDBThriftAsyncConnector.java
@@ -30,7 +30,7 @@ import org.apache.iotdb.commons.conf.CommonDescriptor;
 import 
org.apache.iotdb.db.pipe.connector.payload.evolvable.request.PipeTransferHandshakeReq;
 import 
org.apache.iotdb.db.pipe.connector.payload.evolvable.request.PipeTransferInsertNodeReq;
 import 
org.apache.iotdb.db.pipe.connector.payload.evolvable.request.PipeTransferTabletReq;
-import org.apache.iotdb.db.pipe.connector.protocol.thrift.IoTDBThriftConnector;
+import org.apache.iotdb.db.pipe.connector.protocol.IoTDBConnector;
 import 
org.apache.iotdb.db.pipe.connector.protocol.thrift.async.handler.PipeTransferInsertNodeTabletInsertionEventHandler;
 import 
org.apache.iotdb.db.pipe.connector.protocol.thrift.async.handler.PipeTransferRawTabletInsertionEventHandler;
 import 
org.apache.iotdb.db.pipe.connector.protocol.thrift.async.handler.PipeTransferTsFileInsertionEventHandler;
@@ -63,13 +63,14 @@ import java.util.Comparator;
 import java.util.Optional;
 import java.util.PriorityQueue;
 import java.util.concurrent.Future;
+import java.util.concurrent.PriorityBlockingQueue;
 import java.util.concurrent.ScheduledExecutorService;
 import java.util.concurrent.TimeUnit;
 import java.util.concurrent.atomic.AtomicBoolean;
 import java.util.concurrent.atomic.AtomicLong;
 import java.util.concurrent.atomic.AtomicReference;
 
-public class IoTDBThriftAsyncConnector extends IoTDBThriftConnector {
+public class IoTDBThriftAsyncConnector extends IoTDBConnector {
 
   private static final Logger LOGGER = 
LoggerFactory.getLogger(IoTDBThriftAsyncConnector.class);
 
@@ -87,8 +88,8 @@ public class IoTDBThriftAsyncConnector extends 
IoTDBThriftConnector {
   private static final int RETRY_TRIGGER_INTERVAL_MINUTES = 1;
   private final AtomicReference<Future<?>> retryTriggerFuture = new 
AtomicReference<>();
   private final IoTDBThriftSyncConnector retryConnector = new 
IoTDBThriftSyncConnector();
-  private final PriorityQueue<Pair<Long, Event>> retryEventQueue =
-      new PriorityQueue<>(Comparator.comparing(o -> o.left));
+  private final PriorityBlockingQueue<Pair<Long, Event>> retryEventQueue =
+      new PriorityBlockingQueue<>(11, Comparator.comparing(o -> o.left));
 
   private final AtomicLong commitIdGenerator = new AtomicLong(0);
   private final AtomicLong lastCommitId = new AtomicLong(0);
diff --git 
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/connector/protocol/thrift/sync/IoTDBThriftSyncConnector.java
 
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/connector/protocol/thrift/sync/IoTDBThriftSyncConnector.java
index a025f1742ae..e682247cc66 100644
--- 
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/connector/protocol/thrift/sync/IoTDBThriftSyncConnector.java
+++ 
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/connector/protocol/thrift/sync/IoTDBThriftSyncConnector.java
@@ -19,7 +19,6 @@
 
 package org.apache.iotdb.db.pipe.connector.protocol.thrift.sync;
 
-import org.apache.iotdb.common.rpc.thrift.TEndPoint;
 import org.apache.iotdb.commons.client.property.ThriftClientProperty;
 import org.apache.iotdb.commons.conf.CommonDescriptor;
 import org.apache.iotdb.commons.pipe.config.PipeConfig;
@@ -29,7 +28,7 @@ import 
org.apache.iotdb.db.pipe.connector.payload.evolvable.request.PipeTransfer
 import 
org.apache.iotdb.db.pipe.connector.payload.evolvable.request.PipeTransferHandshakeReq;
 import 
org.apache.iotdb.db.pipe.connector.payload.evolvable.request.PipeTransferInsertNodeReq;
 import 
org.apache.iotdb.db.pipe.connector.payload.evolvable.request.PipeTransferTabletReq;
-import org.apache.iotdb.db.pipe.connector.protocol.thrift.IoTDBThriftConnector;
+import org.apache.iotdb.db.pipe.connector.protocol.IoTDBConnector;
 import 
org.apache.iotdb.db.pipe.event.common.tablet.PipeInsertNodeTabletInsertionEvent;
 import 
org.apache.iotdb.db.pipe.event.common.tablet.PipeRawTabletInsertionEvent;
 import org.apache.iotdb.db.pipe.event.common.tsfile.PipeTsFileInsertionEvent;
@@ -55,7 +54,7 @@ import java.util.ArrayList;
 import java.util.Arrays;
 import java.util.List;
 
-public class IoTDBThriftSyncConnector extends IoTDBThriftConnector {
+public class IoTDBThriftSyncConnector extends IoTDBConnector {
 
   private static final Logger LOGGER = 
LoggerFactory.getLogger(IoTDBThriftSyncConnector.class);
 
@@ -70,10 +69,6 @@ public class IoTDBThriftSyncConnector extends 
IoTDBThriftConnector {
     // Do nothing
   }
 
-  public IoTDBThriftSyncConnector(String ipAddress, int port) {
-    nodeUrls.add(new TEndPoint(ipAddress, port));
-  }
-
   @Override
   public void customize(PipeParameters parameters, 
PipeConnectorRuntimeConfiguration configuration)
       throws Exception {
@@ -87,7 +82,7 @@ public class IoTDBThriftSyncConnector extends 
IoTDBThriftConnector {
   @Override
   public void handshake() throws Exception {
     for (int i = 0; i < clients.size(); i++) {
-      if (isClientAlive.get(i)) {
+      if (Boolean.TRUE.equals(isClientAlive.get(i))) {
         continue;
       }
 
@@ -126,22 +121,26 @@ public class IoTDBThriftSyncConnector extends 
IoTDBThriftConnector {
                     PipeTransferHandshakeReq.toTPipeTransferReq(
                         
CommonDescriptor.getInstance().getConfig().getTimestampPrecision()));
         if (resp.getStatus().getCode() != 
TSStatusCode.SUCCESS_STATUS.getStatusCode()) {
-          throw new PipeException(String.format("Handshake error, result 
status %s.", resp.status));
+          LOGGER.warn(
+              "Handshake error with target server ip: {}, port: {}, because: 
{}.",
+              ip,
+              port,
+              resp.status);
         } else {
           isClientAlive.set(i, true);
           LOGGER.info("Handshake success. Target server ip: {}, port: {}", ip, 
port);
         }
       } catch (TException e) {
-        throw new PipeConnectionException(
-            String.format(
-                "Handshake error with target server ip: %s, port: %s, because: 
%s",
-                ip, port, e.getMessage()),
-            e);
+        LOGGER.warn(
+            "Handshake error with target server ip: {}, port: {}, because: 
{}.",
+            ip,
+            port,
+            e.getMessage());
       }
     }
 
     for (int i = 0; i < clients.size(); i++) {
-      if (isClientAlive.get(i)) {
+      if (Boolean.TRUE.equals(isClientAlive.get(i))) {
         return;
       }
     }
@@ -193,7 +192,7 @@ public class IoTDBThriftSyncConnector extends 
IoTDBThriftConnector {
 
   @Override
   public void transfer(TsFileInsertionEvent tsFileInsertionEvent) throws 
Exception {
-    // PipeProcessor can change the type of TabletInsertionEvent
+    // PipeProcessor can change the type of tsFileInsertionEvent
     if (!(tsFileInsertionEvent instanceof PipeTsFileInsertionEvent)) {
       LOGGER.warn(
           "IoTDBThriftSyncConnector only support PipeTsFileInsertionEvent. 
Ignore {}.",
@@ -320,7 +319,7 @@ public class IoTDBThriftSyncConnector extends 
IoTDBThriftConnector {
     // Round-robin, find the next alive client
     for (int tryCount = 0; tryCount < clientSize; ++tryCount) {
       final int clientIndex = (int) (currentClientIndex++ % clientSize);
-      if (isClientAlive.get(clientIndex)) {
+      if (Boolean.TRUE.equals(isClientAlive.get(clientIndex))) {
         return clientIndex;
       }
     }
diff --git 
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/receiver/airgap/IoTDBAirGapReceiver.java
 
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/receiver/airgap/IoTDBAirGapReceiver.java
index 2c70bd57107..dd7a7d83556 100644
--- 
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/receiver/airgap/IoTDBAirGapReceiver.java
+++ 
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/receiver/airgap/IoTDBAirGapReceiver.java
@@ -19,4 +19,182 @@
 
 package org.apache.iotdb.db.pipe.receiver.airgap;
 
-public class IoTDBAirGapReceiver {}
+import org.apache.iotdb.commons.concurrent.WrappedRunnable;
+import org.apache.iotdb.commons.pipe.config.PipeConfig;
+import org.apache.iotdb.db.pipe.agent.PipeAgent;
+import org.apache.iotdb.db.pipe.connector.payload.airgap.AirGapOneByteResponse;
+import 
org.apache.iotdb.db.pipe.connector.payload.airgap.AirGapPseudoTPipeTransferRequest;
+import org.apache.iotdb.db.pipe.receiver.thrift.IoTDBThriftReceiverAgent;
+import org.apache.iotdb.db.queryengine.plan.analyze.ClusterPartitionFetcher;
+import org.apache.iotdb.db.queryengine.plan.analyze.IPartitionFetcher;
+import 
org.apache.iotdb.db.queryengine.plan.analyze.schema.ClusterSchemaFetcher;
+import org.apache.iotdb.db.queryengine.plan.analyze.schema.ISchemaFetcher;
+import org.apache.iotdb.rpc.TSStatusCode;
+import org.apache.iotdb.service.rpc.thrift.TPipeTransferResp;
+import org.apache.iotdb.tsfile.utils.BytesUtils;
+import org.apache.iotdb.tsfile.utils.ReadWriteIOUtils;
+
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import java.io.BufferedInputStream;
+import java.io.IOException;
+import java.io.InputStream;
+import java.io.OutputStream;
+import java.net.Socket;
+import java.nio.ByteBuffer;
+import java.util.Arrays;
+import java.util.zip.CRC32;
+
+import static org.apache.iotdb.commons.utils.BasicStructureSerDeUtil.LONG_LEN;
+
+public class IoTDBAirGapReceiver extends WrappedRunnable {
+
+  private static final Logger LOGGER = 
LoggerFactory.getLogger(IoTDBAirGapReceiver.class);
+
+  private final Socket socket;
+  private final long receiverId;
+
+  private final IoTDBThriftReceiverAgent agent;
+  private final IPartitionFetcher partitionFetcher;
+  private final ISchemaFetcher schemaFetcher;
+
+  public IoTDBAirGapReceiver(Socket socket, long receiverId) {
+    this.socket = socket;
+    this.receiverId = receiverId;
+
+    agent = PipeAgent.receiver().thrift();
+    partitionFetcher = ClusterPartitionFetcher.getInstance();
+    schemaFetcher = ClusterSchemaFetcher.getInstance();
+  }
+
+  @Override
+  public void runMayThrow() throws Throwable {
+    socket.setSoTimeout((int) 
PipeConfig.getInstance().getPipeConnectorTimeoutMs());
+    socket.setKeepAlive(true);
+
+    LOGGER.info("Pipe air gap receiver {} started. Socket: {}", receiverId, 
socket);
+
+    try {
+      while (!socket.isClosed()) {
+        receive();
+      }
+      LOGGER.info(
+          "Pipe air gap receiver {} closed because socket is closed. Socket: 
{}",
+          receiverId,
+          socket);
+    } catch (Exception e) {
+      LOGGER.warn(
+          "Pipe air gap receiver {} closed because of exception. Socket: {}",
+          receiverId,
+          socket,
+          e);
+      throw e;
+    } finally {
+      PipeAgent.receiver().thrift().handleClientExit();
+      socket.close();
+    }
+  }
+
+  private void receive() throws IOException {
+    final InputStream inputStream = new 
BufferedInputStream(socket.getInputStream());
+
+    try {
+      final byte[] data = readData(inputStream);
+
+      if (!checkSum(data)) {
+        LOGGER.warn("Checksum failed, receiverId: {}", receiverId);
+        fail();
+        return;
+      }
+
+      // Removed the used checksum
+      final ByteBuffer byteBuffer = ByteBuffer.wrap(data, LONG_LEN, 
data.length - LONG_LEN);
+      // Pseudo request, to reuse logic in IoTDBThriftReceiverAgent
+      final AirGapPseudoTPipeTransferRequest req =
+          (AirGapPseudoTPipeTransferRequest)
+              new AirGapPseudoTPipeTransferRequest()
+                  .setVersion(ReadWriteIOUtils.readByte(byteBuffer))
+                  .setType(ReadWriteIOUtils.readShort(byteBuffer))
+                  .setBody(byteBuffer.slice());
+      final TPipeTransferResp resp = agent.receive(req, partitionFetcher, 
schemaFetcher);
+
+      if (resp.getStatus().code == 
TSStatusCode.SUCCESS_STATUS.getStatusCode()) {
+        ok();
+      } else {
+        LOGGER.warn(
+            "Handle data failed, receiverId: {}, status: {}, req: {}",
+            receiverId,
+            resp.getStatus(),
+            req);
+        fail();
+      }
+    } catch (Exception e) {
+      LOGGER.warn("Exception during handling receiving, receiverId: {}", 
receiverId, e);
+      fail();
+    }
+  }
+
+  private void ok() throws IOException {
+    final OutputStream outputStream = socket.getOutputStream();
+    outputStream.write(AirGapOneByteResponse.OK);
+    outputStream.flush();
+  }
+
+  private void fail() throws IOException {
+    final OutputStream outputStream = socket.getOutputStream();
+    outputStream.write(AirGapOneByteResponse.FAIL);
+    outputStream.flush();
+  }
+
+  private boolean checkSum(byte[] bytes) {
+    try {
+      final CRC32 crc32 = new CRC32();
+      crc32.update(bytes, LONG_LEN, bytes.length - LONG_LEN);
+      return BytesUtils.bytesToLong(BytesUtils.subBytes(bytes, 0, LONG_LEN)) 
== crc32.getValue();
+    } catch (Exception e) {
+      // ArrayIndexOutOfBoundsException when bytes.length < LONG_LEN
+      return false;
+    }
+  }
+
+  private byte[] readData(InputStream inputStream) throws IOException {
+    final int length = readLength(inputStream);
+
+    if (length == 0) {
+      // Will fail() after checkSum()
+      return new byte[0];
+    }
+
+    final ByteBuffer resultBuffer = ByteBuffer.allocate(length);
+    final byte[] readBuffer = new byte[length];
+
+    int alreadyReadBytes = 0;
+    int currentReadBytes;
+    while (alreadyReadBytes < length) {
+      currentReadBytes = inputStream.read(readBuffer, 0, length - 
alreadyReadBytes);
+      resultBuffer.put(readBuffer, 0, currentReadBytes);
+      alreadyReadBytes += currentReadBytes;
+    }
+    return resultBuffer.array();
+  }
+
+  /**
+   * Read the length of the following data. The thread may typically block 
here when there is no
+   * data to read.
+   */
+  private int readLength(InputStream inputStream) throws IOException {
+    byte[] lengthBytes0 = new byte[4];
+    if (inputStream.read(lengthBytes0) < 4) {
+      return 0;
+    }
+
+    // for double check
+    byte[] lengthBytes1 = new byte[4];
+    if (inputStream.read(lengthBytes1) < 4) {
+      return 0;
+    }
+
+    return Arrays.equals(lengthBytes0, lengthBytes1) ? 
BytesUtils.bytesToInt(lengthBytes0) : 0;
+  }
+}
diff --git 
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/receiver/airgap/IoTDBAirGapReceiverAgent.java
 
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/receiver/airgap/IoTDBAirGapReceiverAgent.java
new file mode 100644
index 00000000000..497ae6ce834
--- /dev/null
+++ 
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/receiver/airgap/IoTDBAirGapReceiverAgent.java
@@ -0,0 +1,97 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.iotdb.db.pipe.receiver.airgap;
+
+import org.apache.iotdb.commons.concurrent.IoTDBThreadPoolFactory;
+import org.apache.iotdb.commons.concurrent.ThreadName;
+import org.apache.iotdb.commons.exception.StartupException;
+import org.apache.iotdb.commons.pipe.config.PipeConfig;
+import org.apache.iotdb.commons.service.IService;
+import org.apache.iotdb.commons.service.ServiceType;
+
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import java.io.IOException;
+import java.net.ServerSocket;
+import java.net.Socket;
+import java.util.concurrent.ExecutorService;
+import java.util.concurrent.atomic.AtomicBoolean;
+import java.util.concurrent.atomic.AtomicLong;
+
+public class IoTDBAirGapReceiverAgent implements IService {
+
+  private static final Logger LOGGER = 
LoggerFactory.getLogger(IoTDBAirGapReceiverAgent.class);
+
+  private final ExecutorService listenExecutor =
+      IoTDBThreadPoolFactory.newSingleThreadExecutor(
+          ThreadName.PIPE_RECEIVER_AIR_GAP_AGENT.getName());
+  private final AtomicBoolean allowSubmitListen = new AtomicBoolean(false);
+
+  private ServerSocket serverSocket;
+
+  private final AtomicLong receiverId = new AtomicLong(0);
+
+  public void listen() {
+    try {
+      final Socket socket = serverSocket.accept();
+      new Thread(new IoTDBAirGapReceiver(socket, 
receiverId.incrementAndGet())).start();
+    } catch (IOException e) {
+      LOGGER.warn("Unhandled exception during pipe air gap receiver 
listening", e);
+    }
+
+    if (allowSubmitListen.get()) {
+      listenExecutor.submit(this::listen);
+    }
+  }
+
+  @Override
+  public void start() throws StartupException {
+    try {
+      serverSocket = new 
ServerSocket(PipeConfig.getInstance().getPipeAirGapReceiverPort());
+    } catch (IOException e) {
+      throw new StartupException(e);
+    }
+
+    allowSubmitListen.set(true);
+    listenExecutor.submit(this::listen);
+
+    LOGGER.info("IoTDBAirGapReceiverAgent {} started.", serverSocket);
+  }
+
+  @Override
+  public void stop() {
+    try {
+      serverSocket.close();
+    } catch (IOException e) {
+      LOGGER.warn("Failed to close IoTDBAirGapReceiverAgent's server socket", 
e);
+    }
+
+    allowSubmitListen.set(false);
+    listenExecutor.shutdown();
+
+    LOGGER.info("IoTDBAirGapReceiverAgent {} stopped.", serverSocket);
+  }
+
+  @Override
+  public ServiceType getID() {
+    return ServiceType.AIR_GAP_SERVICE;
+  }
+}
diff --git 
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/receiver/legacy/IoTDBLegacyPipeReceiver.java
 
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/receiver/legacy/IoTDBLegacyPipeReceiverAgent.java
similarity index 96%
rename from 
iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/receiver/legacy/IoTDBLegacyPipeReceiver.java
rename to 
iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/receiver/legacy/IoTDBLegacyPipeReceiverAgent.java
index a3c7020c71c..19dc3d51781 100644
--- 
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/receiver/legacy/IoTDBLegacyPipeReceiver.java
+++ 
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/receiver/legacy/IoTDBLegacyPipeReceiverAgent.java
@@ -52,9 +52,9 @@ import java.util.Map;
 import java.util.concurrent.ConcurrentHashMap;
 import java.util.concurrent.atomic.AtomicLong;
 
-public class IoTDBLegacyPipeReceiver {
+public class IoTDBLegacyPipeReceiverAgent {
 
-  private static final Logger LOGGER = 
LoggerFactory.getLogger(IoTDBLegacyPipeReceiver.class);
+  private static final Logger LOGGER = 
LoggerFactory.getLogger(IoTDBLegacyPipeReceiverAgent.class);
 
   private static final String PATCH_SUFFIX = ".patch";
 
@@ -162,7 +162,7 @@ public class IoTDBLegacyPipeReceiver {
    * @return {@link TSStatusCode#PIPESERVER_ERROR} if fail to receive or load; 
{@link
    *     TSStatusCode#SUCCESS_STATUS} if load successfully.
    * @throws TException The connection between the sender and the receiver has 
not been established
-   *     by {@link IoTDBLegacyPipeReceiver#handshake}
+   *     by {@link IoTDBLegacyPipeReceiverAgent#handshake}
    */
   public TSStatus transportPipeData(ByteBuffer buff) throws TException {
     // step1. check connection
@@ -260,7 +260,7 @@ public class IoTDBLegacyPipeReceiver {
    *     TSStatusCode#SYNC_FILE_REDIRECTION_ERROR} if startIndex needs to 
rollback because
    *     mismatched; {@link TSStatusCode#SYNC_FILE_ERROR} if fail to receive 
file.
    * @throws TException The connection between the sender and the receiver has 
not been established
-   *     by {@link IoTDBLegacyPipeReceiver#handshake}
+   *     by {@link IoTDBLegacyPipeReceiverAgent#handshake}
    */
   public TSStatus transportFile(TSyncTransportMetaInfo metaInfo, ByteBuffer 
buff)
       throws TException {
@@ -439,16 +439,4 @@ public class IoTDBLegacyPipeReceiver {
       return index;
     }
   }
-
-  ///////////////////////// singleton /////////////////////////
-
-  private IoTDBLegacyPipeReceiver() {}
-
-  public static IoTDBLegacyPipeReceiver getInstance() {
-    return IoTDBSyncReceiverHolder.INSTANCE;
-  }
-
-  private static class IoTDBSyncReceiverHolder {
-    private static final IoTDBLegacyPipeReceiver INSTANCE = new 
IoTDBLegacyPipeReceiver();
-  }
 }
diff --git 
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/receiver/thrift/IoTDBThriftReceiver.java
 
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/receiver/thrift/IoTDBThriftReceiver.java
index d898355bfd1..b34b5c123da 100644
--- 
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/receiver/thrift/IoTDBThriftReceiver.java
+++ 
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/receiver/thrift/IoTDBThriftReceiver.java
@@ -19,7 +19,7 @@
 
 package org.apache.iotdb.db.pipe.receiver.thrift;
 
-import 
org.apache.iotdb.db.pipe.connector.protocol.thrift.IoTDBThriftConnectorRequestVersion;
+import 
org.apache.iotdb.db.pipe.connector.protocol.IoTDBConnectorRequestVersion;
 import org.apache.iotdb.db.queryengine.plan.analyze.IPartitionFetcher;
 import org.apache.iotdb.db.queryengine.plan.analyze.schema.ISchemaFetcher;
 import org.apache.iotdb.service.rpc.thrift.TPipeTransferReq;
@@ -27,7 +27,7 @@ import org.apache.iotdb.service.rpc.thrift.TPipeTransferResp;
 
 public interface IoTDBThriftReceiver {
 
-  IoTDBThriftConnectorRequestVersion getVersion();
+  IoTDBConnectorRequestVersion getVersion();
 
   TPipeTransferResp receive(
       TPipeTransferReq req, IPartitionFetcher partitionFetcher, ISchemaFetcher 
schemaFetcher);
diff --git 
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/agent/receiver/PipeReceiverAgent.java
 
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/receiver/thrift/IoTDBThriftReceiverAgent.java
similarity index 68%
copy from 
iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/agent/receiver/PipeReceiverAgent.java
copy to 
iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/receiver/thrift/IoTDBThriftReceiverAgent.java
index 5b5d0b2bd42..848e31d3400 100644
--- 
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/agent/receiver/PipeReceiverAgent.java
+++ 
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/receiver/thrift/IoTDBThriftReceiverAgent.java
@@ -17,12 +17,9 @@
  * under the License.
  */
 
-package org.apache.iotdb.db.pipe.agent.receiver;
+package org.apache.iotdb.db.pipe.receiver.thrift;
 
-import org.apache.iotdb.db.conf.IoTDBDescriptor;
-import 
org.apache.iotdb.db.pipe.connector.protocol.thrift.IoTDBThriftConnectorRequestVersion;
-import org.apache.iotdb.db.pipe.receiver.thrift.IoTDBThriftReceiver;
-import org.apache.iotdb.db.pipe.receiver.thrift.IoTDBThriftReceiverV1;
+import 
org.apache.iotdb.db.pipe.connector.protocol.IoTDBConnectorRequestVersion;
 import org.apache.iotdb.db.queryengine.plan.analyze.IPartitionFetcher;
 import org.apache.iotdb.db.queryengine.plan.analyze.schema.ISchemaFetcher;
 import org.apache.iotdb.rpc.RpcUtils;
@@ -30,23 +27,19 @@ import org.apache.iotdb.rpc.TSStatusCode;
 import org.apache.iotdb.service.rpc.thrift.TPipeTransferReq;
 import org.apache.iotdb.service.rpc.thrift.TPipeTransferResp;
 
-import org.apache.commons.io.FileUtils;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
-import java.io.File;
-import java.io.IOException;
+public class IoTDBThriftReceiverAgent {
 
-public class PipeReceiverAgent {
-
-  private static final Logger LOGGER = 
LoggerFactory.getLogger(PipeReceiverAgent.class);
+  private static final Logger LOGGER = 
LoggerFactory.getLogger(IoTDBThriftReceiverAgent.class);
 
   private final ThreadLocal<IoTDBThriftReceiver> receiverThreadLocal = new 
ThreadLocal<>();
 
   public TPipeTransferResp receive(
       TPipeTransferReq req, IPartitionFetcher partitionFetcher, ISchemaFetcher 
schemaFetcher) {
     final byte reqVersion = req.getVersion();
-    if (reqVersion == 
IoTDBThriftConnectorRequestVersion.VERSION_1.getVersion()) {
+    if (reqVersion == IoTDBConnectorRequestVersion.VERSION_1.getVersion()) {
       return getReceiver(reqVersion).receive(req, partitionFetcher, 
schemaFetcher);
     } else {
       return new TPipeTransferResp(
@@ -77,7 +70,7 @@ public class PipeReceiverAgent {
   }
 
   private IoTDBThriftReceiver setAndGetReceiver(byte reqVersion) {
-    if (reqVersion == 
IoTDBThriftConnectorRequestVersion.VERSION_1.getVersion()) {
+    if (reqVersion == IoTDBConnectorRequestVersion.VERSION_1.getVersion()) {
       receiverThreadLocal.set(new IoTDBThriftReceiverV1());
     } else {
       throw new UnsupportedOperationException(
@@ -93,23 +86,4 @@ public class PipeReceiverAgent {
       receiverThreadLocal.remove();
     }
   }
-
-  public void cleanPipeReceiverDir() {
-    final File receiverFileDir =
-        new 
File(IoTDBDescriptor.getInstance().getConfig().getPipeReceiverFileDir());
-
-    try {
-      FileUtils.deleteDirectory(receiverFileDir);
-      LOGGER.info("Clean pipe receiver dir {} successfully.", receiverFileDir);
-    } catch (Exception e) {
-      LOGGER.warn("Clean pipe receiver dir {} failed.", receiverFileDir, e);
-    }
-
-    try {
-      FileUtils.forceMkdir(receiverFileDir);
-      LOGGER.info("Create pipe receiver dir {} successfully.", 
receiverFileDir);
-    } catch (IOException e) {
-      LOGGER.warn("Create pipe receiver dir {} failed.", receiverFileDir, e);
-    }
-  }
 }
diff --git 
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/receiver/thrift/IoTDBThriftReceiverV1.java
 
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/receiver/thrift/IoTDBThriftReceiverV1.java
index 1f79ebcdf5d..2aebbd7a3e5 100644
--- 
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/receiver/thrift/IoTDBThriftReceiverV1.java
+++ 
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/receiver/thrift/IoTDBThriftReceiverV1.java
@@ -23,6 +23,7 @@ import org.apache.iotdb.common.rpc.thrift.TSStatus;
 import org.apache.iotdb.commons.conf.CommonDescriptor;
 import org.apache.iotdb.db.conf.IoTDBConfig;
 import org.apache.iotdb.db.conf.IoTDBDescriptor;
+import 
org.apache.iotdb.db.pipe.connector.payload.airgap.AirGapPseudoTPipeTransferRequest;
 import org.apache.iotdb.db.pipe.connector.payload.evolvable.PipeRequestType;
 import 
org.apache.iotdb.db.pipe.connector.payload.evolvable.reponse.PipeTransferFilePieceResp;
 import 
org.apache.iotdb.db.pipe.connector.payload.evolvable.request.PipeTransferFilePieceReq;
@@ -30,7 +31,7 @@ import 
org.apache.iotdb.db.pipe.connector.payload.evolvable.request.PipeTransfer
 import 
org.apache.iotdb.db.pipe.connector.payload.evolvable.request.PipeTransferHandshakeReq;
 import 
org.apache.iotdb.db.pipe.connector.payload.evolvable.request.PipeTransferInsertNodeReq;
 import 
org.apache.iotdb.db.pipe.connector.payload.evolvable.request.PipeTransferTabletReq;
-import 
org.apache.iotdb.db.pipe.connector.protocol.thrift.IoTDBThriftConnectorRequestVersion;
+import 
org.apache.iotdb.db.pipe.connector.protocol.IoTDBConnectorRequestVersion;
 import org.apache.iotdb.db.protocol.session.SessionManager;
 import org.apache.iotdb.db.queryengine.plan.Coordinator;
 import org.apache.iotdb.db.queryengine.plan.analyze.IPartitionFetcher;
@@ -84,7 +85,9 @@ public class IoTDBThriftReceiverV1 implements 
IoTDBThriftReceiver {
           return handleTransferTablet(
               PipeTransferTabletReq.fromTPipeTransferReq(req), 
partitionFetcher, schemaFetcher);
         case TRANSFER_FILE_PIECE:
-          return 
handleTransferFilePiece(PipeTransferFilePieceReq.fromTPipeTransferReq(req));
+          return handleTransferFilePiece(
+              PipeTransferFilePieceReq.fromTPipeTransferReq(req),
+              req instanceof AirGapPseudoTPipeTransferRequest);
         case TRANSFER_FILE_SEAL:
           return handleTransferFileSeal(
               PipeTransferFileSealReq.fromTPipeTransferReq(req), 
partitionFetcher, schemaFetcher);
@@ -181,11 +184,20 @@ public class IoTDBThriftReceiverV1 implements 
IoTDBThriftReceiver {
             : executeStatement(statement, partitionFetcher, schemaFetcher));
   }
 
-  private TPipeTransferResp handleTransferFilePiece(PipeTransferFilePieceReq 
req) {
+  private TPipeTransferResp handleTransferFilePiece(
+      PipeTransferFilePieceReq req, boolean isRequestThroughAirGap) {
     try {
       updateWritingFileIfNeeded(req.getFileName());
 
       if (!isWritingFileOffsetCorrect(req.getStartWritingOffset())) {
+        if (isRequestThroughAirGap) {
+          // If the request is through air gap, the sender will resend the 
file piece from the
+          // beginning of the file.
+          // So the receiver should reset the offset of the writing file to 
the beginning of the
+          // file.
+          writingFileWriter.setLength(0);
+        }
+
         final TSStatus status =
             RpcUtils.getStatus(
                 TSStatusCode.PIPE_TRANSFER_FILE_OFFSET_RESET,
@@ -490,7 +502,7 @@ public class IoTDBThriftReceiverV1 implements 
IoTDBThriftReceiver {
   }
 
   @Override
-  public IoTDBThriftConnectorRequestVersion getVersion() {
-    return IoTDBThriftConnectorRequestVersion.VERSION_1;
+  public IoTDBConnectorRequestVersion getVersion() {
+    return IoTDBConnectorRequestVersion.VERSION_1;
   }
 }
diff --git 
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/task/subtask/connector/PipeConnectorSubtaskManager.java
 
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/task/subtask/connector/PipeConnectorSubtaskManager.java
index 6fd0d776219..fb06743874f 100644
--- 
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/task/subtask/connector/PipeConnectorSubtaskManager.java
+++ 
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/task/subtask/connector/PipeConnectorSubtaskManager.java
@@ -24,6 +24,7 @@ import 
org.apache.iotdb.commons.pipe.plugin.builtin.BuiltinPipePlugin;
 import org.apache.iotdb.db.pipe.agent.PipeAgent;
 import org.apache.iotdb.db.pipe.config.constant.PipeConnectorConstant;
 import 
org.apache.iotdb.db.pipe.config.plugin.configuraion.PipeTaskRuntimeConfiguration;
+import org.apache.iotdb.db.pipe.connector.protocol.airgap.IoTDBAirGapConnector;
 import 
org.apache.iotdb.db.pipe.connector.protocol.legacy.IoTDBLegacyPipeConnector;
 import 
org.apache.iotdb.db.pipe.connector.protocol.thrift.async.IoTDBThriftAsyncConnector;
 import 
org.apache.iotdb.db.pipe.connector.protocol.thrift.sync.IoTDBThriftSyncConnector;
@@ -74,6 +75,9 @@ public class PipeConnectorSubtaskManager {
       } else if (connectorKey.equals(
           BuiltinPipePlugin.IOTDB_LEGACY_PIPE_CONNECTOR.getPipePluginName())) {
         pipeConnector = new IoTDBLegacyPipeConnector();
+      } else if (connectorKey.equals(
+          BuiltinPipePlugin.IOTDB_AIR_GAP_CONNECTOR.getPipePluginName())) {
+        pipeConnector = new IoTDBAirGapConnector();
       } else {
         pipeConnector = 
PipeAgent.plugin().reflectConnector(pipeConnectorParameters);
       }
diff --git 
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/protocol/thrift/impl/ClientRPCServiceImpl.java
 
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/protocol/thrift/impl/ClientRPCServiceImpl.java
index 25e7d3db2c6..b50766b8f08 100644
--- 
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/protocol/thrift/impl/ClientRPCServiceImpl.java
+++ 
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/protocol/thrift/impl/ClientRPCServiceImpl.java
@@ -43,7 +43,6 @@ import org.apache.iotdb.db.auth.AuthorityChecker;
 import org.apache.iotdb.db.conf.IoTDBConfig;
 import org.apache.iotdb.db.conf.IoTDBDescriptor;
 import org.apache.iotdb.db.pipe.agent.PipeAgent;
-import org.apache.iotdb.db.pipe.receiver.legacy.IoTDBLegacyPipeReceiver;
 import org.apache.iotdb.db.protocol.basic.BasicOpenSessionResp;
 import org.apache.iotdb.db.protocol.session.IClientSession;
 import org.apache.iotdb.db.protocol.session.SessionManager;
@@ -2528,7 +2527,8 @@ public class ClientRPCServiceImpl implements 
IClientRPCServiceWithHandler {
 
   @Override
   public TSStatus handshake(TSyncIdentityInfo info) throws TException {
-    return IoTDBLegacyPipeReceiver.getInstance()
+    return PipeAgent.receiver()
+        .legacy()
         .handshake(
             info,
             SESSION_MANAGER.getCurrSession().getClientAddress(),
@@ -2538,17 +2538,17 @@ public class ClientRPCServiceImpl implements 
IClientRPCServiceWithHandler {
 
   @Override
   public TSStatus sendPipeData(ByteBuffer buff) throws TException {
-    return IoTDBLegacyPipeReceiver.getInstance().transportPipeData(buff);
+    return PipeAgent.receiver().legacy().transportPipeData(buff);
   }
 
   @Override
   public TSStatus sendFile(TSyncTransportMetaInfo metaInfo, ByteBuffer buff) 
throws TException {
-    return IoTDBLegacyPipeReceiver.getInstance().transportFile(metaInfo, buff);
+    return PipeAgent.receiver().legacy().transportFile(metaInfo, buff);
   }
 
   @Override
   public TPipeTransferResp pipeTransfer(TPipeTransferReq req) {
-    return PipeAgent.receiver().receive(req, partitionFetcher, schemaFetcher);
+    return PipeAgent.receiver().thrift().receive(req, partitionFetcher, 
schemaFetcher);
   }
 
   @Override
@@ -2688,7 +2688,7 @@ public class ClientRPCServiceImpl implements 
IClientRPCServiceWithHandler {
       TSCloseSessionReq req = new TSCloseSessionReq();
       closeSession(req);
     }
-    IoTDBLegacyPipeReceiver.getInstance().handleClientExit();
-    PipeAgent.receiver().handleClientExit();
+    PipeAgent.receiver().thrift().handleClientExit();
+    PipeAgent.receiver().legacy().handleClientExit();
   }
 }
diff --git 
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/service/DataNode.java 
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/service/DataNode.java
index 1230d9be8d2..37b4bb4a8d6 100644
--- 
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/service/DataNode.java
+++ 
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/service/DataNode.java
@@ -31,6 +31,7 @@ import org.apache.iotdb.commons.conf.CommonDescriptor;
 import org.apache.iotdb.commons.conf.IoTDBConstant;
 import org.apache.iotdb.commons.exception.StartupException;
 import org.apache.iotdb.commons.file.SystemFileFactory;
+import org.apache.iotdb.commons.pipe.config.PipeConfig;
 import org.apache.iotdb.commons.pipe.plugin.meta.PipePluginMeta;
 import org.apache.iotdb.commons.service.JMXService;
 import org.apache.iotdb.commons.service.RegisterManager;
@@ -886,6 +887,9 @@ public class DataNode implements DataNodeMBean {
     if 
(IoTDBRestServiceDescriptor.getInstance().getConfig().isEnableRestService()) {
       registerManager.register(RestService.getInstance());
     }
+    if (PipeConfig.getInstance().getPipeAirGapReceiverEnabled()) {
+      registerManager.register(PipeAgent.receiver().airGap());
+    }
   }
 
   private void deactivate() {
diff --git 
a/iotdb-core/node-commons/src/assembly/resources/conf/iotdb-common.properties 
b/iotdb-core/node-commons/src/assembly/resources/conf/iotdb-common.properties
index 22b9083f14e..d3279721947 100644
--- 
a/iotdb-core/node-commons/src/assembly/resources/conf/iotdb-common.properties
+++ 
b/iotdb-core/node-commons/src/assembly/resources/conf/iotdb-common.properties
@@ -960,6 +960,13 @@ cluster_name=defaultCluster
 # The maximum number of clients that can be used in the async connector.
 # pipe_async_connector_max_client_number=16
 
+# Whether to enable receiving pipe data through air gap.
+# The receiver can only return 0 or 1 in tcp mode to indicate whether the data 
is received successfully.
+# pipe_air_gap_receiver_enabled=false
+
+# The port for the server to receive pipe data through air gap.
+# pipe_air_gap_receiver_port=9780
+
 ####################
 ### RatisConsensus Configuration
 ####################
diff --git 
a/iotdb-core/node-commons/src/main/java/org/apache/iotdb/commons/concurrent/ThreadName.java
 
b/iotdb-core/node-commons/src/main/java/org/apache/iotdb/commons/concurrent/ThreadName.java
index 3d26b9e8ee7..ebc7a2ef511 100644
--- 
a/iotdb-core/node-commons/src/main/java/org/apache/iotdb/commons/concurrent/ThreadName.java
+++ 
b/iotdb-core/node-commons/src/main/java/org/apache/iotdb/commons/concurrent/ThreadName.java
@@ -134,6 +134,7 @@ public enum ThreadName {
   PIPE_ASYNC_CONNECTOR_CLIENT_POOL("Pipe-Async-Connector-Client-Pool"),
   PIPE_ASYNC_CONNECTOR_RETRY_TRIGGER("Pipe-Async-Connector-Retry-Trigger"),
   PIPE_WAL_RESOURCE_TTL_CHECKER("Pipe-WAL-Resource-TTL-Checker"),
+  PIPE_RECEIVER_AIR_GAP_AGENT("Pipe-Receiver-Air-Gap-Agent"),
   WINDOW_EVALUATION_SERVICE("WindowEvaluationTaskPoolManager"),
   STATEFUL_TRIGGER_INFORMATION_UPDATER("Stateful-Trigger-Information-Updater"),
   // -------------------------- JVM --------------------------
@@ -270,6 +271,7 @@ public enum ThreadName {
               PIPE_ASYNC_CONNECTOR_CLIENT_POOL,
               PIPE_ASYNC_CONNECTOR_RETRY_TRIGGER,
               PIPE_WAL_RESOURCE_TTL_CHECKER,
+              PIPE_RECEIVER_AIR_GAP_AGENT,
               WINDOW_EVALUATION_SERVICE,
               STATEFUL_TRIGGER_INFORMATION_UPDATER));
 
diff --git 
a/iotdb-core/node-commons/src/main/java/org/apache/iotdb/commons/conf/CommonConfig.java
 
b/iotdb-core/node-commons/src/main/java/org/apache/iotdb/commons/conf/CommonConfig.java
index 97932889caa..abc39f11ee6 100644
--- 
a/iotdb-core/node-commons/src/main/java/org/apache/iotdb/commons/conf/CommonConfig.java
+++ 
b/iotdb-core/node-commons/src/main/java/org/apache/iotdb/commons/conf/CommonConfig.java
@@ -180,6 +180,9 @@ public class CommonConfig {
   private long pipeMetaSyncerAutoRestartPipeCheckIntervalRound = 1;
   private boolean pipeAutoRestartEnabled = true;
 
+  private boolean pipeAirGapReceiverEnabled = false;
+  private int pipeAirGapReceiverPort = 9780;
+
   /** Whether to use persistent schema mode. */
   private String schemaEngineMode = "Memory";
 
@@ -702,6 +705,22 @@ public class CommonConfig {
         pipeSubtaskExecutorPendingQueueMaxBlockingTimeMs;
   }
 
+  public void setPipeAirGapReceiverEnabled(boolean pipeAirGapReceiverEnabled) {
+    this.pipeAirGapReceiverEnabled = pipeAirGapReceiverEnabled;
+  }
+
+  public boolean getPipeAirGapReceiverEnabled() {
+    return pipeAirGapReceiverEnabled;
+  }
+
+  public void setPipeAirGapReceiverPort(int pipeAirGapReceiverPort) {
+    this.pipeAirGapReceiverPort = pipeAirGapReceiverPort;
+  }
+
+  public int getPipeAirGapReceiverPort() {
+    return pipeAirGapReceiverPort;
+  }
+
   public String getSchemaEngineMode() {
     return schemaEngineMode;
   }
diff --git 
a/iotdb-core/node-commons/src/main/java/org/apache/iotdb/commons/conf/CommonDescriptor.java
 
b/iotdb-core/node-commons/src/main/java/org/apache/iotdb/commons/conf/CommonDescriptor.java
index 8ca5dbbd85e..0922b96452d 100644
--- 
a/iotdb-core/node-commons/src/main/java/org/apache/iotdb/commons/conf/CommonDescriptor.java
+++ 
b/iotdb-core/node-commons/src/main/java/org/apache/iotdb/commons/conf/CommonDescriptor.java
@@ -377,6 +377,17 @@ public class CommonDescriptor {
         Boolean.parseBoolean(
             properties.getProperty(
                 "pipe_auto_restart_enabled", 
String.valueOf(config.getPipeAutoRestartEnabled()))));
+
+    config.setPipeAirGapReceiverEnabled(
+        Boolean.parseBoolean(
+            properties.getProperty(
+                "pipe_air_gap_receiver_enabled",
+                Boolean.toString(config.getPipeAirGapReceiverEnabled()))));
+    config.setPipeAirGapReceiverPort(
+        Integer.parseInt(
+            properties.getProperty(
+                "pipe_air_gap_receiver_port",
+                Integer.toString(config.getPipeAirGapReceiverPort()))));
   }
 
   public void loadGlobalConfig(TGlobalConfig globalConfig) {
diff --git 
a/iotdb-core/node-commons/src/main/java/org/apache/iotdb/commons/pipe/config/PipeConfig.java
 
b/iotdb-core/node-commons/src/main/java/org/apache/iotdb/commons/pipe/config/PipeConfig.java
index 355d0851207..4b46ac5e0fa 100644
--- 
a/iotdb-core/node-commons/src/main/java/org/apache/iotdb/commons/pipe/config/PipeConfig.java
+++ 
b/iotdb-core/node-commons/src/main/java/org/apache/iotdb/commons/pipe/config/PipeConfig.java
@@ -149,6 +149,16 @@ public class PipeConfig {
     return COMMON_CONFIG.getPipeAutoRestartEnabled();
   }
 
+  /////////////////////////////// Air Gap Receiver 
///////////////////////////////
+
+  public boolean getPipeAirGapReceiverEnabled() {
+    return COMMON_CONFIG.getPipeAirGapReceiverEnabled();
+  }
+
+  public int getPipeAirGapReceiverPort() {
+    return COMMON_CONFIG.getPipeAirGapReceiverPort();
+  }
+
   /////////////////////////////// Utils ///////////////////////////////
 
   private static final Logger LOGGER = 
LoggerFactory.getLogger(PipeConfig.class);
@@ -184,6 +194,13 @@ public class PipeConfig {
     LOGGER.info("PipeConnectorReadFileBufferSize: {}", 
getPipeConnectorReadFileBufferSize());
     LOGGER.info("PipeConnectorRetryIntervalMs: {}", 
getPipeConnectorRetryIntervalMs());
     LOGGER.info("PipeConnectorPendingQueueSize: {}", 
getPipeConnectorPendingQueueSize());
+    LOGGER.info(
+        "PipeConnectorRPCThriftCompressionEnabled: {}",
+        isPipeConnectorRPCThriftCompressionEnabled());
+
+    LOGGER.info("PipeAsyncConnectorSelectorNumber: {}", 
getPipeAsyncConnectorSelectorNumber());
+    LOGGER.info("PipeAsyncConnectorCoreClientNumber: {}", 
getPipeAsyncConnectorCoreClientNumber());
+    LOGGER.info("PipeAsyncConnectorMaxClientNumber: {}", 
getPipeAsyncConnectorMaxClientNumber());
 
     LOGGER.info("SeperatedPipeHeartbeatEnabled: {}", 
isSeperatedPipeHeartbeatEnabled());
     LOGGER.info(
@@ -196,6 +213,9 @@ public class PipeConfig {
         "PipeMetaSyncerAutoRestartPipeCheckIntervalRound: {}",
         getPipeMetaSyncerAutoRestartPipeCheckIntervalRound());
     LOGGER.info("PipeAutoRestartEnabled: {}", getPipeAutoRestartEnabled());
+
+    LOGGER.info("PipeAirGapReceiverEnabled: {}", 
getPipeAirGapReceiverEnabled());
+    LOGGER.info("PipeAirGapReceiverPort: {}", getPipeAirGapReceiverPort());
   }
 
   /////////////////////////////// Singleton ///////////////////////////////
diff --git 
a/iotdb-core/node-commons/src/main/java/org/apache/iotdb/commons/pipe/plugin/builtin/BuiltinPipePlugin.java
 
b/iotdb-core/node-commons/src/main/java/org/apache/iotdb/commons/pipe/plugin/builtin/BuiltinPipePlugin.java
index a2f1795ed27..ef973b1cc30 100644
--- 
a/iotdb-core/node-commons/src/main/java/org/apache/iotdb/commons/pipe/plugin/builtin/BuiltinPipePlugin.java
+++ 
b/iotdb-core/node-commons/src/main/java/org/apache/iotdb/commons/pipe/plugin/builtin/BuiltinPipePlugin.java
@@ -20,6 +20,7 @@
 package org.apache.iotdb.commons.pipe.plugin.builtin;
 
 import 
org.apache.iotdb.commons.pipe.plugin.builtin.connector.DoNothingConnector;
+import 
org.apache.iotdb.commons.pipe.plugin.builtin.connector.IoTDBAirGapConnector;
 import 
org.apache.iotdb.commons.pipe.plugin.builtin.connector.IoTDBLegacyPipeConnector;
 import 
org.apache.iotdb.commons.pipe.plugin.builtin.connector.IoTDBThriftAsyncConnector;
 import 
org.apache.iotdb.commons.pipe.plugin.builtin.connector.IoTDBThriftConnector;
@@ -41,6 +42,7 @@ public enum BuiltinPipePlugin {
   IOTDB_THRIFT_SYNC_CONNECTOR("iotdb-thrift-sync-connector", 
IoTDBThriftSyncConnector.class),
   IOTDB_THRIFT_ASYNC_CONNECTOR("iotdb-thrift-async-connector", 
IoTDBThriftAsyncConnector.class),
   IOTDB_LEGACY_PIPE_CONNECTOR("iotdb-legacy-pipe-connector", 
IoTDBLegacyPipeConnector.class),
+  IOTDB_AIR_GAP_CONNECTOR("iotdb-air-gap-connector", 
IoTDBAirGapConnector.class),
   ;
 
   private final String pipePluginName;
diff --git 
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/connector/protocol/thrift/IoTDBThriftConnectorRequestVersion.java
 
b/iotdb-core/node-commons/src/main/java/org/apache/iotdb/commons/pipe/plugin/builtin/connector/IoTDBAirGapConnector.java
similarity index 63%
rename from 
iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/connector/protocol/thrift/IoTDBThriftConnectorRequestVersion.java
rename to 
iotdb-core/node-commons/src/main/java/org/apache/iotdb/commons/pipe/plugin/builtin/connector/IoTDBAirGapConnector.java
index 29e59abc304..81fa0b03a3d 100644
--- 
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/connector/protocol/thrift/IoTDBThriftConnectorRequestVersion.java
+++ 
b/iotdb-core/node-commons/src/main/java/org/apache/iotdb/commons/pipe/plugin/builtin/connector/IoTDBAirGapConnector.java
@@ -17,19 +17,12 @@
  * under the License.
  */
 
-package org.apache.iotdb.db.pipe.connector.protocol.thrift;
+package org.apache.iotdb.commons.pipe.plugin.builtin.connector;
 
-public enum IoTDBThriftConnectorRequestVersion {
-  VERSION_1((byte) 1),
-  ;
-
-  private final byte version;
-
-  IoTDBThriftConnectorRequestVersion(byte type) {
-    this.version = type;
-  }
-
-  public byte getVersion() {
-    return version;
-  }
-}
+/**
+ * This class is a placeholder and should not be initialized. It represents 
the IoTDB Air Gap
+ * connector. There is a real implementation in the server module but cannot 
be imported here. The
+ * pipe agent in the server module will replace this class with the real 
implementation when
+ * initializing the IoTDB Air Gap connector.
+ */
+public class IoTDBAirGapConnector extends PlaceholderConnector {}
diff --git 
a/iotdb-core/node-commons/src/main/java/org/apache/iotdb/commons/pipe/plugin/builtin/connector/IoTDBLegacyPipeConnector.java
 
b/iotdb-core/node-commons/src/main/java/org/apache/iotdb/commons/pipe/plugin/builtin/connector/IoTDBLegacyPipeConnector.java
index e527f1c72d3..59b0d12ccdd 100644
--- 
a/iotdb-core/node-commons/src/main/java/org/apache/iotdb/commons/pipe/plugin/builtin/connector/IoTDBLegacyPipeConnector.java
+++ 
b/iotdb-core/node-commons/src/main/java/org/apache/iotdb/commons/pipe/plugin/builtin/connector/IoTDBLegacyPipeConnector.java
@@ -19,62 +19,10 @@
 
 package org.apache.iotdb.commons.pipe.plugin.builtin.connector;
 
-import org.apache.iotdb.pipe.api.PipeConnector;
-import 
org.apache.iotdb.pipe.api.customizer.configuration.PipeConnectorRuntimeConfiguration;
-import org.apache.iotdb.pipe.api.customizer.parameter.PipeParameterValidator;
-import org.apache.iotdb.pipe.api.customizer.parameter.PipeParameters;
-import org.apache.iotdb.pipe.api.event.Event;
-import org.apache.iotdb.pipe.api.event.dml.insertion.TabletInsertionEvent;
-import org.apache.iotdb.pipe.api.event.dml.insertion.TsFileInsertionEvent;
-
 /**
  * This class is a placeholder and should not be initialized. It represents 
the IoTDB legacy pipe
  * connector (for IoTDB v1.1). There is a real implementation in the server 
module but cannot be
  * imported here. The pipe agent in the server module will replace this class 
with the real
  * implementation when initializing the IoTDB legacy pipe connector.
  */
-public class IoTDBLegacyPipeConnector implements PipeConnector {
-  private static final String PLACEHOLDER_ERROR_MSG =
-      "This class is a placeholder and should not be used.";
-
-  @Override
-  public void validate(PipeParameterValidator validator) {
-    throw new UnsupportedOperationException(PLACEHOLDER_ERROR_MSG);
-  }
-
-  @Override
-  public void customize(
-      PipeParameters parameters, PipeConnectorRuntimeConfiguration 
configuration) {
-    throw new UnsupportedOperationException(PLACEHOLDER_ERROR_MSG);
-  }
-
-  @Override
-  public void handshake() {
-    throw new UnsupportedOperationException(PLACEHOLDER_ERROR_MSG);
-  }
-
-  @Override
-  public void heartbeat() {
-    throw new UnsupportedOperationException(PLACEHOLDER_ERROR_MSG);
-  }
-
-  @Override
-  public void transfer(TabletInsertionEvent tabletInsertionEvent) {
-    throw new UnsupportedOperationException(PLACEHOLDER_ERROR_MSG);
-  }
-
-  @Override
-  public void transfer(TsFileInsertionEvent tsFileInsertionEvent) {
-    throw new UnsupportedOperationException(PLACEHOLDER_ERROR_MSG);
-  }
-
-  @Override
-  public void transfer(Event event) {
-    throw new UnsupportedOperationException(PLACEHOLDER_ERROR_MSG);
-  }
-
-  @Override
-  public void close() {
-    throw new UnsupportedOperationException(PLACEHOLDER_ERROR_MSG);
-  }
-}
+public class IoTDBLegacyPipeConnector extends PlaceholderConnector {}
diff --git 
a/iotdb-core/node-commons/src/main/java/org/apache/iotdb/commons/pipe/plugin/builtin/connector/IoTDBThriftConnector.java
 
b/iotdb-core/node-commons/src/main/java/org/apache/iotdb/commons/pipe/plugin/builtin/connector/IoTDBThriftConnector.java
index b3772d361d3..23f4065fe52 100644
--- 
a/iotdb-core/node-commons/src/main/java/org/apache/iotdb/commons/pipe/plugin/builtin/connector/IoTDBThriftConnector.java
+++ 
b/iotdb-core/node-commons/src/main/java/org/apache/iotdb/commons/pipe/plugin/builtin/connector/IoTDBThriftConnector.java
@@ -19,62 +19,10 @@
 
 package org.apache.iotdb.commons.pipe.plugin.builtin.connector;
 
-import org.apache.iotdb.pipe.api.PipeConnector;
-import 
org.apache.iotdb.pipe.api.customizer.configuration.PipeConnectorRuntimeConfiguration;
-import org.apache.iotdb.pipe.api.customizer.parameter.PipeParameterValidator;
-import org.apache.iotdb.pipe.api.customizer.parameter.PipeParameters;
-import org.apache.iotdb.pipe.api.event.Event;
-import org.apache.iotdb.pipe.api.event.dml.insertion.TabletInsertionEvent;
-import org.apache.iotdb.pipe.api.event.dml.insertion.TsFileInsertionEvent;
-
 /**
  * This class is a placeholder and should not be initialized. It represents 
the IoTDB Thrift
  * connector. There is a real implementation in the server module but cannot 
be imported here. The
  * pipe agent in the server module will replace this class with the real 
implementation when
  * initializing the IoTDB Thrift connector.
  */
-public class IoTDBThriftConnector implements PipeConnector {
-  private static final String PLACEHOLDER_ERROR_MSG =
-      "This class is a placeholder and should not be used.";
-
-  @Override
-  public final void validate(PipeParameterValidator validator) {
-    throw new UnsupportedOperationException(PLACEHOLDER_ERROR_MSG);
-  }
-
-  @Override
-  public final void customize(
-      PipeParameters parameters, PipeConnectorRuntimeConfiguration 
configuration) {
-    throw new UnsupportedOperationException(PLACEHOLDER_ERROR_MSG);
-  }
-
-  @Override
-  public final void handshake() {
-    throw new UnsupportedOperationException(PLACEHOLDER_ERROR_MSG);
-  }
-
-  @Override
-  public final void heartbeat() {
-    throw new UnsupportedOperationException(PLACEHOLDER_ERROR_MSG);
-  }
-
-  @Override
-  public final void transfer(TabletInsertionEvent tabletInsertionEvent) {
-    throw new UnsupportedOperationException(PLACEHOLDER_ERROR_MSG);
-  }
-
-  @Override
-  public final void transfer(TsFileInsertionEvent tsFileInsertionEvent) {
-    throw new UnsupportedOperationException(PLACEHOLDER_ERROR_MSG);
-  }
-
-  @Override
-  public final void transfer(Event event) {
-    throw new UnsupportedOperationException(PLACEHOLDER_ERROR_MSG);
-  }
-
-  @Override
-  public final void close() {
-    throw new UnsupportedOperationException(PLACEHOLDER_ERROR_MSG);
-  }
-}
+public class IoTDBThriftConnector extends PlaceholderConnector {}
diff --git 
a/iotdb-core/node-commons/src/main/java/org/apache/iotdb/commons/pipe/plugin/builtin/connector/IoTDBLegacyPipeConnector.java
 
b/iotdb-core/node-commons/src/main/java/org/apache/iotdb/commons/pipe/plugin/builtin/connector/PlaceholderConnector.java
similarity index 86%
copy from 
iotdb-core/node-commons/src/main/java/org/apache/iotdb/commons/pipe/plugin/builtin/connector/IoTDBLegacyPipeConnector.java
copy to 
iotdb-core/node-commons/src/main/java/org/apache/iotdb/commons/pipe/plugin/builtin/connector/PlaceholderConnector.java
index e527f1c72d3..dc8f4d25579 100644
--- 
a/iotdb-core/node-commons/src/main/java/org/apache/iotdb/commons/pipe/plugin/builtin/connector/IoTDBLegacyPipeConnector.java
+++ 
b/iotdb-core/node-commons/src/main/java/org/apache/iotdb/commons/pipe/plugin/builtin/connector/PlaceholderConnector.java
@@ -28,12 +28,14 @@ import 
org.apache.iotdb.pipe.api.event.dml.insertion.TabletInsertionEvent;
 import org.apache.iotdb.pipe.api.event.dml.insertion.TsFileInsertionEvent;
 
 /**
- * This class is a placeholder and should not be initialized. It represents 
the IoTDB legacy pipe
- * connector (for IoTDB v1.1). There is a real implementation in the server 
module but cannot be
- * imported here. The pipe agent in the server module will replace this class 
with the real
- * implementation when initializing the IoTDB legacy pipe connector.
+ * This class is a placeholder and should not be initialized. It represents 
the all the IoTDB pipe
+ * connectors that can not be implemented in the node-commons module. Each 
IoTDB pipe connector has
+ * a real implementation in the server module but cannot be imported here. The 
pipe agent in the
+ * server module will replace this class with the real implementation when 
initializing the IoTDB
+ * pipe connector.
  */
-public class IoTDBLegacyPipeConnector implements PipeConnector {
+public class PlaceholderConnector implements PipeConnector {
+
   private static final String PLACEHOLDER_ERROR_MSG =
       "This class is a placeholder and should not be used.";
 
diff --git 
a/iotdb-core/node-commons/src/main/java/org/apache/iotdb/commons/service/ServiceType.java
 
b/iotdb-core/node-commons/src/main/java/org/apache/iotdb/commons/service/ServiceType.java
index 5e43452898b..74be88f288c 100644
--- 
a/iotdb-core/node-commons/src/main/java/org/apache/iotdb/commons/service/ServiceType.java
+++ 
b/iotdb-core/node-commons/src/main/java/org/apache/iotdb/commons/service/ServiceType.java
@@ -26,6 +26,7 @@ public enum ServiceType {
   RPC_SERVICE("RPC ServerService", "RPCService"),
   INFLUX_SERVICE("InfluxDB Protocol Service", "InfluxDB Protocol"),
   MQTT_SERVICE("MQTTService", "MqttService"),
+  AIR_GAP_SERVICE("AirGapService", "AirGapService"),
   MONITOR_SERVICE("Monitor ServerService", "Monitor"),
   STAT_MONITOR_SERVICE("Statistics ServerService", "StatMonitorService"),
   WAL_SERVICE("WAL ServerService", "WalService"),
diff --git 
a/iotdb-core/tsfile/src/main/java/org/apache/iotdb/tsfile/utils/BytesUtils.java 
b/iotdb-core/tsfile/src/main/java/org/apache/iotdb/tsfile/utils/BytesUtils.java
index 418f7d136ab..321799d0d4a 100644
--- 
a/iotdb-core/tsfile/src/main/java/org/apache/iotdb/tsfile/utils/BytesUtils.java
+++ 
b/iotdb-core/tsfile/src/main/java/org/apache/iotdb/tsfile/utils/BytesUtils.java
@@ -688,10 +688,10 @@ public class BytesUtils {
    */
   public static byte[] subBytes(byte[] src, int start, int length) {
     if ((start + length) > src.length) {
-      return null;
+      return new byte[0];
     }
     if (length <= 0) {
-      return null;
+      return new byte[0];
     }
     byte[] result = new byte[length];
     System.arraycopy(src, start, result, 0, length);
diff --git 
a/iotdb-core/tsfile/src/main/java/org/apache/iotdb/tsfile/utils/PublicBAOS.java 
b/iotdb-core/tsfile/src/main/java/org/apache/iotdb/tsfile/utils/PublicBAOS.java
index 90c2d0f22cf..18a459a6ee0 100644
--- 
a/iotdb-core/tsfile/src/main/java/org/apache/iotdb/tsfile/utils/PublicBAOS.java
+++ 
b/iotdb-core/tsfile/src/main/java/org/apache/iotdb/tsfile/utils/PublicBAOS.java
@@ -16,6 +16,7 @@
  * specific language governing permissions and limitations
  * under the License.
  */
+
 package org.apache.iotdb.tsfile.utils;
 
 import java.io.ByteArrayOutputStream;

Reply via email to