This is an automated email from the ASF dual-hosted git repository.

rong pushed a commit to branch sync-huge-tsfile
in repository https://gitbox.apache.org/repos/asf/iotdb.git

commit e0a4c8d3c3fab840fd8e94b949120a1651e687c8
Author: Steve Yurong Su <[email protected]>
AuthorDate: Wed May 8 15:10:00 2024 +0800

    Load: dynamically adjust connection timeout to handle 
SocketTimeoutException & Avoid resource cleaning when load task is in process
---
 .../execution/load/LoadTsFileManager.java          |  65 +++++--
 .../scheduler/load/LoadTsFileDispatcherImpl.java   | 190 ++++++++++++---------
 .../iotdb/db/storageengine/StorageEngine.java      |   3 +-
 3 files changed, 168 insertions(+), 90 deletions(-)

diff --git 
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/execution/load/LoadTsFileManager.java
 
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/execution/load/LoadTsFileManager.java
index ef4ed7f16b9..b568b887988 100644
--- 
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/execution/load/LoadTsFileManager.java
+++ 
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/execution/load/LoadTsFileManager.java
@@ -54,6 +54,7 @@ import java.nio.file.Path;
 import java.util.HashMap;
 import java.util.Map;
 import java.util.Objects;
+import java.util.Optional;
 import java.util.concurrent.ConcurrentHashMap;
 import java.util.concurrent.PriorityBlockingQueue;
 
@@ -104,6 +105,13 @@ public class LoadTsFileManager {
 
         final CleanupTask cleanupTask = cleanupTaskQueue.peek();
         if (cleanupTask.scheduledTime <= System.currentTimeMillis()) {
+          if (cleanupTask.isLoadTaskRunning) {
+            cleanupTaskQueue.poll();
+            cleanupTask.resetScheduledTime();
+            cleanupTaskQueue.add(cleanupTask);
+            continue;
+          }
+
           cleanupTask.run();
 
           uuid2CleanupTask.remove(cleanupTask.uuid);
@@ -157,17 +165,24 @@ public class LoadTsFileManager {
       }
     }
 
-    final TsFileWriterManager writerManager =
-        uuid2WriterManager.computeIfAbsent(
-            uuid, o -> new 
TsFileWriterManager(SystemFileFactory.INSTANCE.getFile(loadDir, uuid)));
-    for (TsFileData tsFileData : pieceNode.getAllTsFileData()) {
-      if (!tsFileData.isModification()) {
-        ChunkData chunkData = (ChunkData) tsFileData;
-        writerManager.write(
-            new DataPartitionInfo(dataRegion, 
chunkData.getTimePartitionSlot()), chunkData);
-      } else {
-        writerManager.writeDeletion(dataRegion, (DeletionData) tsFileData);
+    final Optional<CleanupTask> cleanupTask = 
Optional.of(uuid2CleanupTask.get(uuid));
+    cleanupTask.ifPresent(CleanupTask::markLoadTaskRunning);
+    try {
+      final TsFileWriterManager writerManager =
+          uuid2WriterManager.computeIfAbsent(
+              uuid,
+              o -> new 
TsFileWriterManager(SystemFileFactory.INSTANCE.getFile(loadDir, uuid)));
+      for (TsFileData tsFileData : pieceNode.getAllTsFileData()) {
+        if (!tsFileData.isModification()) {
+          ChunkData chunkData = (ChunkData) tsFileData;
+          writerManager.write(
+              new DataPartitionInfo(dataRegion, 
chunkData.getTimePartitionSlot()), chunkData);
+        } else {
+          writerManager.writeDeletion(dataRegion, (DeletionData) tsFileData);
+        }
       }
+    } finally {
+      cleanupTask.ifPresent(CleanupTask::markLoadTaskNotRunning);
     }
   }
 
@@ -176,7 +191,15 @@ public class LoadTsFileManager {
     if (!uuid2WriterManager.containsKey(uuid)) {
       return false;
     }
-    uuid2WriterManager.get(uuid).loadAll(isGeneratedByPipe, progressIndex);
+
+    final Optional<CleanupTask> cleanupTask = 
Optional.of(uuid2CleanupTask.get(uuid));
+    cleanupTask.ifPresent(CleanupTask::markLoadTaskRunning);
+    try {
+      uuid2WriterManager.get(uuid).loadAll(isGeneratedByPipe, progressIndex);
+    } finally {
+      cleanupTask.ifPresent(CleanupTask::markLoadTaskNotRunning);
+    }
+
     clean(uuid);
     return true;
   }
@@ -412,12 +435,30 @@ public class LoadTsFileManager {
   private class CleanupTask implements Runnable, Comparable<CleanupTask> {
 
     private final String uuid;
-    private final long scheduledTime;
 
+    private final long delayInMs;
+    private long scheduledTime;
+
+    private volatile boolean isLoadTaskRunning = false;
     private volatile boolean isCanceled = false;
 
     private CleanupTask(String uuid, long delayInMs) {
       this.uuid = uuid;
+      this.delayInMs = delayInMs;
+      resetScheduledTime();
+    }
+
+    public void markLoadTaskRunning() {
+      isLoadTaskRunning = true;
+      resetScheduledTime();
+    }
+
+    public void markLoadTaskNotRunning() {
+      isLoadTaskRunning = false;
+      resetScheduledTime();
+    }
+
+    public void resetScheduledTime() {
       scheduledTime = System.currentTimeMillis() + delayInMs;
     }
 
diff --git 
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/scheduler/load/LoadTsFileDispatcherImpl.java
 
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/scheduler/load/LoadTsFileDispatcherImpl.java
index 0d5f0bb9794..83a12e0d637 100644
--- 
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/scheduler/load/LoadTsFileDispatcherImpl.java
+++ 
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/scheduler/load/LoadTsFileDispatcherImpl.java
@@ -24,7 +24,6 @@ import org.apache.iotdb.common.rpc.thrift.TEndPoint;
 import org.apache.iotdb.common.rpc.thrift.TRegionReplicaSet;
 import org.apache.iotdb.common.rpc.thrift.TSStatus;
 import org.apache.iotdb.commons.client.IClientManager;
-import org.apache.iotdb.commons.client.exception.ClientManagerException;
 import org.apache.iotdb.commons.client.sync.SyncDataNodeInternalServiceClient;
 import org.apache.iotdb.commons.concurrent.IoTDBThreadPoolFactory;
 import org.apache.iotdb.commons.consensus.ConsensusGroupId;
@@ -51,23 +50,29 @@ import org.apache.iotdb.rpc.RpcUtils;
 import org.apache.iotdb.rpc.TSStatusCode;
 
 import io.airlift.concurrent.SetThreadName;
-import org.apache.thrift.TException;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
 import java.io.IOException;
+import java.net.SocketTimeoutException;
 import java.nio.ByteBuffer;
 import java.util.HashSet;
 import java.util.List;
 import java.util.Set;
 import java.util.concurrent.ExecutorService;
 import java.util.concurrent.Future;
+import java.util.concurrent.atomic.AtomicInteger;
 
 import static com.google.common.util.concurrent.Futures.immediateFuture;
 
 public class LoadTsFileDispatcherImpl implements IFragInstanceDispatcher {
+
   private static final Logger LOGGER = 
LoggerFactory.getLogger(LoadTsFileDispatcherImpl.class);
 
+  private static final int MAX_CONNECTION_TIMEOUT_MS = 24 * 60 * 60 * 1000; // 
1 day
+  private static final AtomicInteger CONNECTION_TIMEOUT_IN_MS =
+      new 
AtomicInteger(IoTDBDescriptor.getInstance().getConfig().getConnectionTimeoutInMS());
+
   private String uuid;
   private final String localhostIpAddr;
   private final int localhostInternalPort;
@@ -76,8 +81,6 @@ public class LoadTsFileDispatcherImpl implements 
IFragInstanceDispatcher {
   private final ExecutorService executor;
   private final boolean isGeneratedByPipe;
 
-  private static final String NODE_CONNECTION_ERROR = "can't connect to node 
{}";
-
   public LoadTsFileDispatcherImpl(
       IClientManager<TEndPoint, SyncDataNodeInternalServiceClient> 
internalServiceClientManager,
       boolean isGeneratedByPipe) {
@@ -137,78 +140,6 @@ public class LoadTsFileDispatcherImpl implements 
IFragInstanceDispatcher {
     }
   }
 
-  private boolean isDispatchedToLocal(TEndPoint endPoint) {
-    return this.localhostIpAddr.equals(endPoint.getIp()) && 
localhostInternalPort == endPoint.port;
-  }
-
-  private void dispatchRemote(TTsFilePieceReq loadTsFileReq, TEndPoint 
endPoint)
-      throws FragmentInstanceDispatchException {
-    try (SyncDataNodeInternalServiceClient client =
-        internalServiceClientManager.borrowClient(endPoint)) {
-      TLoadResp loadResp = client.sendTsFilePieceNode(loadTsFileReq);
-      if (!loadResp.isAccepted()) {
-        LOGGER.warn(loadResp.message);
-        throw new FragmentInstanceDispatchException(loadResp.status);
-      }
-    } catch (ClientManagerException | TException e) {
-      String warning = NODE_CONNECTION_ERROR;
-      LOGGER.warn(warning, endPoint, e);
-      TSStatus status = new TSStatus();
-      status.setCode(TSStatusCode.DISPATCH_ERROR.getStatusCode());
-      status.setMessage(warning + endPoint);
-      throw new FragmentInstanceDispatchException(status);
-    }
-  }
-
-  private void dispatchRemote(TLoadCommandReq loadCommandReq, TEndPoint 
endPoint)
-      throws FragmentInstanceDispatchException {
-    try (SyncDataNodeInternalServiceClient client =
-        internalServiceClientManager.borrowClient(endPoint)) {
-      TLoadResp loadResp = client.sendLoadCommand(loadCommandReq);
-      if (!loadResp.isAccepted()) {
-        LOGGER.warn(loadResp.message);
-        throw new FragmentInstanceDispatchException(loadResp.status);
-      }
-    } catch (ClientManagerException | TException e) {
-      LOGGER.warn(NODE_CONNECTION_ERROR, endPoint, e);
-      TSStatus status = new TSStatus();
-      status.setCode(TSStatusCode.DISPATCH_ERROR.getStatusCode());
-      status.setMessage(
-          "can't connect to node "
-              + endPoint
-              + ", please reset longer dn_connection_timeout_ms "
-              + "in iotdb-datanode.properties and restart iotdb.");
-      throw new FragmentInstanceDispatchException(status);
-    }
-  }
-
-  private void dispatchLocally(TLoadCommandReq loadCommandReq)
-      throws FragmentInstanceDispatchException {
-    final ProgressIndex progressIndex;
-    if (loadCommandReq.isSetProgressIndex()) {
-      progressIndex =
-          
ProgressIndexType.deserializeFrom(ByteBuffer.wrap(loadCommandReq.getProgressIndex()));
-    } else {
-      // fallback to use local generated progress index for compatibility
-      progressIndex = PipeAgent.runtime().getNextProgressIndexForTsFileLoad();
-      LOGGER.info(
-          "Use local generated load progress index {} for uuid {}.",
-          progressIndex,
-          loadCommandReq.uuid);
-    }
-
-    final TSStatus resultStatus =
-        StorageEngine.getInstance()
-            .executeLoadCommand(
-                
LoadTsFileScheduler.LoadCommand.values()[loadCommandReq.commandType],
-                loadCommandReq.uuid,
-                loadCommandReq.isSetIsGeneratedByPipe() && 
loadCommandReq.isGeneratedByPipe,
-                progressIndex);
-    if (!RpcUtils.SUCCESS_STATUS.equals(resultStatus)) {
-      throw new FragmentInstanceDispatchException(resultStatus);
-    }
-  }
-
   public void dispatchLocally(FragmentInstance instance) throws 
FragmentInstanceDispatchException {
     LOGGER.info("Receive load node from uuid {}.", uuid);
 
@@ -259,6 +190,32 @@ public class LoadTsFileDispatcherImpl implements 
IFragInstanceDispatcher {
     }
   }
 
+  private void dispatchRemote(TTsFilePieceReq loadTsFileReq, TEndPoint 
endPoint)
+      throws FragmentInstanceDispatchException {
+    try (SyncDataNodeInternalServiceClient client =
+        internalServiceClientManager.borrowClient(endPoint)) {
+      client.setTimeout(CONNECTION_TIMEOUT_IN_MS.get());
+
+      final TLoadResp loadResp = client.sendTsFilePieceNode(loadTsFileReq);
+      if (!loadResp.isAccepted()) {
+        LOGGER.warn(loadResp.message);
+        throw new FragmentInstanceDispatchException(loadResp.status);
+      }
+    } catch (Exception e) {
+      adjustTimeoutIfNecessary(e);
+
+      final String exceptionMessage =
+          String.format(
+              "failed to dispatch load command %s to node %s because of 
exception: %s",
+              loadTsFileReq, endPoint, e);
+      LOGGER.warn(exceptionMessage, e);
+      throw new FragmentInstanceDispatchException(
+          new TSStatus()
+              .setCode(TSStatusCode.DISPATCH_ERROR.getStatusCode())
+              .setMessage(exceptionMessage));
+    }
+  }
+
   public Future<FragInstanceDispatchResult> dispatchCommand(
       TLoadCommandReq loadCommandReq, Set<TRegionReplicaSet> replicaSets) {
     Set<TEndPoint> allEndPoint = new HashSet<>();
@@ -290,6 +247,87 @@ public class LoadTsFileDispatcherImpl implements 
IFragInstanceDispatcher {
     return immediateFuture(new FragInstanceDispatchResult(true));
   }
 
+  private void dispatchLocally(TLoadCommandReq loadCommandReq)
+      throws FragmentInstanceDispatchException {
+    final ProgressIndex progressIndex;
+    if (loadCommandReq.isSetProgressIndex()) {
+      progressIndex =
+          
ProgressIndexType.deserializeFrom(ByteBuffer.wrap(loadCommandReq.getProgressIndex()));
+    } else {
+      // fallback to use local generated progress index for compatibility
+      progressIndex = PipeAgent.runtime().getNextProgressIndexForTsFileLoad();
+      LOGGER.info(
+          "Use local generated load progress index {} for uuid {}.",
+          progressIndex,
+          loadCommandReq.uuid);
+    }
+
+    final TSStatus resultStatus =
+        StorageEngine.getInstance()
+            .executeLoadCommand(
+                
LoadTsFileScheduler.LoadCommand.values()[loadCommandReq.commandType],
+                loadCommandReq.uuid,
+                loadCommandReq.isSetIsGeneratedByPipe() && 
loadCommandReq.isGeneratedByPipe,
+                progressIndex);
+    if (!RpcUtils.SUCCESS_STATUS.equals(resultStatus)) {
+      throw new FragmentInstanceDispatchException(resultStatus);
+    }
+  }
+
+  private void dispatchRemote(TLoadCommandReq loadCommandReq, TEndPoint 
endPoint)
+      throws FragmentInstanceDispatchException {
+    try (SyncDataNodeInternalServiceClient client =
+        internalServiceClientManager.borrowClient(endPoint)) {
+      client.setTimeout(CONNECTION_TIMEOUT_IN_MS.get());
+
+      final TLoadResp loadResp = client.sendLoadCommand(loadCommandReq);
+      if (!loadResp.isAccepted()) {
+        LOGGER.warn(loadResp.message);
+        throw new FragmentInstanceDispatchException(loadResp.status);
+      }
+    } catch (Exception e) {
+      adjustTimeoutIfNecessary(e);
+
+      final String exceptionMessage =
+          String.format(
+              "failed to dispatch load command %s to node %s because of 
exception: %s",
+              loadCommandReq, endPoint, e);
+      LOGGER.warn(exceptionMessage, e);
+      throw new FragmentInstanceDispatchException(
+          new TSStatus()
+              .setCode(TSStatusCode.DISPATCH_ERROR.getStatusCode())
+              .setMessage(exceptionMessage));
+    }
+  }
+
+  private boolean isDispatchedToLocal(TEndPoint endPoint) {
+    return this.localhostIpAddr.equals(endPoint.getIp()) && 
localhostInternalPort == endPoint.port;
+  }
+
+  private static void adjustTimeoutIfNecessary(Throwable e) {
+    do {
+      if (e instanceof SocketTimeoutException) {
+        int newConnectionTimeout;
+        try {
+          newConnectionTimeout =
+              Math.min(
+                  Math.toIntExact(CONNECTION_TIMEOUT_IN_MS.get() * 2L), 
MAX_CONNECTION_TIMEOUT_MS);
+        } catch (ArithmeticException arithmeticException) {
+          newConnectionTimeout = MAX_CONNECTION_TIMEOUT_MS;
+        }
+
+        if (newConnectionTimeout != CONNECTION_TIMEOUT_IN_MS.get()) {
+          CONNECTION_TIMEOUT_IN_MS.set(newConnectionTimeout);
+          LOGGER.info(
+              "Load remote procedure call connection timeout is adjusted to {} 
ms ({} mins)",
+              newConnectionTimeout,
+              newConnectionTimeout / 60000.0);
+        }
+        return;
+      }
+    } while ((e = e.getCause()) != null);
+  }
+
   @Override
   public void abort() {
     // Do nothing
diff --git 
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/storageengine/StorageEngine.java
 
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/storageengine/StorageEngine.java
index 5bcff3443ec..c8c35e0958c 100644
--- 
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/storageengine/StorageEngine.java
+++ 
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/storageengine/StorageEngine.java
@@ -41,7 +41,6 @@ import org.apache.iotdb.db.conf.IoTDBConfig;
 import org.apache.iotdb.db.conf.IoTDBDescriptor;
 import 
org.apache.iotdb.db.consensus.statemachine.dataregion.DataExecutionVisitor;
 import org.apache.iotdb.db.exception.DataRegionException;
-import org.apache.iotdb.db.exception.LoadFileException;
 import org.apache.iotdb.db.exception.LoadReadOnlyException;
 import org.apache.iotdb.db.exception.StorageEngineException;
 import org.apache.iotdb.db.exception.TsFileProcessorException;
@@ -903,7 +902,7 @@ public class StorageEngine implements IService {
           status.setCode(TSStatusCode.ILLEGAL_PARAMETER.getStatusCode());
           status.setMessage(String.format("Wrong load command %s.", 
loadCommand));
       }
-    } catch (IOException | LoadFileException e) {
+    } catch (Exception e) {
       LOGGER.error("Execute load command {} error.", loadCommand, e);
       status.setCode(TSStatusCode.LOAD_FILE_ERROR.getStatusCode());
       status.setMessage(e.getMessage());

Reply via email to