This is an automated email from the ASF dual-hosted git repository.

rong pushed a commit to branch remove-sync-entry
in repository https://gitbox.apache.org/repos/asf/iotdb.git

commit f76c722e2e5e8ac971c0584850e9e1aa43584b3c
Author: Steve Yurong Su <[email protected]>
AuthorDate: Sat Jun 17 04:20:24 2023 +0800

    refactor
---
 .../org/apache/iotdb/db/audit/AuditLogger.java     |   2 +-
 .../config/constant/PipeConnectorConstant.java     |   3 +
 .../pipe/connector/legacy/IoTDBSyncConnector.java  |  20 +-
 .../pipe/connector/legacy/IoTDBSyncReceiver.java   | 395 ++++++++-----------
 .../legacy/exception/SyncDataLoadException.java    |  28 --
 .../{pipedata/load => loader}/DeletionLoader.java  |  16 +-
 .../legacy/{pipedata/load => loader}/ILoader.java  |   2 +-
 .../{pipedata/load => loader}/TsFileLoader.java    |  13 +-
 .../legacy/pipedata/DeletionPipeData.java          |   4 +-
 .../pipe/connector/legacy/pipedata/PipeData.java   |  11 +-
 .../connector/legacy/pipedata/TsFilePipeData.java  |   4 +-
 .../pipedata/queue/BufferedPipeDataQueue.java      | 426 ---------------------
 .../legacy/pipedata/queue/PipeDataQueue.java       |  41 --
 .../pipe/connector/legacy/utils/SyncConstant.java  |  24 --
 .../pipe/connector/legacy/utils/SyncPathUtil.java  |  17 -
 15 files changed, 205 insertions(+), 801 deletions(-)

diff --git a/server/src/main/java/org/apache/iotdb/db/audit/AuditLogger.java 
b/server/src/main/java/org/apache/iotdb/db/audit/AuditLogger.java
index ab7df92e591..aaeb3fde212 100644
--- a/server/src/main/java/org/apache/iotdb/db/audit/AuditLogger.java
+++ b/server/src/main/java/org/apache/iotdb/db/audit/AuditLogger.java
@@ -43,7 +43,7 @@ import org.slf4j.LoggerFactory;
 
 import java.util.List;
 
-import static 
org.apache.iotdb.db.pipe.connector.legacy.pipedata.load.ILoader.SCHEMA_FETCHER;
+import static 
org.apache.iotdb.db.pipe.connector.legacy.loader.ILoader.SCHEMA_FETCHER;
 
 public class AuditLogger {
   private static final Logger logger = 
LoggerFactory.getLogger(AuditLogger.class);
diff --git 
a/server/src/main/java/org/apache/iotdb/db/pipe/config/constant/PipeConnectorConstant.java
 
b/server/src/main/java/org/apache/iotdb/db/pipe/config/constant/PipeConnectorConstant.java
index d0b9002d2d4..24bd5a51e75 100644
--- 
a/server/src/main/java/org/apache/iotdb/db/pipe/config/constant/PipeConnectorConstant.java
+++ 
b/server/src/main/java/org/apache/iotdb/db/pipe/config/constant/PipeConnectorConstant.java
@@ -33,6 +33,9 @@ public class PipeConnectorConstant {
   public static final String CONNECTOR_IOTDB_PASSWORD_KEY = 
"connector.password";
   public static final String CONNECTOR_IOTDB_PASSWORD_DEFAULT_VALUE = "root";
 
+  public static final String CONNECTOR_IOTDB_SYNC_CONNECTOR_VERSION_KEY = 
"connector.version";
+  public static final String 
CONNECTOR_IOTDB_SYNC_CONNECTOR_VERSION_DEFAULT_VALUE = "1.1";
+
   private PipeConnectorConstant() {
     throw new IllegalStateException("Utility class");
   }
diff --git 
a/server/src/main/java/org/apache/iotdb/db/pipe/connector/legacy/IoTDBSyncConnector.java
 
b/server/src/main/java/org/apache/iotdb/db/pipe/connector/legacy/IoTDBSyncConnector.java
index a7ebbecff51..b4f48208c72 100644
--- 
a/server/src/main/java/org/apache/iotdb/db/pipe/connector/legacy/IoTDBSyncConnector.java
+++ 
b/server/src/main/java/org/apache/iotdb/db/pipe/connector/legacy/IoTDBSyncConnector.java
@@ -61,6 +61,8 @@ import static 
org.apache.iotdb.db.pipe.config.constant.PipeConnectorConstant.CON
 import static 
org.apache.iotdb.db.pipe.config.constant.PipeConnectorConstant.CONNECTOR_IOTDB_PASSWORD_DEFAULT_VALUE;
 import static 
org.apache.iotdb.db.pipe.config.constant.PipeConnectorConstant.CONNECTOR_IOTDB_PASSWORD_KEY;
 import static 
org.apache.iotdb.db.pipe.config.constant.PipeConnectorConstant.CONNECTOR_IOTDB_PORT_KEY;
+import static 
org.apache.iotdb.db.pipe.config.constant.PipeConnectorConstant.CONNECTOR_IOTDB_SYNC_CONNECTOR_VERSION_DEFAULT_VALUE;
+import static 
org.apache.iotdb.db.pipe.config.constant.PipeConnectorConstant.CONNECTOR_IOTDB_SYNC_CONNECTOR_VERSION_KEY;
 import static 
org.apache.iotdb.db.pipe.config.constant.PipeConnectorConstant.CONNECTOR_IOTDB_USER_DEFAULT_VALUE;
 import static 
org.apache.iotdb.db.pipe.config.constant.PipeConnectorConstant.CONNECTOR_IOTDB_USER_KEY;
 
@@ -69,7 +71,6 @@ public class IoTDBSyncConnector implements PipeConnector {
   private static final Logger LOGGER = 
LoggerFactory.getLogger(IoTDBSyncConnector.class);
 
   private static final CommonConfig COMMON_CONFIG = 
CommonDescriptor.getInstance().getConfig();
-  public static final String IOTDB_SYNC_CONNECTOR_VERSION = "1.1";
 
   private String ipAddress;
   private int port;
@@ -77,6 +78,8 @@ public class IoTDBSyncConnector implements PipeConnector {
   private String user;
   private String password;
 
+  private String syncConnectorVersion;
+
   private String pipeName;
   private Long creationTime;
 
@@ -94,15 +97,20 @@ public class IoTDBSyncConnector implements PipeConnector {
   @Override
   public void customize(PipeParameters parameters, 
PipeConnectorRuntimeConfiguration configuration)
       throws Exception {
-    this.ipAddress = parameters.getString(CONNECTOR_IOTDB_IP_KEY);
-    this.port = parameters.getInt(CONNECTOR_IOTDB_PORT_KEY);
+    ipAddress = parameters.getString(CONNECTOR_IOTDB_IP_KEY);
+    port = parameters.getInt(CONNECTOR_IOTDB_PORT_KEY);
 
-    this.user =
+    user =
         parameters.getStringOrDefault(CONNECTOR_IOTDB_USER_KEY, 
CONNECTOR_IOTDB_USER_DEFAULT_VALUE);
-    this.password =
+    password =
         parameters.getStringOrDefault(
             CONNECTOR_IOTDB_PASSWORD_KEY, 
CONNECTOR_IOTDB_PASSWORD_DEFAULT_VALUE);
 
+    syncConnectorVersion =
+        parameters.getStringOrDefault(
+            CONNECTOR_IOTDB_SYNC_CONNECTOR_VERSION_KEY,
+            CONNECTOR_IOTDB_SYNC_CONNECTOR_VERSION_DEFAULT_VALUE);
+
     pipeName = configuration.getRuntimeEnvironment().getPipeName();
     creationTime = configuration.getRuntimeEnvironment().getCreationTime();
   }
@@ -123,7 +131,7 @@ public class IoTDBSyncConnector implements PipeConnector {
     try {
       final TSyncIdentityInfo identityInfo =
           new TSyncIdentityInfo(
-              pipeName, creationTime, IOTDB_SYNC_CONNECTOR_VERSION, 
IoTDBConstant.PATH_ROOT);
+              pipeName, creationTime, syncConnectorVersion, 
IoTDBConstant.PATH_ROOT);
       final TSStatus status = client.handshake(identityInfo);
       if (status.code != TSStatusCode.SUCCESS_STATUS.getStatusCode()) {
         String errorMsg =
diff --git 
a/server/src/main/java/org/apache/iotdb/db/pipe/connector/legacy/IoTDBSyncReceiver.java
 
b/server/src/main/java/org/apache/iotdb/db/pipe/connector/legacy/IoTDBSyncReceiver.java
index 79480a4ea2c..5189a3635ba 100644
--- 
a/server/src/main/java/org/apache/iotdb/db/pipe/connector/legacy/IoTDBSyncReceiver.java
+++ 
b/server/src/main/java/org/apache/iotdb/db/pipe/connector/legacy/IoTDBSyncReceiver.java
@@ -22,20 +22,19 @@ package org.apache.iotdb.db.pipe.connector.legacy;
 import org.apache.iotdb.common.rpc.thrift.TSStatus;
 import org.apache.iotdb.commons.exception.IllegalPathException;
 import org.apache.iotdb.commons.path.PartialPath;
-import org.apache.iotdb.db.conf.IoTDBConfig;
 import org.apache.iotdb.db.conf.IoTDBDescriptor;
 import org.apache.iotdb.db.mpp.plan.Coordinator;
 import org.apache.iotdb.db.mpp.plan.analyze.IPartitionFetcher;
 import org.apache.iotdb.db.mpp.plan.analyze.schema.ISchemaFetcher;
 import org.apache.iotdb.db.mpp.plan.execution.ExecutionResult;
 import org.apache.iotdb.db.mpp.plan.statement.metadata.DatabaseSchemaStatement;
-import 
org.apache.iotdb.db.pipe.connector.legacy.exception.SyncDataLoadException;
 import org.apache.iotdb.db.pipe.connector.legacy.pipedata.PipeData;
 import org.apache.iotdb.db.pipe.connector.legacy.pipedata.TsFilePipeData;
 import org.apache.iotdb.db.pipe.connector.legacy.transport.SyncIdentityInfo;
 import org.apache.iotdb.db.pipe.connector.legacy.utils.SyncConstant;
 import org.apache.iotdb.db.pipe.connector.legacy.utils.SyncPathUtil;
 import org.apache.iotdb.db.query.control.SessionManager;
+import org.apache.iotdb.pipe.api.exception.PipeException;
 import org.apache.iotdb.rpc.RpcUtils;
 import org.apache.iotdb.rpc.TSStatusCode;
 import org.apache.iotdb.service.rpc.thrift.TSyncIdentityInfo;
@@ -50,9 +49,6 @@ import java.io.File;
 import java.io.IOException;
 import java.io.RandomAccessFile;
 import java.nio.ByteBuffer;
-import java.text.DecimalFormat;
-import java.util.ArrayList;
-import java.util.List;
 import java.util.Map;
 import java.util.concurrent.ConcurrentHashMap;
 import java.util.concurrent.atomic.AtomicLong;
@@ -60,96 +56,38 @@ import java.util.concurrent.atomic.AtomicLong;
 /** This class is responsible for implementing the RPC processing on the 
receiver-side. */
 public class IoTDBSyncReceiver {
 
-  public static IoTDBSyncReceiver getInstance() {
-    return IoTDBSyncReceiverHolder.INSTANCE;
-  }
-
-  private static class IoTDBSyncReceiverHolder {
-    private static final IoTDBSyncReceiver INSTANCE = new IoTDBSyncReceiver();
-  }
-
-  private static final Logger logger = 
LoggerFactory.getLogger(IoTDBSyncReceiver.class);
-
-  private final IoTDBConfig config = IoTDBDescriptor.getInstance().getConfig();
+  private static final Logger LOGGER = 
LoggerFactory.getLogger(IoTDBSyncReceiver.class);
 
   // When the client abnormally exits, we can still know who to disconnect
-  private final ThreadLocal<Long> currentConnectionId;
-  // Record the remote message for every rpc connection
-  private final Map<Long, SyncIdentityInfo> connectionIdToIdentityInfoMap;
-  // Record the remote message for every rpc connection
-  private final Map<Long, Map<String, Long>> connectionIdToStartIndexRecord;
-  private final Map<String, String> registeredDatabase;
-
-  // The sync connectionId is unique in one IoTDB instance.
-  private final AtomicLong connectionIdGenerator;
-
-  private IoTDBSyncReceiver() {
-    currentConnectionId = new ThreadLocal<>();
-    connectionIdToIdentityInfoMap = new ConcurrentHashMap<>();
-    connectionIdToStartIndexRecord = new ConcurrentHashMap<>();
-    registeredDatabase = new ConcurrentHashMap<>();
-    connectionIdGenerator = new AtomicLong();
-  }
+  private final ThreadLocal<Long> currentConnectionId = new ThreadLocal<>();
 
-  // region Interfaces and Implementation of Index Checker
-
-  private class CheckResult {
-    boolean result;
-    String index;
+  // Record the remote message for every rpc connection
+  private final Map<Long, SyncIdentityInfo> connectionIdToIdentityInfoMap =
+      new ConcurrentHashMap<>();
 
-    public CheckResult(boolean result, String index) {
-      this.result = result;
-      this.index = index;
-    }
+  // Record the remote message for every rpc connection
+  private final Map<Long, Map<String, Long>> connectionIdToStartIndexRecord =
+      new ConcurrentHashMap<>();
 
-    public boolean isResult() {
-      return result;
-    }
+  private final Map<String, String> registeredDatabase = new 
ConcurrentHashMap<>();
 
-    public String getIndex() {
-      return index;
-    }
-  }
+  // The sync connectionId is unique in one IoTDB instance.
+  private final AtomicLong connectionIdGenerator = new AtomicLong();
 
-  private CheckResult checkStartIndexValid(File file, long startIndex) throws 
IOException {
-    // get local index from memory map
-    long localIndex = getCurrentFileStartIndex(file.getAbsolutePath());
-    // get local index from file
-    if (localIndex < 0 && file.exists()) {
-      localIndex = file.length();
-      recordStartIndex(file, localIndex);
-    }
-    // compare and check
-    if (localIndex < 0 && startIndex != 0) {
-      logger.error(
-          "The start index {} of data sync is not valid. "
-              + "The file is not exist and start index should equal to 0).",
-          startIndex);
-      return new CheckResult(false, "0");
-    } else if (localIndex >= 0 && localIndex != startIndex) {
-      logger.error(
-          "The start index {} of data sync is not valid. "
-              + "The start index of the file should equal to {}.",
-          startIndex,
-          localIndex);
-      return new CheckResult(false, String.valueOf(localIndex));
-    }
-    return new CheckResult(true, "0");
-  }
+  ////////////// Interfaces and Implementation of RPC Handler 
////////////////////////
 
-  private void recordStartIndex(File file, long position) {
-    Long id = currentConnectionId.get();
-    if (id != null) {
-      Map<String, Long> map =
-          connectionIdToStartIndexRecord.computeIfAbsent(id, i -> new 
ConcurrentHashMap<>());
-      map.put(file.getAbsolutePath(), position);
+  /**
+   * release resources or cleanup when a client (a sender) is disconnected 
(normally or abnormally).
+   */
+  public void handleClientExit() {
+    if (currentConnectionId.get() != null) {
+      long id = currentConnectionId.get();
+      connectionIdToIdentityInfoMap.remove(id);
+      connectionIdToStartIndexRecord.remove(id);
+      currentConnectionId.remove();
     }
   }
 
-  // endregion
-
-  // region Interfaces and Implementation of RPC Handler
-
   /**
    * Create connection from sender
    *
@@ -162,15 +100,7 @@ public class IoTDBSyncReceiver {
       IPartitionFetcher partitionFetcher,
       ISchemaFetcher schemaFetcher) {
     SyncIdentityInfo identityInfo = new SyncIdentityInfo(tIdentityInfo, 
remoteAddress);
-    logger.info("Invoke handshake method from client ip = {}", 
identityInfo.getRemoteAddress());
-    // Version check
-    if 
(!config.getIoTDBMajorVersion(identityInfo.version).equals(config.getIoTDBMajorVersion()))
 {
-      return RpcUtils.getStatus(
-          TSStatusCode.PIPESERVER_ERROR,
-          String.format(
-              "version mismatch: the sender <%s>, the receiver <%s>",
-              identityInfo.version, config.getIoTDBVersion()));
-    }
+    LOGGER.info("Invoke handshake method from client ip = {}", 
identityInfo.getRemoteAddress());
 
     if (!new File(SyncPathUtil.getFileDataDirPath(identityInfo)).exists()) {
       new File(SyncPathUtil.getFileDataDirPath(identityInfo)).mkdirs();
@@ -186,44 +116,45 @@ public class IoTDBSyncReceiver {
     return RpcUtils.getStatus(TSStatusCode.SUCCESS_STATUS, "");
   }
 
-  /**
-   * Verify IP address with IP white list which contains more than one IP 
segment. It's used by sync
-   * sender.
-   */
-  private boolean verifyIPSegment(String ipWhiteList, String ipAddress) {
-    String[] ipSegments = ipWhiteList.split(",");
-    for (String IPsegment : ipSegments) {
-      int subnetMask = 
Integer.parseInt(IPsegment.substring(IPsegment.indexOf('/') + 1));
-      IPsegment = IPsegment.substring(0, IPsegment.indexOf('/'));
-      if (verifyIP(IPsegment, ipAddress, subnetMask)) {
-        return true;
-      }
-    }
-    return false;
+  private void createConnection(SyncIdentityInfo identityInfo) {
+    long connectionId = connectionIdGenerator.incrementAndGet();
+    currentConnectionId.set(connectionId);
+    connectionIdToIdentityInfoMap.put(connectionId, identityInfo);
   }
 
-  /** Verify IP address with IP segment. */
-  private boolean verifyIP(String ipSegment, String ipAddress, int subnetMark) 
{
-    String ipSegmentBinary;
-    String ipAddressBinary;
-    String[] ipSplits = ipSegment.split(SyncConstant.IP_SEPARATOR);
-    DecimalFormat df = new DecimalFormat("00000000");
-    StringBuilder ipSegmentBuilder = new StringBuilder();
-    for (String IPsplit : ipSplits) {
-      ipSegmentBuilder.append(
-          
df.format(Integer.parseInt(Integer.toBinaryString(Integer.parseInt(IPsplit)))));
+  private boolean registerDatabase(
+      String database, IPartitionFetcher partitionFetcher, ISchemaFetcher 
schemaFetcher) {
+    if (registeredDatabase.containsKey(database)) {
+      return true;
     }
-    ipSegmentBinary = ipSegmentBuilder.toString();
-    ipSegmentBinary = ipSegmentBinary.substring(0, subnetMark);
-    ipSplits = ipAddress.split(SyncConstant.IP_SEPARATOR);
-    StringBuilder ipAddressBuilder = new StringBuilder();
-    for (String IPsplit : ipSplits) {
-      ipAddressBuilder.append(
-          
df.format(Integer.parseInt(Integer.toBinaryString(Integer.parseInt(IPsplit)))));
+    try {
+      DatabaseSchemaStatement statement =
+          new 
DatabaseSchemaStatement(DatabaseSchemaStatement.DatabaseSchemaStatementType.CREATE);
+      statement.setDatabasePath(new PartialPath(database));
+      long queryId = SessionManager.getInstance().requestQueryId();
+      ExecutionResult result =
+          Coordinator.getInstance()
+              .execute(
+                  statement,
+                  queryId,
+                  null,
+                  "",
+                  partitionFetcher,
+                  schemaFetcher,
+                  
IoTDBDescriptor.getInstance().getConfig().getQueryTimeoutThreshold());
+      if (result.status.code != TSStatusCode.SUCCESS_STATUS.getStatusCode()
+          && result.status.code != 
TSStatusCode.DATABASE_ALREADY_EXISTS.getStatusCode()) {
+        LOGGER.error("Create Database error, statement: {}.", statement);
+        LOGGER.error("Create database result status : {}.", result.status);
+        return false;
+      }
+    } catch (IllegalPathException e) {
+      LOGGER.error(String.format("Parse database PartialPath %s error", 
database), e);
+      return false;
     }
-    ipAddressBinary = ipAddressBuilder.toString();
-    ipAddressBinary = ipAddressBinary.substring(0, subnetMark);
-    return ipAddressBinary.equals(ipSegmentBinary);
+
+    registeredDatabase.put(database, "");
+    return true;
   }
 
   /**
@@ -240,7 +171,7 @@ public class IoTDBSyncReceiver {
     if (identityInfo == null) {
       throw new TException("Thrift connection is not alive.");
     }
-    logger.debug(
+    LOGGER.debug(
         "Invoke transportPipeData method from client ip = {}", 
identityInfo.getRemoteAddress());
     String fileDir = SyncPathUtil.getFileDataDirPath(identityInfo);
 
@@ -257,23 +188,23 @@ public class IoTDBSyncReceiver {
         handleTsFilePipeData(tsFilePipeData, fileDir);
       }
     } catch (IOException | IllegalPathException e) {
-      logger.error("Pipe data transport error, {}", e.getMessage());
+      LOGGER.error("Pipe data transport error, {}", e.getMessage());
       return RpcUtils.getStatus(
           TSStatusCode.PIPESERVER_ERROR, "Pipe data transport error, " + 
e.getMessage());
     }
 
     // step3. load PipeData
-    logger.info(
+    LOGGER.info(
         "Start load pipeData with serialize number {} and type {},value={}",
         pipeData.getSerialNumber(),
         pipeData.getPipeDataType(),
         pipeData);
     try {
       pipeData.createLoader().load();
-      logger.info(
+      LOGGER.info(
           "Load pipeData with serialize number {} successfully.", 
pipeData.getSerialNumber());
-    } catch (SyncDataLoadException e) {
-      logger.error("Fail to load pipeData because {}.", e.getMessage());
+    } catch (PipeException e) {
+      LOGGER.error("Fail to load pipeData because {}.", e.getMessage());
       return RpcUtils.getStatus(
           TSStatusCode.PIPESERVER_ERROR, "Fail to load pipeData because " + 
e.getMessage());
     }
@@ -281,6 +212,49 @@ public class IoTDBSyncReceiver {
     return RpcUtils.getStatus(TSStatusCode.SUCCESS_STATUS, "");
   }
 
+  /**
+   * Get current SyncIdentityInfo
+   *
+   * @return null if connection has been exited
+   */
+  private SyncIdentityInfo getCurrentSyncIdentityInfo() {
+    Long id = currentConnectionId.get();
+    if (id != null) {
+      return connectionIdToIdentityInfoMap.get(id);
+    } else {
+      return null;
+    }
+  }
+
+  /**
+   * handle when successfully receive tsFilePipeData. Rename .patch file and 
reset tsFilePipeData's
+   * path.
+   *
+   * @param tsFilePipeData pipeData
+   * @param fileDir path of file data dir
+   */
+  private void handleTsFilePipeData(TsFilePipeData tsFilePipeData, String 
fileDir) {
+    String tsFileName = tsFilePipeData.getTsFileName();
+    File dir = new File(fileDir);
+    File[] targetFiles =
+        dir.listFiles(
+            (dir1, name) ->
+                name.startsWith(tsFileName) && 
name.endsWith(SyncConstant.PATCH_SUFFIX));
+    if (targetFiles != null) {
+      for (File targetFile : targetFiles) {
+        File newFile =
+            new File(
+                dir,
+                targetFile
+                    .getName()
+                    .substring(
+                        0, targetFile.getName().length() - 
SyncConstant.PATCH_SUFFIX.length()));
+        targetFile.renameTo(newFile);
+      }
+    }
+    tsFilePipeData.setParentDirPath(dir.getAbsolutePath());
+  }
+
   /**
    * Receive TsFile based on startIndex.
    *
@@ -297,7 +271,7 @@ public class IoTDBSyncReceiver {
     if (identityInfo == null) {
       throw new TException("Thrift connection is not alive.");
     }
-    logger.debug(
+    LOGGER.debug(
         "Invoke transportData method from client ip = {}", 
identityInfo.getRemoteAddress());
 
     String fileDir = SyncPathUtil.getFileDataDirPath(identityInfo);
@@ -312,7 +286,7 @@ public class IoTDBSyncReceiver {
         return RpcUtils.getStatus(TSStatusCode.SYNC_FILE_REDIRECTION_ERROR, 
result.getIndex());
       }
     } catch (IOException e) {
-      logger.error(e.getMessage());
+      LOGGER.error(e.getMessage());
       return RpcUtils.getStatus(TSStatusCode.SYNC_FILE_ERROR, e.getMessage());
     }
 
@@ -324,7 +298,7 @@ public class IoTDBSyncReceiver {
       buff.get(byteArray);
       randomAccessFile.write(byteArray);
       recordStartIndex(new File(fileDir, fileName), startIndex + length);
-      logger.debug(
+      LOGGER.debug(
           "Sync "
               + fileName
               + " start at "
@@ -333,63 +307,37 @@ public class IoTDBSyncReceiver {
               + (startIndex + length)
               + " is done.");
     } catch (IOException e) {
-      logger.error(e.getMessage());
+      LOGGER.error(e.getMessage());
       return RpcUtils.getStatus(TSStatusCode.SYNC_FILE_ERROR, e.getMessage());
     }
 
     return RpcUtils.getStatus(TSStatusCode.SUCCESS_STATUS, "");
   }
 
-  /**
-   * handle when successfully receive tsFilePipeData. Rename .patch file and 
reset tsFilePipeData's
-   * path.
-   *
-   * @param tsFilePipeData pipeData
-   * @param fileDir path of file data dir
-   */
-  private void handleTsFilePipeData(TsFilePipeData tsFilePipeData, String 
fileDir) {
-    String tsFileName = tsFilePipeData.getTsFileName();
-    File dir = new File(fileDir);
-    File[] targetFiles =
-        dir.listFiles(
-            (dir1, name) ->
-                name.startsWith(tsFileName) && 
name.endsWith(SyncConstant.PATCH_SUFFIX));
-    if (targetFiles != null) {
-      for (File targetFile : targetFiles) {
-        File newFile =
-            new File(
-                dir,
-                targetFile
-                    .getName()
-                    .substring(
-                        0, targetFile.getName().length() - 
SyncConstant.PATCH_SUFFIX.length()));
-        targetFile.renameTo(newFile);
-      }
+  private CheckResult checkStartIndexValid(File file, long startIndex) throws 
IOException {
+    // get local index from memory map
+    long localIndex = getCurrentFileStartIndex(file.getAbsolutePath());
+    // get local index from file
+    if (localIndex < 0 && file.exists()) {
+      localIndex = file.length();
+      recordStartIndex(file, localIndex);
     }
-    tsFilePipeData.setParentDirPath(dir.getAbsolutePath());
-  }
-
-  // endregion
-
-  // region Interfaces and Implementation of Connection Manager
-
-  /** Check if the connection is legally established by handshaking */
-  private boolean checkConnection() {
-    return currentConnectionId.get() != null;
-  }
-
-  /**
-   * Get current SyncIdentityInfo
-   *
-   * @return null if connection has been exited
-   */
-  private SyncIdentityInfo getCurrentSyncIdentityInfo() {
-    Long id = currentConnectionId.get();
-    if (id != null) {
-      return connectionIdToIdentityInfoMap.get(id);
-    } else {
-      return null;
+    // compare and check
+    if (localIndex < 0 && startIndex != 0) {
+      LOGGER.error(
+          "The start index {} of data sync is not valid. "
+              + "The file is not exist and start index should equal to 0).",
+          startIndex);
+      return new CheckResult(false, "0");
+    } else if (localIndex >= 0 && localIndex != startIndex) {
+      LOGGER.error(
+          "The start index {} of data sync is not valid. "
+              + "The start index of the file should equal to {}.",
+          startIndex,
+          localIndex);
+      return new CheckResult(false, String.valueOf(localIndex));
     }
+    return new CheckResult(true, "0");
   }
 
   /**
@@ -408,62 +356,43 @@ public class IoTDBSyncReceiver {
     return -1;
   }
 
-  private void createConnection(SyncIdentityInfo identityInfo) {
-    long connectionId = connectionIdGenerator.incrementAndGet();
-    currentConnectionId.set(connectionId);
-    connectionIdToIdentityInfoMap.put(connectionId, identityInfo);
+  private void recordStartIndex(File file, long position) {
+    Long id = currentConnectionId.get();
+    if (id != null) {
+      Map<String, Long> map =
+          connectionIdToStartIndexRecord.computeIfAbsent(id, i -> new 
ConcurrentHashMap<>());
+      map.put(file.getAbsolutePath(), position);
+    }
   }
 
-  private boolean registerDatabase(
-      String database, IPartitionFetcher partitionFetcher, ISchemaFetcher 
schemaFetcher) {
-    if (registeredDatabase.containsKey(database)) {
-      return true;
-    }
-    try {
-      DatabaseSchemaStatement statement =
-          new 
DatabaseSchemaStatement(DatabaseSchemaStatement.DatabaseSchemaStatementType.CREATE);
-      statement.setDatabasePath(new PartialPath(database));
-      long queryId = SessionManager.getInstance().requestQueryId();
-      ExecutionResult result =
-          Coordinator.getInstance()
-              .execute(
-                  statement,
-                  queryId,
-                  null,
-                  "",
-                  partitionFetcher,
-                  schemaFetcher,
-                  
IoTDBDescriptor.getInstance().getConfig().getQueryTimeoutThreshold());
-      if (result.status.code != TSStatusCode.SUCCESS_STATUS.getStatusCode()
-          && result.status.code != 
TSStatusCode.DATABASE_ALREADY_EXISTS.getStatusCode()) {
-        logger.error("Create Database error, statement: {}.", statement);
-        logger.error("Create database result status : {}.", result.status);
-        return false;
-      }
-    } catch (IllegalPathException e) {
-      logger.error(String.format("Parse database PartialPath %s error", 
database), e);
-      return false;
+  private static class CheckResult {
+
+    private final boolean result;
+    private final String index;
+
+    public CheckResult(boolean result, String index) {
+      this.result = result;
+      this.index = index;
     }
 
-    registeredDatabase.put(database, "");
-    return true;
-  }
+    public boolean isResult() {
+      return result;
+    }
 
-  /**
-   * release resources or cleanup when a client (a sender) is disconnected 
(normally or abnormally).
-   */
-  public void handleClientExit() {
-    if (checkConnection()) {
-      long id = currentConnectionId.get();
-      connectionIdToIdentityInfoMap.remove(id);
-      connectionIdToStartIndexRecord.remove(id);
-      currentConnectionId.remove();
+    public String getIndex() {
+      return index;
     }
   }
 
-  public List<SyncIdentityInfo> getAllTSyncIdentityInfos() {
-    return new ArrayList<>(connectionIdToIdentityInfoMap.values());
+  ///////////////////////// Singleton /////////////////////////
+
+  private IoTDBSyncReceiver() {}
+
+  public static IoTDBSyncReceiver getInstance() {
+    return IoTDBSyncReceiverHolder.INSTANCE;
   }
 
-  // endregion
+  private static class IoTDBSyncReceiverHolder {
+    private static final IoTDBSyncReceiver INSTANCE = new IoTDBSyncReceiver();
+  }
 }
diff --git 
a/server/src/main/java/org/apache/iotdb/db/pipe/connector/legacy/exception/SyncDataLoadException.java
 
b/server/src/main/java/org/apache/iotdb/db/pipe/connector/legacy/exception/SyncDataLoadException.java
deleted file mode 100644
index 0fffb488a4f..00000000000
--- 
a/server/src/main/java/org/apache/iotdb/db/pipe/connector/legacy/exception/SyncDataLoadException.java
+++ /dev/null
@@ -1,28 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing,
- * software distributed under the License is distributed on an
- * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
- * KIND, either express or implied.  See the License for the
- * specific language governing permissions and limitations
- * under the License.
- */
-package org.apache.iotdb.db.pipe.connector.legacy.exception;
-
-import org.apache.iotdb.pipe.api.exception.PipeException;
-
-public class SyncDataLoadException extends PipeException {
-
-  public SyncDataLoadException(String message) {
-    super(message);
-  }
-}
diff --git 
a/server/src/main/java/org/apache/iotdb/db/pipe/connector/legacy/pipedata/load/DeletionLoader.java
 
b/server/src/main/java/org/apache/iotdb/db/pipe/connector/legacy/loader/DeletionLoader.java
similarity index 87%
rename from 
server/src/main/java/org/apache/iotdb/db/pipe/connector/legacy/pipedata/load/DeletionLoader.java
rename to 
server/src/main/java/org/apache/iotdb/db/pipe/connector/legacy/loader/DeletionLoader.java
index e60a77dee5b..8445a6fdcd6 100644
--- 
a/server/src/main/java/org/apache/iotdb/db/pipe/connector/legacy/pipedata/load/DeletionLoader.java
+++ 
b/server/src/main/java/org/apache/iotdb/db/pipe/connector/legacy/loader/DeletionLoader.java
@@ -16,7 +16,7 @@
  * specific language governing permissions and limitations
  * under the License.
  */
-package org.apache.iotdb.db.pipe.connector.legacy.pipedata.load;
+package org.apache.iotdb.db.pipe.connector.legacy.loader;
 
 import org.apache.iotdb.commons.conf.CommonDescriptor;
 import org.apache.iotdb.db.conf.IoTDBDescriptor;
@@ -29,6 +29,7 @@ import 
org.apache.iotdb.db.mpp.plan.statement.crud.DeleteDataStatement;
 import 
org.apache.iotdb.db.mpp.plan.statement.metadata.DeleteTimeSeriesStatement;
 import 
org.apache.iotdb.db.pipe.connector.legacy.exception.SyncDataLoadException;
 import org.apache.iotdb.db.query.control.SessionManager;
+import org.apache.iotdb.pipe.api.exception.PipeException;
 import org.apache.iotdb.rpc.TSStatusCode;
 
 import org.slf4j.Logger;
@@ -38,9 +39,10 @@ import java.util.Collections;
 
 /** This loader is used to load deletion plan. */
 public class DeletionLoader implements ILoader {
-  private static final Logger logger = 
LoggerFactory.getLogger(DeletionLoader.class);
 
-  private Deletion deletion;
+  private static final Logger LOGGER = 
LoggerFactory.getLogger(DeletionLoader.class);
+
+  private final Deletion deletion;
 
   public DeletionLoader(Deletion deletion) {
     this.deletion = deletion;
@@ -49,7 +51,7 @@ public class DeletionLoader implements ILoader {
   @Override
   public void load() throws SyncDataLoadException {
     if (CommonDescriptor.getInstance().getConfig().isReadOnly()) {
-      throw new SyncDataLoadException("storage engine readonly");
+      throw new PipeException("storage engine readonly");
     }
     try {
       Statement statement = generateStatement();
@@ -65,13 +67,13 @@ public class DeletionLoader implements ILoader {
                   SCHEMA_FETCHER,
                   
IoTDBDescriptor.getInstance().getConfig().getQueryTimeoutThreshold());
       if (result.status.code != TSStatusCode.SUCCESS_STATUS.getStatusCode()) {
-        logger.error("Delete {} error, statement: {}.", deletion, statement);
-        logger.error("Delete result status : {}.", result.status);
+        LOGGER.error("Delete {} error, statement: {}.", deletion, statement);
+        LOGGER.error("Delete result status : {}.", result.status);
         throw new LoadFileException(
             String.format("Can not execute delete statement: %s", statement));
       }
     } catch (Exception e) {
-      throw new SyncDataLoadException(e.getMessage());
+      throw new PipeException(e.getMessage());
     }
   }
 
diff --git 
a/server/src/main/java/org/apache/iotdb/db/pipe/connector/legacy/pipedata/load/ILoader.java
 
b/server/src/main/java/org/apache/iotdb/db/pipe/connector/legacy/loader/ILoader.java
similarity index 95%
rename from 
server/src/main/java/org/apache/iotdb/db/pipe/connector/legacy/pipedata/load/ILoader.java
rename to 
server/src/main/java/org/apache/iotdb/db/pipe/connector/legacy/loader/ILoader.java
index fe2518beb04..0639dd50ead 100644
--- 
a/server/src/main/java/org/apache/iotdb/db/pipe/connector/legacy/pipedata/load/ILoader.java
+++ 
b/server/src/main/java/org/apache/iotdb/db/pipe/connector/legacy/loader/ILoader.java
@@ -16,7 +16,7 @@
  * specific language governing permissions and limitations
  * under the License.
  */
-package org.apache.iotdb.db.pipe.connector.legacy.pipedata.load;
+package org.apache.iotdb.db.pipe.connector.legacy.loader;
 
 import org.apache.iotdb.db.mpp.plan.analyze.ClusterPartitionFetcher;
 import org.apache.iotdb.db.mpp.plan.analyze.IPartitionFetcher;
diff --git 
a/server/src/main/java/org/apache/iotdb/db/pipe/connector/legacy/pipedata/load/TsFileLoader.java
 
b/server/src/main/java/org/apache/iotdb/db/pipe/connector/legacy/loader/TsFileLoader.java
similarity index 89%
rename from 
server/src/main/java/org/apache/iotdb/db/pipe/connector/legacy/pipedata/load/TsFileLoader.java
rename to 
server/src/main/java/org/apache/iotdb/db/pipe/connector/legacy/loader/TsFileLoader.java
index 0609dc3056d..6d0be5ab785 100644
--- 
a/server/src/main/java/org/apache/iotdb/db/pipe/connector/legacy/pipedata/load/TsFileLoader.java
+++ 
b/server/src/main/java/org/apache/iotdb/db/pipe/connector/legacy/loader/TsFileLoader.java
@@ -16,7 +16,7 @@
  * specific language governing permissions and limitations
  * under the License.
  */
-package org.apache.iotdb.db.pipe.connector.legacy.pipedata.load;
+package org.apache.iotdb.db.pipe.connector.legacy.loader;
 
 import org.apache.iotdb.commons.exception.IllegalPathException;
 import org.apache.iotdb.commons.path.PartialPath;
@@ -27,6 +27,7 @@ import org.apache.iotdb.db.mpp.plan.execution.ExecutionResult;
 import org.apache.iotdb.db.mpp.plan.statement.crud.LoadTsFileStatement;
 import 
org.apache.iotdb.db.pipe.connector.legacy.exception.SyncDataLoadException;
 import org.apache.iotdb.db.query.control.SessionManager;
+import org.apache.iotdb.pipe.api.exception.PipeException;
 import org.apache.iotdb.rpc.TSStatusCode;
 
 import org.slf4j.Logger;
@@ -36,7 +37,8 @@ import java.io.File;
 
 /** This loader is used to load tsFiles. If .mods file exists, it will be 
loaded as well. */
 public class TsFileLoader implements ILoader {
-  private static final Logger logger = 
LoggerFactory.getLogger(TsFileLoader.class);
+
+  private static final Logger LOGGER = 
LoggerFactory.getLogger(TsFileLoader.class);
 
   private final File tsFile;
   private final String database;
@@ -49,7 +51,6 @@ public class TsFileLoader implements ILoader {
   @Override
   public void load() throws SyncDataLoadException {
     try {
-
       LoadTsFileStatement statement = new 
LoadTsFileStatement(tsFile.getAbsolutePath());
       statement.setDeleteAfterLoad(true);
       statement.setSgLevel(parseSgLevel());
@@ -68,13 +69,13 @@ public class TsFileLoader implements ILoader {
                   SCHEMA_FETCHER,
                   
IoTDBDescriptor.getInstance().getConfig().getQueryTimeoutThreshold());
       if (result.status.code != TSStatusCode.SUCCESS_STATUS.getStatusCode()) {
-        logger.error("Load TsFile {} error, statement: {}.", tsFile.getPath(), 
statement);
-        logger.error("Load TsFile result status : {}.", result.status);
+        LOGGER.error("Load TsFile {} error, statement: {}.", tsFile.getPath(), 
statement);
+        LOGGER.error("Load TsFile result status : {}.", result.status);
         throw new LoadFileException(
             String.format("Can not execute load TsFile statement: %s", 
statement));
       }
     } catch (Exception e) {
-      throw new SyncDataLoadException(e.getMessage());
+      throw new PipeException(e.getMessage());
     }
   }
 
diff --git 
a/server/src/main/java/org/apache/iotdb/db/pipe/connector/legacy/pipedata/DeletionPipeData.java
 
b/server/src/main/java/org/apache/iotdb/db/pipe/connector/legacy/pipedata/DeletionPipeData.java
index b7162c5e00f..96fb8601dfd 100644
--- 
a/server/src/main/java/org/apache/iotdb/db/pipe/connector/legacy/pipedata/DeletionPipeData.java
+++ 
b/server/src/main/java/org/apache/iotdb/db/pipe/connector/legacy/pipedata/DeletionPipeData.java
@@ -21,8 +21,8 @@ package org.apache.iotdb.db.pipe.connector.legacy.pipedata;
 
 import org.apache.iotdb.commons.exception.IllegalPathException;
 import org.apache.iotdb.db.engine.modification.Deletion;
-import org.apache.iotdb.db.pipe.connector.legacy.pipedata.load.DeletionLoader;
-import org.apache.iotdb.db.pipe.connector.legacy.pipedata.load.ILoader;
+import org.apache.iotdb.db.pipe.connector.legacy.loader.DeletionLoader;
+import org.apache.iotdb.db.pipe.connector.legacy.loader.ILoader;
 import org.apache.iotdb.tsfile.utils.ReadWriteIOUtils;
 
 import org.slf4j.Logger;
diff --git 
a/server/src/main/java/org/apache/iotdb/db/pipe/connector/legacy/pipedata/PipeData.java
 
b/server/src/main/java/org/apache/iotdb/db/pipe/connector/legacy/pipedata/PipeData.java
index dae6141019d..659c69876e9 100644
--- 
a/server/src/main/java/org/apache/iotdb/db/pipe/connector/legacy/pipedata/PipeData.java
+++ 
b/server/src/main/java/org/apache/iotdb/db/pipe/connector/legacy/pipedata/PipeData.java
@@ -20,7 +20,7 @@
 package org.apache.iotdb.db.pipe.connector.legacy.pipedata;
 
 import org.apache.iotdb.commons.exception.IllegalPathException;
-import org.apache.iotdb.db.pipe.connector.legacy.pipedata.load.ILoader;
+import org.apache.iotdb.db.pipe.connector.legacy.loader.ILoader;
 
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
@@ -32,7 +32,8 @@ import java.io.DataOutputStream;
 import java.io.IOException;
 
 public abstract class PipeData {
-  private static final Logger logger = LoggerFactory.getLogger(PipeData.class);
+
+  private static final Logger LOGGER = LoggerFactory.getLogger(PipeData.class);
 
   protected long serialNumber;
 
@@ -46,10 +47,6 @@ public abstract class PipeData {
     return serialNumber;
   }
 
-  public void setSerialNumber(long serialNumber) {
-    this.serialNumber = serialNumber;
-  }
-
   public abstract PipeDataType getPipeDataType();
 
   public long serialize(DataOutputStream stream) throws IOException {
@@ -83,7 +80,7 @@ public abstract class PipeData {
         pipeData = new DeletionPipeData();
         break;
       default:
-        logger.error("Deserialize PipeData error because Unknown type {}.", 
type);
+        LOGGER.error("Deserialize PipeData error because Unknown type {}.", 
type);
         throw new UnsupportedOperationException(
             "Deserialize PipeData error because Unknown type " + type);
     }
diff --git 
a/server/src/main/java/org/apache/iotdb/db/pipe/connector/legacy/pipedata/TsFilePipeData.java
 
b/server/src/main/java/org/apache/iotdb/db/pipe/connector/legacy/pipedata/TsFilePipeData.java
index 60882c74dd7..1f2d93d3879 100644
--- 
a/server/src/main/java/org/apache/iotdb/db/pipe/connector/legacy/pipedata/TsFilePipeData.java
+++ 
b/server/src/main/java/org/apache/iotdb/db/pipe/connector/legacy/pipedata/TsFilePipeData.java
@@ -22,8 +22,8 @@ package org.apache.iotdb.db.pipe.connector.legacy.pipedata;
 import org.apache.iotdb.commons.exception.IllegalPathException;
 import org.apache.iotdb.db.engine.modification.ModificationFile;
 import org.apache.iotdb.db.engine.storagegroup.TsFileResource;
-import org.apache.iotdb.db.pipe.connector.legacy.pipedata.load.ILoader;
-import org.apache.iotdb.db.pipe.connector.legacy.pipedata.load.TsFileLoader;
+import org.apache.iotdb.db.pipe.connector.legacy.loader.ILoader;
+import org.apache.iotdb.db.pipe.connector.legacy.loader.TsFileLoader;
 import org.apache.iotdb.db.pipe.connector.legacy.utils.SyncConstant;
 import org.apache.iotdb.tsfile.utils.ReadWriteIOUtils;
 
diff --git 
a/server/src/main/java/org/apache/iotdb/db/pipe/connector/legacy/pipedata/queue/BufferedPipeDataQueue.java
 
b/server/src/main/java/org/apache/iotdb/db/pipe/connector/legacy/pipedata/queue/BufferedPipeDataQueue.java
deleted file mode 100644
index 495a6118c02..00000000000
--- 
a/server/src/main/java/org/apache/iotdb/db/pipe/connector/legacy/pipedata/queue/BufferedPipeDataQueue.java
+++ /dev/null
@@ -1,426 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *      http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing,
- * software distributed under the License is distributed on an
- * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
- * KIND, either express or implied.  See the License for the
- * specific language governing permissions and limitations
- * under the License.
- */
-package org.apache.iotdb.db.pipe.connector.legacy.pipedata.queue;
-
-import org.apache.iotdb.commons.exception.IllegalPathException;
-import org.apache.iotdb.commons.utils.FileUtils;
-import org.apache.iotdb.db.pipe.connector.legacy.pipedata.PipeData;
-import org.apache.iotdb.db.pipe.connector.legacy.pipedata.TsFilePipeData;
-import org.apache.iotdb.db.pipe.connector.legacy.utils.SyncConstant;
-import org.apache.iotdb.db.pipe.connector.legacy.utils.SyncPathUtil;
-import org.apache.iotdb.tsfile.exception.NotImplementedException;
-
-import org.slf4j.Logger;
-import org.slf4j.LoggerFactory;
-
-import java.io.DataInputStream;
-import java.io.DataOutputStream;
-import java.io.EOFException;
-import java.io.File;
-import java.io.FileInputStream;
-import java.io.FileOutputStream;
-import java.io.IOException;
-import java.io.RandomAccessFile;
-import java.nio.file.Files;
-import java.util.ArrayList;
-import java.util.Collections;
-import java.util.List;
-import java.util.concurrent.BlockingDeque;
-import java.util.concurrent.LinkedBlockingDeque;
-
-public class BufferedPipeDataQueue implements PipeDataQueue {
-  private static final Logger logger = 
LoggerFactory.getLogger(BufferedPipeDataQueue.class);
-
-  private final String pipeLogDir;
-
-  /** input */
-  private long lastMaxSerialNumber;
-
-  private BlockingDeque<PipeData> inputDeque;
-
-  private BlockingDeque<Long> pipeLogStartNumber;
-  private DataOutputStream outputStream;
-  private long currentPipeLogSize;
-
-  /** output */
-  private final Object waitLock = new Object();
-
-  private BlockingDeque<PipeData> outputDeque;
-
-  private long pullSerialNumber;
-  private long commitSerialNumber;
-  private DataOutputStream commitLogWriter;
-  private long currentCommitLogSize;
-
-  public BufferedPipeDataQueue(String pipeLogDir) {
-    this.pipeLogDir = pipeLogDir;
-
-    this.lastMaxSerialNumber = 0;
-    this.pipeLogStartNumber = new LinkedBlockingDeque<>();
-
-    this.outputDeque = new LinkedBlockingDeque<>();
-    this.pullSerialNumber = Long.MIN_VALUE;
-    this.commitSerialNumber = Long.MIN_VALUE;
-
-    recover();
-  }
-
-  /** recover */
-  private void recover() {
-    if (!new File(pipeLogDir).exists()) {
-      return;
-    }
-
-    recoverPipeLogStartNumber();
-    recoverLastMaxSerialNumber();
-    recoverCommitSerialNumber();
-    recoverOutputDeque();
-  }
-
-  private void recoverPipeLogStartNumber() {
-    File logDir = new File(pipeLogDir);
-    List<Long> startNumbers = new ArrayList<>();
-
-    for (File file : logDir.listFiles())
-      if (file.getName().endsWith(SyncConstant.PIPE_LOG_NAME_SUFFIX) && 
file.length() > 0) {
-        
startNumbers.add(SyncPathUtil.getSerialNumberFromPipeLogName(file.getName()));
-      }
-    if (!startNumbers.isEmpty()) {
-      Collections.sort(startNumbers);
-      for (Long startTime : startNumbers) {
-        pipeLogStartNumber.offer(startTime);
-      }
-    }
-  }
-
-  private void recoverLastMaxSerialNumber() {
-    if (pipeLogStartNumber.isEmpty()) {
-      return;
-    }
-
-    File writingPipeLog =
-        new File(pipeLogDir, 
SyncPathUtil.getPipeLogName(pipeLogStartNumber.peekLast()));
-    try {
-      List<PipeData> recoverPipeData = parsePipeLog(writingPipeLog);
-      int recoverPipeDataSize = recoverPipeData.size();
-      lastMaxSerialNumber =
-          recoverPipeDataSize == 0
-              ? pipeLogStartNumber.peekLast() - 1
-              : recoverPipeData.get(recoverPipeDataSize - 1).getSerialNumber();
-    } catch (IOException e) {
-      logger.error(
-          String.format("Can not recover inputQueue from %s.", 
writingPipeLog.getPath()), e);
-    }
-  }
-
-  private void recoverCommitSerialNumber() {
-    File commitLog = new File(pipeLogDir, SyncConstant.COMMIT_LOG_NAME);
-    if (!commitLog.exists()) {
-      if (!pipeLogStartNumber.isEmpty()) {
-        commitSerialNumber = pipeLogStartNumber.peek() - 1;
-      }
-      return;
-    }
-
-    try (RandomAccessFile raf = new RandomAccessFile(commitLog, "r")) {
-      if (raf.length() >= Long.BYTES) {
-        raf.seek(raf.length() - Long.BYTES);
-        commitSerialNumber = raf.readLong();
-      }
-    } catch (IOException e) {
-      logger.error(
-          String.format(
-              "deserialize remove serial number error, remove serial number 
has been set to %d.",
-              commitSerialNumber),
-          e);
-    }
-  }
-
-  private void recoverOutputDeque() {
-    if (pipeLogStartNumber.isEmpty()) {
-      return;
-    }
-
-    File readingPipeLog =
-        new File(pipeLogDir, 
SyncPathUtil.getPipeLogName(pipeLogStartNumber.peek()));
-    try {
-      List<PipeData> recoverPipeData = parsePipeLog(readingPipeLog);
-      int recoverPipeDataSize = recoverPipeData.size();
-      for (int i = recoverPipeDataSize - 1; i >= 0; --i) {
-        PipeData pipeData = recoverPipeData.get(i);
-        if (pipeData.getSerialNumber() <= commitSerialNumber) {
-          break;
-        }
-        outputDeque.addFirst(pipeData);
-      }
-    } catch (IOException e) {
-      logger.error(
-          String.format("Recover output deque from pipe log %s error.", 
readingPipeLog.getPath()),
-          e);
-    }
-  }
-
-  public long getLastMaxSerialNumber() {
-    return lastMaxSerialNumber;
-  }
-
-  public long getCommitSerialNumber() {
-    return commitSerialNumber;
-  }
-
-  /** input */
-  @Override
-  public boolean offer(PipeData pipeData) {
-    if (outputStream == null || currentPipeLogSize > 
SyncConstant.DEFAULT_PIPE_LOG_SIZE_IN_BYTE) {
-      try {
-        moveToNextPipeLog(pipeData.getSerialNumber());
-      } catch (IOException e) {
-        logger.error(String.format("Move to next pipe log %s error.", 
pipeData), e);
-      }
-    }
-    synchronized (waitLock) {
-      if (!inputDeque.offer(pipeData)) {
-        waitLock.notifyAll();
-        return false;
-      }
-      waitLock.notifyAll();
-    }
-
-    try {
-      writeToDisk(pipeData);
-    } catch (IOException e) {
-      logger.error(String.format("Record pipe data %s error.", pipeData), e);
-      return false;
-    }
-    return true;
-  }
-
-  private synchronized void moveToNextPipeLog(long startSerialNumber) throws 
IOException {
-    if (outputStream != null) {
-      outputStream.close();
-    }
-    File newPipeLog = new File(pipeLogDir, 
SyncPathUtil.getPipeLogName(startSerialNumber));
-    SyncPathUtil.createFile(newPipeLog);
-
-    outputStream = new DataOutputStream(new FileOutputStream(newPipeLog));
-    pipeLogStartNumber.offer(startSerialNumber);
-    currentPipeLogSize = 0;
-
-    inputDeque = new LinkedBlockingDeque<>();
-    if (commitSerialNumber == Long.MIN_VALUE) {
-      commitSerialNumber = startSerialNumber - 1;
-    }
-  }
-
-  private void writeToDisk(PipeData pipeData) throws IOException {
-    // skip trick
-
-    currentPipeLogSize += pipeData.serialize(outputStream);
-    outputStream.flush();
-  }
-
-  /** output */
-  private synchronized PipeData pullOnePipeData(long lastSerialNumber) throws 
IOException {
-    long serialNumber = lastSerialNumber + 1;
-    if (!outputDeque.isEmpty()) {
-      return outputDeque.poll();
-    } else if (outputDeque != inputDeque) {
-      if (pipeLogStartNumber.isEmpty() || lastSerialNumber == Long.MIN_VALUE) {
-        return null;
-      }
-
-      if (serialNumber > pipeLogStartNumber.peekLast()) {
-        return null;
-      } else if (serialNumber == pipeLogStartNumber.peekLast() && inputDeque 
!= null) {
-        outputDeque = inputDeque;
-      } else {
-        long nextStartNumber =
-            pipeLogStartNumber.stream().filter(o -> o >= 
serialNumber).findFirst().get();
-        List<PipeData> parsePipeData =
-            parsePipeLog(new File(pipeLogDir, 
SyncPathUtil.getPipeLogName(nextStartNumber)));
-        int parsePipeDataSize = parsePipeData.size();
-        outputDeque = new LinkedBlockingDeque<>();
-        for (int i = 0; i < parsePipeDataSize; i++) {
-          outputDeque.offer(parsePipeData.get(i));
-        }
-      }
-      return outputDeque.poll();
-    }
-    return null;
-  }
-
-  @Override
-  public List<PipeData> pull(long serialNumber) {
-    throw new NotImplementedException("Not implement pull");
-  }
-
-  @Override
-  public PipeData take() throws InterruptedException {
-    PipeData pipeData = null;
-    try {
-      synchronized (waitLock) {
-        while ((pipeData = pullOnePipeData(commitSerialNumber)) == null) {
-          waitLock.wait();
-          waitLock.notifyAll();
-        }
-      }
-    } catch (IOException e) {
-      logger.error(
-          String.format("Blocking pull pipe data number %s error.", 
commitSerialNumber + 1), e);
-    }
-    outputDeque.addFirst(pipeData);
-    pullSerialNumber = pipeData.getSerialNumber();
-    return pipeData;
-  }
-
-  @Override
-  public void commit() {
-    if (pullSerialNumber == Long.MIN_VALUE) {
-      return;
-    }
-    commit(pullSerialNumber);
-  }
-
-  @Override
-  public void commit(long serialNumber) {
-    deletePipeData(serialNumber);
-    deletePipeLog();
-    serializeCommitSerialNumber();
-  }
-
-  private void deletePipeData(long serialNumber) {
-    while (commitSerialNumber < serialNumber) {
-      PipeData commitData = null;
-      try {
-        commitData = pullOnePipeData(commitSerialNumber);
-        if (commitData == null) {
-          return;
-        }
-        if (PipeData.PipeDataType.TSFILE.equals(commitData.getPipeDataType())) 
{
-          List<File> tsFiles = ((TsFilePipeData) commitData).getTsFiles(false);
-          for (File file : tsFiles) {
-            Files.deleteIfExists(file.toPath());
-          }
-        }
-      } catch (IOException e) {
-        logger.error(
-            String.format("Commit pipe data serial number %s error.", 
commitSerialNumber), e);
-      }
-      if (commitData != null) {
-        commitSerialNumber = commitData.getSerialNumber();
-      }
-    }
-  }
-
-  private void deletePipeLog() {
-    if (pipeLogStartNumber.size() >= 2) {
-      long nowPipeLogStartNumber;
-      while (true) {
-        nowPipeLogStartNumber = pipeLogStartNumber.poll();
-        if (!pipeLogStartNumber.isEmpty() && pipeLogStartNumber.peek() <= 
commitSerialNumber) {
-          try {
-            Files.deleteIfExists(
-                new File(pipeLogDir, 
SyncPathUtil.getPipeLogName(nowPipeLogStartNumber)).toPath());
-          } catch (IOException e) {
-            logger.warn(String.format("Delete %s-pipe.log error.", 
nowPipeLogStartNumber), e);
-          }
-        } else {
-          break;
-        }
-      }
-      pipeLogStartNumber.addFirst(nowPipeLogStartNumber);
-    }
-  }
-
-  private void serializeCommitSerialNumber() {
-    try {
-      if (commitLogWriter == null) {
-        commitLogWriter =
-            new DataOutputStream(
-                new FileOutputStream(new File(pipeLogDir, 
SyncConstant.COMMIT_LOG_NAME)));
-        currentCommitLogSize = 0;
-      }
-      commitLogWriter.writeLong(commitSerialNumber);
-      commitLogWriter.flush();
-      currentCommitLogSize += Long.BYTES;
-      if (currentCommitLogSize >= SyncConstant.DEFAULT_PIPE_LOG_SIZE_IN_BYTE) {
-        commitLogWriter.close();
-        commitLogWriter = null;
-      }
-    } catch (IOException e) {
-      logger.error(
-          String.format("Serialize commit serial number %s error.", 
commitSerialNumber), e);
-    }
-  }
-
-  /** common */
-  @Override
-  public synchronized boolean isEmpty() {
-    if (outputDeque == null || pipeLogStartNumber.isEmpty()) {
-      return true;
-    }
-    return pipeLogStartNumber.size() == 1
-        && outputDeque.isEmpty()
-        && (inputDeque == null || inputDeque.isEmpty());
-  }
-
-  @Override
-  public void close() {
-    try {
-      if (outputStream != null) {
-        outputStream.close();
-        outputStream = null;
-      }
-      if (commitLogWriter != null) {
-        commitLogWriter.close();
-        commitLogWriter = null;
-      }
-
-      inputDeque = null;
-      pipeLogStartNumber = null;
-      outputDeque = null;
-    } catch (IOException e) {
-      logger.warn(String.format("Close pipe log dir %s error.", pipeLogDir), 
e);
-    }
-  }
-
-  @Override
-  public void clear() {
-    close();
-
-    File logDir = new File(pipeLogDir);
-    if (logDir.exists()) {
-      FileUtils.deleteDirectory(logDir);
-    }
-  }
-
-  public static List<PipeData> parsePipeLog(File file) throws IOException {
-    List<PipeData> pipeData = new ArrayList<>();
-    try (DataInputStream inputStream = new DataInputStream(new 
FileInputStream(file))) {
-      while (true) {
-        pipeData.add(PipeData.createPipeData(inputStream));
-      }
-    } catch (EOFException e) {
-    } catch (IllegalPathException e) {
-      logger.error(String.format("Parsing pipeLog %s error.", file.getPath()), 
e);
-      throw new IOException(e);
-    }
-    return pipeData;
-  }
-}
diff --git 
a/server/src/main/java/org/apache/iotdb/db/pipe/connector/legacy/pipedata/queue/PipeDataQueue.java
 
b/server/src/main/java/org/apache/iotdb/db/pipe/connector/legacy/pipedata/queue/PipeDataQueue.java
deleted file mode 100644
index 08ecfe0f3ce..00000000000
--- 
a/server/src/main/java/org/apache/iotdb/db/pipe/connector/legacy/pipedata/queue/PipeDataQueue.java
+++ /dev/null
@@ -1,41 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *      http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing,
- * software distributed under the License is distributed on an
- * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
- * KIND, either express or implied.  See the License for the
- * specific language governing permissions and limitations
- * under the License.
- */
-package org.apache.iotdb.db.pipe.connector.legacy.pipedata.queue;
-
-import org.apache.iotdb.db.pipe.connector.legacy.pipedata.PipeData;
-
-import java.util.List;
-
-public interface PipeDataQueue {
-  boolean offer(PipeData data);
-
-  List<PipeData> pull(long serialNumber);
-
-  PipeData take() throws InterruptedException;
-
-  void commit();
-
-  void commit(long serialNumber);
-
-  boolean isEmpty();
-
-  void close();
-
-  void clear();
-}
diff --git 
a/server/src/main/java/org/apache/iotdb/db/pipe/connector/legacy/utils/SyncConstant.java
 
b/server/src/main/java/org/apache/iotdb/db/pipe/connector/legacy/utils/SyncConstant.java
index cfac1eec568..baa87f17f7e 100644
--- 
a/server/src/main/java/org/apache/iotdb/db/pipe/connector/legacy/utils/SyncConstant.java
+++ 
b/server/src/main/java/org/apache/iotdb/db/pipe/connector/legacy/utils/SyncConstant.java
@@ -20,31 +20,12 @@ package org.apache.iotdb.db.pipe.connector.legacy.utils;
 
 public class SyncConstant {
   /** common */
-  public static final String SYNC_SYS_DIR = "sys";
-
   public static final String FILE_DATA_DIR_NAME = "file-data";
 
-  // pipe log: serialNumber + SEPARATOR + SUFFIX
-  public static final String PIPE_LOG_DIR_NAME = "pipe-log";
   public static final String PIPE_LOG_NAME_SEPARATOR = "_";
   public static final String PIPE_LOG_NAME_SUFFIX = PIPE_LOG_NAME_SEPARATOR + 
"pipe.log";
-  public static final String COMMIT_LOG_NAME = "commit.log";
-  public static final Long DEFAULT_PIPE_LOG_SIZE_IN_BYTE = 10485760L;
-
-  // persistence
-
-  public static final String SYNC_LOG_NAME = "syncService.log";
-
-  /** sender */
-
-  // dir structure
-  public static final String SENDER_DIR_NAME = "sender";
-
-  public static final String HISTORY_PIPE_LOG_DIR_NAME = "history-" + 
PIPE_LOG_DIR_NAME;
 
   // data config
-  public static final String DEFAULT_PIPE_SINK_IP = "127.0.0.1";
-  public static final int DEFAULT_PIPE_SINK_PORT = 6667;
 
   public static final Long DEFAULT_WAITING_FOR_TSFILE_CLOSE_MILLISECONDS = 
500L;
   public static final Long DEFAULT_WAITING_FOR_TSFILE_RETRY_NUMBER = 10L;
@@ -52,11 +33,6 @@ public class SyncConstant {
   /** transport */
   public static final String PATCH_SUFFIX = ".patch";
 
-  public static final String IPV4_PATTERN =
-      
"^([1-9]|[1-9]\\d|1\\d{2}|2[0-4]\\d|25[0-5])(\\.(\\d|[1-9]\\d|1\\d{2}|2[0-4]\\d|25[0-5])){3}$";
-
   /** receiver */
   public static final String RECEIVER_DIR_NAME = "receiver";
-
-  public static final String IP_SEPARATOR = "\\.";
 }
diff --git 
a/server/src/main/java/org/apache/iotdb/db/pipe/connector/legacy/utils/SyncPathUtil.java
 
b/server/src/main/java/org/apache/iotdb/db/pipe/connector/legacy/utils/SyncPathUtil.java
index 057ca641930..39b334c51ca 100644
--- 
a/server/src/main/java/org/apache/iotdb/db/pipe/connector/legacy/utils/SyncPathUtil.java
+++ 
b/server/src/main/java/org/apache/iotdb/db/pipe/connector/legacy/utils/SyncPathUtil.java
@@ -23,7 +23,6 @@ import org.apache.iotdb.commons.conf.CommonDescriptor;
 import org.apache.iotdb.db.pipe.connector.legacy.transport.SyncIdentityInfo;
 
 import java.io.File;
-import java.io.IOException;
 
 /** Util for path generation in sync module */
 public class SyncPathUtil {
@@ -72,20 +71,4 @@ public class SyncPathUtil {
     return SyncPathUtil.getReceiverFileDataDir(
         identityInfo.getPipeName(), identityInfo.getRemoteAddress(), 
identityInfo.getCreateTime());
   }
-
-  /** common */
-  public static String getPipeLogName(long serialNumber) {
-    return serialNumber + SyncConstant.PIPE_LOG_NAME_SUFFIX;
-  }
-
-  public static Long getSerialNumberFromPipeLogName(String pipeLogName) {
-    return 
Long.parseLong(pipeLogName.split(SyncConstant.PIPE_LOG_NAME_SEPARATOR)[0]);
-  }
-
-  public static boolean createFile(File file) throws IOException {
-    if (!file.getParentFile().exists()) {
-      file.getParentFile().mkdirs();
-    }
-    return file.createNewFile();
-  }
 }

Reply via email to