This is an automated email from the ASF dual-hosted git repository.

rong pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/iotdb.git


The following commit(s) were added to refs/heads/master by this push:
     new e161f2528b9 [IOTDB-6001] Pipe: a non-blocking iotdb connector 
(iotdb_thrift_connector_v2) (#10174)
e161f2528b9 is described below

commit e161f2528b9208868cee692a58b34596756a82dd
Author: Steve Yurong Su <[email protected]>
AuthorDate: Fri Jun 16 10:23:54 2023 +0800

    [IOTDB-6001] Pipe: a non-blocking iotdb connector 
(iotdb_thrift_connector_v2) (#10174)
---
 .../runtime/PipeHandleMetaChangeProcedure.java     |   2 +-
 .../apache/iotdb/pipe/api/access/RowIterator.java  |  75 -----
 .../iotdb/commons/client/ClientPoolFactory.java    |  24 ++
 .../async/AsyncPipeDataTransferServiceClient.java  | 167 +++++++++++
 .../pipe/plugin/builtin/BuiltinPipePlugin.java     |   4 +
 .../builtin/connector/IoTDBThriftConnector.java    |  16 +-
 .../builtin/connector/IoTDBThriftConnectorV1.java  |  17 +-
 .../builtin/connector/IoTDBThriftConnectorV2.java  |  17 +-
 .../pipe/agent/receiver/IoTDBThriftReceiver.java   |   6 +-
 .../db/pipe/agent/receiver/PipeReceiverAgent.java  |  10 +-
 .../config/constant/PipeConnectorConstant.java     |   2 +
 ...ava => IoTDBThriftConnectorRequestVersion.java} |   7 +-
 ...orImplV1_1.java => IoTDBSyncConnectorV1_1.java} |  12 +-
 .../{ => v1}/IoTDBThriftConnectorClient.java       |   2 +-
 .../pipe/connector/v1/IoTDBThriftConnectorV1.java  |   5 +-
 .../pipe/connector/v1/IoTDBThriftReceiverV1.java   |   8 +-
 .../v1/request/PipeTransferFilePieceReq.java       |   4 +-
 .../v1/request/PipeTransferFileSealReq.java        |   4 +-
 .../v1/request/PipeTransferHandshakeReq.java       |   4 +-
 .../v1/request/PipeTransferInsertNodeReq.java      |   4 +-
 .../v1/request/PipeTransferTabletReq.java          |   6 +-
 .../pipe/connector/v2/IoTDBThriftConnectorV2.java  | 312 +++++++++++++++++++++
 ...nsferInsertNodeTabletInsertionEventHandler.java |  51 ++++
 ...ipeTransferRawTabletInsertionEventHandler.java} |  29 +-
 .../PipeTransferTabletInsertionEventHandler.java   | 131 +++++++++
 .../PipeTransferTsFileInsertionEventHandler.java   | 222 +++++++++++++++
 .../apache/iotdb/db/pipe/event/EnrichedEvent.java  |   8 +-
 .../tablet/PipeInsertNodeTabletInsertionEvent.java |   4 +-
 .../common/tsfile/PipeTsFileInsertionEvent.java    |   4 +-
 .../event/realtime/PipeRealtimeCollectEvent.java   |   8 +-
 .../task/subtask/PipeConnectorSubtaskManager.java  |  20 +-
 .../service/thrift/impl/ClientRPCServiceImpl.java  |   2 +-
 32 files changed, 1003 insertions(+), 184 deletions(-)

diff --git 
a/confignode/src/main/java/org/apache/iotdb/confignode/procedure/impl/pipe/runtime/PipeHandleMetaChangeProcedure.java
 
b/confignode/src/main/java/org/apache/iotdb/confignode/procedure/impl/pipe/runtime/PipeHandleMetaChangeProcedure.java
index 31ebb405907..435b3e0b41f 100644
--- 
a/confignode/src/main/java/org/apache/iotdb/confignode/procedure/impl/pipe/runtime/PipeHandleMetaChangeProcedure.java
+++ 
b/confignode/src/main/java/org/apache/iotdb/confignode/procedure/impl/pipe/runtime/PipeHandleMetaChangeProcedure.java
@@ -106,7 +106,7 @@ public class PipeHandleMetaChangeProcedure extends 
AbstractOperatePipeProcedureV
       final PipeMeta pipeMetaFromDataNode =
           pipeMetaMapFromDataNode.get(pipeMetaOnConfigNode.getStaticMeta());
       if (pipeMetaFromDataNode == null) {
-        LOGGER.warn(
+        LOGGER.info(
             "PipeRuntimeCoordinator meets error in updating pipeMetaKeeper, "
                 + "pipeMetaFromDataNode is null, pipeMetaOnConfigNode: {}",
             pipeMetaOnConfigNode);
diff --git 
a/iotdb-api/pipe-api/src/main/java/org/apache/iotdb/pipe/api/access/RowIterator.java
 
b/iotdb-api/pipe-api/src/main/java/org/apache/iotdb/pipe/api/access/RowIterator.java
deleted file mode 100644
index ae9849eb3cc..00000000000
--- 
a/iotdb-api/pipe-api/src/main/java/org/apache/iotdb/pipe/api/access/RowIterator.java
+++ /dev/null
@@ -1,75 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing,
- * software distributed under the License is distributed on an
- * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
- * KIND, either express or implied.  See the License for the
- * specific language governing permissions and limitations
- * under the License.
- */
-
-package org.apache.iotdb.pipe.api.access;
-
-import org.apache.iotdb.pipe.api.exception.PipeParameterNotValidException;
-import org.apache.iotdb.pipe.api.type.Type;
-import org.apache.iotdb.tsfile.read.common.Path;
-
-import java.io.IOException;
-import java.util.List;
-
-public interface RowIterator {
-
-  /**
-   * Returns {@code true} if the iteration has more rows.
-   *
-   * @return {@code true} if the iteration has more rows
-   */
-  boolean hasNextRow();
-
-  /**
-   * Returns the next row in the iteration.
-   *
-   * <p>Note that the Row instance returned by this method each time is the 
same instance. In other
-   * words, calling {@code next()} will only change the member variables 
inside the Row instance,
-   * but will not generate a new Row instance.
-   *
-   * @return the next element in the iteration
-   * @throws IOException if any I/O errors occur
-   */
-  Row next() throws IOException;
-
-  /** Resets the iteration. */
-  void reset();
-
-  /**
-   * Returns the actual column index of the given column name.
-   *
-   * @param columnName the column name in Path form
-   * @throws PipeParameterNotValidException if the given column name is not 
existed
-   * @return the actual column index of the given column name
-   */
-  int getColumnIndex(Path columnName) throws PipeParameterNotValidException;
-
-  /**
-   * Returns the column names
-   *
-   * @return the column names
-   */
-  List<Path> getColumnNames();
-
-  /**
-   * Returns the column data types
-   *
-   * @return the column data types
-   */
-  List<Type> getColumnTypes();
-}
diff --git 
a/node-commons/src/main/java/org/apache/iotdb/commons/client/ClientPoolFactory.java
 
b/node-commons/src/main/java/org/apache/iotdb/commons/client/ClientPoolFactory.java
index b156ec47851..972ffeea774 100644
--- 
a/node-commons/src/main/java/org/apache/iotdb/commons/client/ClientPoolFactory.java
+++ 
b/node-commons/src/main/java/org/apache/iotdb/commons/client/ClientPoolFactory.java
@@ -23,6 +23,7 @@ import org.apache.iotdb.common.rpc.thrift.TEndPoint;
 import org.apache.iotdb.commons.client.async.AsyncConfigNodeIServiceClient;
 import 
org.apache.iotdb.commons.client.async.AsyncDataNodeInternalServiceClient;
 import 
org.apache.iotdb.commons.client.async.AsyncDataNodeMPPDataExchangeServiceClient;
+import 
org.apache.iotdb.commons.client.async.AsyncPipeDataTransferServiceClient;
 import org.apache.iotdb.commons.client.property.ClientPoolProperty;
 import org.apache.iotdb.commons.client.property.ThriftClientProperty;
 import org.apache.iotdb.commons.client.sync.SyncConfigNodeIServiceClient;
@@ -220,4 +221,27 @@ public class ClientPoolFactory {
               .getConfig());
     }
   }
+
+  public static class AsyncPipeDataTransferServiceClientPoolFactory
+      implements IClientPoolFactory<TEndPoint, 
AsyncPipeDataTransferServiceClient> {
+
+    @Override
+    public KeyedObjectPool<TEndPoint, AsyncPipeDataTransferServiceClient> 
createClientPool(
+        ClientManager<TEndPoint, AsyncPipeDataTransferServiceClient> manager) {
+      return new GenericKeyedObjectPool<>(
+          new AsyncPipeDataTransferServiceClient.Factory(
+              manager,
+              new ThriftClientProperty.Builder()
+                  .setConnectionTimeoutMs(conf.getConnectionTimeoutInMS())
+                  
.setRpcThriftCompressionEnabled(conf.isRpcThriftCompressionEnabled())
+                  
.setSelectorNumOfAsyncClientManager(conf.getSelectorNumOfClientManager())
+                  .build(),
+              ThreadName.ASYNC_DATANODE_CLIENT_POOL.getName()),
+          new ClientPoolProperty.Builder<AsyncPipeDataTransferServiceClient>()
+              .setCoreClientNumForEachNode(conf.getCoreClientNumForEachNode())
+              .setMaxClientNumForEachNode(conf.getMaxClientNumForEachNode())
+              .build()
+              .getConfig());
+    }
+  }
 }
diff --git 
a/node-commons/src/main/java/org/apache/iotdb/commons/client/async/AsyncPipeDataTransferServiceClient.java
 
b/node-commons/src/main/java/org/apache/iotdb/commons/client/async/AsyncPipeDataTransferServiceClient.java
new file mode 100644
index 00000000000..534349460f1
--- /dev/null
+++ 
b/node-commons/src/main/java/org/apache/iotdb/commons/client/async/AsyncPipeDataTransferServiceClient.java
@@ -0,0 +1,167 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.iotdb.commons.client.async;
+
+import org.apache.iotdb.common.rpc.thrift.TEndPoint;
+import org.apache.iotdb.commons.client.ClientManager;
+import org.apache.iotdb.commons.client.ThriftClient;
+import org.apache.iotdb.commons.client.factory.AsyncThriftClientFactory;
+import org.apache.iotdb.commons.client.property.ThriftClientProperty;
+import org.apache.iotdb.rpc.TNonblockingSocketWrapper;
+import org.apache.iotdb.service.rpc.thrift.IClientRPCService;
+
+import org.apache.commons.pool2.PooledObject;
+import org.apache.commons.pool2.impl.DefaultPooledObject;
+import org.apache.thrift.async.TAsyncClientManager;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import java.io.IOException;
+import java.util.concurrent.atomic.AtomicBoolean;
+
+public class AsyncPipeDataTransferServiceClient extends 
IClientRPCService.AsyncClient
+    implements ThriftClient {
+
+  private static final Logger LOGGER =
+      LoggerFactory.getLogger(AsyncPipeDataTransferServiceClient.class);
+
+  private final boolean printLogWhenEncounterException;
+
+  private final TEndPoint endpoint;
+  private final ClientManager<TEndPoint, AsyncPipeDataTransferServiceClient> 
clientManager;
+
+  private final AtomicBoolean shouldReturnSelf = new AtomicBoolean(true);
+
+  public AsyncPipeDataTransferServiceClient(
+      ThriftClientProperty property,
+      TEndPoint endpoint,
+      TAsyncClientManager tClientManager,
+      ClientManager<TEndPoint, AsyncPipeDataTransferServiceClient> 
clientManager)
+      throws IOException {
+    super(
+        property.getProtocolFactory(),
+        tClientManager,
+        TNonblockingSocketWrapper.wrap(
+            endpoint.getIp(), endpoint.getPort(), 
property.getConnectionTimeoutMs()));
+    this.printLogWhenEncounterException = 
property.isPrintLogWhenEncounterException();
+    this.endpoint = endpoint;
+    this.clientManager = clientManager;
+  }
+
+  @Override
+  public void onComplete() {
+    super.onComplete();
+    returnSelf();
+  }
+
+  @Override
+  public void onError(Exception e) {
+    super.onError(e);
+    ThriftClient.resolveException(e, this);
+    returnSelf();
+  }
+
+  @Override
+  public void invalidate() {
+    if (!hasError()) {
+      super.onError(new Exception("This client has been invalidated"));
+    }
+  }
+
+  @Override
+  public void invalidateAll() {
+    clientManager.clear(endpoint);
+  }
+
+  @Override
+  public boolean printLogWhenEncounterException() {
+    return printLogWhenEncounterException;
+  }
+
+  /**
+   * return self, the method doesn't need to be called by the user and will be 
triggered after the
+   * RPC is finished.
+   */
+  public void returnSelf() {
+    if (shouldReturnSelf.get()) {
+      clientManager.returnClient(endpoint, this);
+    }
+  }
+
+  public void setShouldReturnSelf(boolean shouldReturnSelf) {
+    this.shouldReturnSelf.set(shouldReturnSelf);
+  }
+
+  private void close() {
+    ___transport.close();
+    ___currentMethod = null;
+  }
+
+  private boolean isReady() {
+    try {
+      checkReady();
+      return true;
+    } catch (Exception e) {
+      if (printLogWhenEncounterException) {
+        LOGGER.error("Unexpected exception occurs in {} : {}", this, 
e.getMessage());
+      }
+      return false;
+    }
+  }
+
+  @Override
+  public String toString() {
+    return String.format("AsyncPipeDataTransferServiceClient{%s}", endpoint);
+  }
+
+  public static class Factory
+      extends AsyncThriftClientFactory<TEndPoint, 
AsyncPipeDataTransferServiceClient> {
+
+    public Factory(
+        ClientManager<TEndPoint, AsyncPipeDataTransferServiceClient> 
clientManager,
+        ThriftClientProperty thriftClientProperty,
+        String threadName) {
+      super(clientManager, thriftClientProperty, threadName);
+    }
+
+    @Override
+    public void destroyObject(
+        TEndPoint endPoint, PooledObject<AsyncPipeDataTransferServiceClient> 
pooledObject) {
+      pooledObject.getObject().close();
+    }
+
+    @Override
+    public PooledObject<AsyncPipeDataTransferServiceClient> 
makeObject(TEndPoint endPoint)
+        throws Exception {
+      return new DefaultPooledObject<>(
+          new AsyncPipeDataTransferServiceClient(
+              thriftClientProperty,
+              endPoint,
+              tManagers[clientCnt.incrementAndGet() % tManagers.length],
+              clientManager));
+    }
+
+    @Override
+    public boolean validateObject(
+        TEndPoint endPoint, PooledObject<AsyncPipeDataTransferServiceClient> 
pooledObject) {
+      return pooledObject.getObject().isReady();
+    }
+  }
+}
diff --git 
a/node-commons/src/main/java/org/apache/iotdb/commons/pipe/plugin/builtin/BuiltinPipePlugin.java
 
b/node-commons/src/main/java/org/apache/iotdb/commons/pipe/plugin/builtin/BuiltinPipePlugin.java
index 8bf376138e3..e8ef0018810 100644
--- 
a/node-commons/src/main/java/org/apache/iotdb/commons/pipe/plugin/builtin/BuiltinPipePlugin.java
+++ 
b/node-commons/src/main/java/org/apache/iotdb/commons/pipe/plugin/builtin/BuiltinPipePlugin.java
@@ -23,6 +23,8 @@ import 
org.apache.iotdb.commons.pipe.plugin.builtin.collector.IoTDBCollector;
 import 
org.apache.iotdb.commons.pipe.plugin.builtin.connector.DoNothingConnector;
 import 
org.apache.iotdb.commons.pipe.plugin.builtin.connector.IoTDBSyncConnectorV1_1;
 import 
org.apache.iotdb.commons.pipe.plugin.builtin.connector.IoTDBThriftConnector;
+import 
org.apache.iotdb.commons.pipe.plugin.builtin.connector.IoTDBThriftConnectorV1;
+import 
org.apache.iotdb.commons.pipe.plugin.builtin.connector.IoTDBThriftConnectorV2;
 import 
org.apache.iotdb.commons.pipe.plugin.builtin.processor.DoNothingProcessor;
 
 public enum BuiltinPipePlugin {
@@ -36,6 +38,8 @@ public enum BuiltinPipePlugin {
   // connectors
   DO_NOTHING_CONNECTOR("do_nothing_connector", DoNothingConnector.class),
   IOTDB_THRIFT_CONNECTOR("iotdb_thrift_connector", IoTDBThriftConnector.class),
+  IOTDB_THRIFT_CONNECTOR_V1("iotdb_thrift_connector_v1", 
IoTDBThriftConnectorV1.class),
+  IOTDB_THRIFT_CONNECTOR_V2("iotdb_thrift_connector_v2", 
IoTDBThriftConnectorV2.class),
   IOTDB_SYNC_CONNECTOR_V_1_1("iotdb_sync_connector_v1.1", 
IoTDBSyncConnectorV1_1.class),
   ;
 
diff --git 
a/node-commons/src/main/java/org/apache/iotdb/commons/pipe/plugin/builtin/connector/IoTDBThriftConnector.java
 
b/node-commons/src/main/java/org/apache/iotdb/commons/pipe/plugin/builtin/connector/IoTDBThriftConnector.java
index 315e1347a6d..099bdd15557 100644
--- 
a/node-commons/src/main/java/org/apache/iotdb/commons/pipe/plugin/builtin/connector/IoTDBThriftConnector.java
+++ 
b/node-commons/src/main/java/org/apache/iotdb/commons/pipe/plugin/builtin/connector/IoTDBThriftConnector.java
@@ -36,43 +36,43 @@ import 
org.apache.iotdb.pipe.api.event.dml.insertion.TsFileInsertionEvent;
 public class IoTDBThriftConnector implements PipeConnector {
 
   @Override
-  public void validate(PipeParameterValidator validator) {
+  public final void validate(PipeParameterValidator validator) {
     throw new UnsupportedOperationException("This class is a placeholder and 
should not be used.");
   }
 
   @Override
-  public void customize(
+  public final void customize(
       PipeParameters parameters, PipeConnectorRuntimeConfiguration 
configuration) {
     throw new UnsupportedOperationException("This class is a placeholder and 
should not be used.");
   }
 
   @Override
-  public void handshake() {
+  public final void handshake() {
     throw new UnsupportedOperationException("This class is a placeholder and 
should not be used.");
   }
 
   @Override
-  public void heartbeat() {
+  public final void heartbeat() {
     throw new UnsupportedOperationException("This class is a placeholder and 
should not be used.");
   }
 
   @Override
-  public void transfer(TabletInsertionEvent tabletInsertionEvent) {
+  public final void transfer(TabletInsertionEvent tabletInsertionEvent) {
     throw new UnsupportedOperationException("This class is a placeholder and 
should not be used.");
   }
 
   @Override
-  public void transfer(TsFileInsertionEvent tsFileInsertionEvent) {
+  public final void transfer(TsFileInsertionEvent tsFileInsertionEvent) {
     throw new UnsupportedOperationException("This class is a placeholder and 
should not be used.");
   }
 
   @Override
-  public void transfer(Event event) {
+  public final void transfer(Event event) {
     throw new UnsupportedOperationException("This class is a placeholder and 
should not be used.");
   }
 
   @Override
-  public void close() {
+  public final void close() {
     throw new UnsupportedOperationException("This class is a placeholder and 
should not be used.");
   }
 }
diff --git 
a/server/src/main/java/org/apache/iotdb/db/pipe/connector/IoTDBThriftConnectorVersion.java
 
b/node-commons/src/main/java/org/apache/iotdb/commons/pipe/plugin/builtin/connector/IoTDBThriftConnectorV1.java
similarity index 74%
copy from 
server/src/main/java/org/apache/iotdb/db/pipe/connector/IoTDBThriftConnectorVersion.java
copy to 
node-commons/src/main/java/org/apache/iotdb/commons/pipe/plugin/builtin/connector/IoTDBThriftConnectorV1.java
index 7dac858ae3f..e708767aac6 100644
--- 
a/server/src/main/java/org/apache/iotdb/db/pipe/connector/IoTDBThriftConnectorVersion.java
+++ 
b/node-commons/src/main/java/org/apache/iotdb/commons/pipe/plugin/builtin/connector/IoTDBThriftConnectorV1.java
@@ -17,19 +17,6 @@
  * under the License.
  */
 
-package org.apache.iotdb.db.pipe.connector;
+package org.apache.iotdb.commons.pipe.plugin.builtin.connector;
 
-public enum IoTDBThriftConnectorVersion {
-  VERSION_ONE((byte) 1),
-  ;
-
-  private final byte version;
-
-  IoTDBThriftConnectorVersion(byte type) {
-    this.version = type;
-  }
-
-  public byte getVersion() {
-    return version;
-  }
-}
+public class IoTDBThriftConnectorV1 extends IoTDBThriftConnector {}
diff --git 
a/server/src/main/java/org/apache/iotdb/db/pipe/connector/IoTDBThriftConnectorVersion.java
 
b/node-commons/src/main/java/org/apache/iotdb/commons/pipe/plugin/builtin/connector/IoTDBThriftConnectorV2.java
similarity index 74%
copy from 
server/src/main/java/org/apache/iotdb/db/pipe/connector/IoTDBThriftConnectorVersion.java
copy to 
node-commons/src/main/java/org/apache/iotdb/commons/pipe/plugin/builtin/connector/IoTDBThriftConnectorV2.java
index 7dac858ae3f..c5aa3710ad2 100644
--- 
a/server/src/main/java/org/apache/iotdb/db/pipe/connector/IoTDBThriftConnectorVersion.java
+++ 
b/node-commons/src/main/java/org/apache/iotdb/commons/pipe/plugin/builtin/connector/IoTDBThriftConnectorV2.java
@@ -17,19 +17,6 @@
  * under the License.
  */
 
-package org.apache.iotdb.db.pipe.connector;
+package org.apache.iotdb.commons.pipe.plugin.builtin.connector;
 
-public enum IoTDBThriftConnectorVersion {
-  VERSION_ONE((byte) 1),
-  ;
-
-  private final byte version;
-
-  IoTDBThriftConnectorVersion(byte type) {
-    this.version = type;
-  }
-
-  public byte getVersion() {
-    return version;
-  }
-}
+public class IoTDBThriftConnectorV2 extends IoTDBThriftConnector {}
diff --git 
a/server/src/main/java/org/apache/iotdb/db/pipe/agent/receiver/IoTDBThriftReceiver.java
 
b/server/src/main/java/org/apache/iotdb/db/pipe/agent/receiver/IoTDBThriftReceiver.java
index 4230ee73a34..80b18288c3b 100644
--- 
a/server/src/main/java/org/apache/iotdb/db/pipe/agent/receiver/IoTDBThriftReceiver.java
+++ 
b/server/src/main/java/org/apache/iotdb/db/pipe/agent/receiver/IoTDBThriftReceiver.java
@@ -21,15 +21,15 @@ package org.apache.iotdb.db.pipe.agent.receiver;
 
 import org.apache.iotdb.db.mpp.plan.analyze.IPartitionFetcher;
 import org.apache.iotdb.db.mpp.plan.analyze.schema.ISchemaFetcher;
-import org.apache.iotdb.db.pipe.connector.IoTDBThriftConnectorVersion;
+import org.apache.iotdb.db.pipe.connector.IoTDBThriftConnectorRequestVersion;
 import org.apache.iotdb.service.rpc.thrift.TPipeTransferReq;
 import org.apache.iotdb.service.rpc.thrift.TPipeTransferResp;
 
 public interface IoTDBThriftReceiver {
 
-  IoTDBThriftConnectorVersion getVersion();
+  IoTDBThriftConnectorRequestVersion getVersion();
 
-  TPipeTransferResp handleTransferReq(
+  TPipeTransferResp receive(
       TPipeTransferReq req, IPartitionFetcher partitionFetcher, ISchemaFetcher 
schemaFetcher);
 
   void handleExit();
diff --git 
a/server/src/main/java/org/apache/iotdb/db/pipe/agent/receiver/PipeReceiverAgent.java
 
b/server/src/main/java/org/apache/iotdb/db/pipe/agent/receiver/PipeReceiverAgent.java
index fdeda91ea56..663e169c0bf 100644
--- 
a/server/src/main/java/org/apache/iotdb/db/pipe/agent/receiver/PipeReceiverAgent.java
+++ 
b/server/src/main/java/org/apache/iotdb/db/pipe/agent/receiver/PipeReceiverAgent.java
@@ -21,7 +21,7 @@ package org.apache.iotdb.db.pipe.agent.receiver;
 
 import org.apache.iotdb.db.mpp.plan.analyze.IPartitionFetcher;
 import org.apache.iotdb.db.mpp.plan.analyze.schema.ISchemaFetcher;
-import org.apache.iotdb.db.pipe.connector.IoTDBThriftConnectorVersion;
+import org.apache.iotdb.db.pipe.connector.IoTDBThriftConnectorRequestVersion;
 import org.apache.iotdb.db.pipe.connector.v1.IoTDBThriftReceiverV1;
 import org.apache.iotdb.rpc.RpcUtils;
 import org.apache.iotdb.rpc.TSStatusCode;
@@ -37,11 +37,11 @@ public class PipeReceiverAgent {
 
   private final ThreadLocal<IoTDBThriftReceiver> receiverThreadLocal = new 
ThreadLocal<>();
 
-  public TPipeTransferResp transfer(
+  public TPipeTransferResp receive(
       TPipeTransferReq req, IPartitionFetcher partitionFetcher, ISchemaFetcher 
schemaFetcher) {
     final byte reqVersion = req.getVersion();
-    if (reqVersion == IoTDBThriftConnectorVersion.VERSION_ONE.getVersion()) {
-      return getReceiver(reqVersion).handleTransferReq(req, partitionFetcher, 
schemaFetcher);
+    if (reqVersion == 
IoTDBThriftConnectorRequestVersion.VERSION_1.getVersion()) {
+      return getReceiver(reqVersion).receive(req, partitionFetcher, 
schemaFetcher);
     } else {
       return new TPipeTransferResp(
           RpcUtils.getStatus(
@@ -71,7 +71,7 @@ public class PipeReceiverAgent {
   }
 
   private IoTDBThriftReceiver setAndGetReceiver(byte reqVersion) {
-    if (reqVersion == IoTDBThriftConnectorVersion.VERSION_ONE.getVersion()) {
+    if (reqVersion == 
IoTDBThriftConnectorRequestVersion.VERSION_1.getVersion()) {
       receiverThreadLocal.set(new IoTDBThriftReceiverV1());
     } else {
       throw new UnsupportedOperationException(
diff --git 
a/server/src/main/java/org/apache/iotdb/db/pipe/config/constant/PipeConnectorConstant.java
 
b/server/src/main/java/org/apache/iotdb/db/pipe/config/constant/PipeConnectorConstant.java
index 7c92206de81..d0b9002d2d4 100644
--- 
a/server/src/main/java/org/apache/iotdb/db/pipe/config/constant/PipeConnectorConstant.java
+++ 
b/server/src/main/java/org/apache/iotdb/db/pipe/config/constant/PipeConnectorConstant.java
@@ -25,9 +25,11 @@ public class PipeConnectorConstant {
 
   public static final String CONNECTOR_IOTDB_IP_KEY = "connector.ip";
   public static final String CONNECTOR_IOTDB_PORT_KEY = "connector.port";
+  public static final String CONNECTOR_IOTDB_NODE_URLS_KEY = 
"connector.node-urls";
 
   public static final String CONNECTOR_IOTDB_USER_KEY = "connector.user";
   public static final String CONNECTOR_IOTDB_USER_DEFAULT_VALUE = "root";
+
   public static final String CONNECTOR_IOTDB_PASSWORD_KEY = 
"connector.password";
   public static final String CONNECTOR_IOTDB_PASSWORD_DEFAULT_VALUE = "root";
 
diff --git 
a/server/src/main/java/org/apache/iotdb/db/pipe/connector/IoTDBThriftConnectorVersion.java
 
b/server/src/main/java/org/apache/iotdb/db/pipe/connector/IoTDBThriftConnectorRequestVersion.java
similarity index 87%
rename from 
server/src/main/java/org/apache/iotdb/db/pipe/connector/IoTDBThriftConnectorVersion.java
rename to 
server/src/main/java/org/apache/iotdb/db/pipe/connector/IoTDBThriftConnectorRequestVersion.java
index 7dac858ae3f..670dd9eb6e0 100644
--- 
a/server/src/main/java/org/apache/iotdb/db/pipe/connector/IoTDBThriftConnectorVersion.java
+++ 
b/server/src/main/java/org/apache/iotdb/db/pipe/connector/IoTDBThriftConnectorRequestVersion.java
@@ -19,13 +19,14 @@
 
 package org.apache.iotdb.db.pipe.connector;
 
-public enum IoTDBThriftConnectorVersion {
-  VERSION_ONE((byte) 1),
+public enum IoTDBThriftConnectorRequestVersion {
+  VERSION_1((byte) 1),
+  VERSION_2((byte) 1),
   ;
 
   private final byte version;
 
-  IoTDBThriftConnectorVersion(byte type) {
+  IoTDBThriftConnectorRequestVersion(byte type) {
     this.version = type;
   }
 
diff --git 
a/server/src/main/java/org/apache/iotdb/db/pipe/connector/lagacy/IoTDBSyncConnectorImplV1_1.java
 
b/server/src/main/java/org/apache/iotdb/db/pipe/connector/lagacy/IoTDBSyncConnectorV1_1.java
similarity index 97%
rename from 
server/src/main/java/org/apache/iotdb/db/pipe/connector/lagacy/IoTDBSyncConnectorImplV1_1.java
rename to 
server/src/main/java/org/apache/iotdb/db/pipe/connector/lagacy/IoTDBSyncConnectorV1_1.java
index 3099cf008cc..53a45e83a87 100644
--- 
a/server/src/main/java/org/apache/iotdb/db/pipe/connector/lagacy/IoTDBSyncConnectorImplV1_1.java
+++ 
b/server/src/main/java/org/apache/iotdb/db/pipe/connector/lagacy/IoTDBSyncConnectorV1_1.java
@@ -26,7 +26,7 @@ import org.apache.iotdb.commons.conf.CommonDescriptor;
 import org.apache.iotdb.commons.conf.IoTDBConstant;
 import org.apache.iotdb.commons.exception.pipe.PipeRuntimeCriticalException;
 import org.apache.iotdb.commons.pipe.config.PipeConfig;
-import org.apache.iotdb.db.pipe.connector.IoTDBThriftConnectorClient;
+import org.apache.iotdb.db.pipe.connector.v1.IoTDBThriftConnectorClient;
 import 
org.apache.iotdb.db.pipe.event.common.tablet.PipeInsertNodeTabletInsertionEvent;
 import 
org.apache.iotdb.db.pipe.event.common.tablet.PipeRawTabletInsertionEvent;
 import org.apache.iotdb.db.pipe.event.common.tsfile.PipeTsFileInsertionEvent;
@@ -64,9 +64,9 @@ import static 
org.apache.iotdb.db.pipe.config.constant.PipeConnectorConstant.CON
 import static 
org.apache.iotdb.db.pipe.config.constant.PipeConnectorConstant.CONNECTOR_IOTDB_USER_DEFAULT_VALUE;
 import static 
org.apache.iotdb.db.pipe.config.constant.PipeConnectorConstant.CONNECTOR_IOTDB_USER_KEY;
 
-public class IoTDBSyncConnectorImplV1_1 implements PipeConnector {
+public class IoTDBSyncConnectorV1_1 implements PipeConnector {
 
-  private static final Logger LOGGER = 
LoggerFactory.getLogger(IoTDBSyncConnectorImplV1_1.class);
+  private static final Logger LOGGER = 
LoggerFactory.getLogger(IoTDBSyncConnectorV1_1.class);
 
   private static final CommonConfig COMMON_CONFIG = 
CommonDescriptor.getInstance().getConfig();
   public static final String IOTDB_SYNC_CONNECTOR_VERSION = "1.1";
@@ -163,7 +163,7 @@ public class IoTDBSyncConnectorImplV1_1 implements 
PipeConnector {
             "IoTDBSyncConnectorV1_1 only support PipeInsertNodeInsertionEvent 
and PipeTabletInsertionEvent.");
       }
     } catch (TException e) {
-      LOGGER.error(
+      LOGGER.warn(
           "Network error when transfer tablet insertion event: {}.", 
tabletInsertionEvent, e);
       // the connection may be broken, try to reconnect by catching 
PipeConnectionException
       throw new PipeConnectionException(
@@ -193,7 +193,7 @@ public class IoTDBSyncConnectorImplV1_1 implements 
PipeConnector {
     try {
       doTransfer((PipeTsFileInsertionEvent) tsFileInsertionEvent);
     } catch (TException e) {
-      LOGGER.error(
+      LOGGER.warn(
           "Network error when transfer tsFile insertion event: {}.", 
tsFileInsertionEvent, e);
       // The connection may be broken, try to reconnect by catching 
PipeConnectionException
       throw new PipeConnectionException("Network error when transfer tsFile 
insertion event.", e);
@@ -244,7 +244,7 @@ public class IoTDBSyncConnectorImplV1_1 implements 
PipeConnector {
         }
       }
     } catch (TException e) {
-      LOGGER.error(String.format("Cannot send pipe data to receiver %s:%s.", 
ipAddress, port), e);
+      LOGGER.warn(String.format("Cannot send pipe data to receiver %s:%s.", 
ipAddress, port), e);
       throw new PipeConnectionException(e.getMessage(), e);
     }
   }
diff --git 
a/server/src/main/java/org/apache/iotdb/db/pipe/connector/IoTDBThriftConnectorClient.java
 
b/server/src/main/java/org/apache/iotdb/db/pipe/connector/v1/IoTDBThriftConnectorClient.java
similarity index 97%
rename from 
server/src/main/java/org/apache/iotdb/db/pipe/connector/IoTDBThriftConnectorClient.java
rename to 
server/src/main/java/org/apache/iotdb/db/pipe/connector/v1/IoTDBThriftConnectorClient.java
index db0047372db..c8f506fdded 100644
--- 
a/server/src/main/java/org/apache/iotdb/db/pipe/connector/IoTDBThriftConnectorClient.java
+++ 
b/server/src/main/java/org/apache/iotdb/db/pipe/connector/v1/IoTDBThriftConnectorClient.java
@@ -17,7 +17,7 @@
  * under the License.
  */
 
-package org.apache.iotdb.db.pipe.connector;
+package org.apache.iotdb.db.pipe.connector.v1;
 
 import org.apache.iotdb.commons.client.ThriftClient;
 import org.apache.iotdb.commons.client.property.ThriftClientProperty;
diff --git 
a/server/src/main/java/org/apache/iotdb/db/pipe/connector/v1/IoTDBThriftConnectorV1.java
 
b/server/src/main/java/org/apache/iotdb/db/pipe/connector/v1/IoTDBThriftConnectorV1.java
index 3e1e0db312c..e863bad5dd9 100644
--- 
a/server/src/main/java/org/apache/iotdb/db/pipe/connector/v1/IoTDBThriftConnectorV1.java
+++ 
b/server/src/main/java/org/apache/iotdb/db/pipe/connector/v1/IoTDBThriftConnectorV1.java
@@ -25,7 +25,6 @@ import org.apache.iotdb.commons.conf.CommonDescriptor;
 import org.apache.iotdb.commons.pipe.config.PipeConfig;
 import org.apache.iotdb.db.conf.IoTDBConfig;
 import org.apache.iotdb.db.conf.IoTDBDescriptor;
-import org.apache.iotdb.db.pipe.connector.IoTDBThriftConnectorClient;
 import org.apache.iotdb.db.pipe.connector.v1.reponse.PipeTransferFilePieceResp;
 import org.apache.iotdb.db.pipe.connector.v1.request.PipeTransferFilePieceReq;
 import org.apache.iotdb.db.pipe.connector.v1.request.PipeTransferFileSealReq;
@@ -133,7 +132,7 @@ public class IoTDBThriftConnectorV1 implements 
PipeConnector {
             "IoTDBThriftConnectorV1 only support 
PipeInsertNodeTabletInsertionEvent and PipeRawTabletInsertionEvent.");
       }
     } catch (TException e) {
-      LOGGER.error(
+      LOGGER.warn(
           "Network error when transfer tablet insertion event: {}.", 
tabletInsertionEvent, e);
       // the connection may be broken, try to reconnect by catching 
PipeConnectionException
       throw new PipeConnectionException(
@@ -184,7 +183,7 @@ public class IoTDBThriftConnectorV1 implements 
PipeConnector {
     try {
       doTransfer((PipeTsFileInsertionEvent) tsFileInsertionEvent);
     } catch (TException e) {
-      LOGGER.error(
+      LOGGER.warn(
           "Network error when transfer tsfile insertion event: {}.", 
tsFileInsertionEvent, e);
       // the connection may be broken, try to reconnect by catching 
PipeConnectionException
       throw new PipeConnectionException(
diff --git 
a/server/src/main/java/org/apache/iotdb/db/pipe/connector/v1/IoTDBThriftReceiverV1.java
 
b/server/src/main/java/org/apache/iotdb/db/pipe/connector/v1/IoTDBThriftReceiverV1.java
index dfd6765b68d..5ab2b19c5a7 100644
--- 
a/server/src/main/java/org/apache/iotdb/db/pipe/connector/v1/IoTDBThriftReceiverV1.java
+++ 
b/server/src/main/java/org/apache/iotdb/db/pipe/connector/v1/IoTDBThriftReceiverV1.java
@@ -30,7 +30,7 @@ import org.apache.iotdb.db.mpp.plan.statement.Statement;
 import org.apache.iotdb.db.mpp.plan.statement.crud.InsertTabletStatement;
 import org.apache.iotdb.db.mpp.plan.statement.crud.LoadTsFileStatement;
 import org.apache.iotdb.db.pipe.agent.receiver.IoTDBThriftReceiver;
-import org.apache.iotdb.db.pipe.connector.IoTDBThriftConnectorVersion;
+import org.apache.iotdb.db.pipe.connector.IoTDBThriftConnectorRequestVersion;
 import org.apache.iotdb.db.pipe.connector.v1.reponse.PipeTransferFilePieceResp;
 import org.apache.iotdb.db.pipe.connector.v1.request.PipeTransferFilePieceReq;
 import org.apache.iotdb.db.pipe.connector.v1.request.PipeTransferFileSealReq;
@@ -61,7 +61,7 @@ public class IoTDBThriftReceiverV1 implements 
IoTDBThriftReceiver {
   private RandomAccessFile writingFileWriter;
 
   @Override
-  public synchronized TPipeTransferResp handleTransferReq(
+  public synchronized TPipeTransferResp receive(
       TPipeTransferReq req, IPartitionFetcher partitionFetcher, ISchemaFetcher 
schemaFetcher) {
     final short rawRequestType = req.getType();
     if (PipeRequestType.isValidatedRequestType(rawRequestType)) {
@@ -297,7 +297,7 @@ public class IoTDBThriftReceiverV1 implements 
IoTDBThriftReceiver {
   }
 
   @Override
-  public IoTDBThriftConnectorVersion getVersion() {
-    return IoTDBThriftConnectorVersion.VERSION_ONE;
+  public IoTDBThriftConnectorRequestVersion getVersion() {
+    return IoTDBThriftConnectorRequestVersion.VERSION_1;
   }
 }
diff --git 
a/server/src/main/java/org/apache/iotdb/db/pipe/connector/v1/request/PipeTransferFilePieceReq.java
 
b/server/src/main/java/org/apache/iotdb/db/pipe/connector/v1/request/PipeTransferFilePieceReq.java
index 897ee2a29e7..d26939ec54f 100644
--- 
a/server/src/main/java/org/apache/iotdb/db/pipe/connector/v1/request/PipeTransferFilePieceReq.java
+++ 
b/server/src/main/java/org/apache/iotdb/db/pipe/connector/v1/request/PipeTransferFilePieceReq.java
@@ -19,7 +19,7 @@
 
 package org.apache.iotdb.db.pipe.connector.v1.request;
 
-import org.apache.iotdb.db.pipe.connector.IoTDBThriftConnectorVersion;
+import org.apache.iotdb.db.pipe.connector.IoTDBThriftConnectorRequestVersion;
 import org.apache.iotdb.db.pipe.connector.v1.PipeRequestType;
 import org.apache.iotdb.service.rpc.thrift.TPipeTransferReq;
 import org.apache.iotdb.tsfile.utils.Binary;
@@ -58,7 +58,7 @@ public class PipeTransferFilePieceReq extends 
TPipeTransferReq {
     filePieceReq.startWritingOffset = startWritingOffset;
     filePieceReq.filePiece = filePiece;
 
-    filePieceReq.version = 
IoTDBThriftConnectorVersion.VERSION_ONE.getVersion();
+    filePieceReq.version = 
IoTDBThriftConnectorRequestVersion.VERSION_1.getVersion();
     filePieceReq.type = PipeRequestType.TRANSFER_FILE_PIECE.getType();
     try (final PublicBAOS byteArrayOutputStream = new PublicBAOS();
         final DataOutputStream outputStream = new 
DataOutputStream(byteArrayOutputStream)) {
diff --git 
a/server/src/main/java/org/apache/iotdb/db/pipe/connector/v1/request/PipeTransferFileSealReq.java
 
b/server/src/main/java/org/apache/iotdb/db/pipe/connector/v1/request/PipeTransferFileSealReq.java
index 0e73b95af07..1314e5e5ae0 100644
--- 
a/server/src/main/java/org/apache/iotdb/db/pipe/connector/v1/request/PipeTransferFileSealReq.java
+++ 
b/server/src/main/java/org/apache/iotdb/db/pipe/connector/v1/request/PipeTransferFileSealReq.java
@@ -19,7 +19,7 @@
 
 package org.apache.iotdb.db.pipe.connector.v1.request;
 
-import org.apache.iotdb.db.pipe.connector.IoTDBThriftConnectorVersion;
+import org.apache.iotdb.db.pipe.connector.IoTDBThriftConnectorRequestVersion;
 import org.apache.iotdb.db.pipe.connector.v1.PipeRequestType;
 import org.apache.iotdb.service.rpc.thrift.TPipeTransferReq;
 import org.apache.iotdb.tsfile.utils.PublicBAOS;
@@ -51,7 +51,7 @@ public class PipeTransferFileSealReq extends TPipeTransferReq 
{
     fileSealReq.fileName = fileName;
     fileSealReq.fileLength = fileLength;
 
-    fileSealReq.version = IoTDBThriftConnectorVersion.VERSION_ONE.getVersion();
+    fileSealReq.version = 
IoTDBThriftConnectorRequestVersion.VERSION_1.getVersion();
     fileSealReq.type = PipeRequestType.TRANSFER_FILE_SEAL.getType();
     try (final PublicBAOS byteArrayOutputStream = new PublicBAOS();
         final DataOutputStream outputStream = new 
DataOutputStream(byteArrayOutputStream)) {
diff --git 
a/server/src/main/java/org/apache/iotdb/db/pipe/connector/v1/request/PipeTransferHandshakeReq.java
 
b/server/src/main/java/org/apache/iotdb/db/pipe/connector/v1/request/PipeTransferHandshakeReq.java
index da03903fd79..aceb9a5877b 100644
--- 
a/server/src/main/java/org/apache/iotdb/db/pipe/connector/v1/request/PipeTransferHandshakeReq.java
+++ 
b/server/src/main/java/org/apache/iotdb/db/pipe/connector/v1/request/PipeTransferHandshakeReq.java
@@ -19,7 +19,7 @@
 
 package org.apache.iotdb.db.pipe.connector.v1.request;
 
-import org.apache.iotdb.db.pipe.connector.IoTDBThriftConnectorVersion;
+import org.apache.iotdb.db.pipe.connector.IoTDBThriftConnectorRequestVersion;
 import org.apache.iotdb.db.pipe.connector.v1.PipeRequestType;
 import org.apache.iotdb.service.rpc.thrift.TPipeTransferReq;
 import org.apache.iotdb.tsfile.utils.PublicBAOS;
@@ -45,7 +45,7 @@ public class PipeTransferHandshakeReq extends 
TPipeTransferReq {
 
     handshakeReq.timestampPrecision = timestampPrecision;
 
-    handshakeReq.version = 
IoTDBThriftConnectorVersion.VERSION_ONE.getVersion();
+    handshakeReq.version = 
IoTDBThriftConnectorRequestVersion.VERSION_1.getVersion();
     handshakeReq.type = PipeRequestType.HANDSHAKE.getType();
     try (final PublicBAOS byteArrayOutputStream = new PublicBAOS();
         final DataOutputStream outputStream = new 
DataOutputStream(byteArrayOutputStream)) {
diff --git 
a/server/src/main/java/org/apache/iotdb/db/pipe/connector/v1/request/PipeTransferInsertNodeReq.java
 
b/server/src/main/java/org/apache/iotdb/db/pipe/connector/v1/request/PipeTransferInsertNodeReq.java
index 10098a9721e..157a4200fd6 100644
--- 
a/server/src/main/java/org/apache/iotdb/db/pipe/connector/v1/request/PipeTransferInsertNodeReq.java
+++ 
b/server/src/main/java/org/apache/iotdb/db/pipe/connector/v1/request/PipeTransferInsertNodeReq.java
@@ -26,7 +26,7 @@ import 
org.apache.iotdb.db.mpp.plan.planner.plan.node.write.InsertTabletNode;
 import org.apache.iotdb.db.mpp.plan.statement.Statement;
 import org.apache.iotdb.db.mpp.plan.statement.crud.InsertRowStatement;
 import org.apache.iotdb.db.mpp.plan.statement.crud.InsertTabletStatement;
-import org.apache.iotdb.db.pipe.connector.IoTDBThriftConnectorVersion;
+import org.apache.iotdb.db.pipe.connector.IoTDBThriftConnectorRequestVersion;
 import org.apache.iotdb.db.pipe.connector.v1.PipeRequestType;
 import org.apache.iotdb.service.rpc.thrift.TPipeTransferReq;
 
@@ -83,7 +83,7 @@ public class PipeTransferInsertNodeReq extends 
TPipeTransferReq {
 
     req.insertNode = insertNode;
 
-    req.version = IoTDBThriftConnectorVersion.VERSION_ONE.getVersion();
+    req.version = IoTDBThriftConnectorRequestVersion.VERSION_1.getVersion();
     req.type = PipeRequestType.TRANSFER_INSERT_NODE.getType();
     req.body = insertNode.serializeToByteBuffer();
 
diff --git 
a/server/src/main/java/org/apache/iotdb/db/pipe/connector/v1/request/PipeTransferTabletReq.java
 
b/server/src/main/java/org/apache/iotdb/db/pipe/connector/v1/request/PipeTransferTabletReq.java
index 758013d6c72..ead686a6acf 100644
--- 
a/server/src/main/java/org/apache/iotdb/db/pipe/connector/v1/request/PipeTransferTabletReq.java
+++ 
b/server/src/main/java/org/apache/iotdb/db/pipe/connector/v1/request/PipeTransferTabletReq.java
@@ -23,7 +23,7 @@ import org.apache.iotdb.commons.exception.MetadataException;
 import org.apache.iotdb.commons.utils.PathUtils;
 import org.apache.iotdb.db.mpp.plan.parser.StatementGenerator;
 import org.apache.iotdb.db.mpp.plan.statement.crud.InsertTabletStatement;
-import org.apache.iotdb.db.pipe.connector.IoTDBThriftConnectorVersion;
+import org.apache.iotdb.db.pipe.connector.IoTDBThriftConnectorRequestVersion;
 import org.apache.iotdb.db.pipe.connector.v1.PipeRequestType;
 import org.apache.iotdb.service.rpc.thrift.TPipeTransferReq;
 import org.apache.iotdb.service.rpc.thrift.TSInsertTabletReq;
@@ -47,12 +47,12 @@ public class PipeTransferTabletReq extends TPipeTransferReq 
{
   private static final Logger LOGGER = 
LoggerFactory.getLogger(PipeTransferTabletReq.class);
   private Tablet tablet;
 
-  public static TPipeTransferReq toTPipeTransferReq(Tablet tablet) throws 
IOException {
+  public static PipeTransferTabletReq toTPipeTransferReq(Tablet tablet) throws 
IOException {
     final PipeTransferTabletReq tabletReq = new PipeTransferTabletReq();
 
     tabletReq.tablet = tablet;
 
-    tabletReq.version = IoTDBThriftConnectorVersion.VERSION_ONE.getVersion();
+    tabletReq.version = 
IoTDBThriftConnectorRequestVersion.VERSION_1.getVersion();
     tabletReq.type = PipeRequestType.TRANSFER_TABLET.getType();
     tabletReq.body = tablet.serialize();
     return tabletReq;
diff --git 
a/server/src/main/java/org/apache/iotdb/db/pipe/connector/v2/IoTDBThriftConnectorV2.java
 
b/server/src/main/java/org/apache/iotdb/db/pipe/connector/v2/IoTDBThriftConnectorV2.java
new file mode 100644
index 00000000000..e22d2620cc9
--- /dev/null
+++ 
b/server/src/main/java/org/apache/iotdb/db/pipe/connector/v2/IoTDBThriftConnectorV2.java
@@ -0,0 +1,312 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.iotdb.db.pipe.connector.v2;
+
+import org.apache.iotdb.common.rpc.thrift.TEndPoint;
+import org.apache.iotdb.commons.client.ClientPoolFactory;
+import org.apache.iotdb.commons.client.IClientManager;
+import 
org.apache.iotdb.commons.client.async.AsyncPipeDataTransferServiceClient;
+import org.apache.iotdb.commons.client.property.ThriftClientProperty;
+import org.apache.iotdb.commons.conf.CommonConfig;
+import org.apache.iotdb.commons.conf.CommonDescriptor;
+import org.apache.iotdb.db.conf.IoTDBConfig;
+import org.apache.iotdb.db.conf.IoTDBDescriptor;
+import org.apache.iotdb.db.pipe.connector.v1.IoTDBThriftConnectorClient;
+import org.apache.iotdb.db.pipe.connector.v1.request.PipeTransferHandshakeReq;
+import org.apache.iotdb.db.pipe.connector.v1.request.PipeTransferInsertNodeReq;
+import org.apache.iotdb.db.pipe.connector.v1.request.PipeTransferTabletReq;
+import 
org.apache.iotdb.db.pipe.connector.v2.handler.PipeTransferInsertNodeTabletInsertionEventHandler;
+import 
org.apache.iotdb.db.pipe.connector.v2.handler.PipeTransferRawTabletInsertionEventHandler;
+import 
org.apache.iotdb.db.pipe.connector.v2.handler.PipeTransferTsFileInsertionEventHandler;
+import org.apache.iotdb.db.pipe.event.EnrichedEvent;
+import 
org.apache.iotdb.db.pipe.event.common.tablet.PipeInsertNodeTabletInsertionEvent;
+import 
org.apache.iotdb.db.pipe.event.common.tablet.PipeRawTabletInsertionEvent;
+import org.apache.iotdb.db.pipe.event.common.tsfile.PipeTsFileInsertionEvent;
+import org.apache.iotdb.pipe.api.PipeConnector;
+import 
org.apache.iotdb.pipe.api.customizer.configuration.PipeConnectorRuntimeConfiguration;
+import org.apache.iotdb.pipe.api.customizer.parameter.PipeParameterValidator;
+import org.apache.iotdb.pipe.api.customizer.parameter.PipeParameters;
+import org.apache.iotdb.pipe.api.event.Event;
+import org.apache.iotdb.pipe.api.event.dml.insertion.TabletInsertionEvent;
+import org.apache.iotdb.pipe.api.event.dml.insertion.TsFileInsertionEvent;
+import org.apache.iotdb.pipe.api.exception.PipeConnectionException;
+import org.apache.iotdb.pipe.api.exception.PipeException;
+import org.apache.iotdb.rpc.TSStatusCode;
+import org.apache.iotdb.service.rpc.thrift.TPipeTransferResp;
+import org.apache.iotdb.session.util.SessionUtils;
+import org.apache.iotdb.tsfile.utils.Pair;
+
+import org.apache.commons.lang.NotImplementedException;
+import org.apache.thrift.TException;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import javax.annotation.Nullable;
+
+import java.util.Arrays;
+import java.util.Comparator;
+import java.util.List;
+import java.util.Optional;
+import java.util.PriorityQueue;
+import java.util.concurrent.atomic.AtomicBoolean;
+import java.util.concurrent.atomic.AtomicLong;
+
+import static 
org.apache.iotdb.db.pipe.config.constant.PipeConnectorConstant.CONNECTOR_IOTDB_NODE_URLS_KEY;
+
+public class IoTDBThriftConnectorV2 implements PipeConnector {
+
+  private static final Logger LOGGER = 
LoggerFactory.getLogger(IoTDBThriftConnectorV2.class);
+
+  private static final CommonConfig COMMON_CONFIG = 
CommonDescriptor.getInstance().getConfig();
+  private static final IoTDBConfig IOTDB_CONFIG = 
IoTDBDescriptor.getInstance().getConfig();
+
+  private static final IClientManager<TEndPoint, 
AsyncPipeDataTransferServiceClient>
+      ASYNC_PIPE_DATA_TRANSFER_CLIENT_MANAGER =
+          new IClientManager.Factory<TEndPoint, 
AsyncPipeDataTransferServiceClient>()
+              .createClientManager(
+                  new 
ClientPoolFactory.AsyncPipeDataTransferServiceClientPoolFactory());
+
+  private final AtomicBoolean isClosed = new AtomicBoolean(false);
+
+  private final AtomicLong commitIdGenerator = new AtomicLong(0);
+  private final AtomicLong lastCommitId = new AtomicLong(0);
+  private final PriorityQueue<Pair<Long, Runnable>> commitQueue =
+      new PriorityQueue<>(Comparator.comparing(o -> o.left));
+
+  private List<TEndPoint> nodeUrls;
+
+  public synchronized void commit(long requestCommitId, @Nullable 
EnrichedEvent enrichedEvent) {
+    commitQueue.offer(
+        new Pair<>(
+            requestCommitId,
+            () ->
+                Optional.ofNullable(enrichedEvent)
+                    .ifPresent(
+                        event ->
+                            
event.decreaseReferenceCount(IoTDBThriftConnectorV2.class.getName()))));
+
+    while (!commitQueue.isEmpty()) {
+      final Pair<Long, Runnable> committer = commitQueue.peek();
+      if (lastCommitId.get() + 1 != committer.left) {
+        break;
+      }
+
+      committer.right.run();
+      lastCommitId.incrementAndGet();
+
+      commitQueue.poll();
+    }
+  }
+
+  @Override
+  public void validate(PipeParameterValidator validator) throws Exception {
+    // node urls string should be like "localhost:6667,localhost:6668"
+    validator.validateRequiredAttribute(CONNECTOR_IOTDB_NODE_URLS_KEY);
+  }
+
+  @Override
+  public void customize(PipeParameters parameters, 
PipeConnectorRuntimeConfiguration configuration)
+      throws Exception {
+    nodeUrls =
+        SessionUtils.parseSeedNodeUrls(
+            
Arrays.asList(parameters.getString(CONNECTOR_IOTDB_NODE_URLS_KEY).split(",")));
+    if (nodeUrls.isEmpty()) {
+      throw new PipeException("Node urls is empty.");
+    }
+  }
+
+  @Override
+  public void handshake() throws Exception {
+    final TEndPoint firstNodeUrl = nodeUrls.get(0);
+    try (IoTDBThriftConnectorClient client =
+        new IoTDBThriftConnectorClient(
+            new ThriftClientProperty.Builder()
+                
.setConnectionTimeoutMs(COMMON_CONFIG.getConnectionTimeoutInMS())
+                
.setRpcThriftCompressionEnabled(COMMON_CONFIG.isRpcThriftCompressionEnabled())
+                .build(),
+            firstNodeUrl.getIp(),
+            firstNodeUrl.getPort())) {
+      final TPipeTransferResp resp =
+          client.pipeTransfer(
+              
PipeTransferHandshakeReq.toTPipeTransferReq(IOTDB_CONFIG.getTimestampPrecision()));
+      if (resp.getStatus().getCode() != 
TSStatusCode.SUCCESS_STATUS.getStatusCode()) {
+        throw new PipeException(String.format("Handshake error, result status 
%s.", resp.status));
+      }
+    } catch (TException e) {
+      LOGGER.warn(
+          String.format(
+              "Connect to receiver %s:%s error.", firstNodeUrl.getIp(), 
firstNodeUrl.getPort()),
+          e);
+      throw new PipeConnectionException(e.getMessage(), e);
+    }
+  }
+
+  @Override
+  public void heartbeat() {
+    // do nothing
+  }
+
+  @Override
+  public void transfer(TabletInsertionEvent tabletInsertionEvent) throws 
Exception {
+    if (tabletInsertionEvent instanceof PipeInsertNodeTabletInsertionEvent) {
+      final long requestCommitId = commitIdGenerator.incrementAndGet();
+      final PipeInsertNodeTabletInsertionEvent 
pipeInsertNodeTabletInsertionEvent =
+          (PipeInsertNodeTabletInsertionEvent) tabletInsertionEvent;
+      final PipeTransferInsertNodeReq pipeTransferInsertNodeReq =
+          PipeTransferInsertNodeReq.toTPipeTransferReq(
+              pipeInsertNodeTabletInsertionEvent.getInsertNode());
+      final PipeTransferInsertNodeTabletInsertionEventHandler 
pipeTransferInsertNodeReqHandler =
+          new PipeTransferInsertNodeTabletInsertionEventHandler(
+              requestCommitId, pipeInsertNodeTabletInsertionEvent, 
pipeTransferInsertNodeReq, this);
+
+      transfer(requestCommitId, pipeTransferInsertNodeReqHandler);
+    } else if (tabletInsertionEvent instanceof PipeRawTabletInsertionEvent) {
+      final long requestCommitId = commitIdGenerator.incrementAndGet();
+      final PipeRawTabletInsertionEvent pipeRawTabletInsertionEvent =
+          (PipeRawTabletInsertionEvent) tabletInsertionEvent;
+      final PipeTransferTabletReq pipeTransferTabletReq =
+          
PipeTransferTabletReq.toTPipeTransferReq(pipeRawTabletInsertionEvent.convertToTablet());
+      final PipeTransferRawTabletInsertionEventHandler 
pipeTransferTabletReqHandler =
+          new PipeTransferRawTabletInsertionEventHandler(
+              requestCommitId, pipeTransferTabletReq, this);
+
+      transfer(requestCommitId, pipeTransferTabletReqHandler);
+    } else {
+      throw new NotImplementedException(
+          "IoTDBThriftConnectorV2 only support 
PipeInsertNodeTabletInsertionEvent and PipeRawTabletInsertionEvent.");
+    }
+  }
+
+  public void transfer(
+      long requestCommitId,
+      PipeTransferInsertNodeTabletInsertionEventHandler 
pipeTransferInsertNodeReqHandler) {
+    final TEndPoint targetNodeUrl = nodeUrls.get((int) (requestCommitId % 
nodeUrls.size()));
+
+    try {
+      final AsyncPipeDataTransferServiceClient client =
+          ASYNC_PIPE_DATA_TRANSFER_CLIENT_MANAGER.borrowClient(targetNodeUrl);
+
+      try {
+        pipeTransferInsertNodeReqHandler.transfer(client);
+      } catch (TException e) {
+        LOGGER.warn(
+            String.format(
+                "Transfer insert node to receiver %s:%s error, retrying...",
+                targetNodeUrl.getIp(), targetNodeUrl.getPort()),
+            e);
+      }
+    } catch (Exception ex) {
+      pipeTransferInsertNodeReqHandler.onError(ex);
+      LOGGER.warn(
+          String.format(
+              "Failed to borrow client from client pool for receiver %s:%s.",
+              targetNodeUrl.getIp(), targetNodeUrl.getPort()),
+          ex);
+    }
+  }
+
+  public void transfer(
+      long requestCommitId,
+      PipeTransferRawTabletInsertionEventHandler pipeTransferTabletReqHandler) 
{
+    final TEndPoint targetNodeUrl = nodeUrls.get((int) (requestCommitId % 
nodeUrls.size()));
+
+    try {
+      final AsyncPipeDataTransferServiceClient client =
+          ASYNC_PIPE_DATA_TRANSFER_CLIENT_MANAGER.borrowClient(targetNodeUrl);
+
+      try {
+        pipeTransferTabletReqHandler.transfer(client);
+      } catch (TException e) {
+        LOGGER.warn(
+            String.format(
+                "Transfer tablet to receiver %s:%s error, retrying...",
+                targetNodeUrl.getIp(), targetNodeUrl.getPort()),
+            e);
+      }
+    } catch (Exception ex) {
+      pipeTransferTabletReqHandler.onError(ex);
+      LOGGER.warn(
+          String.format(
+              "Failed to borrow client from client pool for receiver %s:%s.",
+              targetNodeUrl.getIp(), targetNodeUrl.getPort()),
+          ex);
+    }
+  }
+
+  @Override
+  public void transfer(TsFileInsertionEvent tsFileInsertionEvent) throws 
Exception {
+    if (!(tsFileInsertionEvent instanceof PipeTsFileInsertionEvent)) {
+      throw new NotImplementedException(
+          "IoTDBThriftConnectorV2 only support PipeTsFileInsertionEvent.");
+    }
+
+    final long requestCommitId = commitIdGenerator.incrementAndGet();
+    final PipeTsFileInsertionEvent pipeTsFileInsertionEvent =
+        (PipeTsFileInsertionEvent) tsFileInsertionEvent;
+    final PipeTransferTsFileInsertionEventHandler 
pipeTransferTsFileInsertionEventHandler =
+        new PipeTransferTsFileInsertionEventHandler(
+            requestCommitId, pipeTsFileInsertionEvent, this);
+
+    pipeTsFileInsertionEvent.waitForTsFileClose();
+    transfer(requestCommitId, pipeTransferTsFileInsertionEventHandler);
+  }
+
+  public void transfer(
+      long requestCommitId,
+      PipeTransferTsFileInsertionEventHandler 
pipeTransferTsFileInsertionEventHandler) {
+    final TEndPoint targetNodeUrl = nodeUrls.get((int) (requestCommitId % 
nodeUrls.size()));
+
+    try {
+      final AsyncPipeDataTransferServiceClient client =
+          ASYNC_PIPE_DATA_TRANSFER_CLIENT_MANAGER.borrowClient(targetNodeUrl);
+
+      try {
+        pipeTransferTsFileInsertionEventHandler.transfer(client);
+      } catch (TException e) {
+        LOGGER.warn(
+            String.format(
+                "Transfer tsfile to receiver %s:%s error, retrying...",
+                targetNodeUrl.getIp(), targetNodeUrl.getPort()),
+            e);
+      }
+    } catch (Exception ex) {
+      pipeTransferTsFileInsertionEventHandler.onError(ex);
+      LOGGER.warn(
+          String.format(
+              "Failed to borrow client from client pool for receiver %s:%s.",
+              targetNodeUrl.getIp(), targetNodeUrl.getPort()),
+          ex);
+    }
+  }
+
+  @Override
+  public void transfer(Event event) {
+    LOGGER.warn("IoTDBThriftConnectorV2 does not support transfer generic 
event: {}.", event);
+  }
+
+  @Override
+  public void close() {
+    isClosed.set(true);
+  }
+
+  public boolean isClosed() {
+    return isClosed.get();
+  }
+}
diff --git 
a/server/src/main/java/org/apache/iotdb/db/pipe/connector/v2/handler/PipeTransferInsertNodeTabletInsertionEventHandler.java
 
b/server/src/main/java/org/apache/iotdb/db/pipe/connector/v2/handler/PipeTransferInsertNodeTabletInsertionEventHandler.java
new file mode 100644
index 00000000000..c8d95c8691e
--- /dev/null
+++ 
b/server/src/main/java/org/apache/iotdb/db/pipe/connector/v2/handler/PipeTransferInsertNodeTabletInsertionEventHandler.java
@@ -0,0 +1,51 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.iotdb.db.pipe.connector.v2.handler;
+
+import 
org.apache.iotdb.commons.client.async.AsyncPipeDataTransferServiceClient;
+import org.apache.iotdb.db.pipe.connector.v2.IoTDBThriftConnectorV2;
+import org.apache.iotdb.db.pipe.event.EnrichedEvent;
+import org.apache.iotdb.service.rpc.thrift.TPipeTransferReq;
+import org.apache.iotdb.service.rpc.thrift.TPipeTransferResp;
+
+import org.apache.thrift.TException;
+import org.jetbrains.annotations.Nullable;
+
+public class PipeTransferInsertNodeTabletInsertionEventHandler
+    extends PipeTransferTabletInsertionEventHandler<TPipeTransferResp> {
+  public PipeTransferInsertNodeTabletInsertionEventHandler(
+      long requestCommitId,
+      @Nullable EnrichedEvent event,
+      TPipeTransferReq req,
+      IoTDBThriftConnectorV2 connector) {
+    super(requestCommitId, event, req, connector);
+  }
+
+  @Override
+  protected void doTransfer(AsyncPipeDataTransferServiceClient client, 
TPipeTransferReq req)
+      throws TException {
+    client.pipeTransfer(req, this);
+  }
+
+  @Override
+  protected void retryTransfer(IoTDBThriftConnectorV2 connector, long 
requestCommitId) {
+    connector.transfer(requestCommitId, this);
+  }
+}
diff --git 
a/server/src/main/java/org/apache/iotdb/db/pipe/agent/receiver/IoTDBThriftReceiver.java
 
b/server/src/main/java/org/apache/iotdb/db/pipe/connector/v2/handler/PipeTransferRawTabletInsertionEventHandler.java
similarity index 51%
copy from 
server/src/main/java/org/apache/iotdb/db/pipe/agent/receiver/IoTDBThriftReceiver.java
copy to 
server/src/main/java/org/apache/iotdb/db/pipe/connector/v2/handler/PipeTransferRawTabletInsertionEventHandler.java
index 4230ee73a34..d0096eaafc2 100644
--- 
a/server/src/main/java/org/apache/iotdb/db/pipe/agent/receiver/IoTDBThriftReceiver.java
+++ 
b/server/src/main/java/org/apache/iotdb/db/pipe/connector/v2/handler/PipeTransferRawTabletInsertionEventHandler.java
@@ -17,20 +17,31 @@
  * under the License.
  */
 
-package org.apache.iotdb.db.pipe.agent.receiver;
+package org.apache.iotdb.db.pipe.connector.v2.handler;
 
-import org.apache.iotdb.db.mpp.plan.analyze.IPartitionFetcher;
-import org.apache.iotdb.db.mpp.plan.analyze.schema.ISchemaFetcher;
-import org.apache.iotdb.db.pipe.connector.IoTDBThriftConnectorVersion;
+import 
org.apache.iotdb.commons.client.async.AsyncPipeDataTransferServiceClient;
+import org.apache.iotdb.db.pipe.connector.v2.IoTDBThriftConnectorV2;
 import org.apache.iotdb.service.rpc.thrift.TPipeTransferReq;
 import org.apache.iotdb.service.rpc.thrift.TPipeTransferResp;
 
-public interface IoTDBThriftReceiver {
+import org.apache.thrift.TException;
 
-  IoTDBThriftConnectorVersion getVersion();
+public class PipeTransferRawTabletInsertionEventHandler
+    extends PipeTransferTabletInsertionEventHandler<TPipeTransferResp> {
 
-  TPipeTransferResp handleTransferReq(
-      TPipeTransferReq req, IPartitionFetcher partitionFetcher, ISchemaFetcher 
schemaFetcher);
+  public PipeTransferRawTabletInsertionEventHandler(
+      long requestCommitId, TPipeTransferReq req, IoTDBThriftConnectorV2 
connector) {
+    super(requestCommitId, null, req, connector);
+  }
 
-  void handleExit();
+  @Override
+  protected void doTransfer(AsyncPipeDataTransferServiceClient client, 
TPipeTransferReq req)
+      throws TException {
+    client.pipeTransfer(req, this);
+  }
+
+  @Override
+  protected void retryTransfer(IoTDBThriftConnectorV2 connector, long 
requestCommitId) {
+    connector.transfer(requestCommitId, this);
+  }
 }
diff --git 
a/server/src/main/java/org/apache/iotdb/db/pipe/connector/v2/handler/PipeTransferTabletInsertionEventHandler.java
 
b/server/src/main/java/org/apache/iotdb/db/pipe/connector/v2/handler/PipeTransferTabletInsertionEventHandler.java
new file mode 100644
index 00000000000..6b150c53357
--- /dev/null
+++ 
b/server/src/main/java/org/apache/iotdb/db/pipe/connector/v2/handler/PipeTransferTabletInsertionEventHandler.java
@@ -0,0 +1,131 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.iotdb.db.pipe.connector.v2.handler;
+
+import 
org.apache.iotdb.commons.client.async.AsyncPipeDataTransferServiceClient;
+import org.apache.iotdb.commons.pipe.config.PipeConfig;
+import org.apache.iotdb.db.pipe.connector.v2.IoTDBThriftConnectorV2;
+import org.apache.iotdb.db.pipe.event.EnrichedEvent;
+import org.apache.iotdb.pipe.api.exception.PipeException;
+import org.apache.iotdb.rpc.TSStatusCode;
+import org.apache.iotdb.service.rpc.thrift.TPipeTransferReq;
+import org.apache.iotdb.service.rpc.thrift.TPipeTransferResp;
+
+import org.apache.thrift.TException;
+import org.apache.thrift.async.AsyncMethodCallback;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import javax.annotation.Nullable;
+
+import java.util.Optional;
+import java.util.concurrent.CompletableFuture;
+
+public abstract class PipeTransferTabletInsertionEventHandler<E extends 
TPipeTransferResp>
+    implements AsyncMethodCallback<E> {
+
+  private static final Logger LOGGER =
+      
LoggerFactory.getLogger(PipeTransferInsertNodeTabletInsertionEventHandler.class);
+
+  private final long requestCommitId;
+  private final EnrichedEvent event;
+  private final TPipeTransferReq req;
+
+  private final IoTDBThriftConnectorV2 connector;
+
+  private static final long MAX_RETRY_WAIT_TIME_MS =
+      (long) (PipeConfig.getInstance().getPipeConnectorRetryIntervalMs() * 
Math.pow(2, 5));
+  private int retryCount = 0;
+
+  public PipeTransferTabletInsertionEventHandler(
+      long requestCommitId,
+      @Nullable EnrichedEvent event,
+      TPipeTransferReq req,
+      IoTDBThriftConnectorV2 connector) {
+    this.requestCommitId = requestCommitId;
+    this.event = event;
+    this.req = req;
+    this.connector = connector;
+
+    Optional.ofNullable(event)
+        .ifPresent(
+            e -> 
e.increaseReferenceCount(PipeTransferTabletInsertionEventHandler.class.getName()));
+  }
+
+  public void transfer(AsyncPipeDataTransferServiceClient client) throws 
TException {
+    doTransfer(client, req);
+  }
+
+  protected abstract void doTransfer(
+      AsyncPipeDataTransferServiceClient client, TPipeTransferReq req) throws 
TException;
+
+  @Override
+  public void onComplete(TPipeTransferResp response) {
+    // just in case
+    if (response == null) {
+      onError(new PipeException("TPipeTransferResp is null"));
+      return;
+    }
+
+    if (response.getStatus().getCode() == 
TSStatusCode.SUCCESS_STATUS.getStatusCode()) {
+      connector.commit(requestCommitId, event);
+    } else {
+      onError(new PipeException(response.getStatus().getMessage()));
+    }
+  }
+
+  @Override
+  public void onError(Exception exception) {
+    ++retryCount;
+
+    CompletableFuture.runAsync(
+        () -> {
+          try {
+            Thread.sleep(
+                Math.min(
+                    (long)
+                        
(PipeConfig.getInstance().getPipeConnectorRetryIntervalMs()
+                            * Math.pow(2d, retryCount - 1d)),
+                    MAX_RETRY_WAIT_TIME_MS));
+          } catch (InterruptedException e) {
+            Thread.currentThread().interrupt();
+            LOGGER.warn("Unexpected interruption during retrying", e);
+          }
+
+          if (connector.isClosed()) {
+            LOGGER.info(
+                "IoTDBThriftConnectorV2 has been stopped, we will not retry 
this request {} after {} times",
+                req,
+                retryCount,
+                exception);
+          } else {
+            LOGGER.warn(
+                "IoTDBThriftConnectorV2 failed to transfer request {} after {} 
times, retrying...",
+                req,
+                retryCount,
+                exception);
+
+            retryTransfer(connector, requestCommitId);
+          }
+        });
+  }
+
+  protected abstract void retryTransfer(IoTDBThriftConnectorV2 connector, long 
requestCommitId);
+}
diff --git 
a/server/src/main/java/org/apache/iotdb/db/pipe/connector/v2/handler/PipeTransferTsFileInsertionEventHandler.java
 
b/server/src/main/java/org/apache/iotdb/db/pipe/connector/v2/handler/PipeTransferTsFileInsertionEventHandler.java
new file mode 100644
index 00000000000..21c244f0537
--- /dev/null
+++ 
b/server/src/main/java/org/apache/iotdb/db/pipe/connector/v2/handler/PipeTransferTsFileInsertionEventHandler.java
@@ -0,0 +1,222 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.iotdb.db.pipe.connector.v2.handler;
+
+import 
org.apache.iotdb.commons.client.async.AsyncPipeDataTransferServiceClient;
+import org.apache.iotdb.commons.pipe.config.PipeConfig;
+import org.apache.iotdb.db.pipe.connector.v1.reponse.PipeTransferFilePieceResp;
+import org.apache.iotdb.db.pipe.connector.v1.request.PipeTransferFilePieceReq;
+import org.apache.iotdb.db.pipe.connector.v1.request.PipeTransferFileSealReq;
+import org.apache.iotdb.db.pipe.connector.v2.IoTDBThriftConnectorV2;
+import org.apache.iotdb.db.pipe.event.common.tsfile.PipeTsFileInsertionEvent;
+import org.apache.iotdb.pipe.api.exception.PipeException;
+import org.apache.iotdb.rpc.TSStatusCode;
+import org.apache.iotdb.service.rpc.thrift.TPipeTransferResp;
+
+import org.apache.thrift.TException;
+import org.apache.thrift.async.AsyncMethodCallback;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import java.io.File;
+import java.io.FileNotFoundException;
+import java.io.IOException;
+import java.io.RandomAccessFile;
+import java.util.Arrays;
+import java.util.concurrent.CompletableFuture;
+import java.util.concurrent.atomic.AtomicBoolean;
+
+public class PipeTransferTsFileInsertionEventHandler
+    implements AsyncMethodCallback<TPipeTransferResp> {
+
+  private static final Logger LOGGER =
+      LoggerFactory.getLogger(PipeTransferTsFileInsertionEventHandler.class);
+
+  private final long requestCommitId;
+  private final PipeTsFileInsertionEvent event;
+  private final IoTDBThriftConnectorV2 connector;
+
+  private final File tsFile;
+  private final int readFileBufferSize;
+  private final byte[] readBuffer;
+  private long position;
+
+  private RandomAccessFile reader;
+
+  private final AtomicBoolean isSealSignalSent;
+
+  private AsyncPipeDataTransferServiceClient client;
+
+  private static final long MAX_RETRY_WAIT_TIME_MS =
+      (long) (PipeConfig.getInstance().getPipeConnectorRetryIntervalMs() * 
Math.pow(2, 5));
+  private int retryCount = 0;
+
+  public PipeTransferTsFileInsertionEventHandler(
+      long requestCommitId, PipeTsFileInsertionEvent event, 
IoTDBThriftConnectorV2 connector)
+      throws FileNotFoundException {
+    this.requestCommitId = requestCommitId;
+    this.event = event;
+    this.connector = connector;
+
+    tsFile = event.getTsFile();
+    readFileBufferSize = 
PipeConfig.getInstance().getPipeConnectorReadFileBufferSize();
+    readBuffer = new byte[readFileBufferSize];
+    position = 0;
+
+    reader = new RandomAccessFile(tsFile, "r");
+
+    isSealSignalSent = new AtomicBoolean(false);
+
+    
event.increaseReferenceCount(PipeTransferTabletInsertionEventHandler.class.getName());
+  }
+
+  public void transfer(AsyncPipeDataTransferServiceClient client) throws 
TException, IOException {
+    this.client = client;
+    client.setShouldReturnSelf(false);
+
+    final int readLength = reader.read(readBuffer);
+
+    if (readLength == -1) {
+      isSealSignalSent.set(true);
+      client.pipeTransfer(
+          PipeTransferFileSealReq.toTPipeTransferReq(tsFile.getName(), 
tsFile.length()), this);
+      return;
+    }
+
+    client.pipeTransfer(
+        PipeTransferFilePieceReq.toTPipeTransferReq(
+            tsFile.getName(),
+            position,
+            readLength == readFileBufferSize
+                ? readBuffer
+                : Arrays.copyOfRange(readBuffer, 0, readLength)),
+        this);
+    position += readLength;
+  }
+
+  @Override
+  public void onComplete(TPipeTransferResp response) {
+    if (isSealSignalSent.get()) {
+      if (response.getStatus().getCode() != 
TSStatusCode.SUCCESS_STATUS.getStatusCode()) {
+        onError(
+            new PipeException(
+                String.format(
+                    "Seal file %s error, result status %s.", tsFile, 
response.getStatus())));
+        return;
+      }
+
+      try {
+        if (reader != null) {
+          reader.close();
+        }
+      } catch (IOException e) {
+        LOGGER.warn("Failed to close file reader.", e);
+      } finally {
+        if (client != null) {
+          client.setShouldReturnSelf(true);
+          client.returnSelf();
+        }
+
+        connector.commit(requestCommitId, event);
+      }
+      return;
+    }
+
+    // if the isSealSignalSent is false, then the response must be a 
PipeTransferFilePieceResp
+    try {
+      final PipeTransferFilePieceResp resp =
+          PipeTransferFilePieceResp.fromTPipeTransferResp(response);
+
+      // this case only happens when the connection is broken, and the 
connector is reconnected
+      // to the receiver, then the receiver will redirect the file position to 
the last position
+      final long code = resp.getStatus().getCode();
+
+      if (code == 
TSStatusCode.PIPE_TRANSFER_FILE_OFFSET_RESET.getStatusCode()) {
+        position = resp.getEndWritingOffset();
+        reader.seek(position);
+        LOGGER.info(String.format("Redirect file position to %s.", position));
+      } else if (code != TSStatusCode.SUCCESS_STATUS.getStatusCode()) {
+        throw new PipeException(
+            String.format("Transfer file %s error, result status %s.", tsFile, 
resp.getStatus()));
+      }
+
+      transfer(client);
+    } catch (Exception e) {
+      onError(e);
+    }
+  }
+
+  @Override
+  public void onError(Exception exception) {
+    try {
+      if (reader != null) {
+        reader.close();
+      }
+    } catch (IOException e) {
+      LOGGER.warn("Failed to close file reader.", e);
+    } finally {
+      if (client != null) {
+        client.setShouldReturnSelf(true);
+        client.returnSelf();
+      }
+    }
+
+    ++retryCount;
+
+    CompletableFuture.runAsync(
+        () -> {
+          try {
+            Thread.sleep(
+                Math.min(
+                    (long)
+                        
(PipeConfig.getInstance().getPipeConnectorRetryIntervalMs()
+                            * Math.pow(2d, retryCount - 1d)),
+                    MAX_RETRY_WAIT_TIME_MS));
+          } catch (InterruptedException e) {
+            Thread.currentThread().interrupt();
+            LOGGER.warn("Unexpected interruption during retrying", e);
+          }
+
+          if (connector.isClosed()) {
+            LOGGER.info(
+                "IoTDBThriftConnectorV2 has been stopped, we will not retry to 
transfer tsfile {}.",
+                tsFile,
+                exception);
+          } else {
+            LOGGER.warn(
+                "IoTDBThriftConnectorV2 failed to transfer tsfile {} after {} 
times, retrying...",
+                tsFile,
+                retryCount,
+                exception);
+
+            try {
+              position = 0;
+              reader = new RandomAccessFile(tsFile, "r");
+              isSealSignalSent.set(false);
+
+              connector.transfer(requestCommitId, this);
+            } catch (FileNotFoundException e) {
+              LOGGER.error("Exception occurred when retrying...", e);
+              onError(e);
+            }
+          }
+        });
+  }
+}
diff --git 
a/server/src/main/java/org/apache/iotdb/db/pipe/event/EnrichedEvent.java 
b/server/src/main/java/org/apache/iotdb/db/pipe/event/EnrichedEvent.java
index 27975236127..f2df19bf2b8 100644
--- a/server/src/main/java/org/apache/iotdb/db/pipe/event/EnrichedEvent.java
+++ b/server/src/main/java/org/apache/iotdb/db/pipe/event/EnrichedEvent.java
@@ -57,7 +57,7 @@ public abstract class EnrichedEvent implements Event {
     boolean isSuccessful = true;
     synchronized (this) {
       if (referenceCount.get() == 0) {
-        isSuccessful = increaseResourceReferenceCount(holderMessage);
+        isSuccessful = internallyIncreaseResourceReferenceCount(holderMessage);
       }
       referenceCount.incrementAndGet();
     }
@@ -71,7 +71,7 @@ public abstract class EnrichedEvent implements Event {
    * @return true if the reference count is increased successfully, false if 
the event is not
    *     controlled by the invoker, which means the data stored in the event 
is not safe to use
    */
-  public abstract boolean increaseResourceReferenceCount(String holderMessage);
+  public abstract boolean internallyIncreaseResourceReferenceCount(String 
holderMessage);
 
   /**
    * Decrease the reference count of this event. If the reference count is 
decreased to 0, the event
@@ -85,7 +85,7 @@ public abstract class EnrichedEvent implements Event {
     boolean isSuccessful = true;
     synchronized (this) {
       if (referenceCount.get() == 1) {
-        isSuccessful = decreaseResourceReferenceCount(holderMessage);
+        isSuccessful = internallyDecreaseResourceReferenceCount(holderMessage);
         reportProgress();
       }
       referenceCount.decrementAndGet();
@@ -100,7 +100,7 @@ public abstract class EnrichedEvent implements Event {
    * @param holderMessage the message of the invoker
    * @return true if the reference count is decreased successfully, false 
otherwise
    */
-  public abstract boolean decreaseResourceReferenceCount(String holderMessage);
+  public abstract boolean internallyDecreaseResourceReferenceCount(String 
holderMessage);
 
   private void reportProgress() {
     if (pipeTaskMeta != null) {
diff --git 
a/server/src/main/java/org/apache/iotdb/db/pipe/event/common/tablet/PipeInsertNodeTabletInsertionEvent.java
 
b/server/src/main/java/org/apache/iotdb/db/pipe/event/common/tablet/PipeInsertNodeTabletInsertionEvent.java
index 8a3f9ca8a77..5623d7715a6 100644
--- 
a/server/src/main/java/org/apache/iotdb/db/pipe/event/common/tablet/PipeInsertNodeTabletInsertionEvent.java
+++ 
b/server/src/main/java/org/apache/iotdb/db/pipe/event/common/tablet/PipeInsertNodeTabletInsertionEvent.java
@@ -70,7 +70,7 @@ public class PipeInsertNodeTabletInsertionEvent extends 
EnrichedEvent
   /////////////////////////// EnrichedEvent ///////////////////////////
 
   @Override
-  public boolean increaseResourceReferenceCount(String holderMessage) {
+  public boolean internallyIncreaseResourceReferenceCount(String 
holderMessage) {
     try {
       PipeResourceManager.wal().pin(walEntryHandler.getMemTableId(), 
walEntryHandler);
       return true;
@@ -85,7 +85,7 @@ public class PipeInsertNodeTabletInsertionEvent extends 
EnrichedEvent
   }
 
   @Override
-  public boolean decreaseResourceReferenceCount(String holderMessage) {
+  public boolean internallyDecreaseResourceReferenceCount(String 
holderMessage) {
     try {
       PipeResourceManager.wal().unpin(walEntryHandler.getMemTableId());
       return true;
diff --git 
a/server/src/main/java/org/apache/iotdb/db/pipe/event/common/tsfile/PipeTsFileInsertionEvent.java
 
b/server/src/main/java/org/apache/iotdb/db/pipe/event/common/tsfile/PipeTsFileInsertionEvent.java
index d82313dad8d..40aaf53c1ec 100644
--- 
a/server/src/main/java/org/apache/iotdb/db/pipe/event/common/tsfile/PipeTsFileInsertionEvent.java
+++ 
b/server/src/main/java/org/apache/iotdb/db/pipe/event/common/tsfile/PipeTsFileInsertionEvent.java
@@ -107,7 +107,7 @@ public class PipeTsFileInsertionEvent extends EnrichedEvent 
implements TsFileIns
   /////////////////////////// EnrichedEvent ///////////////////////////
 
   @Override
-  public boolean increaseResourceReferenceCount(String holderMessage) {
+  public boolean internallyIncreaseResourceReferenceCount(String 
holderMessage) {
     try {
       tsFile = PipeResourceManager.file().increaseFileReference(tsFile, true);
       return true;
@@ -122,7 +122,7 @@ public class PipeTsFileInsertionEvent extends EnrichedEvent 
implements TsFileIns
   }
 
   @Override
-  public boolean decreaseResourceReferenceCount(String holderMessage) {
+  public boolean internallyDecreaseResourceReferenceCount(String 
holderMessage) {
     try {
       PipeResourceManager.file().decreaseFileReference(tsFile);
       return true;
diff --git 
a/server/src/main/java/org/apache/iotdb/db/pipe/event/realtime/PipeRealtimeCollectEvent.java
 
b/server/src/main/java/org/apache/iotdb/db/pipe/event/realtime/PipeRealtimeCollectEvent.java
index 06410b26a02..cb84d0bc11a 100644
--- 
a/server/src/main/java/org/apache/iotdb/db/pipe/event/realtime/PipeRealtimeCollectEvent.java
+++ 
b/server/src/main/java/org/apache/iotdb/db/pipe/event/realtime/PipeRealtimeCollectEvent.java
@@ -95,8 +95,8 @@ public class PipeRealtimeCollectEvent extends EnrichedEvent {
   }
 
   @Override
-  public boolean increaseResourceReferenceCount(String holderMessage) {
-    return event.increaseResourceReferenceCount(holderMessage);
+  public boolean internallyIncreaseResourceReferenceCount(String 
holderMessage) {
+    return event.internallyIncreaseResourceReferenceCount(holderMessage);
   }
 
   @Override
@@ -109,8 +109,8 @@ public class PipeRealtimeCollectEvent extends EnrichedEvent 
{
   }
 
   @Override
-  public boolean decreaseResourceReferenceCount(String holderMessage) {
-    return event.decreaseResourceReferenceCount(holderMessage);
+  public boolean internallyDecreaseResourceReferenceCount(String 
holderMessage) {
+    return event.internallyDecreaseResourceReferenceCount(holderMessage);
   }
 
   @Override
diff --git 
a/server/src/main/java/org/apache/iotdb/db/pipe/task/subtask/PipeConnectorSubtaskManager.java
 
b/server/src/main/java/org/apache/iotdb/db/pipe/task/subtask/PipeConnectorSubtaskManager.java
index fadb6a0184f..829e0bb993b 100644
--- 
a/server/src/main/java/org/apache/iotdb/db/pipe/task/subtask/PipeConnectorSubtaskManager.java
+++ 
b/server/src/main/java/org/apache/iotdb/db/pipe/task/subtask/PipeConnectorSubtaskManager.java
@@ -24,8 +24,9 @@ import 
org.apache.iotdb.commons.pipe.plugin.builtin.BuiltinPipePlugin;
 import org.apache.iotdb.db.pipe.agent.PipeAgent;
 import org.apache.iotdb.db.pipe.config.constant.PipeConnectorConstant;
 import 
org.apache.iotdb.db.pipe.config.plugin.configuraion.PipeTaskRuntimeConfiguration;
-import org.apache.iotdb.db.pipe.connector.lagacy.IoTDBSyncConnectorImplV1_1;
+import org.apache.iotdb.db.pipe.connector.lagacy.IoTDBSyncConnectorV1_1;
 import org.apache.iotdb.db.pipe.connector.v1.IoTDBThriftConnectorV1;
+import org.apache.iotdb.db.pipe.connector.v2.IoTDBThriftConnectorV2;
 import 
org.apache.iotdb.db.pipe.execution.executor.PipeConnectorSubtaskExecutor;
 import org.apache.iotdb.db.pipe.task.connection.BoundedBlockingPendingQueue;
 import org.apache.iotdb.pipe.api.PipeConnector;
@@ -61,11 +62,15 @@ public class PipeConnectorSubtaskManager {
               BuiltinPipePlugin.IOTDB_THRIFT_CONNECTOR.getPipePluginName());
 
       PipeConnector pipeConnector;
-      if 
(connectorKey.equals(BuiltinPipePlugin.IOTDB_THRIFT_CONNECTOR.getPipePluginName()))
 {
+      if 
(connectorKey.equals(BuiltinPipePlugin.IOTDB_THRIFT_CONNECTOR.getPipePluginName())
+          || 
connectorKey.equals(BuiltinPipePlugin.IOTDB_THRIFT_CONNECTOR_V1.getPipePluginName()))
 {
         pipeConnector = new IoTDBThriftConnectorV1();
+      } else if (connectorKey.equals(
+          BuiltinPipePlugin.IOTDB_THRIFT_CONNECTOR_V2.getPipePluginName())) {
+        pipeConnector = new IoTDBThriftConnectorV2();
       } else if (connectorKey.equals(
           BuiltinPipePlugin.IOTDB_SYNC_CONNECTOR_V_1_1.getPipePluginName())) {
-        pipeConnector = new IoTDBSyncConnectorImplV1_1();
+        pipeConnector = new IoTDBSyncConnectorV1_1();
       } else {
         pipeConnector = 
PipeAgent.plugin().reflectConnector(pipeConnectorParameters);
       }
@@ -126,15 +131,6 @@ public class PipeConnectorSubtaskManager {
     
attributeSortedString2SubtaskLifeCycleMap.get(attributeSortedString).stop();
   }
 
-  public PipeConnectorSubtask getPipeConnectorSubtask(String 
attributeSortedString) {
-    if 
(!attributeSortedString2SubtaskLifeCycleMap.containsKey(attributeSortedString)) 
{
-      throw new PipeException(
-          "Failed to get PipeConnectorSubtask. No such subtask: " + 
attributeSortedString);
-    }
-
-    return 
attributeSortedString2SubtaskLifeCycleMap.get(attributeSortedString).getSubtask();
-  }
-
   public BoundedBlockingPendingQueue<Event> getPipeConnectorPendingQueue(
       String attributeSortedString) {
     if 
(!attributeSortedString2SubtaskLifeCycleMap.containsKey(attributeSortedString)) 
{
diff --git 
a/server/src/main/java/org/apache/iotdb/db/service/thrift/impl/ClientRPCServiceImpl.java
 
b/server/src/main/java/org/apache/iotdb/db/service/thrift/impl/ClientRPCServiceImpl.java
index 28bec0979bd..4fbe17270f6 100644
--- 
a/server/src/main/java/org/apache/iotdb/db/service/thrift/impl/ClientRPCServiceImpl.java
+++ 
b/server/src/main/java/org/apache/iotdb/db/service/thrift/impl/ClientRPCServiceImpl.java
@@ -2111,7 +2111,7 @@ public class ClientRPCServiceImpl implements 
IClientRPCServiceWithHandler {
 
   @Override
   public TPipeTransferResp pipeTransfer(TPipeTransferReq req) {
-    return PipeAgent.receiver().transfer(req, partitionFetcher, schemaFetcher);
+    return PipeAgent.receiver().receive(req, partitionFetcher, schemaFetcher);
   }
 
   @Override

Reply via email to