This is an automated email from the ASF dual-hosted git repository.

qiaojialin pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/iotdb.git


The following commit(s) were added to refs/heads/master by this push:
     new 6517daf661 [IOTDB-3950] Transport layer optimization for sync modules 
(#7038)
6517daf661 is described below

commit 6517daf661a9124b6bab6a5f1a0d57235e00b035
Author: Chen YZ <[email protected]>
AuthorDate: Wed Aug 24 20:16:39 2022 +0800

    [IOTDB-3950] Transport layer optimization for sync modules (#7038)
---
 .../db/integration/sync/IoTDBSyncReceiverIT.java   |   6 +-
 .../db/integration/sync/IoTDBSyncSenderIT.java     |  13 +-
 ...ortClientMock.java => MockTransportClient.java} |  42 +-
 .../db/integration/sync/TransportHandlerMock.java  |  58 ---
 .../apache/iotdb/commons/sync/SyncConstant.java    |   2 +
 .../service/thrift/impl/ClientRPCServiceImpl.java  |  10 +-
 .../db/service/thrift/impl/TSServiceImpl.java      |  10 +-
 .../java/org/apache/iotdb/db/sync/SyncService.java |  49 +--
 .../iotdb/db/sync/common/ISyncInfoFetcher.java     |   2 +-
 .../iotdb/db/sync/common/LocalSyncInfoFetcher.java |   2 +-
 .../org/apache/iotdb/db/sync/common/SyncInfo.java  |   2 +-
 .../db/sync/common/persistence/SyncLogReader.java  |   2 +-
 .../db/sync/common/persistence/SyncLogWriter.java  |   2 +-
 .../manager => sender/pipe}/PipeMessage.java       |   2 +-
 .../db/sync/sender/service/TransportHandler.java   | 126 ------
 .../db/sync/transport/client/ClientWrapper.java    | 124 ------
 .../db/sync/transport/client/ITransportClient.java |  23 +-
 .../transport/client/IoTDBSinkTransportClient.java | 450 ++++++---------------
 .../db/sync/transport/client/SenderManager.java    | 120 ++++++
 .../transport/client/TransportClientFactory.java   |  68 ++++
 .../db/sync/transport/conf/TransportConfig.java    |  45 ---
 .../db/sync/transport/server/ReceiverManager.java  | 323 +++++++--------
 .../db/sync/receiver/manager/SyncInfoTest.java     |   1 +
 .../db/sync/receiver/recovery/SyncLogTest.java     |   2 +-
 .../iotdb/db/sync/transport/SyncTransportTest.java | 173 +++++++-
 .../java/org/apache/iotdb/rpc/TSStatusCode.java    |   3 +-
 thrift/src/main/thrift/client.thrift               |  17 +-
 27 files changed, 702 insertions(+), 975 deletions(-)

diff --git 
a/integration/src/test/java/org/apache/iotdb/db/integration/sync/IoTDBSyncReceiverIT.java
 
b/integration/src/test/java/org/apache/iotdb/db/integration/sync/IoTDBSyncReceiverIT.java
index 67a4ab4618..a6115d536b 100644
--- 
a/integration/src/test/java/org/apache/iotdb/db/integration/sync/IoTDBSyncReceiverIT.java
+++ 
b/integration/src/test/java/org/apache/iotdb/db/integration/sync/IoTDBSyncReceiverIT.java
@@ -166,16 +166,16 @@ public class IoTDBSyncReceiverIT {
               null));
       planList.add(new SetStorageGroupPlan(new PartialPath("root.sg1")));
       for (PhysicalPlan plan : planList) {
-        client.senderTransport(new SchemaPipeData(plan, serialNum++));
+        client.send(new SchemaPipeData(plan, serialNum++));
       }
       List<File> tsFiles = SyncTestUtil.getTsFilePaths(tmpDir);
       SyncTestUtil.renameTsFiles(tsFiles);
       for (File f : tsFiles) {
-        client.senderTransport(new TsFilePipeData(f.getPath(), serialNum++));
+        client.send(new TsFilePipeData(f.getPath(), serialNum++));
       }
       Deletion deletion = new Deletion(new PartialPath("root.vehicle.**"), 0, 
33, 38);
       PipeData pipeData = new DeletionPipeData(deletion, serialNum++);
-      client.senderTransport(pipeData);
+      client.send(pipeData);
 
       // wait collector to load pipe data
       Thread.sleep(1000);
diff --git 
a/integration/src/test/java/org/apache/iotdb/db/integration/sync/IoTDBSyncSenderIT.java
 
b/integration/src/test/java/org/apache/iotdb/db/integration/sync/IoTDBSyncSenderIT.java
index 21952d47c9..368b0ff0c8 100644
--- 
a/integration/src/test/java/org/apache/iotdb/db/integration/sync/IoTDBSyncSenderIT.java
+++ 
b/integration/src/test/java/org/apache/iotdb/db/integration/sync/IoTDBSyncSenderIT.java
@@ -22,14 +22,13 @@ import org.apache.iotdb.commons.path.PartialPath;
 import org.apache.iotdb.db.conf.IoTDBDescriptor;
 import org.apache.iotdb.db.engine.modification.Deletion;
 import org.apache.iotdb.db.qp.physical.sys.ShowPipeSinkTypePlan;
+import org.apache.iotdb.db.sync.SyncService;
 import org.apache.iotdb.db.sync.common.LocalSyncInfoFetcher;
 import org.apache.iotdb.db.sync.pipedata.DeletionPipeData;
 import org.apache.iotdb.db.sync.pipedata.PipeData;
 import org.apache.iotdb.db.sync.pipedata.SchemaPipeData;
 import org.apache.iotdb.db.sync.pipedata.TsFilePipeData;
 import org.apache.iotdb.db.sync.sender.pipe.IoTDBPipeSink;
-import org.apache.iotdb.db.sync.sender.pipe.TsFilePipe;
-import org.apache.iotdb.db.sync.sender.service.TransportHandler;
 import org.apache.iotdb.db.utils.EnvironmentUtils;
 import org.apache.iotdb.db.wal.recover.WALRecoverManager;
 import org.apache.iotdb.itbase.category.LocalStandaloneTest;
@@ -63,8 +62,7 @@ public class IoTDBSyncSenderIT {
   private static final String pipeSinkName = "test_pipesink";
   private static final String pipeName = "test_pipe";
 
-  private TransportHandlerMock handler;
-  private TransportClientMock transportClient;
+  private MockTransportClient transportClient;
 
   private final Map<String, List<PipeData>> resultMap = new HashMap<>();
   private static final TsFilePipeData simpleTsFilePipeData =
@@ -89,10 +87,7 @@ public class IoTDBSyncSenderIT {
     Class.forName(Config.JDBC_DRIVER_NAME);
 
     IoTDBPipeSink pipeSink = new IoTDBPipeSink(pipeSinkName);
-    TsFilePipe pipe = new TsFilePipe(0L, pipeName, pipeSink, 0L, true);
-    transportClient = new TransportClientMock(pipe, pipeSink.getIp(), 
pipeSink.getPort());
-    handler = new TransportHandlerMock(pipe, pipeSink, transportClient);
-    TransportHandler.setDebugTransportHandler(handler);
+    transportClient = new MockTransportClient();
     LocalSyncInfoFetcher.getInstance().reset();
   }
 
@@ -255,6 +250,7 @@ public class IoTDBSyncSenderIT {
       statement.execute("create pipesink " + pipeSinkName + " as iotdb");
       statement.execute("create pipe " + pipeName + " to " + pipeSinkName);
     }
+    
SyncService.getInstance().getSenderManager().setTransportClient(transportClient);
   }
 
   private void restart() throws Exception {
@@ -262,6 +258,7 @@ public class IoTDBSyncSenderIT {
     EnvironmentUtils.shutdownDaemon();
     WALRecoverManager.getInstance().clear();
     EnvironmentUtils.reactiveDaemon();
+    
SyncService.getInstance().getSenderManager().setTransportClient(transportClient);
   }
 
   private void startPipe() throws Exception {
diff --git 
a/integration/src/test/java/org/apache/iotdb/db/integration/sync/TransportClientMock.java
 
b/integration/src/test/java/org/apache/iotdb/db/integration/sync/MockTransportClient.java
similarity index 59%
rename from 
integration/src/test/java/org/apache/iotdb/db/integration/sync/TransportClientMock.java
rename to 
integration/src/test/java/org/apache/iotdb/db/integration/sync/MockTransportClient.java
index aa10f630d0..25b59f10b0 100644
--- 
a/integration/src/test/java/org/apache/iotdb/db/integration/sync/TransportClientMock.java
+++ 
b/integration/src/test/java/org/apache/iotdb/db/integration/sync/MockTransportClient.java
@@ -19,48 +19,34 @@
 package org.apache.iotdb.db.integration.sync;
 
 import org.apache.iotdb.db.sync.pipedata.PipeData;
-import org.apache.iotdb.db.sync.sender.pipe.Pipe;
 import org.apache.iotdb.db.sync.transport.client.ITransportClient;
 
 import java.util.ArrayList;
 import java.util.List;
 
-public class TransportClientMock implements ITransportClient {
-  private Pipe pipe;
-  private String ipAddress;
-  private int port;
+public class MockTransportClient implements ITransportClient {
 
-  private List<PipeData> pipeDataList;
-
-  public TransportClientMock(Pipe pipe, String ipAddress, int port) {
-    this.pipe = pipe;
-    this.ipAddress = ipAddress;
-    this.port = port;
+  private final List<PipeData> pipeDataList;
 
+  public MockTransportClient() {
     this.pipeDataList = new ArrayList<>();
   }
 
-  public void resetInfo(Pipe pipe, String ipAddress, int port) {
-    this.pipe = pipe;
-    this.ipAddress = ipAddress;
-    this.port = port;
+  public List<PipeData> getPipeDataList() {
+    return pipeDataList;
   }
 
   @Override
-  public void run() {
-    try {
-      while (!Thread.currentThread().isInterrupted()) {
-        PipeData pipeData = pipe.take();
-        pipeDataList.add(pipeData);
-        pipe.commit();
-      }
-    } catch (InterruptedException e) {
-    } catch (Exception e) {
-      e.printStackTrace();
-    }
+  public boolean handshake() {
+    return true;
   }
 
-  public List<PipeData> getPipeDataList() {
-    return pipeDataList;
+  @Override
+  public boolean send(PipeData pipeData) {
+    pipeDataList.add(pipeData);
+    return true;
   }
+
+  @Override
+  public void close() {}
 }
diff --git 
a/integration/src/test/java/org/apache/iotdb/db/integration/sync/TransportHandlerMock.java
 
b/integration/src/test/java/org/apache/iotdb/db/integration/sync/TransportHandlerMock.java
deleted file mode 100644
index d54e1f9805..0000000000
--- 
a/integration/src/test/java/org/apache/iotdb/db/integration/sync/TransportHandlerMock.java
+++ /dev/null
@@ -1,58 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing,
- * software distributed under the License is distributed on an
- * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
- * KIND, either express or implied.  See the License for the
- * specific language governing permissions and limitations
- * under the License.
- */
-package org.apache.iotdb.db.integration.sync;
-
-import org.apache.iotdb.commons.concurrent.IoTDBThreadPoolFactory;
-import org.apache.iotdb.commons.concurrent.ThreadName;
-import org.apache.iotdb.db.sync.sender.pipe.IoTDBPipeSink;
-import org.apache.iotdb.db.sync.sender.pipe.Pipe;
-import org.apache.iotdb.db.sync.sender.service.TransportHandler;
-
-public class TransportHandlerMock extends TransportHandler {
-  private Pipe pipe;
-
-  public TransportHandlerMock(
-      Pipe pipe, IoTDBPipeSink pipeSink, TransportClientMock 
transportClientMock) {
-    super(pipe, pipeSink);
-    this.transportClient = transportClientMock;
-  }
-
-  @Override
-  protected void resetTransportClient(Pipe pipe) {
-    try {
-      super.close();
-    } catch (InterruptedException e) {
-      e.printStackTrace();
-    }
-
-    super.resetTransportClient(pipe);
-    IoTDBPipeSink pipeSink = (IoTDBPipeSink) pipe.getPipeSink();
-    ((TransportClientMock) this.transportClient)
-        .resetInfo(pipe, pipeSink.getIp(), pipeSink.getPort());
-    this.pipe = pipe;
-
-    this.transportExecutorService =
-        IoTDBThreadPoolFactory.newSingleThreadExecutor(
-            ThreadName.SYNC_SENDER_PIPE.getName() + "-" + pipe.getName());
-  }
-
-  public TransportClientMock getTransportClientMock() {
-    return (TransportClientMock) transportClient;
-  }
-}
diff --git 
a/node-commons/src/main/java/org/apache/iotdb/commons/sync/SyncConstant.java 
b/node-commons/src/main/java/org/apache/iotdb/commons/sync/SyncConstant.java
index bf9f4afb09..81f54e7026 100644
--- a/node-commons/src/main/java/org/apache/iotdb/commons/sync/SyncConstant.java
+++ b/node-commons/src/main/java/org/apache/iotdb/commons/sync/SyncConstant.java
@@ -72,6 +72,8 @@ public class SyncConstant {
   public static final int DATA_CHUNK_SIZE =
       Math.min(16 * 1024 * 1024, RpcUtils.THRIFT_FRAME_MAX_SIZE);
 
+  public static final String PATCH_SUFFIX = ".patch";
+
   /** receiver */
   public static final String RECEIVER_DIR_NAME = "receiver";
 
diff --git 
a/server/src/main/java/org/apache/iotdb/db/service/thrift/impl/ClientRPCServiceImpl.java
 
b/server/src/main/java/org/apache/iotdb/db/service/thrift/impl/ClientRPCServiceImpl.java
index c5b75f3bab..bab10ce4e3 100644
--- 
a/server/src/main/java/org/apache/iotdb/db/service/thrift/impl/ClientRPCServiceImpl.java
+++ 
b/server/src/main/java/org/apache/iotdb/db/service/thrift/impl/ClientRPCServiceImpl.java
@@ -1462,15 +1462,13 @@ public class ClientRPCServiceImpl implements 
IClientRPCServiceWithHandler {
   }
 
   @Override
-  public TSStatus transportData(TSyncTransportMetaInfo metaInfo, ByteBuffer 
buff, ByteBuffer digest)
-      throws TException {
-    return SyncService.getInstance().transportData(metaInfo, buff, digest);
+  public TSStatus sendPipeData(ByteBuffer buff) throws TException {
+    return SyncService.getInstance().transportPipeData(buff);
   }
 
   @Override
-  public TSStatus checkFileDigest(TSyncTransportMetaInfo metaInfo, ByteBuffer 
digest)
-      throws TException {
-    return SyncService.getInstance().checkFileDigest(metaInfo, digest);
+  public TSStatus sendFile(TSyncTransportMetaInfo metaInfo, ByteBuffer buff) 
throws TException {
+    return SyncService.getInstance().transportFile(metaInfo, buff);
   }
 
   @Override
diff --git 
a/server/src/main/java/org/apache/iotdb/db/service/thrift/impl/TSServiceImpl.java
 
b/server/src/main/java/org/apache/iotdb/db/service/thrift/impl/TSServiceImpl.java
index c439813c1f..0654444a79 100644
--- 
a/server/src/main/java/org/apache/iotdb/db/service/thrift/impl/TSServiceImpl.java
+++ 
b/server/src/main/java/org/apache/iotdb/db/service/thrift/impl/TSServiceImpl.java
@@ -2102,15 +2102,13 @@ public class TSServiceImpl implements 
IClientRPCServiceWithHandler {
   }
 
   @Override
-  public TSStatus transportData(TSyncTransportMetaInfo metaInfo, ByteBuffer 
buff, ByteBuffer digest)
-      throws TException {
-    return SyncService.getInstance().transportData(metaInfo, buff, digest);
+  public TSStatus sendPipeData(ByteBuffer buff) throws TException {
+    return SyncService.getInstance().transportPipeData(buff);
   }
 
   @Override
-  public TSStatus checkFileDigest(TSyncTransportMetaInfo metaInfo, ByteBuffer 
digest)
-      throws TException {
-    return SyncService.getInstance().checkFileDigest(metaInfo, digest);
+  public TSStatus sendFile(TSyncTransportMetaInfo metaInfo, ByteBuffer buff) 
throws TException {
+    return SyncService.getInstance().transportFile(metaInfo, buff);
   }
 
   protected TSStatus executeNonQueryPlan(PhysicalPlan plan) {
diff --git a/server/src/main/java/org/apache/iotdb/db/sync/SyncService.java 
b/server/src/main/java/org/apache/iotdb/db/sync/SyncService.java
index 46eaa93eab..7fdcb7cb91 100644
--- a/server/src/main/java/org/apache/iotdb/db/sync/SyncService.java
+++ b/server/src/main/java/org/apache/iotdb/db/sync/SyncService.java
@@ -27,6 +27,7 @@ import org.apache.iotdb.commons.service.IService;
 import org.apache.iotdb.commons.service.ServiceType;
 import org.apache.iotdb.commons.sync.SyncConstant;
 import org.apache.iotdb.commons.sync.SyncPathUtil;
+import org.apache.iotdb.commons.utils.TestOnly;
 import org.apache.iotdb.db.exception.sync.PipeException;
 import org.apache.iotdb.db.exception.sync.PipeSinkException;
 import org.apache.iotdb.db.qp.physical.sys.CreatePipePlan;
@@ -39,14 +40,14 @@ import org.apache.iotdb.db.sync.common.LocalSyncInfoFetcher;
 import org.apache.iotdb.db.sync.externalpipe.ExtPipePluginManager;
 import org.apache.iotdb.db.sync.externalpipe.ExtPipePluginRegister;
 import org.apache.iotdb.db.sync.externalpipe.ExternalPipeStatus;
-import org.apache.iotdb.db.sync.receiver.manager.PipeMessage;
 import org.apache.iotdb.db.sync.sender.pipe.ExternalPipeSink;
 import org.apache.iotdb.db.sync.sender.pipe.IoTDBPipeSink;
 import org.apache.iotdb.db.sync.sender.pipe.Pipe;
 import org.apache.iotdb.db.sync.sender.pipe.PipeInfo;
+import org.apache.iotdb.db.sync.sender.pipe.PipeMessage;
 import org.apache.iotdb.db.sync.sender.pipe.PipeSink;
 import org.apache.iotdb.db.sync.sender.pipe.TsFilePipe;
-import org.apache.iotdb.db.sync.sender.service.TransportHandler;
+import org.apache.iotdb.db.sync.transport.client.SenderManager;
 import org.apache.iotdb.db.sync.transport.server.ReceiverManager;
 import org.apache.iotdb.db.utils.sync.SyncPipeUtil;
 import org.apache.iotdb.pipe.external.api.IExternalPipeSinkWriterFactory;
@@ -72,15 +73,16 @@ public class SyncService implements IService {
 
   private Pipe runningPipe;
 
-  private TransportHandler transportHandler;
-
   /* handle external Pipe */
   private ExtPipePluginManager extPipePluginManager;
 
   private ISyncInfoFetcher syncInfoFetcher = 
LocalSyncInfoFetcher.getInstance();
 
+  /* handle rpc send logic in sender-side*/
+  private SenderManager senderManager;
+
   /* handle rpc in receiver-side*/
-  private ReceiverManager receiverManager;
+  private final ReceiverManager receiverManager;
 
   private SyncService() {
     receiverManager = new ReceiverManager();
@@ -102,15 +104,13 @@ public class SyncService implements IService {
     return receiverManager.handshake(identityInfo);
   }
 
-  public TSStatus transportData(TSyncTransportMetaInfo metaInfo, ByteBuffer 
buff, ByteBuffer digest)
+  public TSStatus transportFile(TSyncTransportMetaInfo metaInfo, ByteBuffer 
buff)
       throws TException {
-    return receiverManager.transportData(metaInfo, buff, digest);
+    return receiverManager.transportFile(metaInfo, buff);
   }
 
-  // TODO: this will be deleted later
-  public TSStatus checkFileDigest(TSyncTransportMetaInfo metaInfo, ByteBuffer 
digest)
-      throws TException {
-    return receiverManager.checkFileDigest(metaInfo, digest);
+  public TSStatus transportPipeData(ByteBuffer buff) throws TException {
+    return receiverManager.transportPipeData(buff);
   }
 
   public void handleClientExit() {
@@ -168,8 +168,7 @@ public class SyncService implements IService {
     runningPipe = SyncPipeUtil.parseCreatePipePlanAsPipe(plan, 
runningPipeSink, currentTime);
     if (runningPipe.getPipeSink().getType() == PipeSink.PipeSinkType.IoTDB) {
       try {
-        transportHandler =
-            TransportHandler.getNewTransportHandler(runningPipe, 
(IoTDBPipeSink) runningPipeSink);
+        senderManager = new SenderManager(runningPipe, (IoTDBPipeSink) 
runningPipeSink);
       } catch (ClassCastException e) {
         logger.error(
             String.format(
@@ -193,7 +192,7 @@ public class SyncService implements IService {
     if (runningPipe.getStatus() == Pipe.PipeStatus.RUNNING) {
       if (runningPipe.getPipeSink().getType() == PipeSink.PipeSinkType.IoTDB) {
         runningPipe.stop();
-        transportHandler.stop();
+        senderManager.stop();
       } else { // for external PIPE
         // == pause externalPipeProcessor's task
         if (extPipePluginManager != null) {
@@ -217,7 +216,7 @@ public class SyncService implements IService {
     if (runningPipe.getStatus() == Pipe.PipeStatus.STOP) {
       if (runningPipe.getPipeSink().getType() == PipeSink.PipeSinkType.IoTDB) {
         runningPipe.start();
-        transportHandler.start();
+        senderManager.start();
       } else { // for external PIPE
         runningPipe.start();
         startExternalPipeManager(true);
@@ -230,7 +229,7 @@ public class SyncService implements IService {
     checkRunningPipeExistAndName(pipeName);
     try {
       if (runningPipe.getPipeSink().getType() == PipeSink.PipeSinkType.IoTDB) {
-        if (!transportHandler.close()) {
+        if (!senderManager.close()) {
           throw new PipeException(
               String.format(
                   "Close pipe %s transport error after %s %s, please try 
again.",
@@ -390,7 +389,7 @@ public class SyncService implements IService {
     if (externalPipeSinkWriterFactory == null) {
       logger.error(
           String.format(
-              "startExternalPipeManager(), can not found ExternalPipe plugin 
for {}.",
+              "startExternalPipeManager(), can not found ExternalPipe plugin 
for %s.",
               extPipeSinkTypeName));
       throw new PipeException("Can not found ExternalPipe plugin for " + 
extPipeSinkTypeName + ".");
     }
@@ -446,7 +445,7 @@ public class SyncService implements IService {
     if (runningPipe != null && 
!Pipe.PipeStatus.DROP.equals(runningPipe.getStatus())) {
       try {
         runningPipe.stop();
-        transportHandler.stop();
+        senderManager.stop();
       } catch (PipeException e) {
         logger.warn(
             String.format("Stop pipe %s error when stop Sender Service.", 
runningPipe.getName()),
@@ -460,7 +459,7 @@ public class SyncService implements IService {
     if (runningPipe != null && 
!Pipe.PipeStatus.DROP.equals(runningPipe.getStatus())) {
       try {
         runningPipe.stop();
-        transportHandler.close();
+        senderManager.close();
         runningPipe.close();
       } catch (PipeException | InterruptedException e) {
         logger.warn(
@@ -503,15 +502,19 @@ public class SyncService implements IService {
     }
 
     if (runningPipe.getPipeSink().getType() == PipeSink.PipeSinkType.IoTDB) {
-      this.transportHandler =
-          TransportHandler.getNewTransportHandler(
-              runningPipe, (IoTDBPipeSink) runningPipe.getPipeSink());
+      this.senderManager =
+          new SenderManager(runningPipe, (IoTDBPipeSink) 
runningPipe.getPipeSink());
       if (Pipe.PipeStatus.RUNNING.equals(runningPipe.getStatus())) {
-        transportHandler.start();
+        senderManager.start();
       }
     } else { // for external pipe
       // == start ExternalPipeProcessor for send data to external pipe plugin
       startExternalPipeManager(runningPipe.getStatus() == 
Pipe.PipeStatus.RUNNING);
     }
   }
+
+  @TestOnly
+  public SenderManager getSenderManager() {
+    return senderManager;
+  }
 }
diff --git 
a/server/src/main/java/org/apache/iotdb/db/sync/common/ISyncInfoFetcher.java 
b/server/src/main/java/org/apache/iotdb/db/sync/common/ISyncInfoFetcher.java
index f13483444d..ea85433f22 100644
--- a/server/src/main/java/org/apache/iotdb/db/sync/common/ISyncInfoFetcher.java
+++ b/server/src/main/java/org/apache/iotdb/db/sync/common/ISyncInfoFetcher.java
@@ -21,8 +21,8 @@ package org.apache.iotdb.db.sync.common;
 import org.apache.iotdb.common.rpc.thrift.TSStatus;
 import org.apache.iotdb.db.qp.physical.sys.CreatePipePlan;
 import org.apache.iotdb.db.qp.physical.sys.CreatePipeSinkPlan;
-import org.apache.iotdb.db.sync.receiver.manager.PipeMessage;
 import org.apache.iotdb.db.sync.sender.pipe.PipeInfo;
+import org.apache.iotdb.db.sync.sender.pipe.PipeMessage;
 import org.apache.iotdb.db.sync.sender.pipe.PipeSink;
 
 import java.util.List;
diff --git 
a/server/src/main/java/org/apache/iotdb/db/sync/common/LocalSyncInfoFetcher.java
 
b/server/src/main/java/org/apache/iotdb/db/sync/common/LocalSyncInfoFetcher.java
index e74c6748ea..3a89cdc0f8 100644
--- 
a/server/src/main/java/org/apache/iotdb/db/sync/common/LocalSyncInfoFetcher.java
+++ 
b/server/src/main/java/org/apache/iotdb/db/sync/common/LocalSyncInfoFetcher.java
@@ -25,8 +25,8 @@ import org.apache.iotdb.db.exception.sync.PipeSinkException;
 import org.apache.iotdb.db.qp.logical.Operator;
 import org.apache.iotdb.db.qp.physical.sys.CreatePipePlan;
 import org.apache.iotdb.db.qp.physical.sys.CreatePipeSinkPlan;
-import org.apache.iotdb.db.sync.receiver.manager.PipeMessage;
 import org.apache.iotdb.db.sync.sender.pipe.PipeInfo;
+import org.apache.iotdb.db.sync.sender.pipe.PipeMessage;
 import org.apache.iotdb.db.sync.sender.pipe.PipeSink;
 import org.apache.iotdb.rpc.RpcUtils;
 import org.apache.iotdb.rpc.TSStatusCode;
diff --git a/server/src/main/java/org/apache/iotdb/db/sync/common/SyncInfo.java 
b/server/src/main/java/org/apache/iotdb/db/sync/common/SyncInfo.java
index 936d4a7ea7..6b08525723 100644
--- a/server/src/main/java/org/apache/iotdb/db/sync/common/SyncInfo.java
+++ b/server/src/main/java/org/apache/iotdb/db/sync/common/SyncInfo.java
@@ -27,9 +27,9 @@ import org.apache.iotdb.db.qp.physical.sys.CreatePipePlan;
 import org.apache.iotdb.db.qp.physical.sys.CreatePipeSinkPlan;
 import org.apache.iotdb.db.sync.common.persistence.SyncLogReader;
 import org.apache.iotdb.db.sync.common.persistence.SyncLogWriter;
-import org.apache.iotdb.db.sync.receiver.manager.PipeMessage;
 import org.apache.iotdb.db.sync.sender.pipe.Pipe;
 import org.apache.iotdb.db.sync.sender.pipe.PipeInfo;
+import org.apache.iotdb.db.sync.sender.pipe.PipeMessage;
 import org.apache.iotdb.db.sync.sender.pipe.PipeSink;
 import org.apache.iotdb.db.utils.sync.SyncPipeUtil;
 
diff --git 
a/server/src/main/java/org/apache/iotdb/db/sync/common/persistence/SyncLogReader.java
 
b/server/src/main/java/org/apache/iotdb/db/sync/common/persistence/SyncLogReader.java
index ac5cd39397..b848ab1959 100644
--- 
a/server/src/main/java/org/apache/iotdb/db/sync/common/persistence/SyncLogReader.java
+++ 
b/server/src/main/java/org/apache/iotdb/db/sync/common/persistence/SyncLogReader.java
@@ -25,8 +25,8 @@ import org.apache.iotdb.commons.sync.SyncPathUtil;
 import org.apache.iotdb.db.qp.logical.Operator;
 import org.apache.iotdb.db.qp.physical.sys.CreatePipePlan;
 import org.apache.iotdb.db.qp.physical.sys.CreatePipeSinkPlan;
-import org.apache.iotdb.db.sync.receiver.manager.PipeMessage;
 import org.apache.iotdb.db.sync.sender.pipe.PipeInfo;
+import org.apache.iotdb.db.sync.sender.pipe.PipeMessage;
 import org.apache.iotdb.db.sync.sender.pipe.PipeSink;
 import org.apache.iotdb.db.utils.sync.SyncPipeUtil;
 
diff --git 
a/server/src/main/java/org/apache/iotdb/db/sync/common/persistence/SyncLogWriter.java
 
b/server/src/main/java/org/apache/iotdb/db/sync/common/persistence/SyncLogWriter.java
index 3408d2d5ef..01419e6f09 100644
--- 
a/server/src/main/java/org/apache/iotdb/db/sync/common/persistence/SyncLogWriter.java
+++ 
b/server/src/main/java/org/apache/iotdb/db/sync/common/persistence/SyncLogWriter.java
@@ -23,7 +23,7 @@ import org.apache.iotdb.commons.sync.SyncPathUtil;
 import org.apache.iotdb.db.qp.logical.Operator;
 import org.apache.iotdb.db.qp.physical.sys.CreatePipePlan;
 import org.apache.iotdb.db.qp.physical.sys.CreatePipeSinkPlan;
-import org.apache.iotdb.db.sync.receiver.manager.PipeMessage;
+import org.apache.iotdb.db.sync.sender.pipe.PipeMessage;
 
 import java.io.BufferedWriter;
 import java.io.File;
diff --git 
a/server/src/main/java/org/apache/iotdb/db/sync/receiver/manager/PipeMessage.java
 b/server/src/main/java/org/apache/iotdb/db/sync/sender/pipe/PipeMessage.java
similarity index 97%
rename from 
server/src/main/java/org/apache/iotdb/db/sync/receiver/manager/PipeMessage.java
rename to 
server/src/main/java/org/apache/iotdb/db/sync/sender/pipe/PipeMessage.java
index 8d81daaaf9..1a313d5158 100644
--- 
a/server/src/main/java/org/apache/iotdb/db/sync/receiver/manager/PipeMessage.java
+++ b/server/src/main/java/org/apache/iotdb/db/sync/sender/pipe/PipeMessage.java
@@ -16,7 +16,7 @@
  * specific language governing permissions and limitations
  * under the License.
  */
-package org.apache.iotdb.db.sync.receiver.manager;
+package org.apache.iotdb.db.sync.sender.pipe;
 
 import java.util.Objects;
 
diff --git 
a/server/src/main/java/org/apache/iotdb/db/sync/sender/service/TransportHandler.java
 
b/server/src/main/java/org/apache/iotdb/db/sync/sender/service/TransportHandler.java
deleted file mode 100644
index d8f96ac28e..0000000000
--- 
a/server/src/main/java/org/apache/iotdb/db/sync/sender/service/TransportHandler.java
+++ /dev/null
@@ -1,126 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing,
- * software distributed under the License is distributed on an
- * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
- * KIND, either express or implied.  See the License for the
- * specific language governing permissions and limitations
- * under the License.
- *
- */
-package org.apache.iotdb.db.sync.sender.service;
-
-import org.apache.iotdb.commons.concurrent.IoTDBThreadPoolFactory;
-import org.apache.iotdb.commons.concurrent.ThreadName;
-import org.apache.iotdb.commons.sync.SyncConstant;
-import org.apache.iotdb.commons.utils.TestOnly;
-import org.apache.iotdb.db.sync.sender.pipe.IoTDBPipeSink;
-import org.apache.iotdb.db.sync.sender.pipe.Pipe;
-import org.apache.iotdb.db.sync.transport.client.ITransportClient;
-import org.apache.iotdb.db.sync.transport.client.IoTDBSinkTransportClient;
-
-import org.slf4j.Logger;
-import org.slf4j.LoggerFactory;
-
-import java.net.DatagramSocket;
-import java.net.InetAddress;
-import java.net.SocketException;
-import java.net.UnknownHostException;
-import java.util.concurrent.ExecutorService;
-import java.util.concurrent.Future;
-import java.util.concurrent.TimeUnit;
-
-public class TransportHandler {
-  private static final Logger logger = 
LoggerFactory.getLogger(TransportHandler.class);
-  private static TransportHandler DEBUG_TRANSPORT_HANDLER = null; // test only
-
-  private String pipeName;
-  private long createTime;
-  private final String localIP;
-  protected ITransportClient transportClient;
-  private final Pipe pipe;
-
-  protected ExecutorService transportExecutorService;
-  private Future transportFuture;
-
-  public TransportHandler(Pipe pipe, IoTDBPipeSink pipeSink) {
-    this.pipe = pipe;
-    this.pipeName = pipe.getName();
-    this.createTime = pipe.getCreateTime();
-
-    this.transportExecutorService =
-        IoTDBThreadPoolFactory.newSingleThreadExecutor(
-            ThreadName.SYNC_SENDER_PIPE.getName() + "-" + pipeName);
-
-    this.localIP = getLocalIP(pipeSink);
-    this.transportClient =
-        new IoTDBSinkTransportClient(pipe, pipeSink.getIp(), 
pipeSink.getPort(), localIP);
-  }
-
-  private String getLocalIP(IoTDBPipeSink pipeSink) {
-    String localIP;
-    try {
-      InetAddress inetAddress = InetAddress.getLocalHost();
-      if (inetAddress.isLoopbackAddress()) {
-        try (final DatagramSocket socket = new DatagramSocket()) {
-          socket.connect(InetAddress.getByName(pipeSink.getIp()), 
pipeSink.getPort());
-          localIP = socket.getLocalAddress().getHostAddress();
-        }
-      } else {
-        localIP = inetAddress.getHostAddress();
-      }
-    } catch (UnknownHostException | SocketException e) {
-      logger.error(String.format("Get local host error when create transport 
handler."), e);
-      localIP = SyncConstant.UNKNOWN_IP;
-    }
-    return localIP;
-  }
-
-  public void start() {
-    transportFuture = transportExecutorService.submit(transportClient);
-  }
-
-  public void stop() {
-    if (transportFuture != null) {
-      transportFuture.cancel(true);
-    }
-  }
-
-  public boolean close() throws InterruptedException {
-    boolean isClosed;
-    transportExecutorService.shutdownNow();
-    isClosed =
-        transportExecutorService.awaitTermination(
-            SyncConstant.DEFAULT_WAITING_FOR_STOP_MILLISECONDS, 
TimeUnit.MILLISECONDS);
-    return isClosed;
-  }
-
-  public static TransportHandler getNewTransportHandler(Pipe pipe, 
IoTDBPipeSink pipeSink) {
-    if (DEBUG_TRANSPORT_HANDLER == null) {
-      return new TransportHandler(pipe, pipeSink);
-    }
-    DEBUG_TRANSPORT_HANDLER.resetTransportClient(pipe); // test only
-    return DEBUG_TRANSPORT_HANDLER;
-  }
-
-  /** test */
-  @TestOnly
-  public static void setDebugTransportHandler(TransportHandler 
transportHandler) {
-    DEBUG_TRANSPORT_HANDLER = transportHandler;
-  }
-
-  @TestOnly
-  protected void resetTransportClient(Pipe pipe) {
-    this.pipeName = pipe.getName();
-    this.createTime = pipe.getCreateTime();
-  }
-}
diff --git 
a/server/src/main/java/org/apache/iotdb/db/sync/transport/client/ClientWrapper.java
 
b/server/src/main/java/org/apache/iotdb/db/sync/transport/client/ClientWrapper.java
deleted file mode 100644
index b96d6b3ead..0000000000
--- 
a/server/src/main/java/org/apache/iotdb/db/sync/transport/client/ClientWrapper.java
+++ /dev/null
@@ -1,124 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing,
- * software distributed under the License is distributed on an
- * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
- * KIND, either express or implied.  See the License for the
- * specific language governing permissions and limitations
- * under the License.
- */
-package org.apache.iotdb.db.sync.transport.client;
-
-import org.apache.iotdb.common.rpc.thrift.TSStatus;
-import org.apache.iotdb.commons.sync.SyncConstant;
-import org.apache.iotdb.db.conf.IoTDBConfig;
-import org.apache.iotdb.db.conf.IoTDBDescriptor;
-import org.apache.iotdb.db.exception.SyncConnectionException;
-import org.apache.iotdb.db.sync.sender.pipe.Pipe;
-import org.apache.iotdb.rpc.RpcTransportFactory;
-import org.apache.iotdb.rpc.TConfigurationConst;
-import org.apache.iotdb.rpc.TSStatusCode;
-import org.apache.iotdb.service.rpc.thrift.IClientRPCService;
-import org.apache.iotdb.service.rpc.thrift.TSyncIdentityInfo;
-
-import org.apache.thrift.TException;
-import org.apache.thrift.protocol.TBinaryProtocol;
-import org.apache.thrift.protocol.TCompactProtocol;
-import org.apache.thrift.protocol.TProtocol;
-import org.apache.thrift.transport.TSocket;
-import org.apache.thrift.transport.TTransport;
-import org.slf4j.Logger;
-import org.slf4j.LoggerFactory;
-
-public class ClientWrapper {
-  private static final Logger logger = 
LoggerFactory.getLogger(ClientWrapper.class);
-  private static final IoTDBConfig config = 
IoTDBDescriptor.getInstance().getConfig();
-
-  private TTransport transport = null;
-  private volatile IClientRPCService.Client serviceClient = null;
-
-  /* remote IP address*/
-  private final String ipAddress;
-  /* remote port */
-  private final int port;
-  /* local IP address*/
-  private final String localIP;
-
-  private final Pipe pipe;
-
-  public ClientWrapper(Pipe pipe, String ipAddress, int port, String localIP) {
-    this.pipe = pipe;
-    this.ipAddress = ipAddress;
-    this.port = port;
-    this.localIP = localIP;
-  }
-
-  public IClientRPCService.Client getClient() {
-    return serviceClient;
-  }
-
-  /**
-   * create connection and handshake before sending messages
-   *
-   * @return true if success; false if failed to check IoTDB version.
-   * @throws SyncConnectionException cannot create connection to receiver
-   */
-  public boolean handshakeWithVersion() throws SyncConnectionException {
-    if (transport != null && transport.isOpen()) {
-      transport.close();
-    }
-
-    try {
-      transport =
-          RpcTransportFactory.INSTANCE.getTransport(
-              new TSocket(
-                  TConfigurationConst.defaultTConfiguration,
-                  ipAddress,
-                  port,
-                  SyncConstant.SOCKET_TIMEOUT_MILLISECONDS,
-                  SyncConstant.CONNECT_TIMEOUT_MILLISECONDS));
-      TProtocol protocol;
-      if (config.isRpcThriftCompressionEnable()) {
-        protocol = new TCompactProtocol(transport);
-      } else {
-        protocol = new TBinaryProtocol(transport);
-      }
-      serviceClient = new IClientRPCService.Client(protocol);
-
-      // Underlay socket open.
-      if (!transport.isOpen()) {
-        transport.open();
-      }
-
-      TSyncIdentityInfo identityInfo =
-          new TSyncIdentityInfo(
-              localIP, pipe.getName(), pipe.getCreateTime(), 
config.getIoTDBMajorVersion());
-      TSStatus status = serviceClient.handshake(identityInfo);
-      if (status.code != TSStatusCode.SUCCESS_STATUS.getStatusCode()) {
-        logger.error("The receiver rejected the synchronization task because 
{}", status.message);
-        return false;
-      }
-    } catch (TException e) {
-      logger.warn("Cannot connect to the receiver because {}", e.getMessage());
-      throw new SyncConnectionException(
-          String.format("Cannot connect to the receiver because %s.", 
e.getMessage()));
-    }
-    return true;
-  }
-
-  public void close() {
-    if (transport != null) {
-      transport.close();
-      transport = null;
-    }
-  }
-}
diff --git 
a/server/src/main/java/org/apache/iotdb/db/sync/transport/client/ITransportClient.java
 
b/server/src/main/java/org/apache/iotdb/db/sync/transport/client/ITransportClient.java
index 68a87d7655..c06398807d 100644
--- 
a/server/src/main/java/org/apache/iotdb/db/sync/transport/client/ITransportClient.java
+++ 
b/server/src/main/java/org/apache/iotdb/db/sync/transport/client/ITransportClient.java
@@ -19,4 +19,25 @@
  */
 package org.apache.iotdb.db.sync.transport.client;
 
-public interface ITransportClient extends Runnable {}
+import org.apache.iotdb.db.exception.SyncConnectionException;
+import org.apache.iotdb.db.sync.pipedata.PipeData;
+
+public interface ITransportClient {
+  /**
+   * Create connection and handshake before sending messages
+   *
+   * @return true if success; false if failed to check IoTDB version.
+   * @throws SyncConnectionException cannot create connection to receiver
+   */
+  boolean handshake() throws SyncConnectionException;
+
+  /**
+   * Send {@link PipeData} to receiver and load.
+   *
+   * @return true if success; false if failed to send or load.
+   * @throws SyncConnectionException cannot create connection to receiver
+   */
+  boolean send(PipeData pipeData) throws SyncConnectionException;
+
+  void close();
+}
diff --git 
a/server/src/main/java/org/apache/iotdb/db/sync/transport/client/IoTDBSinkTransportClient.java
 
b/server/src/main/java/org/apache/iotdb/db/sync/transport/client/IoTDBSinkTransportClient.java
index 14c5da4b50..53b6cdf119 100644
--- 
a/server/src/main/java/org/apache/iotdb/db/sync/transport/client/IoTDBSinkTransportClient.java
+++ 
b/server/src/main/java/org/apache/iotdb/db/sync/transport/client/IoTDBSinkTransportClient.java
@@ -24,33 +24,33 @@ import org.apache.iotdb.commons.sync.SyncConstant;
 import org.apache.iotdb.db.conf.IoTDBConfig;
 import org.apache.iotdb.db.conf.IoTDBDescriptor;
 import org.apache.iotdb.db.exception.SyncConnectionException;
-import org.apache.iotdb.db.sync.SyncService;
 import org.apache.iotdb.db.sync.pipedata.PipeData;
 import org.apache.iotdb.db.sync.pipedata.TsFilePipeData;
-import org.apache.iotdb.db.sync.receiver.manager.PipeMessage;
 import org.apache.iotdb.db.sync.sender.pipe.Pipe;
 import org.apache.iotdb.rpc.RpcTransportFactory;
+import org.apache.iotdb.rpc.TConfigurationConst;
 import org.apache.iotdb.rpc.TSStatusCode;
+import org.apache.iotdb.service.rpc.thrift.IClientRPCService;
+import org.apache.iotdb.service.rpc.thrift.TSyncIdentityInfo;
 import org.apache.iotdb.service.rpc.thrift.TSyncTransportMetaInfo;
-import org.apache.iotdb.service.rpc.thrift.TSyncTransportType;
 
 import org.apache.thrift.TException;
+import org.apache.thrift.protocol.TBinaryProtocol;
+import org.apache.thrift.protocol.TCompactProtocol;
+import org.apache.thrift.protocol.TProtocol;
+import org.apache.thrift.transport.TSocket;
+import org.apache.thrift.transport.TTransport;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
 import java.io.BufferedReader;
 import java.io.File;
-import java.io.FileInputStream;
 import java.io.FileReader;
 import java.io.IOException;
-import java.io.InputStream;
 import java.io.RandomAccessFile;
 import java.nio.ByteBuffer;
-import java.security.MessageDigest;
-import java.security.NoSuchAlgorithmException;
 
 import static org.apache.iotdb.commons.sync.SyncConstant.DATA_CHUNK_SIZE;
-import static 
org.apache.iotdb.db.sync.transport.conf.TransportConfig.isCheckFileDegistAgain;
 
 public class IoTDBSinkTransportClient implements ITransportClient {
 
@@ -60,8 +60,8 @@ public class IoTDBSinkTransportClient implements 
ITransportClient {
 
   private static final int TRANSFER_BUFFER_SIZE_IN_BYTES = 1 * 1024 * 1024;
 
-  private final ClientWrapper serviceClient;
-  private final ClientWrapper heartbeatClient;
+  private TTransport transport = null;
+  private volatile IClientRPCService.Client serviceClient = null;
 
   /* remote IP address*/
   private final String ipAddress;
@@ -84,234 +84,137 @@ public class IoTDBSinkTransportClient implements 
ITransportClient {
     this.ipAddress = ipAddress;
     this.port = port;
     this.localIP = localIP;
-    serviceClient = new ClientWrapper(pipe, ipAddress, port, localIP);
-    heartbeatClient = new ClientWrapper(pipe, ipAddress, port, localIP);
   }
 
   /**
-   * Create thrift connection to receiver. (1) register pipe message, 
including pipeName, localIp
-   * and createTime (2) check IoTDB version to make sure compatibility
+   * Create thrift connection to receiver. Check IoTDB version to make sure 
compatibility
    *
-   * @return true if success; false if failed MaxNumberOfSyncFileRetry times.
+   * @return true if success; false if failed to check IoTDB version.
    * @throws SyncConnectionException cannot create connection to receiver
    */
+  @Override
   public synchronized boolean handshake() throws SyncConnectionException {
-    for (int handshakeCounter = 0;
-        handshakeCounter < config.getMaxNumberOfSyncFileRetry();
-        handshakeCounter++) {
-      try {
-        return serviceClient.handshakeWithVersion();
-      } catch (SyncConnectionException e) {
-        logger.warn(
-            String.format(
-                "Handshake error, retry %d/%d.",
-                handshakeCounter, config.getMaxNumberOfSyncFileRetry()));
-      }
+    if (transport != null && transport.isOpen()) {
+      transport.close();
     }
-    if (!serviceClient.handshakeWithVersion()) {
-      logger.info(
-          String.format("Handshake failed %s times!", 
config.getMaxNumberOfSyncFileRetry()));
-      return false;
-    } else {
-      return true;
-    }
-  }
 
-  public boolean senderTransport(PipeData pipeData) throws 
SyncConnectionException {
-    if (pipeData instanceof TsFilePipeData) {
-      try {
-        for (File file : ((TsFilePipeData) pipeData).getTsFiles(true)) {
-          transportSingleFile(file);
-        }
-      } catch (IOException e) {
-        logger.error(String.format("Get tsfiles error, because %s.", e), e);
-        return false;
-      } catch (NoSuchAlgorithmException e) {
-        logger.error(String.format("Wrong message digest, because %s.", e), e);
-        return false;
+    try {
+      transport =
+          RpcTransportFactory.INSTANCE.getTransport(
+              new TSocket(
+                  TConfigurationConst.defaultTConfiguration,
+                  ipAddress,
+                  port,
+                  SyncConstant.SOCKET_TIMEOUT_MILLISECONDS,
+                  SyncConstant.CONNECT_TIMEOUT_MILLISECONDS));
+      TProtocol protocol;
+      if (config.isRpcThriftCompressionEnable()) {
+        protocol = new TCompactProtocol(transport);
+      } else {
+        protocol = new TBinaryProtocol(transport);
       }
-    }
-
-    int retryCount = 0;
+      serviceClient = new IClientRPCService.Client(protocol);
 
-    while (true) {
-      retryCount++;
-      if (retryCount > config.getMaxNumberOfSyncFileRetry()) {
-        logger.error(
-            String.format("After %s tries, stop the transport of current 
pipeData!", retryCount));
-        throw new SyncConnectionException(
-            String.format("Can not connect to receiver when transferring 
pipedata %s.", pipeData));
+      // Underlay socket open.
+      if (!transport.isOpen()) {
+        transport.open();
       }
 
-      try {
-        transportPipeData(pipeData);
-        logger.info(String.format("Finish pipeData %s transport.", pipeData));
-        break;
-      } catch (SyncConnectionException e) {
-        // handshake and retry
-        try {
-          if (!handshake()) {
-            logger.error(
-                String.format(
-                    "Handshake to receiver %s:%d error when transfer pipe data 
%s.",
-                    ipAddress, port, pipeData));
-            return false;
-          }
-        } catch (SyncConnectionException syncConnectionException) {
-          logger.error(
-              String.format(
-                  "Reconnect to receiver %s:%d error when transfer pipe data 
%s.",
-                  ipAddress, port, pipeData));
-          throw new SyncConnectionException(
-              String.format(
-                  "Reconnect to receiver error when transferring pipedata 
%s.", pipeData));
-        }
-      } catch (NoSuchAlgorithmException e) {
-        logger.error("Transport failed. ", e);
+      TSyncIdentityInfo identityInfo =
+          new TSyncIdentityInfo(
+              localIP, pipe.getName(), pipe.getCreateTime(), 
config.getIoTDBMajorVersion());
+      TSStatus status = serviceClient.handshake(identityInfo);
+      if (status.code != TSStatusCode.SUCCESS_STATUS.getStatusCode()) {
+        logger.error("The receiver rejected the synchronization task because 
{}", status.message);
         return false;
       }
+    } catch (TException e) {
+      logger.warn("Cannot connect to the receiver because {}", e.getMessage());
+      throw new SyncConnectionException(
+          String.format("Cannot connect to the receiver because %s.", 
e.getMessage()));
     }
     return true;
   }
 
-  /** Transfer data of a tsfile to the receiver. */
-  @SuppressWarnings("squid:S3776") // Suppress high Cognitive Complexity 
warning
-  private void transportSingleFile(File file)
-      throws SyncConnectionException, NoSuchAlgorithmException {
-
-    MessageDigest messageDigest = MessageDigest.getInstance("SHA-256");
-
-    int retryCount = 0;
-    while (true) {
-      retryCount++;
-      if (retryCount > config.getMaxNumberOfSyncFileRetry()) {
-        throw new SyncConnectionException(
-            String.format("Connect to receiver error when transferring file 
%s.", file.getName()));
-      }
-
+  /**
+   * Send {@link PipeData} to receiver and load. If PipeData is 
TsFilePipeData, The TsFiles will be
+   * transferred before the PipeData transfer.
+   *
+   * @return true if success; false if failed to send or load.
+   * @throws SyncConnectionException cannot create connection to receiver
+   */
+  @Override
+  public boolean send(PipeData pipeData) throws SyncConnectionException {
+    if (pipeData instanceof TsFilePipeData) {
       try {
-        transportSingleFilePieceByPiece(file, messageDigest);
-
-        if (isCheckFileDegistAgain) {
-          // Check file digest as entirety.
-          try {
-            if (!checkFileDigest(file, messageDigest)) {
-              continue;
-            }
-          } catch (IOException e) {
-            logger.warn(
-                String.format(
-                    "Read from disk to make digest error, skip check file %s, 
because %s.",
-                    file.getName(), e));
-          }
-        }
-        break;
-      } catch (SyncConnectionException e) {
-        // handshake and retry
-        try {
-          if (!handshake()) {
-            throw new SyncConnectionException(
-                String.format(
-                    "Handshake with receiver error when transferring file 
%s.", file.getName()));
+        for (File file : ((TsFilePipeData) pipeData).getTsFiles(true)) {
+          if (!transportSingleFilePieceByPiece(file)) {
+            return false;
           }
-        } catch (SyncConnectionException syncConnectionException) {
-          throw new SyncConnectionException(
-              String.format(
-                  "Connect to receiver error when transferring file %s.", 
file.getName()));
         }
+      } catch (IOException e) {
+        logger.error(String.format("Get TsFiles error, because %s.", e), e);
+        return false;
       }
     }
-
-    logger.info("Receiver has received {} successfully.", 
file.getAbsoluteFile());
+    try {
+      return transportPipeData(pipeData);
+    } catch (IOException e) {
+      logger.error(String.format("Transport PipeData error, because %s.", e), 
e);
+      return false;
+    }
   }
 
-  private void transportSingleFilePieceByPiece(File file, MessageDigest 
messageDigest)
-      throws SyncConnectionException {
-
+  /**
+   * Transport file piece by piece.
+   *
+   * @return true if success; false if failed.
+   * @throws SyncConnectionException Connection exception, wait for a while 
and try again
+   * @throws IOException Serialize error.
+   */
+  private boolean transportSingleFilePieceByPiece(File file)
+      throws SyncConnectionException, IOException {
     // Cut the file into pieces to send
     long position = 0;
-
     long limit = getFileSizeLimit(file);
 
     // Try small piece to rebase the file position.
     byte[] buffer = new byte[TRANSFER_BUFFER_SIZE_IN_BYTES];
-
-    outer:
-    while (true) {
-
-      // Normal piece.
-      if (position != 0L && buffer.length != DATA_CHUNK_SIZE) {
-        buffer = new byte[DATA_CHUNK_SIZE];
-      }
-
-      int dataLength;
-      try (RandomAccessFile randomAccessFile = new RandomAccessFile(file, 
"r")) {
-        if (limit <= position) {
+    int dataLength;
+    try (RandomAccessFile randomAccessFile = new RandomAccessFile(file, "r")) {
+      while (position < limit) {
+        // Normal piece.
+        if (position != 0L && buffer.length != DATA_CHUNK_SIZE) {
+          buffer = new byte[DATA_CHUNK_SIZE];
+        }
+        dataLength =
+            randomAccessFile.read(buffer, 0, Math.min(buffer.length, (int) 
(limit - position)));
+        if (dataLength == -1) {
           break;
         }
-        randomAccessFile.seek(position);
-        while ((dataLength =
-                randomAccessFile.read(buffer, 0, Math.min(buffer.length, (int) 
(limit - position))))
-            != -1) {
-          messageDigest.reset();
-          messageDigest.update(buffer, 0, dataLength);
-          ByteBuffer buffToSend = ByteBuffer.wrap(buffer, 0, dataLength);
-          TSyncTransportMetaInfo metaInfo =
-              new TSyncTransportMetaInfo(TSyncTransportType.FILE, 
file.getName(), position);
+        ByteBuffer buffToSend = ByteBuffer.wrap(buffer, 0, dataLength);
+        TSyncTransportMetaInfo metaInfo = new 
TSyncTransportMetaInfo(file.getName(), position);
 
-          TSStatus status = null;
-          int retryCount = 0;
-          while (true) {
-            retryCount++;
-            if (retryCount > config.getMaxNumberOfSyncFileRetry()) {
-              throw new SyncConnectionException(
-                  String.format(
-                      "Can not sync file %s after %s tries.",
-                      file.getAbsoluteFile(), 
config.getMaxNumberOfSyncFileRetry()));
-            }
-            try {
-              status =
-                  serviceClient
-                      .getClient()
-                      .transportData(metaInfo, buffToSend, 
ByteBuffer.wrap(messageDigest.digest()));
-            } catch (TException e) {
-              // retry
-              logger.error("TException happened! ", e);
-              continue;
-            }
-            break;
-          }
+        TSStatus status = serviceClient.sendFile(metaInfo, buffToSend);
 
-          if (status.code == TSStatusCode.SYNC_FILE_REBASE.getStatusCode()) {
-            position = Long.parseLong(status.message);
-            continue outer;
-          } else if (status.code == 
TSStatusCode.SYNC_FILE_RETRY.getStatusCode()) {
-            logger.info(
-                "Receiver failed to receive data from {} because {}, retry.",
-                file.getAbsoluteFile(),
-                status.message);
-            continue outer;
-          } else if (status.code != 
TSStatusCode.SUCCESS_STATUS.getStatusCode()) {
-            logger.info(
-                "Receiver failed to receive data from {} because {}, abort.",
-                file.getAbsoluteFile(),
-                status.message);
-            throw new SyncConnectionException(status.message);
-          } else { // Success
-            position += dataLength;
-            if (position >= limit) {
-              break;
-            }
-          }
+        if ((status.code == TSStatusCode.SUCCESS_STATUS.getStatusCode())) {
+          // Success
+          position += dataLength;
+        } else if (status.code == 
TSStatusCode.SYNC_FILE_REBASE.getStatusCode()) {
+          position = Long.parseLong(status.message);
+        } else if (status.code == 
TSStatusCode.SYNC_FILE_ERROR.getStatusCode()) {
+          logger.error(
+              "Receiver failed to receive data from {} because {}, abort.",
+              file.getAbsoluteFile(),
+              status.message);
+          return false;
         }
-      } catch (IOException e) {
-        // retry
-        logger.error("IOException happened! ", e);
-      } catch (SyncConnectionException e) {
-        logger.error("Cannot sync data with receiver. ", e);
-        throw e;
       }
+    } catch (TException e) {
+      logger.error("Cannot sync data with receiver. ", e);
+      throw new SyncConnectionException(e);
     }
+    return true;
   }
 
   private long getFileSizeLimit(File file) {
@@ -327,142 +230,35 @@ public class IoTDBSinkTransportClient implements 
ITransportClient {
     return file.length();
   }
 
-  private boolean checkFileDigest(File file, MessageDigest messageDigest)
-      throws SyncConnectionException, IOException {
-    messageDigest.reset();
-    try (InputStream inputStream = new FileInputStream(file)) {
-      byte[] block = new byte[DATA_CHUNK_SIZE];
-      int length;
-      while ((length = inputStream.read(block)) > 0) {
-        messageDigest.update(block, 0, length);
-      }
-    }
-
-    TSyncTransportMetaInfo metaInfo =
-        new TSyncTransportMetaInfo(TSyncTransportType.FILE, file.getName(), 0);
-
-    TSStatus status;
-    int retryCount = 0;
-
-    while (true) {
-      retryCount++;
-      if (retryCount > config.getMaxNumberOfSyncFileRetry()) {
-        throw new SyncConnectionException(
-            String.format(
-                "Can not sync file %s after %s tries.",
-                file.getAbsoluteFile(), config.getMaxNumberOfSyncFileRetry()));
-      }
-      try {
-        status =
-            serviceClient
-                .getClient()
-                .checkFileDigest(metaInfo, 
ByteBuffer.wrap(messageDigest.digest()));
-      } catch (TException e) {
-        // retry
-        logger.error("TException happens! ", e);
-        continue;
-      }
-      break;
-    }
-
-    if (status.code != TSStatusCode.SUCCESS_STATUS.getStatusCode()) {
-      logger.error("Digest check of tsfile {} failed, retry", 
file.getAbsoluteFile());
-      return false;
-    }
-    return true;
-  }
-
-  private void transportPipeData(PipeData pipeData)
-      throws SyncConnectionException, NoSuchAlgorithmException {
-
-    MessageDigest messageDigest = MessageDigest.getInstance("SHA-256");
-
-    int retryCount = 0;
-    while (true) {
-
-      retryCount++;
-      if (retryCount > config.getMaxNumberOfSyncFileRetry()) {
-        throw new SyncConnectionException(
-            String.format(
-                "Can not sync pipe data after %s tries.", 
config.getMaxNumberOfSyncFileRetry()));
-      }
-
-      try {
-        byte[] buffer = pipeData.serialize();
-        messageDigest.reset();
-        messageDigest.update(buffer);
-        ByteBuffer buffToSend = ByteBuffer.wrap(buffer);
-
-        TSyncTransportMetaInfo metaInfo =
-            new TSyncTransportMetaInfo(
-                TSyncTransportType.findByValue(pipeData.getType().ordinal()), 
"fileName", 0);
-        TSStatus status =
-            serviceClient
-                .getClient()
-                .transportData(metaInfo, buffToSend, 
ByteBuffer.wrap(messageDigest.digest()));
-
-        if (status.code == TSStatusCode.SUCCESS_STATUS.getStatusCode()) {
-          break;
-        } else {
-          logger.error("Digest check of pipeData failed, retry");
-        }
-      } catch (IOException | TException e) {
-        // retry
-        logger.error("Exception happened!", e);
-      }
-    }
-  }
-
   /**
-   * When an object implementing interface <code>Runnable</code> is used to 
create a thread,
-   * starting the thread causes the object's <code>run</code> method to be 
called in that separately
-   * executing thread.
-   *
-   * <p>The general contract of the method <code>run</code> is that it may 
take any action
-   * whatsoever.
+   * Transport and load PipeData
    *
-   * @see Thread#run()
+   * @return true if success; false if failed.
+   * @throws SyncConnectionException Connection exception, wait for a while 
and try again
+   * @throws IOException Serialize error.
    */
-  @Override
-  public void run() {
+  private boolean transportPipeData(PipeData pipeData) throws 
SyncConnectionException, IOException {
     try {
-      while (!Thread.currentThread().isInterrupted()) {
-        try {
-          if (!handshake()) {
-            SyncService.getInstance()
-                .receiveMsg(
-                    PipeMessage.MsgType.ERROR,
-                    String.format("Can not handshake with %s:%d.", ipAddress, 
port));
-          }
-          while (!Thread.currentThread().isInterrupted()) {
-            PipeData pipeData = pipe.take();
-            if (!senderTransport(pipeData)) {
-              logger.error(String.format("Can not transfer pipedata %s, skip 
it.", pipeData));
-              // can do something.
-              SyncService.getInstance()
-                  .receiveMsg(
-                      PipeMessage.MsgType.WARN,
-                      String.format(
-                          "Transfer piepdata %s error, skip it.", 
pipeData.getSerialNumber()));
-              continue;
-            }
-            pipe.commit();
-          }
-        } catch (SyncConnectionException e) {
-          logger.error(
-              String.format("Connect to receiver %s:%d error, because %s.", 
ipAddress, port, e));
-          // TODO: wait and retry
-        }
+      byte[] buffer = pipeData.serialize();
+      ByteBuffer buffToSend = ByteBuffer.wrap(buffer);
+      TSStatus status = serviceClient.sendPipeData(buffToSend);
+      if (status.code == TSStatusCode.SUCCESS_STATUS.getStatusCode()) {
+        logger.info("Transport PipeData {} Successfully", pipeData);
+      } else if (status.code == TSStatusCode.PIPESERVER_ERROR.getStatusCode()) 
{
+        logger.error("Receiver failed to load PipeData {}, skip it.", 
pipeData);
+        return false;
       }
-    } catch (InterruptedException e) {
-      logger.info("Interrupted by pipe, exit transport.");
-    } finally {
-      close();
+    } catch (TException e) {
+      throw new SyncConnectionException(e);
     }
+    return true;
   }
 
+  @Override
   public void close() {
-    serviceClient.close();
-    heartbeatClient.close();
+    if (transport != null) {
+      transport.close();
+      transport = null;
+    }
   }
 }
diff --git 
a/server/src/main/java/org/apache/iotdb/db/sync/transport/client/SenderManager.java
 
b/server/src/main/java/org/apache/iotdb/db/sync/transport/client/SenderManager.java
new file mode 100644
index 0000000000..cf323c04fd
--- /dev/null
+++ 
b/server/src/main/java/org/apache/iotdb/db/sync/transport/client/SenderManager.java
@@ -0,0 +1,120 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ *
+ */
+package org.apache.iotdb.db.sync.transport.client;
+
+import org.apache.iotdb.commons.concurrent.IoTDBThreadPoolFactory;
+import org.apache.iotdb.commons.concurrent.ThreadName;
+import org.apache.iotdb.commons.sync.SyncConstant;
+import org.apache.iotdb.commons.utils.TestOnly;
+import org.apache.iotdb.db.exception.SyncConnectionException;
+import org.apache.iotdb.db.sync.SyncService;
+import org.apache.iotdb.db.sync.pipedata.PipeData;
+import org.apache.iotdb.db.sync.sender.pipe.IoTDBPipeSink;
+import org.apache.iotdb.db.sync.sender.pipe.Pipe;
+import org.apache.iotdb.db.sync.sender.pipe.PipeMessage;
+import org.apache.iotdb.db.sync.sender.pipe.PipeSink;
+
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import java.util.concurrent.ExecutorService;
+import java.util.concurrent.Future;
+import java.util.concurrent.TimeUnit;
+
+public class SenderManager {
+  private static final Logger logger = 
LoggerFactory.getLogger(SenderManager.class);
+
+  protected ITransportClient transportClient;
+  private Pipe pipe;
+  private PipeSink pipeSink;
+
+  protected ExecutorService transportExecutorService;
+  private Future transportFuture;
+
+  public SenderManager(Pipe pipe, IoTDBPipeSink pipeSink) {
+    this.pipe = pipe;
+    this.pipeSink = pipeSink;
+    this.transportExecutorService =
+        IoTDBThreadPoolFactory.newSingleThreadExecutor(
+            ThreadName.SYNC_SENDER_PIPE.getName() + "-" + pipe.getName());
+    this.transportClient = TransportClientFactory.createTransportClient(pipe, 
pipeSink);
+  }
+
+  public void start() {
+    transportFuture = 
transportExecutorService.submit(this::takePipeDataAndTransport);
+  }
+
+  public void stop() {
+    if (transportFuture != null) {
+      transportFuture.cancel(true);
+    }
+  }
+
+  public boolean close() throws InterruptedException {
+    boolean isClosed;
+    transportExecutorService.shutdownNow();
+    isClosed =
+        transportExecutorService.awaitTermination(
+            SyncConstant.DEFAULT_WAITING_FOR_STOP_MILLISECONDS, 
TimeUnit.MILLISECONDS);
+    return isClosed;
+  }
+
+  private void takePipeDataAndTransport() {
+    try {
+      while (!Thread.currentThread().isInterrupted()) {
+        try {
+          if (!transportClient.handshake()) {
+            SyncService.getInstance()
+                .receiveMsg(
+                    PipeMessage.MsgType.ERROR,
+                    String.format("Can not handshake with %s", pipeSink));
+          }
+          while (!Thread.currentThread().isInterrupted()) {
+            PipeData pipeData = pipe.take();
+            if (!transportClient.send(pipeData)) {
+              logger.error(String.format("Can not transfer pipedata %s, skip 
it.", pipeData));
+              // can do something.
+              SyncService.getInstance()
+                  .receiveMsg(
+                      PipeMessage.MsgType.WARN,
+                      String.format(
+                          "Transfer piepdata %s error, skip it.", 
pipeData.getSerialNumber()));
+              continue;
+            }
+            pipe.commit();
+          }
+        } catch (SyncConnectionException e) {
+          logger.error(String.format("Connect to receiver %s error, because 
%s.", pipeSink, e));
+          // TODO: wait and retry
+        }
+      }
+    } catch (InterruptedException e) {
+      logger.info("Interrupted by pipe, exit transport.");
+    } finally {
+      transportClient.close();
+    }
+  }
+
+  /** test */
+  @TestOnly
+  public void setTransportClient(ITransportClient transportClient) {
+    this.transportClient = transportClient;
+  }
+}
diff --git 
a/server/src/main/java/org/apache/iotdb/db/sync/transport/client/TransportClientFactory.java
 
b/server/src/main/java/org/apache/iotdb/db/sync/transport/client/TransportClientFactory.java
new file mode 100644
index 0000000000..cf176bb170
--- /dev/null
+++ 
b/server/src/main/java/org/apache/iotdb/db/sync/transport/client/TransportClientFactory.java
@@ -0,0 +1,68 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.iotdb.db.sync.transport.client;
+
+import org.apache.iotdb.commons.sync.SyncConstant;
+import org.apache.iotdb.db.sync.sender.pipe.IoTDBPipeSink;
+import org.apache.iotdb.db.sync.sender.pipe.Pipe;
+import org.apache.iotdb.db.sync.sender.pipe.PipeSink;
+
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import java.net.DatagramSocket;
+import java.net.InetAddress;
+import java.net.SocketException;
+import java.net.UnknownHostException;
+
+public class TransportClientFactory {
+
+  private static final Logger logger = 
LoggerFactory.getLogger(TransportClientFactory.class);
+
+  public static ITransportClient createTransportClient(Pipe pipe, PipeSink 
pipeSink) {
+    switch (pipeSink.getType()) {
+      case IoTDB:
+        IoTDBPipeSink ioTDBPipeSink = (IoTDBPipeSink) pipeSink;
+        return new IoTDBSinkTransportClient(
+            pipe, ioTDBPipeSink.getIp(), ioTDBPipeSink.getPort(), 
getLocalIP(ioTDBPipeSink));
+      case ExternalPipe:
+      default:
+        throw new UnsupportedOperationException();
+    }
+  }
+
+  private static String getLocalIP(IoTDBPipeSink pipeSink) {
+    String localIP;
+    try {
+      InetAddress inetAddress = InetAddress.getLocalHost();
+      if (inetAddress.isLoopbackAddress()) {
+        try (final DatagramSocket socket = new DatagramSocket()) {
+          socket.connect(InetAddress.getByName(pipeSink.getIp()), 
pipeSink.getPort());
+          localIP = socket.getLocalAddress().getHostAddress();
+        }
+      } else {
+        localIP = inetAddress.getHostAddress();
+      }
+    } catch (UnknownHostException | SocketException e) {
+      logger.error("Get local host error when create transport handler.", e);
+      localIP = SyncConstant.UNKNOWN_IP;
+    }
+    return localIP;
+  }
+}
diff --git 
a/server/src/main/java/org/apache/iotdb/db/sync/transport/conf/TransportConfig.java
 
b/server/src/main/java/org/apache/iotdb/db/sync/transport/conf/TransportConfig.java
deleted file mode 100644
index 28a27aa491..0000000000
--- 
a/server/src/main/java/org/apache/iotdb/db/sync/transport/conf/TransportConfig.java
+++ /dev/null
@@ -1,45 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing,
- * software distributed under the License is distributed on an
- * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
- * KIND, either express or implied.  See the License for the
- * specific language governing permissions and limitations
- * under the License.
- *
- */
-package org.apache.iotdb.db.sync.transport.conf;
-
-import org.apache.iotdb.commons.conf.IoTDBConstant;
-
-import java.io.File;
-
-public class TransportConfig {
-  private TransportConfig() {}
-
-  /** default base dir, stores all IoTDB runtime files */
-  private static final String DEFAULT_BASE_DIR = addHomeDir("data");
-
-  private static String addHomeDir(String dir) {
-    String homeDir = System.getProperty(IoTDBConstant.IOTDB_HOME, null);
-    if (!new File(dir).isAbsolute() && homeDir != null && homeDir.length() > 
0) {
-      if (!homeDir.endsWith(File.separator)) {
-        dir = homeDir + File.separatorChar + dir;
-      } else {
-        dir = homeDir + dir;
-      }
-    }
-    return dir;
-  }
-
-  public static boolean isCheckFileDegistAgain = false;
-}
diff --git 
a/server/src/main/java/org/apache/iotdb/db/sync/transport/server/ReceiverManager.java
 
b/server/src/main/java/org/apache/iotdb/db/sync/transport/server/ReceiverManager.java
index 2a970a67b9..c62220b709 100644
--- 
a/server/src/main/java/org/apache/iotdb/db/sync/transport/server/ReceiverManager.java
+++ 
b/server/src/main/java/org/apache/iotdb/db/sync/transport/server/ReceiverManager.java
@@ -32,36 +32,22 @@ import org.apache.iotdb.rpc.RpcUtils;
 import org.apache.iotdb.rpc.TSStatusCode;
 import org.apache.iotdb.service.rpc.thrift.TSyncIdentityInfo;
 import org.apache.iotdb.service.rpc.thrift.TSyncTransportMetaInfo;
-import org.apache.iotdb.service.rpc.thrift.TSyncTransportType;
 
 import org.apache.thrift.TException;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
-import java.io.BufferedReader;
 import java.io.File;
-import java.io.FileInputStream;
-import java.io.FileWriter;
 import java.io.IOException;
-import java.io.InputStream;
-import java.io.InputStreamReader;
 import java.io.RandomAccessFile;
-import java.math.BigInteger;
 import java.nio.ByteBuffer;
-import java.nio.file.Files;
-import java.nio.file.StandardCopyOption;
-import java.security.MessageDigest;
-import java.security.NoSuchAlgorithmException;
 import java.text.DecimalFormat;
 import java.util.ArrayList;
-import java.util.Arrays;
 import java.util.List;
 import java.util.Map;
 import java.util.concurrent.ConcurrentHashMap;
 import java.util.concurrent.atomic.AtomicLong;
 
-import static org.apache.iotdb.commons.sync.SyncConstant.DATA_CHUNK_SIZE;
-
 /**
  * This class is responsible for implementing the RPC processing on the 
receiver-side. It should
  * only be accessed by the {@linkplain org.apache.iotdb.db.sync.SyncService}
@@ -70,13 +56,13 @@ public class ReceiverManager {
   private static Logger logger = 
LoggerFactory.getLogger(ReceiverManager.class);
 
   private IoTDBConfig config = IoTDBDescriptor.getInstance().getConfig();
-  private static final String RECORD_SUFFIX = ".record";
-  private static final String PATCH_SUFFIX = ".patch";
 
   // When the client abnormally exits, we can still know who to disconnect
   private final ThreadLocal<Long> currentConnectionId;
   // Record the remote message for every rpc connection
   private final Map<Long, TSyncIdentityInfo> connectionIdToIdentityInfoMap;
+  // Record the remote message for every rpc connection
+  private final Map<Long, Map<String, Long>> connectionIdToStartIndexRecord;
 
   // The sync connectionId is unique in one IoTDB instance.
   private final AtomicLong connectionIdGenerator;
@@ -84,6 +70,7 @@ public class ReceiverManager {
   public ReceiverManager() {
     currentConnectionId = new ThreadLocal<>();
     connectionIdToIdentityInfoMap = new ConcurrentHashMap<>();
+    connectionIdToStartIndexRecord = new ConcurrentHashMap<>();
     connectionIdGenerator = new AtomicLong();
   }
 
@@ -108,52 +95,50 @@ public class ReceiverManager {
   }
 
   private CheckResult checkStartIndexValid(File file, long startIndex) throws 
IOException {
-    File recordFile = new File(file.getAbsolutePath() + RECORD_SUFFIX);
-
-    if (!recordFile.exists() && startIndex != 0) {
+    // get local index from memory map
+    long localIndex = getCurrentFileStartIndex(file.getAbsolutePath());
+    // get local index from file
+    if (localIndex < 0 && file.exists()) {
+      localIndex = file.length();
+      recordStartIndex(file, localIndex);
+    }
+    // compare and check
+    if (localIndex < 0 && startIndex != 0) {
       logger.error(
           "The start index {} of data sync is not valid. "
-              + "The file {} is not exist and start index should equal to 0).",
-          startIndex,
-          recordFile.getAbsolutePath());
+              + "The file is not exist and start index should equal to 0).",
+          startIndex);
       return new CheckResult(false, "0");
+    } else if (localIndex >= 0 && localIndex != startIndex) {
+      logger.error(
+          "The start index {} of data sync is not valid. "
+              + "The start index of the file should equal to {}.",
+          startIndex,
+          localIndex);
+      return new CheckResult(false, String.valueOf(localIndex));
     }
+    return new CheckResult(true, "0");
+  }
 
-    if (recordFile.exists()) {
-      try (InputStream inputStream = new FileInputStream(recordFile);
-          BufferedReader bufferedReader = new BufferedReader(new 
InputStreamReader(inputStream))) {
-        String index = bufferedReader.readLine();
-
-        if ((index == null) || (index.length() == 0)) {
-          if (startIndex != 0) {
-            logger.error(
-                "The start index {} of data sync is not valid. "
-                    + "The file {} is not exist and start index is should 
equal to 0.",
-                startIndex,
-                recordFile.getAbsolutePath());
-            return new CheckResult(false, "0");
-          }
-        }
-
-        if (Long.parseLong(index) != startIndex) {
-          logger.error(
-              "The start index {} of data sync is not valid. "
-                  + "The start index of the file {} should equal to {}.",
-              startIndex,
-              recordFile.getAbsolutePath(),
-              index);
-          return new CheckResult(false, index);
-        }
-      }
+  private void recordStartIndex(File file, long position) {
+    Long id = currentConnectionId.get();
+    if (id != null) {
+      Map<String, Long> map =
+          connectionIdToStartIndexRecord.computeIfAbsent(id, i -> new 
ConcurrentHashMap<>());
+      map.put(file.getAbsolutePath(), position);
     }
-
-    return new CheckResult(true, "0");
   }
 
   // endregion
 
   // region Interfaces and Implementation of RPC Handler
 
+  /**
+   * Create connection from sender
+   *
+   * @return {@link TSStatusCode#PIPESERVER_ERROR} if fail to connect; {@link
+   *     TSStatusCode#SUCCESS_STATUS} if success to connect.
+   */
   public TSStatus handshake(TSyncIdentityInfo identityInfo) {
     logger.debug("Invoke handshake method from client ip = {}", 
identityInfo.address);
     // check ip address
@@ -218,157 +203,114 @@ public class ReceiverManager {
     return ipAddressBinary.equals(ipSegmentBinary);
   }
 
-  public TSStatus transportData(TSyncTransportMetaInfo metaInfo, ByteBuffer 
buff, ByteBuffer digest)
-      throws TException {
+  /**
+   * Receive {@link PipeData} and load it into IoTDB Engine.
+   *
+   * @return {@link TSStatusCode#PIPESERVER_ERROR} if fail to receive or load; 
{@link
+   *     TSStatusCode#SUCCESS_STATUS} if load successfully.
+   * @throws TException The connection between the sender and the receiver has 
not been established
+   *     by {@link ReceiverManager#handshake(TSyncIdentityInfo)}
+   */
+  public TSStatus transportPipeData(ByteBuffer buff) throws TException {
+    // step1. check connection
     TSyncIdentityInfo identityInfo = getCurrentTSyncIdentityInfo();
     if (identityInfo == null) {
       throw new TException("Thrift connection is not alive.");
     }
-    logger.debug("Invoke transportData method from client ip = {}", 
identityInfo.address);
-
+    logger.debug("Invoke transportPipeData method from client ip = {}", 
identityInfo.address);
     String fileDir = SyncPathUtil.getFileDataDirPath(identityInfo);
-    TSyncTransportType type = metaInfo.type;
-    String fileName = metaInfo.fileName;
-    long startIndex = metaInfo.startIndex;
 
-    // Check file start index valid
-    if (type == TSyncTransportType.FILE) {
-      try {
-        CheckResult result = checkStartIndexValid(new File(fileDir, fileName), 
startIndex);
-        if (!result.isResult()) {
-          return RpcUtils.getStatus(TSStatusCode.SYNC_FILE_REBASE, 
result.getIndex());
-        }
-      } catch (IOException e) {
-        logger.error(e.getMessage());
-        return RpcUtils.getStatus(TSStatusCode.SYNC_FILE_ERROR, 
e.getMessage());
-      }
-    }
-
-    // Check buff digest
-    int pos = buff.position();
-    MessageDigest messageDigest = null;
+    // step2. deserialize PipeData
+    PipeData pipeData;
     try {
-      messageDigest = MessageDigest.getInstance("SHA-256");
-    } catch (NoSuchAlgorithmException e) {
-      logger.error(e.getMessage());
-      return RpcUtils.getStatus(TSStatusCode.SYNC_FILE_ERROR, e.getMessage());
-    }
-    messageDigest.update(buff);
-    byte[] digestBytes = new byte[digest.capacity()];
-    digest.get(digestBytes);
-    if (!Arrays.equals(messageDigest.digest(), digestBytes)) {
-      return RpcUtils.getStatus(TSStatusCode.SYNC_FILE_RETRY, "Data digest 
check error");
-    }
-
-    if (type != TSyncTransportType.FILE) {
-      buff.position(pos);
       int length = buff.capacity();
       byte[] byteArray = new byte[length];
       buff.get(byteArray);
-      try {
-        PipeData pipeData = PipeData.createPipeData(byteArray);
-        if (type == TSyncTransportType.TSFILE) {
-          // Do with file
-          handleTsFilePipeData((TsFilePipeData) pipeData, fileDir);
-        }
-        logger.info(
-            "Start load pipeData with serialize number {} and type 
{},value={}",
-            pipeData.getSerialNumber(),
-            pipeData.getType(),
-            pipeData);
-        pipeData.createLoader().load();
-        logger.info(
-            "Load pipeData with serialize number {} successfully.", 
pipeData.getSerialNumber());
-      } catch (IOException | IllegalPathException e) {
-        logger.error("Pipe data transport error, {}", e.getMessage());
-        return RpcUtils.getStatus(
-            TSStatusCode.SYNC_FILE_RETRY, "Data digest transport error " + 
e.getMessage());
-      } catch (PipeDataLoadException e) {
-        logger.error("Fail to load pipeData because {}.", e.getMessage());
-        return RpcUtils.getStatus(
-            TSStatusCode.SYNC_FILE_ERROR, "Fail to load pipeData because " + 
e.getMessage());
-      }
-    } else {
-      // Write buff to {file}.patch
-      buff.position(pos);
-      File file = new File(fileDir, fileName + PATCH_SUFFIX);
-      try (RandomAccessFile randomAccessFile = new RandomAccessFile(file, 
"rw")) {
-        randomAccessFile.seek(startIndex);
-        int length = buff.capacity();
-        byte[] byteArray = new byte[length];
-        buff.get(byteArray);
-        randomAccessFile.write(byteArray);
-        writeRecordFile(new File(fileDir, fileName + RECORD_SUFFIX), 
startIndex + length);
-        logger.debug(
-            "Sync "
-                + fileName
-                + " start at "
-                + startIndex
-                + " to "
-                + (startIndex + length)
-                + " is done.");
-      } catch (IOException e) {
-        logger.error(e.getMessage());
-        return RpcUtils.getStatus(TSStatusCode.SYNC_FILE_ERROR, 
e.getMessage());
+      pipeData = PipeData.createPipeData(byteArray);
+      if (pipeData instanceof TsFilePipeData) {
+        handleTsFilePipeData((TsFilePipeData) pipeData, fileDir);
       }
+    } catch (IOException | IllegalPathException e) {
+      logger.error("Pipe data transport error, {}", e.getMessage());
+      return RpcUtils.getStatus(
+          TSStatusCode.PIPESERVER_ERROR, "Pipe data transport error, " + 
e.getMessage());
     }
+
+    // step3. load PipeData
+    logger.info(
+        "Start load pipeData with serialize number {} and type {},value={}",
+        pipeData.getSerialNumber(),
+        pipeData.getType(),
+        pipeData);
+    try {
+      pipeData.createLoader().load();
+      logger.info(
+          "Load pipeData with serialize number {} successfully.", 
pipeData.getSerialNumber());
+    } catch (PipeDataLoadException e) {
+      logger.error("Fail to load pipeData because {}.", e.getMessage());
+      return RpcUtils.getStatus(
+          TSStatusCode.PIPESERVER_ERROR, "Fail to load pipeData because " + 
e.getMessage());
+    }
+
     return RpcUtils.getStatus(TSStatusCode.SUCCESS_STATUS, "");
   }
 
-  public TSStatus checkFileDigest(TSyncTransportMetaInfo metaInfo, ByteBuffer 
digest)
+  /**
+   * Receive TsFile based on startIndex.
+   *
+   * @return {@link TSStatusCode#SUCCESS_STATUS} if receive successfully; 
{@link
+   *     TSStatusCode#SYNC_FILE_REBASE} if startIndex needs to rollback 
because mismatched; {@link
+   *     TSStatusCode#SYNC_FILE_ERROR} if fail to receive file.
+   * @throws TException The connection between the sender and the receiver has 
not been established
+   *     by {@link ReceiverManager#handshake(TSyncIdentityInfo)}
+   */
+  public TSStatus transportFile(TSyncTransportMetaInfo metaInfo, ByteBuffer 
buff)
       throws TException {
+    // step1. check connection
     TSyncIdentityInfo identityInfo = getCurrentTSyncIdentityInfo();
     if (identityInfo == null) {
       throw new TException("Thrift connection is not alive.");
     }
-    logger.debug("Invoke checkFileDigest method from client ip = {}", 
identityInfo.address);
+    logger.debug("Invoke transportData method from client ip = {}", 
identityInfo.address);
+
     String fileDir = SyncPathUtil.getFileDataDirPath(identityInfo);
-    synchronized (fileDir.intern()) {
-      String fileName = metaInfo.fileName;
-      MessageDigest messageDigest = null;
-      try {
-        messageDigest = MessageDigest.getInstance("SHA-256");
-      } catch (NoSuchAlgorithmException e) {
-        logger.error(e.getMessage());
-        return RpcUtils.getStatus(TSStatusCode.PIPESERVER_ERROR, 
e.getMessage());
-      }
+    String fileName = metaInfo.fileName;
+    long startIndex = metaInfo.startIndex;
+    File file = new File(fileDir, fileName + SyncConstant.PATCH_SUFFIX);
 
-      try (InputStream inputStream =
-          new FileInputStream(new File(fileDir, fileName + PATCH_SUFFIX))) {
-        byte[] block = new byte[DATA_CHUNK_SIZE];
-        int length;
-        while ((length = inputStream.read(block)) > 0) {
-          messageDigest.update(block, 0, length);
-        }
-
-        String localDigest = (new BigInteger(1, 
messageDigest.digest())).toString(16);
-        byte[] digestBytes = new byte[digest.capacity()];
-        digest.get(digestBytes);
-        if (!Arrays.equals(messageDigest.digest(), digestBytes)) {
-          logger.error(
-              "The file {} digest check error. "
-                  + "The local digest is {} (should be equal to {}).",
-              fileName,
-              localDigest,
-              digest);
-          new File(fileDir, fileName + RECORD_SUFFIX).delete();
-          return RpcUtils.getStatus(TSStatusCode.PIPESERVER_ERROR, "File 
digest check error.");
-        }
-      } catch (IOException e) {
-        logger.error(e.getMessage());
-        return RpcUtils.getStatus(TSStatusCode.PIPESERVER_ERROR, 
e.getMessage());
+    // step2. check startIndex
+    try {
+      CheckResult result = checkStartIndexValid(new File(fileDir, fileName), 
startIndex);
+      if (!result.isResult()) {
+        return RpcUtils.getStatus(TSStatusCode.SYNC_FILE_REBASE, 
result.getIndex());
       }
+    } catch (IOException e) {
+      logger.error(e.getMessage());
+      return RpcUtils.getStatus(TSStatusCode.SYNC_FILE_ERROR, e.getMessage());
+    }
 
-      return RpcUtils.getStatus(TSStatusCode.SUCCESS_STATUS, "");
+    // step3. append file
+    try (RandomAccessFile randomAccessFile = new RandomAccessFile(file, "rw")) 
{
+      int length = buff.capacity();
+      randomAccessFile.seek(startIndex);
+      byte[] byteArray = new byte[length];
+      buff.get(byteArray);
+      randomAccessFile.write(byteArray);
+      recordStartIndex(new File(fileDir, fileName), startIndex + length);
+      logger.debug(
+          "Sync "
+              + fileName
+              + " start at "
+              + startIndex
+              + " to "
+              + (startIndex + length)
+              + " is done.");
+    } catch (IOException e) {
+      logger.error(e.getMessage());
+      return RpcUtils.getStatus(TSStatusCode.SYNC_FILE_ERROR, e.getMessage());
     }
-  }
 
-  private void writeRecordFile(File recordFile, long position) throws 
IOException {
-    File tmpFile = new File(recordFile.getAbsolutePath() + ".tmp");
-    FileWriter fileWriter = new FileWriter(tmpFile, false);
-    fileWriter.write(String.valueOf(position));
-    fileWriter.close();
-    Files.move(tmpFile.toPath(), recordFile.toPath(), 
StandardCopyOption.REPLACE_EXISTING);
+    return RpcUtils.getStatus(TSStatusCode.SUCCESS_STATUS, "");
   }
 
   /**
@@ -382,7 +324,9 @@ public class ReceiverManager {
     String tsFileName = tsFilePipeData.getTsFileName();
     File dir = new File(fileDir);
     File[] targetFiles =
-        dir.listFiles((dir1, name) -> name.startsWith(tsFileName) && 
name.endsWith(PATCH_SUFFIX));
+        dir.listFiles(
+            (dir1, name) ->
+                name.startsWith(tsFileName) && 
name.endsWith(SyncConstant.PATCH_SUFFIX));
     if (targetFiles != null) {
       for (File targetFile : targetFiles) {
         File newFile =
@@ -390,18 +334,12 @@ public class ReceiverManager {
                 dir,
                 targetFile
                     .getName()
-                    .substring(0, targetFile.getName().length() - 
PATCH_SUFFIX.length()));
+                    .substring(
+                        0, targetFile.getName().length() - 
SyncConstant.PATCH_SUFFIX.length()));
         targetFile.renameTo(newFile);
       }
     }
     tsFilePipeData.setParentDirPath(dir.getAbsolutePath());
-    File recordFile = new File(fileDir, tsFileName + RECORD_SUFFIX);
-    try {
-      Files.deleteIfExists(recordFile.toPath());
-    } catch (IOException e) {
-      logger.warn(
-          String.format("Delete record file %s error, because %s.", 
recordFile.getPath(), e));
-    }
   }
 
   // endregion
@@ -427,6 +365,22 @@ public class ReceiverManager {
     }
   }
 
+  /**
+   * Get current TSyncIdentityInfo
+   *
+   * @return startIndex of file: -1 if file doesn't exist
+   */
+  private long getCurrentFileStartIndex(String absolutePath) {
+    Long id = currentConnectionId.get();
+    if (id != null) {
+      Map<String, Long> map = connectionIdToStartIndexRecord.get(id);
+      if (map != null && map.containsKey(absolutePath)) {
+        return map.get(absolutePath);
+      }
+    }
+    return -1;
+  }
+
   private void createConnection(TSyncIdentityInfo identityInfo) {
     long connectionId = connectionIdGenerator.incrementAndGet();
     currentConnectionId.set(connectionId);
@@ -440,6 +394,7 @@ public class ReceiverManager {
     if (checkConnection()) {
       long id = currentConnectionId.get();
       connectionIdToIdentityInfoMap.remove(id);
+      connectionIdToIdentityInfoMap.remove(id);
       currentConnectionId.remove();
     }
   }
diff --git 
a/server/src/test/java/org/apache/iotdb/db/sync/receiver/manager/SyncInfoTest.java
 
b/server/src/test/java/org/apache/iotdb/db/sync/receiver/manager/SyncInfoTest.java
index fc68a92324..6ea46e968d 100644
--- 
a/server/src/test/java/org/apache/iotdb/db/sync/receiver/manager/SyncInfoTest.java
+++ 
b/server/src/test/java/org/apache/iotdb/db/sync/receiver/manager/SyncInfoTest.java
@@ -24,6 +24,7 @@ import org.apache.iotdb.db.qp.logical.Operator;
 import org.apache.iotdb.db.qp.physical.sys.CreatePipePlan;
 import org.apache.iotdb.db.qp.physical.sys.CreatePipeSinkPlan;
 import org.apache.iotdb.db.sync.common.SyncInfo;
+import org.apache.iotdb.db.sync.sender.pipe.PipeMessage;
 import org.apache.iotdb.db.utils.EnvironmentUtils;
 
 import org.junit.After;
diff --git 
a/server/src/test/java/org/apache/iotdb/db/sync/receiver/recovery/SyncLogTest.java
 
b/server/src/test/java/org/apache/iotdb/db/sync/receiver/recovery/SyncLogTest.java
index 0d53ecf0dc..26919b948e 100644
--- 
a/server/src/test/java/org/apache/iotdb/db/sync/receiver/recovery/SyncLogTest.java
+++ 
b/server/src/test/java/org/apache/iotdb/db/sync/receiver/recovery/SyncLogTest.java
@@ -25,9 +25,9 @@ import org.apache.iotdb.db.qp.physical.sys.CreatePipePlan;
 import org.apache.iotdb.db.qp.physical.sys.CreatePipeSinkPlan;
 import org.apache.iotdb.db.sync.common.persistence.SyncLogReader;
 import org.apache.iotdb.db.sync.common.persistence.SyncLogWriter;
-import org.apache.iotdb.db.sync.receiver.manager.PipeMessage;
 import org.apache.iotdb.db.sync.sender.pipe.Pipe.PipeStatus;
 import org.apache.iotdb.db.sync.sender.pipe.PipeInfo;
+import org.apache.iotdb.db.sync.sender.pipe.PipeMessage;
 import org.apache.iotdb.db.sync.sender.pipe.PipeSink;
 import org.apache.iotdb.db.utils.EnvironmentUtils;
 
diff --git 
a/server/src/test/java/org/apache/iotdb/db/sync/transport/SyncTransportTest.java
 
b/server/src/test/java/org/apache/iotdb/db/sync/transport/SyncTransportTest.java
index afb932cf27..d84a2933e6 100644
--- 
a/server/src/test/java/org/apache/iotdb/db/sync/transport/SyncTransportTest.java
+++ 
b/server/src/test/java/org/apache/iotdb/db/sync/transport/SyncTransportTest.java
@@ -19,8 +19,11 @@
  */
 package org.apache.iotdb.db.sync.transport;
 
+import org.apache.iotdb.common.rpc.thrift.TSStatus;
 import org.apache.iotdb.commons.path.PartialPath;
+import org.apache.iotdb.commons.sync.SyncConstant;
 import org.apache.iotdb.commons.sync.SyncPathUtil;
+import org.apache.iotdb.db.conf.IoTDBConfig;
 import org.apache.iotdb.db.conf.IoTDBDescriptor;
 import org.apache.iotdb.db.engine.modification.Deletion;
 import org.apache.iotdb.db.qp.physical.sys.CreateTimeSeriesPlan;
@@ -33,6 +36,12 @@ import org.apache.iotdb.db.sync.sender.pipe.Pipe;
 import org.apache.iotdb.db.sync.sender.pipe.TsFilePipe;
 import org.apache.iotdb.db.sync.transport.client.IoTDBSinkTransportClient;
 import org.apache.iotdb.db.utils.EnvironmentUtils;
+import org.apache.iotdb.rpc.RpcTransportFactory;
+import org.apache.iotdb.rpc.TConfigurationConst;
+import org.apache.iotdb.rpc.TSStatusCode;
+import org.apache.iotdb.service.rpc.thrift.IClientRPCService;
+import org.apache.iotdb.service.rpc.thrift.TSyncIdentityInfo;
+import org.apache.iotdb.service.rpc.thrift.TSyncTransportMetaInfo;
 import org.apache.iotdb.session.Session;
 import org.apache.iotdb.session.SessionDataSet;
 import org.apache.iotdb.session.util.Version;
@@ -42,17 +51,27 @@ import org.apache.iotdb.tsfile.read.common.RowRecord;
 import org.apache.iotdb.tsfile.write.schema.MeasurementSchema;
 
 import org.apache.commons.io.FileUtils;
+import org.apache.thrift.TException;
+import org.apache.thrift.protocol.TBinaryProtocol;
+import org.apache.thrift.protocol.TCompactProtocol;
+import org.apache.thrift.protocol.TProtocol;
+import org.apache.thrift.transport.TSocket;
+import org.apache.thrift.transport.TTransport;
 import org.junit.After;
 import org.junit.Assert;
 import org.junit.Before;
 import org.junit.Test;
 
 import java.io.File;
+import java.io.RandomAccessFile;
+import java.nio.ByteBuffer;
 import java.util.ArrayList;
 import java.util.Collections;
 import java.util.List;
 
 public class SyncTransportTest {
+
+  private static IoTDBConfig config = 
IoTDBDescriptor.getInstance().getConfig();
   /** create tsfile and move to tmpDir for sync test */
   File tmpDir = new File("target/synctest");
 
@@ -61,6 +80,10 @@ public class SyncTransportTest {
   long createdTime1 = System.currentTimeMillis();
   File fileDir;
 
+  File tsfile;
+  File resourceFile;
+  File modsFile;
+
   @Before
   public void setUp() throws Exception {
     EnvironmentUtils.envSetUp();
@@ -83,6 +106,19 @@ public class SyncTransportTest {
       FileUtils.deleteDirectory(tmpDir);
     }
     FileUtils.moveDirectory(srcDir, tmpDir);
+    tsfile = null;
+    resourceFile = null;
+    modsFile = null;
+    File[] fileList = tmpDir.listFiles();
+    for (File f : fileList) {
+      if (f.getName().endsWith(".tsfile")) {
+        tsfile = f;
+      } else if (f.getName().endsWith(".mods")) {
+        modsFile = f;
+      } else if (f.getName().endsWith(".resource")) {
+        resourceFile = f;
+      }
+    }
     EnvironmentUtils.cleanEnv();
     EnvironmentUtils.envSetUp();
   }
@@ -94,21 +130,130 @@ public class SyncTransportTest {
   }
 
   @Test
-  public void test() throws Exception {
-    // 1. prepare fake file
-    File[] fileList = tmpDir.listFiles();
-    File tsfile = null;
-    File resourceFile = null;
-    File modsFile = null;
-    for (File f : fileList) {
-      if (f.getName().endsWith(".tsfile")) {
-        tsfile = f;
-      } else if (f.getName().endsWith(".mods")) {
-        modsFile = f;
-      } else if (f.getName().endsWith(".resource")) {
-        resourceFile = f;
+  public void testTransportFile() throws Exception {
+    TSyncIdentityInfo identityInfo =
+        new TSyncIdentityInfo("127.0.0.1", pipeName1, createdTime1, 
config.getIoTDBVersion());
+    try (TTransport transport =
+        RpcTransportFactory.INSTANCE.getTransport(
+            new TSocket(
+                TConfigurationConst.defaultTConfiguration,
+                "127.0.0.1",
+                6667,
+                SyncConstant.SOCKET_TIMEOUT_MILLISECONDS,
+                SyncConstant.CONNECT_TIMEOUT_MILLISECONDS))) {
+      TProtocol protocol;
+      if (config.isRpcThriftCompressionEnable()) {
+        protocol = new TCompactProtocol(transport);
+      } else {
+        protocol = new TBinaryProtocol(transport);
+      }
+      IClientRPCService.Client serviceClient = new 
IClientRPCService.Client(protocol);
+      // Underlay socket open.
+      if (!transport.isOpen()) {
+        transport.open();
+      }
+      byte[] buffer = new byte[10];
+      try (RandomAccessFile randomAccessFile = new RandomAccessFile(tsfile, 
"rw")) {
+        // no handshake, response TException
+        try {
+          serviceClient.sendFile(
+              new TSyncTransportMetaInfo(tsfile.getName(), 0), 
ByteBuffer.wrap(buffer));
+          Assert.fail();
+        } catch (TException e) {
+          // do nothing
+        }
+        serviceClient.handshake(identityInfo);
+        // response REBASE:0
+        randomAccessFile.read(buffer, 0, 10);
+        TSStatus tsStatus1 =
+            serviceClient.sendFile(
+                new TSyncTransportMetaInfo(tsfile.getName(), 1), 
ByteBuffer.wrap(buffer));
+        Assert.assertEquals(tsStatus1.getCode(), 
TSStatusCode.SYNC_FILE_REBASE.getStatusCode());
+        Assert.assertEquals(tsStatus1.getMessage(), "0");
+        // response SUCCESS
+        TSStatus tsStatus2 =
+            serviceClient.sendFile(
+                new TSyncTransportMetaInfo(tsfile.getName(), 0), 
ByteBuffer.wrap(buffer));
+        Assert.assertEquals(tsStatus2.getCode(), 
TSStatusCode.SUCCESS_STATUS.getStatusCode());
+        // response response REBASE:10
+        TSStatus tsStatus3 =
+            serviceClient.sendFile(
+                new TSyncTransportMetaInfo(tsfile.getName(), 0), 
ByteBuffer.wrap(buffer));
+        Assert.assertEquals(tsStatus3.getCode(), 
TSStatusCode.SYNC_FILE_REBASE.getStatusCode());
+        Assert.assertEquals(tsStatus3.getMessage(), "10");
+        TSStatus tsStatus4 =
+            serviceClient.sendFile(
+                new TSyncTransportMetaInfo(tsfile.getName(), 100), 
ByteBuffer.wrap(buffer));
+        Assert.assertEquals(tsStatus4.getCode(), 
TSStatusCode.SYNC_FILE_REBASE.getStatusCode());
+        Assert.assertEquals(tsStatus4.getMessage(), "10");
+        // response SUCCESS
+        byte[] remainBuffer = new byte[(int) (randomAccessFile.length() - 10)];
+        randomAccessFile.read(remainBuffer, 0, (int) 
(randomAccessFile.length() - 10));
+        TSStatus tsStatus5 =
+            serviceClient.sendFile(
+                new TSyncTransportMetaInfo(tsfile.getName(), 10), 
ByteBuffer.wrap(remainBuffer));
+        Assert.assertEquals(tsStatus5.getCode(), 
TSStatusCode.SUCCESS_STATUS.getStatusCode());
       }
     }
+    // check completeness of file
+    File receiveFile =
+        new File(
+            SyncPathUtil.getFileDataDirPath(identityInfo),
+            tsfile.getName() + SyncConstant.PATCH_SUFFIX);
+    Assert.assertTrue(receiveFile.exists());
+
+    try (RandomAccessFile originFileRAF = new RandomAccessFile(tsfile, "r");
+        RandomAccessFile receiveFileRAF = new RandomAccessFile(receiveFile, 
"r")) {
+      Assert.assertEquals(originFileRAF.length(), receiveFileRAF.length());
+      byte[] buffer1 = new byte[(int) originFileRAF.length()];
+      byte[] buffer2 = new byte[(int) receiveFile.length()];
+      originFileRAF.read(buffer1);
+      receiveFileRAF.read(buffer2);
+      Assert.assertArrayEquals(buffer1, buffer2);
+    }
+  }
+
+  @Test
+  public void testTransportPipeData() throws Exception {
+    try (TTransport transport =
+        RpcTransportFactory.INSTANCE.getTransport(
+            new TSocket(
+                TConfigurationConst.defaultTConfiguration,
+                "127.0.0.1",
+                6667,
+                SyncConstant.SOCKET_TIMEOUT_MILLISECONDS,
+                SyncConstant.CONNECT_TIMEOUT_MILLISECONDS))) {
+      TProtocol protocol;
+      if (config.isRpcThriftCompressionEnable()) {
+        protocol = new TCompactProtocol(transport);
+      } else {
+        protocol = new TBinaryProtocol(transport);
+      }
+      IClientRPCService.Client serviceClient = new 
IClientRPCService.Client(protocol);
+      // Underlay socket open.
+      if (!transport.isOpen()) {
+        transport.open();
+      }
+      PipeData pipeData =
+          new SchemaPipeData(new SetStorageGroupPlan(new 
PartialPath("root.sg1")), 0);
+      byte[] buffer = pipeData.serialize();
+      ByteBuffer buffToSend = ByteBuffer.wrap(buffer);
+      try {
+        TSStatus tsStatus = serviceClient.sendPipeData(buffToSend);
+        Assert.fail();
+      } catch (TException e) {
+        // do nothing
+      }
+      serviceClient.handshake(
+          new TSyncIdentityInfo("127.0.0.1", pipeName1, createdTime1, 
config.getIoTDBVersion()));
+      TSStatus tsStatus = serviceClient.sendPipeData(buffToSend);
+      Assert.assertEquals(tsStatus.getCode(), 
TSStatusCode.SUCCESS_STATUS.getStatusCode());
+    }
+  }
+
+  @Test
+  public void testTransportClient() throws Exception {
+    // 1. prepare fake file
     Assert.assertNotNull(tsfile);
     Assert.assertNotNull(modsFile);
     Assert.assertNotNull(resourceFile);
@@ -136,7 +281,7 @@ public class SyncTransportTest {
             pipe, "127.0.0.1", 
IoTDBDescriptor.getInstance().getConfig().getRpcPort(), "127.0.0.1");
     client.handshake();
     for (PipeData pipeData : pipeDataList) {
-      client.senderTransport(pipeData);
+      client.send(pipeData);
     }
 
     // 4. check result
diff --git a/service-rpc/src/main/java/org/apache/iotdb/rpc/TSStatusCode.java 
b/service-rpc/src/main/java/org/apache/iotdb/rpc/TSStatusCode.java
index 2eb1ef7a52..b7d095e5c7 100644
--- a/service-rpc/src/main/java/org/apache/iotdb/rpc/TSStatusCode.java
+++ b/service-rpc/src/main/java/org/apache/iotdb/rpc/TSStatusCode.java
@@ -71,8 +71,7 @@ public enum TSStatusCode {
   TEMPLATE_NOT_EXIST(339),
   CREATE_TEMPLATE_ERROR(340),
   SYNC_FILE_REBASE(341),
-  SYNC_FILE_RETRY(342),
-  SYNC_FILE_ERROR(343),
+  SYNC_FILE_ERROR(342),
 
   EXECUTE_STATEMENT_ERROR(400),
   SQL_PARSE_ERROR(401),
diff --git a/thrift/src/main/thrift/client.thrift 
b/thrift/src/main/thrift/client.thrift
index 014bda77dd..7405beae93 100644
--- a/thrift/src/main/thrift/client.thrift
+++ b/thrift/src/main/thrift/client.thrift
@@ -420,20 +420,11 @@ struct TSyncIdentityInfo{
   4:required string version
 }
 
-enum TSyncTransportType {
-  TSFILE,
-  DELETION,
-  PHYSICALPLAN,
-  FILE
-}
-
 struct TSyncTransportMetaInfo{
-  // The type of the pipeData in sending.
-  1:required TSyncTransportType type
   // The name of the file in sending.
-  2:required string fileName
+  1:required string fileName
   // The start index of the file slice in sending.
-  3:required i64 startIndex
+  2:required i64 startIndex
 }
 
 service IClientRPCService {
@@ -529,7 +520,7 @@ service IClientRPCService {
 
   common.TSStatus handshake(TSyncIdentityInfo info);
 
-  common.TSStatus transportData(1:TSyncTransportMetaInfo metaInfo, 2:binary 
buff, 3:binary digest);
+  common.TSStatus sendPipeData(1:binary buff);
 
-  common.TSStatus checkFileDigest(1:TSyncTransportMetaInfo metaInfo, 2:binary 
digest);
+  common.TSStatus sendFile(1:TSyncTransportMetaInfo metaInfo, 2:binary buff);
 }
\ No newline at end of file

Reply via email to