This is an automated email from the ASF dual-hosted git repository.

rong pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/iotdb.git


The following commit(s) were added to refs/heads/master by this push:
     new 7036e9ff935 Pipe: dynamically adjust connection timeout to handle 
SocketTimeoutException & Avoid resource cleaning when load task is in process 
(#12485)
7036e9ff935 is described below

commit 7036e9ff9359fbc903b6695242abaf3eab4a881f
Author: Steve Yurong Su <[email protected]>
AuthorDate: Wed May 8 20:36:44 2024 +0800

    Pipe: dynamically adjust connection timeout to handle 
SocketTimeoutException & Avoid resource cleaning when load task is in process 
(#12485)
    
    * Pipe: dynamically adjust connection timeout to handle 
SocketTimeoutException
    
    * Load: dynamically adjust connection timeout to handle 
SocketTimeoutException & Avoid resource cleaning when load task is in process
---
 .../async/IoTDBDataRegionAsyncConnector.java       |   2 +-
 .../PipeTransferTsFileInsertionEventHandler.java   |  19 ++-
 .../thrift/sync/IoTDBDataRegionSyncConnector.java  |   2 +
 .../exchange/sender/TwoStageAggregateSender.java   |   2 +-
 .../protocol/airgap/IoTDBAirGapReceiver.java       |   2 +-
 .../execution/load/LoadTsFileManager.java          |  65 +++++--
 .../scheduler/load/LoadTsFileDispatcherImpl.java   | 190 ++++++++++++---------
 .../iotdb/db/storageengine/StorageEngine.java      |   3 +-
 .../apache/iotdb/commons/conf/CommonConfig.java    |  24 ++-
 .../iotdb/commons/pipe/config/PipeConfig.java      |   4 +-
 .../pipe/connector/client/IoTDBClientManager.java  |  37 ++++
 .../connector/client/IoTDBSyncClientManager.java   |   4 +-
 .../connector/protocol/IoTDBAirGapConnector.java   |   2 +-
 13 files changed, 249 insertions(+), 107 deletions(-)

diff --git 
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/connector/protocol/thrift/async/IoTDBDataRegionAsyncConnector.java
 
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/connector/protocol/thrift/async/IoTDBDataRegionAsyncConnector.java
index 56ad0b97686..02c02b94b5e 100644
--- 
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/connector/protocol/thrift/async/IoTDBDataRegionAsyncConnector.java
+++ 
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/connector/protocol/thrift/async/IoTDBDataRegionAsyncConnector.java
@@ -291,7 +291,7 @@ public class IoTDBDataRegionAsyncConnector extends 
IoTDBConnector {
     AsyncPipeDataTransferServiceClient client = null;
     try {
       client = clientManager.borrowClient();
-      pipeTransferTsFileInsertionEventHandler.transfer(client);
+      pipeTransferTsFileInsertionEventHandler.transfer(clientManager, client);
     } catch (final Exception ex) {
       logOnClientException(client, ex);
       pipeTransferTsFileInsertionEventHandler.onError(ex);
diff --git 
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/connector/protocol/thrift/async/handler/PipeTransferTsFileInsertionEventHandler.java
 
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/connector/protocol/thrift/async/handler/PipeTransferTsFileInsertionEventHandler.java
index bcafce049aa..9e426f41702 100644
--- 
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/connector/protocol/thrift/async/handler/PipeTransferTsFileInsertionEventHandler.java
+++ 
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/connector/protocol/thrift/async/handler/PipeTransferTsFileInsertionEventHandler.java
@@ -23,6 +23,7 @@ import org.apache.iotdb.common.rpc.thrift.TSStatus;
 import 
org.apache.iotdb.commons.client.async.AsyncPipeDataTransferServiceClient;
 import org.apache.iotdb.commons.pipe.config.PipeConfig;
 import 
org.apache.iotdb.commons.pipe.connector.payload.thrift.response.PipeTransferFilePieceResp;
+import 
org.apache.iotdb.db.pipe.connector.client.IoTDBDataNodeAsyncClientManager;
 import 
org.apache.iotdb.db.pipe.connector.payload.evolvable.request.PipeTransferTsFilePieceReq;
 import 
org.apache.iotdb.db.pipe.connector.payload.evolvable.request.PipeTransferTsFilePieceWithModReq;
 import 
org.apache.iotdb.db.pipe.connector.payload.evolvable.request.PipeTransferTsFileSealReq;
@@ -68,6 +69,7 @@ public class PipeTransferTsFileInsertionEventHandler
 
   private final AtomicBoolean isSealSignalSent;
 
+  private IoTDBDataNodeAsyncClientManager clientManager;
   private AsyncPipeDataTransferServiceClient client;
 
   public PipeTransferTsFileInsertionEventHandler(
@@ -93,10 +95,15 @@ public class PipeTransferTsFileInsertionEventHandler
     isSealSignalSent = new AtomicBoolean(false);
   }
 
-  public void transfer(final AsyncPipeDataTransferServiceClient client)
+  public void transfer(
+      IoTDBDataNodeAsyncClientManager clientManager,
+      final AsyncPipeDataTransferServiceClient client)
       throws TException, IOException {
+    this.clientManager = clientManager;
     this.client = client;
+
     client.setShouldReturnSelf(false);
+    client.setTimeout(clientManager.getConnectionTimeout());
 
     final int readLength = reader.read(readBuffer);
 
@@ -110,7 +117,7 @@ public class PipeTransferTsFileInsertionEventHandler
           LOGGER.warn("Failed to close file reader when successfully 
transferred mod file.", e);
         }
         reader = new RandomAccessFile(tsFile, "r");
-        transfer(client);
+        transfer(clientManager, client);
       } else if (currentFile == tsFile) {
         isSealSignalSent.set(true);
         client.pipeTransfer(
@@ -205,7 +212,7 @@ public class PipeTransferTsFileInsertionEventHandler
         }
       }
 
-      transfer(client);
+      transfer(clientManager, client);
     } catch (final Exception e) {
       onError(e);
     }
@@ -220,6 +227,12 @@ public class PipeTransferTsFileInsertionEventHandler
         event.getCommitId(),
         exception);
 
+    try {
+      clientManager.adjustTimeoutIfNecessary(exception);
+    } catch (Exception e) {
+      LOGGER.warn("Failed to adjust timeout when failed to transfer file.", e);
+    }
+
     try {
       if (reader != null) {
         reader.close();
diff --git 
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/connector/protocol/thrift/sync/IoTDBDataRegionSyncConnector.java
 
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/connector/protocol/thrift/sync/IoTDBDataRegionSyncConnector.java
index 9919d954b30..f873cc27646 100644
--- 
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/connector/protocol/thrift/sync/IoTDBDataRegionSyncConnector.java
+++ 
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/connector/protocol/thrift/sync/IoTDBDataRegionSyncConnector.java
@@ -350,6 +350,7 @@ public class IoTDBDataRegionSyncConnector extends 
IoTDBDataNodeSyncConnector {
                         modFile.getName(), modFile.length(), tsFile.getName(), 
tsFile.length()));
       } catch (final Exception e) {
         clientAndStatus.setRight(false);
+        clientManager.adjustTimeoutIfNecessary(e);
         throw new PipeConnectionException(
             String.format("Network error when seal file %s, because %s.", 
tsFile, e.getMessage()),
             e);
@@ -366,6 +367,7 @@ public class IoTDBDataRegionSyncConnector extends 
IoTDBDataNodeSyncConnector {
                         tsFile.getName(), tsFile.length()));
       } catch (final Exception e) {
         clientAndStatus.setRight(false);
+        clientManager.adjustTimeoutIfNecessary(e);
         throw new PipeConnectionException(
             String.format("Network error when seal file %s, because %s.", 
tsFile, e.getMessage()),
             e);
diff --git 
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/processor/twostage/exchange/sender/TwoStageAggregateSender.java
 
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/processor/twostage/exchange/sender/TwoStageAggregateSender.java
index 90c6a2f860a..a24b6c4fc39 100644
--- 
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/processor/twostage/exchange/sender/TwoStageAggregateSender.java
+++ 
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/processor/twostage/exchange/sender/TwoStageAggregateSender.java
@@ -208,7 +208,7 @@ public class TwoStageAggregateSender implements 
AutoCloseable {
   private IoTDBSyncClient constructIoTDBSyncClient(TEndPoint endPoint) throws 
TTransportException {
     return new IoTDBSyncClient(
         new ThriftClientProperty.Builder()
-            .setConnectionTimeoutMs((int) 
PIPE_CONFIG.getPipeConnectorHandshakeTimeoutMs())
+            
.setConnectionTimeoutMs(PIPE_CONFIG.getPipeConnectorHandshakeTimeoutMs())
             .setRpcThriftCompressionEnabled(
                 PIPE_CONFIG.isPipeConnectorRPCThriftCompressionEnabled())
             .build(),
diff --git 
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/receiver/protocol/airgap/IoTDBAirGapReceiver.java
 
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/receiver/protocol/airgap/IoTDBAirGapReceiver.java
index 99d454de382..4cd97d7cb37 100644
--- 
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/receiver/protocol/airgap/IoTDBAirGapReceiver.java
+++ 
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/receiver/protocol/airgap/IoTDBAirGapReceiver.java
@@ -68,7 +68,7 @@ public class IoTDBAirGapReceiver extends WrappedRunnable {
 
   @Override
   public void runMayThrow() throws Throwable {
-    socket.setSoTimeout((int) 
PipeConfig.getInstance().getPipeConnectorTransferTimeoutMs());
+    
socket.setSoTimeout(PipeConfig.getInstance().getPipeConnectorTransferTimeoutMs());
     socket.setKeepAlive(true);
 
     LOGGER.info("Pipe air gap receiver {} started. Socket: {}", receiverId, 
socket);
diff --git 
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/execution/load/LoadTsFileManager.java
 
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/execution/load/LoadTsFileManager.java
index ef4ed7f16b9..b568b887988 100644
--- 
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/execution/load/LoadTsFileManager.java
+++ 
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/execution/load/LoadTsFileManager.java
@@ -54,6 +54,7 @@ import java.nio.file.Path;
 import java.util.HashMap;
 import java.util.Map;
 import java.util.Objects;
+import java.util.Optional;
 import java.util.concurrent.ConcurrentHashMap;
 import java.util.concurrent.PriorityBlockingQueue;
 
@@ -104,6 +105,13 @@ public class LoadTsFileManager {
 
         final CleanupTask cleanupTask = cleanupTaskQueue.peek();
         if (cleanupTask.scheduledTime <= System.currentTimeMillis()) {
+          if (cleanupTask.isLoadTaskRunning) {
+            cleanupTaskQueue.poll();
+            cleanupTask.resetScheduledTime();
+            cleanupTaskQueue.add(cleanupTask);
+            continue;
+          }
+
           cleanupTask.run();
 
           uuid2CleanupTask.remove(cleanupTask.uuid);
@@ -157,17 +165,24 @@ public class LoadTsFileManager {
       }
     }
 
-    final TsFileWriterManager writerManager =
-        uuid2WriterManager.computeIfAbsent(
-            uuid, o -> new 
TsFileWriterManager(SystemFileFactory.INSTANCE.getFile(loadDir, uuid)));
-    for (TsFileData tsFileData : pieceNode.getAllTsFileData()) {
-      if (!tsFileData.isModification()) {
-        ChunkData chunkData = (ChunkData) tsFileData;
-        writerManager.write(
-            new DataPartitionInfo(dataRegion, 
chunkData.getTimePartitionSlot()), chunkData);
-      } else {
-        writerManager.writeDeletion(dataRegion, (DeletionData) tsFileData);
+    final Optional<CleanupTask> cleanupTask = 
Optional.of(uuid2CleanupTask.get(uuid));
+    cleanupTask.ifPresent(CleanupTask::markLoadTaskRunning);
+    try {
+      final TsFileWriterManager writerManager =
+          uuid2WriterManager.computeIfAbsent(
+              uuid,
+              o -> new 
TsFileWriterManager(SystemFileFactory.INSTANCE.getFile(loadDir, uuid)));
+      for (TsFileData tsFileData : pieceNode.getAllTsFileData()) {
+        if (!tsFileData.isModification()) {
+          ChunkData chunkData = (ChunkData) tsFileData;
+          writerManager.write(
+              new DataPartitionInfo(dataRegion, 
chunkData.getTimePartitionSlot()), chunkData);
+        } else {
+          writerManager.writeDeletion(dataRegion, (DeletionData) tsFileData);
+        }
       }
+    } finally {
+      cleanupTask.ifPresent(CleanupTask::markLoadTaskNotRunning);
     }
   }
 
@@ -176,7 +191,15 @@ public class LoadTsFileManager {
     if (!uuid2WriterManager.containsKey(uuid)) {
       return false;
     }
-    uuid2WriterManager.get(uuid).loadAll(isGeneratedByPipe, progressIndex);
+
+    final Optional<CleanupTask> cleanupTask = 
Optional.of(uuid2CleanupTask.get(uuid));
+    cleanupTask.ifPresent(CleanupTask::markLoadTaskRunning);
+    try {
+      uuid2WriterManager.get(uuid).loadAll(isGeneratedByPipe, progressIndex);
+    } finally {
+      cleanupTask.ifPresent(CleanupTask::markLoadTaskNotRunning);
+    }
+
     clean(uuid);
     return true;
   }
@@ -412,12 +435,30 @@ public class LoadTsFileManager {
   private class CleanupTask implements Runnable, Comparable<CleanupTask> {
 
     private final String uuid;
-    private final long scheduledTime;
 
+    private final long delayInMs;
+    private long scheduledTime;
+
+    private volatile boolean isLoadTaskRunning = false;
     private volatile boolean isCanceled = false;
 
     private CleanupTask(String uuid, long delayInMs) {
       this.uuid = uuid;
+      this.delayInMs = delayInMs;
+      resetScheduledTime();
+    }
+
+    public void markLoadTaskRunning() {
+      isLoadTaskRunning = true;
+      resetScheduledTime();
+    }
+
+    public void markLoadTaskNotRunning() {
+      isLoadTaskRunning = false;
+      resetScheduledTime();
+    }
+
+    public void resetScheduledTime() {
       scheduledTime = System.currentTimeMillis() + delayInMs;
     }
 
diff --git 
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/scheduler/load/LoadTsFileDispatcherImpl.java
 
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/scheduler/load/LoadTsFileDispatcherImpl.java
index 0d5f0bb9794..83a12e0d637 100644
--- 
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/scheduler/load/LoadTsFileDispatcherImpl.java
+++ 
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/scheduler/load/LoadTsFileDispatcherImpl.java
@@ -24,7 +24,6 @@ import org.apache.iotdb.common.rpc.thrift.TEndPoint;
 import org.apache.iotdb.common.rpc.thrift.TRegionReplicaSet;
 import org.apache.iotdb.common.rpc.thrift.TSStatus;
 import org.apache.iotdb.commons.client.IClientManager;
-import org.apache.iotdb.commons.client.exception.ClientManagerException;
 import org.apache.iotdb.commons.client.sync.SyncDataNodeInternalServiceClient;
 import org.apache.iotdb.commons.concurrent.IoTDBThreadPoolFactory;
 import org.apache.iotdb.commons.consensus.ConsensusGroupId;
@@ -51,23 +50,29 @@ import org.apache.iotdb.rpc.RpcUtils;
 import org.apache.iotdb.rpc.TSStatusCode;
 
 import io.airlift.concurrent.SetThreadName;
-import org.apache.thrift.TException;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
 import java.io.IOException;
+import java.net.SocketTimeoutException;
 import java.nio.ByteBuffer;
 import java.util.HashSet;
 import java.util.List;
 import java.util.Set;
 import java.util.concurrent.ExecutorService;
 import java.util.concurrent.Future;
+import java.util.concurrent.atomic.AtomicInteger;
 
 import static com.google.common.util.concurrent.Futures.immediateFuture;
 
 public class LoadTsFileDispatcherImpl implements IFragInstanceDispatcher {
+
   private static final Logger LOGGER = 
LoggerFactory.getLogger(LoadTsFileDispatcherImpl.class);
 
+  private static final int MAX_CONNECTION_TIMEOUT_MS = 24 * 60 * 60 * 1000; // 
1 day
+  private static final AtomicInteger CONNECTION_TIMEOUT_IN_MS =
+      new 
AtomicInteger(IoTDBDescriptor.getInstance().getConfig().getConnectionTimeoutInMS());
+
   private String uuid;
   private final String localhostIpAddr;
   private final int localhostInternalPort;
@@ -76,8 +81,6 @@ public class LoadTsFileDispatcherImpl implements 
IFragInstanceDispatcher {
   private final ExecutorService executor;
   private final boolean isGeneratedByPipe;
 
-  private static final String NODE_CONNECTION_ERROR = "can't connect to node 
{}";
-
   public LoadTsFileDispatcherImpl(
       IClientManager<TEndPoint, SyncDataNodeInternalServiceClient> 
internalServiceClientManager,
       boolean isGeneratedByPipe) {
@@ -137,78 +140,6 @@ public class LoadTsFileDispatcherImpl implements 
IFragInstanceDispatcher {
     }
   }
 
-  private boolean isDispatchedToLocal(TEndPoint endPoint) {
-    return this.localhostIpAddr.equals(endPoint.getIp()) && 
localhostInternalPort == endPoint.port;
-  }
-
-  private void dispatchRemote(TTsFilePieceReq loadTsFileReq, TEndPoint 
endPoint)
-      throws FragmentInstanceDispatchException {
-    try (SyncDataNodeInternalServiceClient client =
-        internalServiceClientManager.borrowClient(endPoint)) {
-      TLoadResp loadResp = client.sendTsFilePieceNode(loadTsFileReq);
-      if (!loadResp.isAccepted()) {
-        LOGGER.warn(loadResp.message);
-        throw new FragmentInstanceDispatchException(loadResp.status);
-      }
-    } catch (ClientManagerException | TException e) {
-      String warning = NODE_CONNECTION_ERROR;
-      LOGGER.warn(warning, endPoint, e);
-      TSStatus status = new TSStatus();
-      status.setCode(TSStatusCode.DISPATCH_ERROR.getStatusCode());
-      status.setMessage(warning + endPoint);
-      throw new FragmentInstanceDispatchException(status);
-    }
-  }
-
-  private void dispatchRemote(TLoadCommandReq loadCommandReq, TEndPoint 
endPoint)
-      throws FragmentInstanceDispatchException {
-    try (SyncDataNodeInternalServiceClient client =
-        internalServiceClientManager.borrowClient(endPoint)) {
-      TLoadResp loadResp = client.sendLoadCommand(loadCommandReq);
-      if (!loadResp.isAccepted()) {
-        LOGGER.warn(loadResp.message);
-        throw new FragmentInstanceDispatchException(loadResp.status);
-      }
-    } catch (ClientManagerException | TException e) {
-      LOGGER.warn(NODE_CONNECTION_ERROR, endPoint, e);
-      TSStatus status = new TSStatus();
-      status.setCode(TSStatusCode.DISPATCH_ERROR.getStatusCode());
-      status.setMessage(
-          "can't connect to node "
-              + endPoint
-              + ", please reset longer dn_connection_timeout_ms "
-              + "in iotdb-datanode.properties and restart iotdb.");
-      throw new FragmentInstanceDispatchException(status);
-    }
-  }
-
-  private void dispatchLocally(TLoadCommandReq loadCommandReq)
-      throws FragmentInstanceDispatchException {
-    final ProgressIndex progressIndex;
-    if (loadCommandReq.isSetProgressIndex()) {
-      progressIndex =
-          
ProgressIndexType.deserializeFrom(ByteBuffer.wrap(loadCommandReq.getProgressIndex()));
-    } else {
-      // fallback to use local generated progress index for compatibility
-      progressIndex = PipeAgent.runtime().getNextProgressIndexForTsFileLoad();
-      LOGGER.info(
-          "Use local generated load progress index {} for uuid {}.",
-          progressIndex,
-          loadCommandReq.uuid);
-    }
-
-    final TSStatus resultStatus =
-        StorageEngine.getInstance()
-            .executeLoadCommand(
-                
LoadTsFileScheduler.LoadCommand.values()[loadCommandReq.commandType],
-                loadCommandReq.uuid,
-                loadCommandReq.isSetIsGeneratedByPipe() && 
loadCommandReq.isGeneratedByPipe,
-                progressIndex);
-    if (!RpcUtils.SUCCESS_STATUS.equals(resultStatus)) {
-      throw new FragmentInstanceDispatchException(resultStatus);
-    }
-  }
-
   public void dispatchLocally(FragmentInstance instance) throws 
FragmentInstanceDispatchException {
     LOGGER.info("Receive load node from uuid {}.", uuid);
 
@@ -259,6 +190,32 @@ public class LoadTsFileDispatcherImpl implements 
IFragInstanceDispatcher {
     }
   }
 
+  private void dispatchRemote(TTsFilePieceReq loadTsFileReq, TEndPoint 
endPoint)
+      throws FragmentInstanceDispatchException {
+    try (SyncDataNodeInternalServiceClient client =
+        internalServiceClientManager.borrowClient(endPoint)) {
+      client.setTimeout(CONNECTION_TIMEOUT_IN_MS.get());
+
+      final TLoadResp loadResp = client.sendTsFilePieceNode(loadTsFileReq);
+      if (!loadResp.isAccepted()) {
+        LOGGER.warn(loadResp.message);
+        throw new FragmentInstanceDispatchException(loadResp.status);
+      }
+    } catch (Exception e) {
+      adjustTimeoutIfNecessary(e);
+
+      final String exceptionMessage =
+          String.format(
+              "failed to dispatch load command %s to node %s because of 
exception: %s",
+              loadTsFileReq, endPoint, e);
+      LOGGER.warn(exceptionMessage, e);
+      throw new FragmentInstanceDispatchException(
+          new TSStatus()
+              .setCode(TSStatusCode.DISPATCH_ERROR.getStatusCode())
+              .setMessage(exceptionMessage));
+    }
+  }
+
   public Future<FragInstanceDispatchResult> dispatchCommand(
       TLoadCommandReq loadCommandReq, Set<TRegionReplicaSet> replicaSets) {
     Set<TEndPoint> allEndPoint = new HashSet<>();
@@ -290,6 +247,87 @@ public class LoadTsFileDispatcherImpl implements 
IFragInstanceDispatcher {
     return immediateFuture(new FragInstanceDispatchResult(true));
   }
 
+  private void dispatchLocally(TLoadCommandReq loadCommandReq)
+      throws FragmentInstanceDispatchException {
+    final ProgressIndex progressIndex;
+    if (loadCommandReq.isSetProgressIndex()) {
+      progressIndex =
+          
ProgressIndexType.deserializeFrom(ByteBuffer.wrap(loadCommandReq.getProgressIndex()));
+    } else {
+      // fallback to use local generated progress index for compatibility
+      progressIndex = PipeAgent.runtime().getNextProgressIndexForTsFileLoad();
+      LOGGER.info(
+          "Use local generated load progress index {} for uuid {}.",
+          progressIndex,
+          loadCommandReq.uuid);
+    }
+
+    final TSStatus resultStatus =
+        StorageEngine.getInstance()
+            .executeLoadCommand(
+                
LoadTsFileScheduler.LoadCommand.values()[loadCommandReq.commandType],
+                loadCommandReq.uuid,
+                loadCommandReq.isSetIsGeneratedByPipe() && 
loadCommandReq.isGeneratedByPipe,
+                progressIndex);
+    if (!RpcUtils.SUCCESS_STATUS.equals(resultStatus)) {
+      throw new FragmentInstanceDispatchException(resultStatus);
+    }
+  }
+
+  private void dispatchRemote(TLoadCommandReq loadCommandReq, TEndPoint 
endPoint)
+      throws FragmentInstanceDispatchException {
+    try (SyncDataNodeInternalServiceClient client =
+        internalServiceClientManager.borrowClient(endPoint)) {
+      client.setTimeout(CONNECTION_TIMEOUT_IN_MS.get());
+
+      final TLoadResp loadResp = client.sendLoadCommand(loadCommandReq);
+      if (!loadResp.isAccepted()) {
+        LOGGER.warn(loadResp.message);
+        throw new FragmentInstanceDispatchException(loadResp.status);
+      }
+    } catch (Exception e) {
+      adjustTimeoutIfNecessary(e);
+
+      final String exceptionMessage =
+          String.format(
+              "failed to dispatch load command %s to node %s because of 
exception: %s",
+              loadCommandReq, endPoint, e);
+      LOGGER.warn(exceptionMessage, e);
+      throw new FragmentInstanceDispatchException(
+          new TSStatus()
+              .setCode(TSStatusCode.DISPATCH_ERROR.getStatusCode())
+              .setMessage(exceptionMessage));
+    }
+  }
+
+  private boolean isDispatchedToLocal(TEndPoint endPoint) {
+    return this.localhostIpAddr.equals(endPoint.getIp()) && 
localhostInternalPort == endPoint.port;
+  }
+
+  private static void adjustTimeoutIfNecessary(Throwable e) {
+    do {
+      if (e instanceof SocketTimeoutException) {
+        int newConnectionTimeout;
+        try {
+          newConnectionTimeout =
+              Math.min(
+                  Math.toIntExact(CONNECTION_TIMEOUT_IN_MS.get() * 2L), 
MAX_CONNECTION_TIMEOUT_MS);
+        } catch (ArithmeticException arithmeticException) {
+          newConnectionTimeout = MAX_CONNECTION_TIMEOUT_MS;
+        }
+
+        if (newConnectionTimeout != CONNECTION_TIMEOUT_IN_MS.get()) {
+          CONNECTION_TIMEOUT_IN_MS.set(newConnectionTimeout);
+          LOGGER.info(
+              "Load remote procedure call connection timeout is adjusted to {} 
ms ({} mins)",
+              newConnectionTimeout,
+              newConnectionTimeout / 60000.0);
+        }
+        return;
+      }
+    } while ((e = e.getCause()) != null);
+  }
+
   @Override
   public void abort() {
     // Do nothing
diff --git 
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/storageengine/StorageEngine.java
 
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/storageengine/StorageEngine.java
index 5bcff3443ec..c8c35e0958c 100644
--- 
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/storageengine/StorageEngine.java
+++ 
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/storageengine/StorageEngine.java
@@ -41,7 +41,6 @@ import org.apache.iotdb.db.conf.IoTDBConfig;
 import org.apache.iotdb.db.conf.IoTDBDescriptor;
 import 
org.apache.iotdb.db.consensus.statemachine.dataregion.DataExecutionVisitor;
 import org.apache.iotdb.db.exception.DataRegionException;
-import org.apache.iotdb.db.exception.LoadFileException;
 import org.apache.iotdb.db.exception.LoadReadOnlyException;
 import org.apache.iotdb.db.exception.StorageEngineException;
 import org.apache.iotdb.db.exception.TsFileProcessorException;
@@ -903,7 +902,7 @@ public class StorageEngine implements IService {
           status.setCode(TSStatusCode.ILLEGAL_PARAMETER.getStatusCode());
           status.setMessage(String.format("Wrong load command %s.", 
loadCommand));
       }
-    } catch (IOException | LoadFileException e) {
+    } catch (Exception e) {
       LOGGER.error("Execute load command {} error.", loadCommand, e);
       status.setCode(TSStatusCode.LOAD_FILE_ERROR.getStatusCode());
       status.setMessage(e.getMessage());
diff --git 
a/iotdb-core/node-commons/src/main/java/org/apache/iotdb/commons/conf/CommonConfig.java
 
b/iotdb-core/node-commons/src/main/java/org/apache/iotdb/commons/conf/CommonConfig.java
index 4e4e76ea192..493ea0651ac 100644
--- 
a/iotdb-core/node-commons/src/main/java/org/apache/iotdb/commons/conf/CommonConfig.java
+++ 
b/iotdb-core/node-commons/src/main/java/org/apache/iotdb/commons/conf/CommonConfig.java
@@ -185,8 +185,8 @@ public class CommonConfig {
   private long pipeExtractorAssignerDisruptorRingBufferEntrySizeInBytes = 50; 
// 50B
   private int pipeExtractorMatcherCacheSize = 1024;
 
-  private long pipeConnectorHandshakeTimeoutMs = 10 * 1000L; // 10 seconds
-  private long pipeConnectorTransferTimeoutMs = 15 * 60 * 1000L; // 15 minutes
+  private int pipeConnectorHandshakeTimeoutMs = 10 * 1000; // 10 seconds
+  private int pipeConnectorTransferTimeoutMs = 15 * 60 * 1000; // 15 minutes
   private int pipeConnectorReadFileBufferSize = 8388608;
   private long pipeConnectorRetryIntervalMs = 1000L;
   // recommend to set this value to 3 * pipeSubtaskExecutorMaxThreadNum *
@@ -634,20 +634,32 @@ public class CommonConfig {
     this.pipeExtractorMatcherCacheSize = pipeExtractorMatcherCacheSize;
   }
 
-  public long getPipeConnectorHandshakeTimeoutMs() {
+  public int getPipeConnectorHandshakeTimeoutMs() {
     return pipeConnectorHandshakeTimeoutMs;
   }
 
   public void setPipeConnectorHandshakeTimeoutMs(long 
pipeConnectorHandshakeTimeoutMs) {
-    this.pipeConnectorHandshakeTimeoutMs = pipeConnectorHandshakeTimeoutMs;
+    try {
+      this.pipeConnectorHandshakeTimeoutMs = 
Math.toIntExact(pipeConnectorHandshakeTimeoutMs);
+    } catch (ArithmeticException e) {
+      this.pipeConnectorHandshakeTimeoutMs = Integer.MAX_VALUE;
+      logger.warn(
+          "Given pipe connector handshake timeout is too large, set to {} 
ms.", Integer.MAX_VALUE);
+    }
   }
 
-  public long getPipeConnectorTransferTimeoutMs() {
+  public int getPipeConnectorTransferTimeoutMs() {
     return pipeConnectorTransferTimeoutMs;
   }
 
   public void setPipeConnectorTransferTimeoutMs(long 
pipeConnectorTransferTimeoutMs) {
-    this.pipeConnectorTransferTimeoutMs = pipeConnectorTransferTimeoutMs;
+    try {
+      this.pipeConnectorTransferTimeoutMs = 
Math.toIntExact(pipeConnectorTransferTimeoutMs);
+    } catch (ArithmeticException e) {
+      this.pipeConnectorTransferTimeoutMs = Integer.MAX_VALUE;
+      logger.warn(
+          "Given pipe connector transfer timeout is too large, set to {} ms.", 
Integer.MAX_VALUE);
+    }
   }
 
   public int getPipeConnectorReadFileBufferSize() {
diff --git 
a/iotdb-core/node-commons/src/main/java/org/apache/iotdb/commons/pipe/config/PipeConfig.java
 
b/iotdb-core/node-commons/src/main/java/org/apache/iotdb/commons/pipe/config/PipeConfig.java
index 7a0391d1dd8..dbe4b6a9c9f 100644
--- 
a/iotdb-core/node-commons/src/main/java/org/apache/iotdb/commons/pipe/config/PipeConfig.java
+++ 
b/iotdb-core/node-commons/src/main/java/org/apache/iotdb/commons/pipe/config/PipeConfig.java
@@ -95,11 +95,11 @@ public class PipeConfig {
 
   /////////////////////////////// Connector ///////////////////////////////
 
-  public long getPipeConnectorHandshakeTimeoutMs() {
+  public int getPipeConnectorHandshakeTimeoutMs() {
     return COMMON_CONFIG.getPipeConnectorHandshakeTimeoutMs();
   }
 
-  public long getPipeConnectorTransferTimeoutMs() {
+  public int getPipeConnectorTransferTimeoutMs() {
     return COMMON_CONFIG.getPipeConnectorTransferTimeoutMs();
   }
 
diff --git 
a/iotdb-core/node-commons/src/main/java/org/apache/iotdb/commons/pipe/connector/client/IoTDBClientManager.java
 
b/iotdb-core/node-commons/src/main/java/org/apache/iotdb/commons/pipe/connector/client/IoTDBClientManager.java
index b645b9110ef..bb1f0062363 100644
--- 
a/iotdb-core/node-commons/src/main/java/org/apache/iotdb/commons/pipe/connector/client/IoTDBClientManager.java
+++ 
b/iotdb-core/node-commons/src/main/java/org/apache/iotdb/commons/pipe/connector/client/IoTDBClientManager.java
@@ -20,11 +20,18 @@
 package org.apache.iotdb.commons.pipe.connector.client;
 
 import org.apache.iotdb.common.rpc.thrift.TEndPoint;
+import org.apache.iotdb.commons.pipe.config.PipeConfig;
 
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import java.net.SocketTimeoutException;
 import java.util.List;
 
 public abstract class IoTDBClientManager {
 
+  private static final Logger LOGGER = 
LoggerFactory.getLogger(IoTDBClientManager.class);
+
   protected final List<TEndPoint> endPointList;
   protected long currentClientIndex = 0;
 
@@ -34,6 +41,9 @@ public abstract class IoTDBClientManager {
   // it is a DataNode receiver. The flag is useless for configNode receiver.
   protected boolean supportModsIfIsDataNodeReceiver = true;
 
+  private static final int MAX_CONNECTION_TIMEOUT_MS = 24 * 60 * 60 * 1000; // 
1 day
+  protected int connectionTimeout = 
PipeConfig.getInstance().getPipeConnectorTransferTimeoutMs();
+
   protected IoTDBClientManager(List<TEndPoint> endPointList, boolean 
useLeaderCache) {
     this.endPointList = endPointList;
     this.useLeaderCache = useLeaderCache;
@@ -42,4 +52,31 @@ public abstract class IoTDBClientManager {
   public boolean supportModsIfIsDataNodeReceiver() {
     return supportModsIfIsDataNodeReceiver;
   }
+
+  public void adjustTimeoutIfNecessary(Throwable e) {
+    do {
+      if (e instanceof SocketTimeoutException) {
+        int newConnectionTimeout;
+        try {
+          newConnectionTimeout =
+              Math.min(Math.toIntExact(connectionTimeout * 2L), 
MAX_CONNECTION_TIMEOUT_MS);
+        } catch (ArithmeticException arithmeticException) {
+          newConnectionTimeout = MAX_CONNECTION_TIMEOUT_MS;
+        }
+
+        if (newConnectionTimeout != connectionTimeout) {
+          connectionTimeout = newConnectionTimeout;
+          LOGGER.info(
+              "Pipe connection timeout is adjusted to {} ms ({} mins)",
+              connectionTimeout,
+              connectionTimeout / 60000.0);
+        }
+        return;
+      }
+    } while ((e = e.getCause()) != null);
+  }
+
+  public int getConnectionTimeout() {
+    return connectionTimeout;
+  }
 }
diff --git 
a/iotdb-core/node-commons/src/main/java/org/apache/iotdb/commons/pipe/connector/client/IoTDBSyncClientManager.java
 
b/iotdb-core/node-commons/src/main/java/org/apache/iotdb/commons/pipe/connector/client/IoTDBSyncClientManager.java
index c262313f24a..a29d7bdb0a2 100644
--- 
a/iotdb-core/node-commons/src/main/java/org/apache/iotdb/commons/pipe/connector/client/IoTDBSyncClientManager.java
+++ 
b/iotdb-core/node-commons/src/main/java/org/apache/iotdb/commons/pipe/connector/client/IoTDBSyncClientManager.java
@@ -142,7 +142,7 @@ public abstract class IoTDBSyncClientManager extends 
IoTDBClientManager implemen
       clientAndStatus.setLeft(
           new IoTDBSyncClient(
               new ThriftClientProperty.Builder()
-                  .setConnectionTimeoutMs((int) 
PIPE_CONFIG.getPipeConnectorHandshakeTimeoutMs())
+                  
.setConnectionTimeoutMs(PIPE_CONFIG.getPipeConnectorHandshakeTimeoutMs())
                   .setRpcThriftCompressionEnabled(
                       PIPE_CONFIG.isPipeConnectorRPCThriftCompressionEnabled())
                   .build(),
@@ -193,7 +193,7 @@ public abstract class IoTDBSyncClientManager extends 
IoTDBClientManager implemen
             resp.getStatus());
       } else {
         clientAndStatus.setRight(true);
-        client.setTimeout((int) 
PipeConfig.getInstance().getPipeConnectorTransferTimeoutMs());
+        client.setTimeout(connectionTimeout);
         LOGGER.info(
             "Handshake success. Target server ip: {}, port: {}",
             client.getIpAddress(),
diff --git 
a/iotdb-core/node-commons/src/main/java/org/apache/iotdb/commons/pipe/connector/protocol/IoTDBAirGapConnector.java
 
b/iotdb-core/node-commons/src/main/java/org/apache/iotdb/commons/pipe/connector/protocol/IoTDBAirGapConnector.java
index 1977c32d338..3034b581d5d 100644
--- 
a/iotdb-core/node-commons/src/main/java/org/apache/iotdb/commons/pipe/connector/protocol/IoTDBAirGapConnector.java
+++ 
b/iotdb-core/node-commons/src/main/java/org/apache/iotdb/commons/pipe/connector/protocol/IoTDBAirGapConnector.java
@@ -188,7 +188,7 @@ public abstract class IoTDBAirGapConnector extends 
IoTDBConnector {
     } else {
       supportModsIfIsDataNodeReceiver = true;
     }
-    socket.setSoTimeout((int) PIPE_CONFIG.getPipeConnectorTransferTimeoutMs());
+    socket.setSoTimeout(PIPE_CONFIG.getPipeConnectorTransferTimeoutMs());
     LOGGER.info("Handshake success. Socket: {}", socket);
   }
 

Reply via email to