This is an automated email from the ASF dual-hosted git repository.

rong pushed a commit to branch rc/1.3.3
in repository https://gitbox.apache.org/repos/asf/iotdb.git


The following commit(s) were added to refs/heads/rc/1.3.3 by this push:
     new c442bf7ca1f Pipe: Support async tsfile load when data syncing between 
clusters (#13311)
c442bf7ca1f is described below

commit c442bf7ca1fba6e8612a1a74219c91f438585858
Author: YC27 <[email protected]>
AuthorDate: Tue Aug 27 16:44:26 2024 +0800

    Pipe: Support async tsfile load when data syncing between clusters (#13311)
    
    Co-authored-by: Steve Yurong Su <[email protected]>
---
 .../pipe/it/autocreate/IoTDBPipeDataSinkIT.java    | 97 ++++++++++++++++++++++
 .../client/IoTDBConfigNodeSyncClientManager.java   |  6 +-
 .../protocol/IoTDBConfigRegionAirGapConnector.java |  2 +
 .../protocol/IoTDBConfigRegionConnector.java       |  6 +-
 .../client/IoTDBDataNodeAsyncClientManager.java    | 32 ++++---
 .../client/IoTDBDataNodeSyncClientManager.java     |  6 +-
 .../airgap/IoTDBDataNodeAirGapConnector.java       |  2 +
 .../async/IoTDBDataRegionAsyncConnector.java       |  3 +-
 .../thrift/sync/IoTDBDataNodeSyncConnector.java    |  6 +-
 .../protocol/thrift/IoTDBDataNodeReceiver.java     | 32 ++++++-
 .../load/active/ActiveLoadTsFileLoader.java        | 57 +------------
 .../config/constant/PipeConnectorConstant.java     | 11 +++
 .../connector/client/IoTDBSyncClientManager.java   |  8 +-
 .../common/PipeTransferHandshakeConstant.java      |  1 +
 .../pipe/connector/protocol/IoTDBConnector.java    | 20 +++++
 .../connector/protocol/IoTDBSslSyncConnector.java  |  6 +-
 .../commons/pipe/receiver/IoTDBFileReceiver.java   | 14 ++++
 .../org/apache/iotdb/commons/utils/FileUtils.java  | 53 ++++++++++++
 18 files changed, 278 insertions(+), 84 deletions(-)

diff --git 
a/integration-test/src/test/java/org/apache/iotdb/pipe/it/autocreate/IoTDBPipeDataSinkIT.java
 
b/integration-test/src/test/java/org/apache/iotdb/pipe/it/autocreate/IoTDBPipeDataSinkIT.java
index 7fda1250b1b..26c76bf0042 100644
--- 
a/integration-test/src/test/java/org/apache/iotdb/pipe/it/autocreate/IoTDBPipeDataSinkIT.java
+++ 
b/integration-test/src/test/java/org/apache/iotdb/pipe/it/autocreate/IoTDBPipeDataSinkIT.java
@@ -351,4 +351,101 @@ public class IoTDBPipeDataSinkIT extends 
AbstractPipeDualAutoIT {
           Collections.singleton("root.ln.wf01.wt02,true,null,INF,"));
     }
   }
+
+  @Test
+  public void testSyncLoadTsFile() throws Exception {
+    testReceiverLoadTsFile("sync");
+  }
+
+  @Test
+  public void testAsyncLoadTsFile() throws Exception {
+    testReceiverLoadTsFile("async");
+  }
+
+  private void testReceiverLoadTsFile(final String loadTsFileStrategy) throws 
Exception {
+    final DataNodeWrapper receiverDataNode = receiverEnv.getDataNodeWrapper(0);
+
+    final String receiverIp = receiverDataNode.getIp();
+    final int receiverPort = receiverDataNode.getPort();
+
+    try (final SyncConfigNodeIServiceClient client =
+        (SyncConfigNodeIServiceClient) 
senderEnv.getLeaderConfigNodeConnection()) {
+
+      // Do not fail if the failure has nothing to do with pipe
+      // Because the failures will randomly generate due to resource limitation
+      if (!TestUtils.tryExecuteNonQueriesWithRetry(
+          senderEnv,
+          Arrays.asList("insert into root.vehicle.d0(time, s1) values (1, 1)", 
"flush"))) {
+        return;
+      }
+
+      final Map<String, String> extractorAttributes = new HashMap<>();
+      final Map<String, String> processorAttributes = new HashMap<>();
+      final Map<String, String> connectorAttributes = new HashMap<>();
+
+      extractorAttributes.put("extractor.realtime.mode", "forced-log");
+
+      connectorAttributes.put("sink", "iotdb-thrift-sink");
+      connectorAttributes.put("sink.batch.enable", "false");
+      connectorAttributes.put("sink.ip", receiverIp);
+      connectorAttributes.put("sink.port", Integer.toString(receiverPort));
+      connectorAttributes.put("sink.load-tsfile-strategy", loadTsFileStrategy);
+
+      Assert.assertEquals(
+          TSStatusCode.SUCCESS_STATUS.getStatusCode(),
+          client
+              .createPipe(
+                  new TCreatePipeReq("testPipe", connectorAttributes)
+                      .setExtractorAttributes(extractorAttributes)
+                      .setProcessorAttributes(processorAttributes))
+              .getCode());
+
+      Assert.assertEquals(
+          TSStatusCode.SUCCESS_STATUS.getStatusCode(), 
client.startPipe("testPipe").getCode());
+
+      // Do not fail if the failure has nothing to do with pipe
+      // Because the failures will randomly generate due to resource limitation
+      if (!TestUtils.tryExecuteNonQueriesWithRetry(
+          senderEnv,
+          Arrays.asList("insert into root.vehicle.d0(time, s1) values (2, 1)", 
"flush"))) {
+        return;
+      }
+
+      TestUtils.assertDataEventuallyOnEnv(
+          receiverEnv,
+          "select * from root.**",
+          "Time,root.vehicle.d0.s1,",
+          Collections.unmodifiableSet(new HashSet<>(Arrays.asList("1,1.0,", 
"2,1.0,"))));
+
+      Assert.assertEquals(
+          TSStatusCode.SUCCESS_STATUS.getStatusCode(), 
client.dropPipe("testPipe").getCode());
+
+      Assert.assertEquals(
+          TSStatusCode.SUCCESS_STATUS.getStatusCode(),
+          client
+              .createPipe(
+                  new TCreatePipeReq("testPipe", connectorAttributes)
+                      .setExtractorAttributes(extractorAttributes)
+                      .setProcessorAttributes(processorAttributes))
+              .getCode());
+
+      // Do not fail if the failure has nothing to do with pipe
+      // Because the failures will randomly generate due to resource limitation
+      if (!TestUtils.tryExecuteNonQueriesWithRetry(
+          senderEnv,
+          Arrays.asList(
+              "insert into root.vehicle.d0(time, s1) values (4, 1)",
+              "insert into root.vehicle.d0(time, s1) values (3, 1), (0, 1)",
+              "flush"))) {
+        return;
+      }
+
+      TestUtils.assertDataEventuallyOnEnv(
+          receiverEnv,
+          "select * from root.**",
+          "Time,root.vehicle.d0.s1,",
+          Collections.unmodifiableSet(
+              new HashSet<>(Arrays.asList("0,1.0,", "1,1.0,", "2,1.0,", 
"3,1.0,", "4,1.0,"))));
+    }
+  }
 }
diff --git 
a/iotdb-core/confignode/src/main/java/org/apache/iotdb/confignode/manager/pipe/connector/client/IoTDBConfigNodeSyncClientManager.java
 
b/iotdb-core/confignode/src/main/java/org/apache/iotdb/confignode/manager/pipe/connector/client/IoTDBConfigNodeSyncClientManager.java
index fd5b3c9ddb9..b2967e3bdf8 100644
--- 
a/iotdb-core/confignode/src/main/java/org/apache/iotdb/confignode/manager/pipe/connector/client/IoTDBConfigNodeSyncClientManager.java
+++ 
b/iotdb-core/confignode/src/main/java/org/apache/iotdb/confignode/manager/pipe/connector/client/IoTDBConfigNodeSyncClientManager.java
@@ -39,7 +39,8 @@ public class IoTDBConfigNodeSyncClientManager extends 
IoTDBSyncClientManager {
       String trustStorePath,
       String trustStorePwd,
       String loadBalanceStrategy,
-      boolean shouldReceiverConvertOnTypeMismatch) {
+      boolean shouldReceiverConvertOnTypeMismatch,
+      String loadTsFileStrategy) {
     super(
         endPoints,
         useSSL,
@@ -47,7 +48,8 @@ public class IoTDBConfigNodeSyncClientManager extends 
IoTDBSyncClientManager {
         trustStorePwd,
         false,
         loadBalanceStrategy,
-        shouldReceiverConvertOnTypeMismatch);
+        shouldReceiverConvertOnTypeMismatch,
+        loadTsFileStrategy);
   }
 
   @Override
diff --git 
a/iotdb-core/confignode/src/main/java/org/apache/iotdb/confignode/manager/pipe/connector/protocol/IoTDBConfigRegionAirGapConnector.java
 
b/iotdb-core/confignode/src/main/java/org/apache/iotdb/confignode/manager/pipe/connector/protocol/IoTDBConfigRegionAirGapConnector.java
index ba151351c39..0d466605d8f 100644
--- 
a/iotdb-core/confignode/src/main/java/org/apache/iotdb/confignode/manager/pipe/connector/protocol/IoTDBConfigRegionAirGapConnector.java
+++ 
b/iotdb-core/confignode/src/main/java/org/apache/iotdb/confignode/manager/pipe/connector/protocol/IoTDBConfigRegionAirGapConnector.java
@@ -71,6 +71,8 @@ public class IoTDBConfigRegionAirGapConnector extends 
IoTDBAirGapConnector {
     params.put(
         PipeTransferHandshakeConstant.HANDSHAKE_KEY_CONVERT_ON_TYPE_MISMATCH,
         Boolean.toString(shouldReceiverConvertOnTypeMismatch));
+    params.put(
+        PipeTransferHandshakeConstant.HANDSHAKE_KEY_LOAD_TSFILE_STRATEGY, 
loadTsFileStrategy);
 
     return PipeTransferConfigNodeHandshakeV2Req.toTPipeTransferBytes(params);
   }
diff --git 
a/iotdb-core/confignode/src/main/java/org/apache/iotdb/confignode/manager/pipe/connector/protocol/IoTDBConfigRegionConnector.java
 
b/iotdb-core/confignode/src/main/java/org/apache/iotdb/confignode/manager/pipe/connector/protocol/IoTDBConfigRegionConnector.java
index 96059980c38..1e2f3c19e5a 100644
--- 
a/iotdb-core/confignode/src/main/java/org/apache/iotdb/confignode/manager/pipe/connector/protocol/IoTDBConfigRegionConnector.java
+++ 
b/iotdb-core/confignode/src/main/java/org/apache/iotdb/confignode/manager/pipe/connector/protocol/IoTDBConfigRegionConnector.java
@@ -63,14 +63,16 @@ public class IoTDBConfigRegionConnector extends 
IoTDBSslSyncConnector {
       final String trustStorePwd,
       final boolean useLeaderCache,
       final String loadBalanceStrategy,
-      final boolean shouldReceiverConvertOnTypeMismatch) {
+      final boolean shouldReceiverConvertOnTypeMismatch,
+      final String loadTsFileStrategy) {
     return new IoTDBConfigNodeSyncClientManager(
         nodeUrls,
         useSSL,
         trustStorePath,
         trustStorePwd,
         loadBalanceStrategy,
-        shouldReceiverConvertOnTypeMismatch);
+        shouldReceiverConvertOnTypeMismatch,
+        loadTsFileStrategy);
   }
 
   @Override
diff --git 
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/connector/client/IoTDBDataNodeAsyncClientManager.java
 
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/connector/client/IoTDBDataNodeAsyncClientManager.java
index 6814f8151b9..bfe5360e749 100644
--- 
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/connector/client/IoTDBDataNodeAsyncClientManager.java
+++ 
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/connector/client/IoTDBDataNodeAsyncClientManager.java
@@ -43,8 +43,10 @@ import org.slf4j.LoggerFactory;
 import java.util.HashMap;
 import java.util.HashSet;
 import java.util.List;
+import java.util.Map;
 import java.util.Objects;
 import java.util.Set;
+import java.util.concurrent.ConcurrentHashMap;
 import java.util.concurrent.atomic.AtomicBoolean;
 import java.util.concurrent.atomic.AtomicReference;
 
@@ -60,35 +62,38 @@ public class IoTDBDataNodeAsyncClientManager extends 
IoTDBClientManager
 
   private final Set<TEndPoint> endPointSet;
 
-  private static final AtomicReference<
-          IClientManager<TEndPoint, AsyncPipeDataTransferServiceClient>>
-      ASYNC_PIPE_DATA_TRANSFER_CLIENT_MANAGER_HOLDER = new AtomicReference<>();
+  private static final Map<String, IClientManager<TEndPoint, 
AsyncPipeDataTransferServiceClient>>
+      ASYNC_PIPE_DATA_TRANSFER_CLIENT_MANAGER_HOLDER = new 
ConcurrentHashMap<>();
   private final IClientManager<TEndPoint, AsyncPipeDataTransferServiceClient> 
endPoint2Client;
 
   private final LoadBalancer loadBalancer;
 
   private final boolean shouldReceiverConvertOnTypeMismatch;
 
+  private final String loadTsFileStrategy;
+
   public IoTDBDataNodeAsyncClientManager(
       List<TEndPoint> endPoints,
       boolean useLeaderCache,
       String loadBalanceStrategy,
-      boolean shouldReceiverConvertOnTypeMismatch) {
+      boolean shouldReceiverConvertOnTypeMismatch,
+      String loadTsFileStrategy) {
     super(endPoints, useLeaderCache);
 
     endPointSet = new HashSet<>(endPoints);
 
-    if (ASYNC_PIPE_DATA_TRANSFER_CLIENT_MANAGER_HOLDER.get() == null) {
+    final String receiverAttributes =
+        String.format("%s-%s", shouldReceiverConvertOnTypeMismatch, 
loadTsFileStrategy);
+    if 
(!ASYNC_PIPE_DATA_TRANSFER_CLIENT_MANAGER_HOLDER.containsKey(receiverAttributes))
 {
       synchronized (IoTDBDataRegionAsyncConnector.class) {
-        if (ASYNC_PIPE_DATA_TRANSFER_CLIENT_MANAGER_HOLDER.get() == null) {
-          ASYNC_PIPE_DATA_TRANSFER_CLIENT_MANAGER_HOLDER.set(
-              new IClientManager.Factory<TEndPoint, 
AsyncPipeDataTransferServiceClient>()
-                  .createClientManager(
-                      new 
ClientPoolFactory.AsyncPipeDataTransferServiceClientPoolFactory()));
-        }
+        ASYNC_PIPE_DATA_TRANSFER_CLIENT_MANAGER_HOLDER.putIfAbsent(
+            receiverAttributes,
+            new IClientManager.Factory<TEndPoint, 
AsyncPipeDataTransferServiceClient>()
+                .createClientManager(
+                    new 
ClientPoolFactory.AsyncPipeDataTransferServiceClientPoolFactory()));
       }
     }
-    endPoint2Client = ASYNC_PIPE_DATA_TRANSFER_CLIENT_MANAGER_HOLDER.get();
+    endPoint2Client = 
ASYNC_PIPE_DATA_TRANSFER_CLIENT_MANAGER_HOLDER.get(receiverAttributes);
 
     switch (loadBalanceStrategy) {
       case CONNECTOR_LOAD_BALANCE_ROUND_ROBIN_STRATEGY:
@@ -108,6 +113,7 @@ public class IoTDBDataNodeAsyncClientManager extends 
IoTDBClientManager
     }
 
     this.shouldReceiverConvertOnTypeMismatch = 
shouldReceiverConvertOnTypeMismatch;
+    this.loadTsFileStrategy = loadTsFileStrategy;
   }
 
   public AsyncPipeDataTransferServiceClient borrowClient() throws Exception {
@@ -219,6 +225,8 @@ public class IoTDBDataNodeAsyncClientManager extends 
IoTDBClientManager
       params.put(
           PipeTransferHandshakeConstant.HANDSHAKE_KEY_CONVERT_ON_TYPE_MISMATCH,
           Boolean.toString(shouldReceiverConvertOnTypeMismatch));
+      params.put(
+          PipeTransferHandshakeConstant.HANDSHAKE_KEY_LOAD_TSFILE_STRATEGY, 
loadTsFileStrategy);
 
       
client.setTimeoutDynamically(PipeConfig.getInstance().getPipeConnectorHandshakeTimeoutMs());
       
client.pipeTransfer(PipeTransferDataNodeHandshakeV2Req.toTPipeTransferReq(params),
 callback);
diff --git 
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/connector/client/IoTDBDataNodeSyncClientManager.java
 
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/connector/client/IoTDBDataNodeSyncClientManager.java
index df9ae2a0e29..5a341dc7574 100644
--- 
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/connector/client/IoTDBDataNodeSyncClientManager.java
+++ 
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/connector/client/IoTDBDataNodeSyncClientManager.java
@@ -49,7 +49,8 @@ public class IoTDBDataNodeSyncClientManager extends 
IoTDBSyncClientManager
       String trustStorePwd,
       boolean useLeaderCache,
       String loadBalanceStrategy,
-      boolean shouldReceiverConvertOnTypeMismatch) {
+      boolean shouldReceiverConvertOnTypeMismatch,
+      String loadTsFileStrategy) {
     super(
         endPoints,
         useSSL,
@@ -57,7 +58,8 @@ public class IoTDBDataNodeSyncClientManager extends 
IoTDBSyncClientManager
         trustStorePwd,
         useLeaderCache,
         loadBalanceStrategy,
-        shouldReceiverConvertOnTypeMismatch);
+        shouldReceiverConvertOnTypeMismatch,
+        loadTsFileStrategy);
   }
 
   @Override
diff --git 
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/connector/protocol/airgap/IoTDBDataNodeAirGapConnector.java
 
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/connector/protocol/airgap/IoTDBDataNodeAirGapConnector.java
index f0491431196..221fa516448 100644
--- 
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/connector/protocol/airgap/IoTDBDataNodeAirGapConnector.java
+++ 
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/connector/protocol/airgap/IoTDBDataNodeAirGapConnector.java
@@ -103,6 +103,8 @@ public abstract class IoTDBDataNodeAirGapConnector extends 
IoTDBAirGapConnector
     params.put(
         PipeTransferHandshakeConstant.HANDSHAKE_KEY_CONVERT_ON_TYPE_MISMATCH,
         Boolean.toString(shouldReceiverConvertOnTypeMismatch));
+    params.put(
+        PipeTransferHandshakeConstant.HANDSHAKE_KEY_LOAD_TSFILE_STRATEGY, 
loadTsFileStrategy);
 
     return PipeTransferDataNodeHandshakeV2Req.toTPipeTransferBytes(params);
   }
diff --git 
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/connector/protocol/thrift/async/IoTDBDataRegionAsyncConnector.java
 
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/connector/protocol/thrift/async/IoTDBDataRegionAsyncConnector.java
index 3f37b247869..394e2e95d2b 100644
--- 
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/connector/protocol/thrift/async/IoTDBDataRegionAsyncConnector.java
+++ 
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/connector/protocol/thrift/async/IoTDBDataRegionAsyncConnector.java
@@ -125,7 +125,8 @@ public class IoTDBDataRegionAsyncConnector extends 
IoTDBConnector {
                 Arrays.asList(SINK_LEADER_CACHE_ENABLE_KEY, 
CONNECTOR_LEADER_CACHE_ENABLE_KEY),
                 CONNECTOR_LEADER_CACHE_ENABLE_DEFAULT_VALUE),
             loadBalanceStrategy,
-            shouldReceiverConvertOnTypeMismatch);
+            shouldReceiverConvertOnTypeMismatch,
+            loadTsFileStrategy);
 
     if (isTabletBatchModeEnabled) {
       tabletBatchBuilder = new PipeTransferBatchReqBuilder(parameters);
diff --git 
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/connector/protocol/thrift/sync/IoTDBDataNodeSyncConnector.java
 
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/connector/protocol/thrift/sync/IoTDBDataNodeSyncConnector.java
index 32c886fbe56..9580346d25b 100644
--- 
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/connector/protocol/thrift/sync/IoTDBDataNodeSyncConnector.java
+++ 
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/connector/protocol/thrift/sync/IoTDBDataNodeSyncConnector.java
@@ -87,7 +87,8 @@ public abstract class IoTDBDataNodeSyncConnector extends 
IoTDBSslSyncConnector {
       final String trustStorePwd,
       final boolean useLeaderCache,
       final String loadBalanceStrategy,
-      final boolean shouldReceiverConvertOnTypeMismatch) {
+      final boolean shouldReceiverConvertOnTypeMismatch,
+      final String loadTsFileStrategy) {
     clientManager =
         new IoTDBDataNodeSyncClientManager(
             nodeUrls,
@@ -96,7 +97,8 @@ public abstract class IoTDBDataNodeSyncConnector extends 
IoTDBSslSyncConnector {
             trustStorePwd,
             useLeaderCache,
             loadBalanceStrategy,
-            shouldReceiverConvertOnTypeMismatch);
+            shouldReceiverConvertOnTypeMismatch,
+            loadTsFileStrategy);
     return clientManager;
   }
 
diff --git 
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/receiver/protocol/thrift/IoTDBDataNodeReceiver.java
 
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/receiver/protocol/thrift/IoTDBDataNodeReceiver.java
index 20a34319d35..52bbe1f68bc 100644
--- 
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/receiver/protocol/thrift/IoTDBDataNodeReceiver.java
+++ 
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/receiver/protocol/thrift/IoTDBDataNodeReceiver.java
@@ -32,6 +32,7 @@ import 
org.apache.iotdb.commons.pipe.connector.payload.thrift.request.PipeTransf
 import 
org.apache.iotdb.commons.pipe.connector.payload.thrift.request.PipeTransferSliceReq;
 import org.apache.iotdb.commons.pipe.pattern.IoTDBPipePattern;
 import org.apache.iotdb.commons.pipe.receiver.IoTDBFileReceiver;
+import org.apache.iotdb.commons.utils.FileUtils;
 import org.apache.iotdb.db.auth.AuthorityChecker;
 import org.apache.iotdb.db.conf.IoTDBConfig;
 import org.apache.iotdb.db.conf.IoTDBDescriptor;
@@ -87,12 +88,14 @@ import org.apache.tsfile.utils.Pair;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
+import java.io.File;
 import java.io.FileNotFoundException;
 import java.io.IOException;
 import java.nio.file.Paths;
 import java.time.ZoneId;
 import java.util.ArrayList;
 import java.util.Arrays;
+import java.util.Collections;
 import java.util.List;
 import java.util.Map;
 import java.util.Objects;
@@ -405,8 +408,10 @@ public class IoTDBDataNodeReceiver extends 
IoTDBFileReceiver {
 
   @Override
   protected TSStatus loadFileV1(final PipeTransferFileSealReqV1 req, final 
String fileAbsolutePath)
-      throws FileNotFoundException {
-    return loadTsFile(fileAbsolutePath);
+      throws IOException {
+    return isUsingAsyncLoadTsFileStrategy.get()
+        ? loadTsFileAsync(Collections.singletonList(fileAbsolutePath))
+        : loadTsFileSync(fileAbsolutePath);
   }
 
   @Override
@@ -415,11 +420,30 @@ public class IoTDBDataNodeReceiver extends 
IoTDBFileReceiver {
       throws IOException, IllegalPathException {
     return req instanceof PipeTransferTsFileSealWithModReq
         // TsFile's absolute path will be the second element
-        ? loadTsFile(fileAbsolutePaths.get(1))
+        ? (isUsingAsyncLoadTsFileStrategy.get()
+            ? loadTsFileAsync(fileAbsolutePaths)
+            : loadTsFileSync(fileAbsolutePaths.get(1)))
         : loadSchemaSnapShot(req.getParameters(), fileAbsolutePaths);
   }
 
-  private TSStatus loadTsFile(final String fileAbsolutePath) throws 
FileNotFoundException {
+  private TSStatus loadTsFileAsync(final List<String> absolutePaths) throws 
IOException {
+    final String loadActiveListeningPipeDir = 
IOTDB_CONFIG.getLoadActiveListeningPipeDir();
+
+    for (final String absolutePath : absolutePaths) {
+      if (absolutePath == null) {
+        continue;
+      }
+      final File sourceFile = new File(absolutePath);
+      if (!Objects.equals(
+          loadActiveListeningPipeDir, 
sourceFile.getParentFile().getAbsolutePath())) {
+        FileUtils.moveFileWithMD5Check(sourceFile, new 
File(loadActiveListeningPipeDir));
+      }
+    }
+
+    return new TSStatus(TSStatusCode.SUCCESS_STATUS.getStatusCode());
+  }
+
+  private TSStatus loadTsFileSync(final String fileAbsolutePath) throws 
FileNotFoundException {
     final LoadTsFileStatement statement = new 
LoadTsFileStatement(fileAbsolutePath);
 
     statement.setDeleteAfterLoad(true);
diff --git 
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/storageengine/load/active/ActiveLoadTsFileLoader.java
 
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/storageengine/load/active/ActiveLoadTsFileLoader.java
index a5b45146067..b130e58e2f1 100644
--- 
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/storageengine/load/active/ActiveLoadTsFileLoader.java
+++ 
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/storageengine/load/active/ActiveLoadTsFileLoader.java
@@ -36,9 +36,7 @@ import 
org.apache.iotdb.db.queryengine.plan.statement.pipe.PipeEnrichedStatement
 import 
org.apache.iotdb.db.storageengine.load.metrics.ActiveLoadingFilesMetricsSet;
 import org.apache.iotdb.rpc.TSStatusCode;
 
-import org.apache.commons.codec.digest.DigestUtils;
 import org.apache.commons.io.FileUtils;
-import org.apache.commons.io.FilenameUtils;
 import org.apache.tsfile.utils.Pair;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
@@ -46,12 +44,10 @@ import org.slf4j.LoggerFactory;
 import java.io.File;
 import java.io.FileNotFoundException;
 import java.io.IOException;
-import java.io.InputStream;
 import java.nio.file.FileVisitResult;
 import java.nio.file.Files;
 import java.nio.file.Path;
 import java.nio.file.SimpleFileVisitor;
-import java.nio.file.StandardCopyOption;
 import java.nio.file.attribute.BasicFileAttributes;
 import java.time.ZoneId;
 import java.util.Objects;
@@ -231,7 +227,6 @@ public class ActiveLoadTsFileLoader {
           "Rejecting auto load tsfile {} (isGeneratedByPipe = {}) due to 
memory constraints, will retry later.",
           filePair.getLeft(),
           filePair.getRight());
-      pendingQueue.enqueue(filePair.getLeft(), filePair.getRight());
     } else {
       LOGGER.warn(
           "Failed to auto load tsfile {} (isGeneratedByPipe = {}) because of 
an unexpected exception. File will be moved to fail directory.",
@@ -257,62 +252,12 @@ public class ActiveLoadTsFileLoader {
 
     final File targetDir = new File(failDir.get());
     try {
-      moveFileWithMD5Check(sourceFile, targetDir);
+      
org.apache.iotdb.commons.utils.FileUtils.moveFileWithMD5Check(sourceFile, 
targetDir);
     } catch (final IOException e) {
       LOGGER.warn("Error occurred during moving file {} to fail directory.", 
filePath, e);
     }
   }
 
-  private static void moveFileWithMD5Check(final File sourceFile, final File 
targetDir)
-      throws IOException {
-    final String sourceFileName = sourceFile.getName();
-    final File targetFile = new File(targetDir, sourceFileName);
-
-    if (targetFile.exists()) {
-      if (haveSameMD5(sourceFile, targetFile)) {
-        FileUtils.forceDelete(sourceFile);
-        LOGGER.info(
-            "Deleted the file {} because it already exists in the fail 
directory: {}",
-            sourceFile.getName(),
-            targetDir.getAbsolutePath());
-      } else {
-        renameWithMD5(sourceFile, targetDir);
-        LOGGER.info(
-            "Renamed file {} to {} because it already exists in the fail 
directory: {}",
-            sourceFile.getName(),
-            targetFile.getName(),
-            targetDir.getAbsolutePath());
-      }
-    } else {
-      FileUtils.moveFileToDirectory(sourceFile, targetDir, true);
-      LOGGER.info(
-          "Moved file {} to fail directory {}.", sourceFile.getName(), 
targetDir.getAbsolutePath());
-    }
-  }
-
-  private static boolean haveSameMD5(final File file1, final File file2) {
-    try (final InputStream is1 = Files.newInputStream(file1.toPath());
-        final InputStream is2 = Files.newInputStream(file2.toPath())) {
-      return DigestUtils.md5Hex(is1).equals(DigestUtils.md5Hex(is2));
-    } catch (final Exception e) {
-      return false;
-    }
-  }
-
-  private static void renameWithMD5(File sourceFile, File targetDir) throws 
IOException {
-    try (final InputStream is = Files.newInputStream(sourceFile.toPath())) {
-      final String sourceFileBaseName = 
FilenameUtils.getBaseName(sourceFile.getName());
-      final String sourceFileExtension = 
FilenameUtils.getExtension(sourceFile.getName());
-      final String sourceFileMD5 = DigestUtils.md5Hex(is);
-
-      final String targetFileName =
-          sourceFileBaseName + "-" + sourceFileMD5.substring(0, 16) + "." + 
sourceFileExtension;
-      final File targetFile = new File(targetDir, targetFileName);
-
-      FileUtils.moveFile(sourceFile, targetFile, 
StandardCopyOption.REPLACE_EXISTING);
-    }
-  }
-
   // Metrics
   public long countAndReportFailedFileNumber() {
     final long[] fileCount = {0};
diff --git 
a/iotdb-core/node-commons/src/main/java/org/apache/iotdb/commons/pipe/config/constant/PipeConnectorConstant.java
 
b/iotdb-core/node-commons/src/main/java/org/apache/iotdb/commons/pipe/config/constant/PipeConnectorConstant.java
index 87ec0aaaeae..77eccfb4069 100644
--- 
a/iotdb-core/node-commons/src/main/java/org/apache/iotdb/commons/pipe/config/constant/PipeConnectorConstant.java
+++ 
b/iotdb-core/node-commons/src/main/java/org/apache/iotdb/commons/pipe/config/constant/PipeConnectorConstant.java
@@ -225,6 +225,17 @@ public class PipeConnectorConstant {
   public static final String CONNECTOR_CONSENSUS_GROUP_ID_KEY = 
"connector.consensus.group-id";
   public static final String CONNECTOR_CONSENSUS_PIPE_NAME = 
"connector.consensus.pipe-name";
 
+  public static final String CONNECTOR_LOAD_TSFILE_STRATEGY_KEY = 
"connector.load-tsfile-strategy";
+  public static final String SINK_LOAD_TSFILE_STRATEGY_KEY = 
"sink.load-tsfile-strategy";
+  public static final String CONNECTOR_LOAD_TSFILE_STRATEGY_ASYNC_VALUE = 
"async";
+  public static final String CONNECTOR_LOAD_TSFILE_STRATEGY_SYNC_VALUE = 
"sync";
+  public static final Set<String> CONNECTOR_LOAD_TSFILE_STRATEGY_SET =
+      Collections.unmodifiableSet(
+          new HashSet<>(
+              Arrays.asList(
+                  CONNECTOR_LOAD_TSFILE_STRATEGY_ASYNC_VALUE,
+                  CONNECTOR_LOAD_TSFILE_STRATEGY_SYNC_VALUE)));
+
   private PipeConnectorConstant() {
     throw new IllegalStateException("Utility class");
   }
diff --git 
a/iotdb-core/node-commons/src/main/java/org/apache/iotdb/commons/pipe/connector/client/IoTDBSyncClientManager.java
 
b/iotdb-core/node-commons/src/main/java/org/apache/iotdb/commons/pipe/connector/client/IoTDBSyncClientManager.java
index 57b9a4fede4..81b5194621c 100644
--- 
a/iotdb-core/node-commons/src/main/java/org/apache/iotdb/commons/pipe/connector/client/IoTDBSyncClientManager.java
+++ 
b/iotdb-core/node-commons/src/main/java/org/apache/iotdb/commons/pipe/connector/client/IoTDBSyncClientManager.java
@@ -62,6 +62,8 @@ public abstract class IoTDBSyncClientManager extends 
IoTDBClientManager implemen
 
   private final boolean shouldReceiverConvertOnTypeMismatch;
 
+  private final String loadTsFileStrategy;
+
   protected IoTDBSyncClientManager(
       List<TEndPoint> endPoints,
       boolean useSSL,
@@ -69,7 +71,8 @@ public abstract class IoTDBSyncClientManager extends 
IoTDBClientManager implemen
       String trustStorePwd,
       boolean useLeaderCache,
       String loadBalanceStrategy,
-      boolean shouldReceiverConvertOnTypeMismatch) {
+      boolean shouldReceiverConvertOnTypeMismatch,
+      String loadTsFileStrategy) {
     super(endPoints, useLeaderCache);
 
     this.useSSL = useSSL;
@@ -98,6 +101,7 @@ public abstract class IoTDBSyncClientManager extends 
IoTDBClientManager implemen
     }
 
     this.shouldReceiverConvertOnTypeMismatch = 
shouldReceiverConvertOnTypeMismatch;
+    this.loadTsFileStrategy = loadTsFileStrategy;
   }
 
   public void checkClientStatusAndTryReconstructIfNecessary() {
@@ -177,6 +181,8 @@ public abstract class IoTDBSyncClientManager extends 
IoTDBClientManager implemen
       params.put(
           PipeTransferHandshakeConstant.HANDSHAKE_KEY_CONVERT_ON_TYPE_MISMATCH,
           Boolean.toString(shouldReceiverConvertOnTypeMismatch));
+      params.put(
+          PipeTransferHandshakeConstant.HANDSHAKE_KEY_LOAD_TSFILE_STRATEGY, 
loadTsFileStrategy);
 
       // Try to handshake by PipeTransferHandshakeV2Req.
       TPipeTransferResp resp = 
client.pipeTransfer(buildHandshakeV2Req(params));
diff --git 
a/iotdb-core/node-commons/src/main/java/org/apache/iotdb/commons/pipe/connector/payload/thrift/common/PipeTransferHandshakeConstant.java
 
b/iotdb-core/node-commons/src/main/java/org/apache/iotdb/commons/pipe/connector/payload/thrift/common/PipeTransferHandshakeConstant.java
index b6864706aaa..ec8818072d2 100644
--- 
a/iotdb-core/node-commons/src/main/java/org/apache/iotdb/commons/pipe/connector/payload/thrift/common/PipeTransferHandshakeConstant.java
+++ 
b/iotdb-core/node-commons/src/main/java/org/apache/iotdb/commons/pipe/connector/payload/thrift/common/PipeTransferHandshakeConstant.java
@@ -24,6 +24,7 @@ public class PipeTransferHandshakeConstant {
   public static final String HANDSHAKE_KEY_TIME_PRECISION = 
"timestampPrecision";
   public static final String HANDSHAKE_KEY_CLUSTER_ID = "clusterID";
   public static final String HANDSHAKE_KEY_CONVERT_ON_TYPE_MISMATCH = 
"convertOnTypeMismatch";
+  public static final String HANDSHAKE_KEY_LOAD_TSFILE_STRATEGY = 
"loadTsFileStrategy";
 
   private PipeTransferHandshakeConstant() {
     // Utility class
diff --git 
a/iotdb-core/node-commons/src/main/java/org/apache/iotdb/commons/pipe/connector/protocol/IoTDBConnector.java
 
b/iotdb-core/node-commons/src/main/java/org/apache/iotdb/commons/pipe/connector/protocol/IoTDBConnector.java
index 55231032e5a..eb66a638672 100644
--- 
a/iotdb-core/node-commons/src/main/java/org/apache/iotdb/commons/pipe/connector/protocol/IoTDBConnector.java
+++ 
b/iotdb-core/node-commons/src/main/java/org/apache/iotdb/commons/pipe/connector/protocol/IoTDBConnector.java
@@ -83,6 +83,9 @@ import static 
org.apache.iotdb.commons.pipe.config.constant.PipeConnectorConstan
 import static 
org.apache.iotdb.commons.pipe.config.constant.PipeConnectorConstant.CONNECTOR_LOAD_BALANCE_ROUND_ROBIN_STRATEGY;
 import static 
org.apache.iotdb.commons.pipe.config.constant.PipeConnectorConstant.CONNECTOR_LOAD_BALANCE_STRATEGY_KEY;
 import static 
org.apache.iotdb.commons.pipe.config.constant.PipeConnectorConstant.CONNECTOR_LOAD_BALANCE_STRATEGY_SET;
+import static 
org.apache.iotdb.commons.pipe.config.constant.PipeConnectorConstant.CONNECTOR_LOAD_TSFILE_STRATEGY_KEY;
+import static 
org.apache.iotdb.commons.pipe.config.constant.PipeConnectorConstant.CONNECTOR_LOAD_TSFILE_STRATEGY_SET;
+import static 
org.apache.iotdb.commons.pipe.config.constant.PipeConnectorConstant.CONNECTOR_LOAD_TSFILE_STRATEGY_SYNC_VALUE;
 import static 
org.apache.iotdb.commons.pipe.config.constant.PipeConnectorConstant.CONNECTOR_RATE_LIMIT_DEFAULT_VALUE;
 import static 
org.apache.iotdb.commons.pipe.config.constant.PipeConnectorConstant.CONNECTOR_RATE_LIMIT_KEY;
 import static 
org.apache.iotdb.commons.pipe.config.constant.PipeConnectorConstant.SINK_COMPRESSOR_KEY;
@@ -101,6 +104,7 @@ import static 
org.apache.iotdb.commons.pipe.config.constant.PipeConnectorConstan
 import static 
org.apache.iotdb.commons.pipe.config.constant.PipeConnectorConstant.SINK_IOTDB_NODE_URLS_KEY;
 import static 
org.apache.iotdb.commons.pipe.config.constant.PipeConnectorConstant.SINK_IOTDB_PORT_KEY;
 import static 
org.apache.iotdb.commons.pipe.config.constant.PipeConnectorConstant.SINK_LOAD_BALANCE_STRATEGY_KEY;
+import static 
org.apache.iotdb.commons.pipe.config.constant.PipeConnectorConstant.SINK_LOAD_TSFILE_STRATEGY_KEY;
 import static 
org.apache.iotdb.commons.pipe.config.constant.PipeConnectorConstant.SINK_RATE_LIMIT_KEY;
 
 public abstract class IoTDBConnector implements PipeConnector {
@@ -116,6 +120,8 @@ public abstract class IoTDBConnector implements 
PipeConnector {
 
   protected String loadBalanceStrategy;
 
+  protected String loadTsFileStrategy;
+
   private boolean isRpcCompressionEnabled;
   private final List<PipeCompressor> compressors = new ArrayList<>();
 
@@ -183,6 +189,20 @@ public abstract class IoTDBConnector implements 
PipeConnector {
             CONNECTOR_LOAD_BALANCE_STRATEGY_SET, loadBalanceStrategy),
         loadBalanceStrategy);
 
+    loadTsFileStrategy =
+        parameters
+            .getStringOrDefault(
+                Arrays.asList(CONNECTOR_LOAD_TSFILE_STRATEGY_KEY, 
SINK_LOAD_TSFILE_STRATEGY_KEY),
+                CONNECTOR_LOAD_TSFILE_STRATEGY_SYNC_VALUE)
+            .trim()
+            .toLowerCase();
+    validator.validate(
+        arg -> CONNECTOR_LOAD_TSFILE_STRATEGY_SET.contains(loadTsFileStrategy),
+        String.format(
+            "Load tsfile strategy should be one of %s, but got %s.",
+            CONNECTOR_LOAD_TSFILE_STRATEGY_SET, loadTsFileStrategy),
+        loadTsFileStrategy);
+
     final int zstdCompressionLevel =
         parameters.getIntOrDefault(
             Arrays.asList(CONNECTOR_COMPRESSOR_ZSTD_LEVEL_KEY, 
SINK_COMPRESSOR_ZSTD_LEVEL_KEY),
diff --git 
a/iotdb-core/node-commons/src/main/java/org/apache/iotdb/commons/pipe/connector/protocol/IoTDBSslSyncConnector.java
 
b/iotdb-core/node-commons/src/main/java/org/apache/iotdb/commons/pipe/connector/protocol/IoTDBSslSyncConnector.java
index 271596cf549..b1a0174e75c 100644
--- 
a/iotdb-core/node-commons/src/main/java/org/apache/iotdb/commons/pipe/connector/protocol/IoTDBSslSyncConnector.java
+++ 
b/iotdb-core/node-commons/src/main/java/org/apache/iotdb/commons/pipe/connector/protocol/IoTDBSslSyncConnector.java
@@ -122,7 +122,8 @@ public abstract class IoTDBSslSyncConnector extends 
IoTDBConnector {
             trustStorePwd,
             useLeaderCache,
             loadBalanceStrategy,
-            shouldReceiverConvertOnTypeMismatch);
+            shouldReceiverConvertOnTypeMismatch,
+            loadTsFileStrategy);
   }
 
   protected abstract IoTDBSyncClientManager constructClient(
@@ -132,7 +133,8 @@ public abstract class IoTDBSslSyncConnector extends 
IoTDBConnector {
       String trustStorePwd,
       boolean useLeaderCache,
       String loadBalanceStrategy,
-      boolean shouldReceiverConvertOnTypeMismatch);
+      boolean shouldReceiverConvertOnTypeMismatch,
+      String loadTsFileStrategy);
 
   @Override
   public void handshake() throws Exception {
diff --git 
a/iotdb-core/node-commons/src/main/java/org/apache/iotdb/commons/pipe/receiver/IoTDBFileReceiver.java
 
b/iotdb-core/node-commons/src/main/java/org/apache/iotdb/commons/pipe/receiver/IoTDBFileReceiver.java
index 7197e112526..4b37cf9aabd 100644
--- 
a/iotdb-core/node-commons/src/main/java/org/apache/iotdb/commons/pipe/receiver/IoTDBFileReceiver.java
+++ 
b/iotdb-core/node-commons/src/main/java/org/apache/iotdb/commons/pipe/receiver/IoTDBFileReceiver.java
@@ -22,6 +22,7 @@ package org.apache.iotdb.commons.pipe.receiver;
 import org.apache.iotdb.common.rpc.thrift.TSStatus;
 import org.apache.iotdb.commons.conf.CommonDescriptor;
 import org.apache.iotdb.commons.exception.IllegalPathException;
+import org.apache.iotdb.commons.pipe.config.constant.PipeConnectorConstant;
 import 
org.apache.iotdb.commons.pipe.connector.payload.thrift.common.PipeTransferHandshakeConstant;
 import 
org.apache.iotdb.commons.pipe.connector.payload.thrift.request.IoTDBConnectorRequestVersion;
 import 
org.apache.iotdb.commons.pipe.connector.payload.thrift.request.PipeRequestType;
@@ -46,6 +47,7 @@ import java.io.IOException;
 import java.io.RandomAccessFile;
 import java.util.List;
 import java.util.Objects;
+import java.util.concurrent.atomic.AtomicBoolean;
 import java.util.concurrent.atomic.AtomicLong;
 import java.util.concurrent.atomic.AtomicReference;
 import java.util.stream.Collectors;
@@ -71,6 +73,9 @@ public abstract class IoTDBFileReceiver implements 
IoTDBReceiver {
   protected boolean shouldConvertDataTypeOnTypeMismatch =
       CONNECTOR_EXCEPTION_DATA_CONVERT_ON_TYPE_MISMATCH_DEFAULT_VALUE;
 
+  // Used to determine current strategy is sync or async
+  protected final AtomicBoolean isUsingAsyncLoadTsFileStrategy = new 
AtomicBoolean(false);
+
   @Override
   public IoTDBConnectorRequestVersion getVersion() {
     return IoTDBConnectorRequestVersion.VERSION_1;
@@ -228,6 +233,15 @@ public abstract class IoTDBFileReceiver implements 
IoTDBReceiver {
           Boolean.parseBoolean(shouldConvertDataTypeOnTypeMismatchString);
     }
 
+    final String loadTsFileStrategyString =
+        
req.getParams().get(PipeTransferHandshakeConstant.HANDSHAKE_KEY_LOAD_TSFILE_STRATEGY);
+    if (loadTsFileStrategyString != null) {
+      isUsingAsyncLoadTsFileStrategy.set(
+          Objects.equals(
+              PipeConnectorConstant.CONNECTOR_LOAD_TSFILE_STRATEGY_ASYNC_VALUE,
+              loadTsFileStrategyString));
+    }
+
     // Handle the handshake request as a v1 request.
     // Here we construct a fake "dataNode" request to valid from v1 validation 
logic, though
     // it may not require the actual type of the v1 request.
diff --git 
a/iotdb-core/node-commons/src/main/java/org/apache/iotdb/commons/utils/FileUtils.java
 
b/iotdb-core/node-commons/src/main/java/org/apache/iotdb/commons/utils/FileUtils.java
index ee2f5098642..71498bbe220 100644
--- 
a/iotdb-core/node-commons/src/main/java/org/apache/iotdb/commons/utils/FileUtils.java
+++ 
b/iotdb-core/node-commons/src/main/java/org/apache/iotdb/commons/utils/FileUtils.java
@@ -21,6 +21,8 @@ package org.apache.iotdb.commons.utils;
 
 import org.apache.iotdb.commons.file.SystemFileFactory;
 
+import org.apache.commons.codec.digest.DigestUtils;
+import org.apache.commons.io.FilenameUtils;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
@@ -30,11 +32,13 @@ import java.io.File;
 import java.io.FileInputStream;
 import java.io.FileOutputStream;
 import java.io.IOException;
+import java.io.InputStream;
 import java.nio.file.DirectoryNotEmptyException;
 import java.nio.file.FileSystems;
 import java.nio.file.Files;
 import java.nio.file.NoSuchFileException;
 import java.nio.file.Path;
+import java.nio.file.StandardCopyOption;
 import java.text.CharacterIterator;
 import java.text.StringCharacterIterator;
 import java.util.Arrays;
@@ -304,4 +308,53 @@ public class FileUtils {
     }
     return String.format("%.2f %cB", bytes / 1000.0, ci.current());
   }
+
+  public static void moveFileWithMD5Check(final File sourceFile, final File 
targetDir)
+      throws IOException {
+    final String sourceFileName = sourceFile.getName();
+    final File targetFile = new File(targetDir, sourceFileName);
+
+    if (targetFile.exists()) {
+      if (haveSameMD5(sourceFile, targetFile)) {
+        org.apache.commons.io.FileUtils.forceDelete(sourceFile);
+        LOGGER.info(
+            "Deleted the file {} because it already exists in the fail 
directory: {}",
+            sourceFile.getName(),
+            targetDir.getAbsolutePath());
+      } else {
+        renameWithMD5(sourceFile, targetDir);
+        LOGGER.info(
+            "Renamed file {} to {} because it already exists in the fail 
directory: {}",
+            sourceFile.getName(),
+            targetFile.getName(),
+            targetDir.getAbsolutePath());
+      }
+    } else {
+      org.apache.commons.io.FileUtils.moveFileToDirectory(sourceFile, 
targetDir, true);
+    }
+  }
+
+  private static boolean haveSameMD5(final File file1, final File file2) {
+    try (final InputStream is1 = Files.newInputStream(file1.toPath());
+        final InputStream is2 = Files.newInputStream(file2.toPath())) {
+      return DigestUtils.md5Hex(is1).equals(DigestUtils.md5Hex(is2));
+    } catch (final Exception e) {
+      return false;
+    }
+  }
+
+  private static void renameWithMD5(File sourceFile, File targetDir) throws 
IOException {
+    try (final InputStream is = Files.newInputStream(sourceFile.toPath())) {
+      final String sourceFileBaseName = 
FilenameUtils.getBaseName(sourceFile.getName());
+      final String sourceFileExtension = 
FilenameUtils.getExtension(sourceFile.getName());
+      final String sourceFileMD5 = DigestUtils.md5Hex(is);
+
+      final String targetFileName =
+          sourceFileBaseName + "-" + sourceFileMD5.substring(0, 16) + "." + 
sourceFileExtension;
+      final File targetFile = new File(targetDir, targetFileName);
+
+      org.apache.commons.io.FileUtils.moveFile(
+          sourceFile, targetFile, StandardCopyOption.REPLACE_EXISTING);
+    }
+  }
 }


Reply via email to