This is an automated email from the ASF dual-hosted git repository.

rong pushed a commit to branch sync-huge-tsfile
in repository https://gitbox.apache.org/repos/asf/iotdb.git

commit 6e53e37e43e1423c5140bfa3f99556d049209478
Author: Steve Yurong Su <[email protected]>
AuthorDate: Wed May 8 11:48:28 2024 +0800

    Pipe: dynamically adjust connection timeout to handle SocketTimeoutException
---
 .../async/IoTDBDataRegionAsyncConnector.java       |  2 +-
 .../PipeTransferTsFileInsertionEventHandler.java   | 19 +++++++++--
 .../thrift/sync/IoTDBDataRegionSyncConnector.java  |  2 ++
 .../pipe/connector/client/IoTDBClientManager.java  | 38 ++++++++++++++++++++++
 .../connector/client/IoTDBSyncClientManager.java   |  2 +-
 5 files changed, 58 insertions(+), 5 deletions(-)

diff --git 
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/connector/protocol/thrift/async/IoTDBDataRegionAsyncConnector.java
 
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/connector/protocol/thrift/async/IoTDBDataRegionAsyncConnector.java
index 56ad0b97686..02c02b94b5e 100644
--- 
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/connector/protocol/thrift/async/IoTDBDataRegionAsyncConnector.java
+++ 
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/connector/protocol/thrift/async/IoTDBDataRegionAsyncConnector.java
@@ -291,7 +291,7 @@ public class IoTDBDataRegionAsyncConnector extends 
IoTDBConnector {
     AsyncPipeDataTransferServiceClient client = null;
     try {
       client = clientManager.borrowClient();
-      pipeTransferTsFileInsertionEventHandler.transfer(client);
+      pipeTransferTsFileInsertionEventHandler.transfer(clientManager, client);
     } catch (final Exception ex) {
       logOnClientException(client, ex);
       pipeTransferTsFileInsertionEventHandler.onError(ex);
diff --git 
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/connector/protocol/thrift/async/handler/PipeTransferTsFileInsertionEventHandler.java
 
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/connector/protocol/thrift/async/handler/PipeTransferTsFileInsertionEventHandler.java
index bcafce049aa..9e426f41702 100644
--- 
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/connector/protocol/thrift/async/handler/PipeTransferTsFileInsertionEventHandler.java
+++ 
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/connector/protocol/thrift/async/handler/PipeTransferTsFileInsertionEventHandler.java
@@ -23,6 +23,7 @@ import org.apache.iotdb.common.rpc.thrift.TSStatus;
 import 
org.apache.iotdb.commons.client.async.AsyncPipeDataTransferServiceClient;
 import org.apache.iotdb.commons.pipe.config.PipeConfig;
 import 
org.apache.iotdb.commons.pipe.connector.payload.thrift.response.PipeTransferFilePieceResp;
+import 
org.apache.iotdb.db.pipe.connector.client.IoTDBDataNodeAsyncClientManager;
 import 
org.apache.iotdb.db.pipe.connector.payload.evolvable.request.PipeTransferTsFilePieceReq;
 import 
org.apache.iotdb.db.pipe.connector.payload.evolvable.request.PipeTransferTsFilePieceWithModReq;
 import 
org.apache.iotdb.db.pipe.connector.payload.evolvable.request.PipeTransferTsFileSealReq;
@@ -68,6 +69,7 @@ public class PipeTransferTsFileInsertionEventHandler
 
   private final AtomicBoolean isSealSignalSent;
 
+  private IoTDBDataNodeAsyncClientManager clientManager;
   private AsyncPipeDataTransferServiceClient client;
 
   public PipeTransferTsFileInsertionEventHandler(
@@ -93,10 +95,15 @@ public class PipeTransferTsFileInsertionEventHandler
     isSealSignalSent = new AtomicBoolean(false);
   }
 
-  public void transfer(final AsyncPipeDataTransferServiceClient client)
+  public void transfer(
+      IoTDBDataNodeAsyncClientManager clientManager,
+      final AsyncPipeDataTransferServiceClient client)
       throws TException, IOException {
+    this.clientManager = clientManager;
     this.client = client;
+
     client.setShouldReturnSelf(false);
+    client.setTimeout(clientManager.getConnectionTimeout());
 
     final int readLength = reader.read(readBuffer);
 
@@ -110,7 +117,7 @@ public class PipeTransferTsFileInsertionEventHandler
           LOGGER.warn("Failed to close file reader when successfully 
transferred mod file.", e);
         }
         reader = new RandomAccessFile(tsFile, "r");
-        transfer(client);
+        transfer(clientManager, client);
       } else if (currentFile == tsFile) {
         isSealSignalSent.set(true);
         client.pipeTransfer(
@@ -205,7 +212,7 @@ public class PipeTransferTsFileInsertionEventHandler
         }
       }
 
-      transfer(client);
+      transfer(clientManager, client);
     } catch (final Exception e) {
       onError(e);
     }
@@ -220,6 +227,12 @@ public class PipeTransferTsFileInsertionEventHandler
         event.getCommitId(),
         exception);
 
+    try {
+      clientManager.adjustTimeoutIfNecessary(exception);
+    } catch (Exception e) {
+      LOGGER.warn("Failed to adjust timeout when failed to transfer file.", e);
+    }
+
     try {
       if (reader != null) {
         reader.close();
diff --git 
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/connector/protocol/thrift/sync/IoTDBDataRegionSyncConnector.java
 
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/connector/protocol/thrift/sync/IoTDBDataRegionSyncConnector.java
index 9919d954b30..f873cc27646 100644
--- 
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/connector/protocol/thrift/sync/IoTDBDataRegionSyncConnector.java
+++ 
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/connector/protocol/thrift/sync/IoTDBDataRegionSyncConnector.java
@@ -350,6 +350,7 @@ public class IoTDBDataRegionSyncConnector extends 
IoTDBDataNodeSyncConnector {
                         modFile.getName(), modFile.length(), tsFile.getName(), 
tsFile.length()));
       } catch (final Exception e) {
         clientAndStatus.setRight(false);
+        clientManager.adjustTimeoutIfNecessary(e);
         throw new PipeConnectionException(
             String.format("Network error when seal file %s, because %s.", 
tsFile, e.getMessage()),
             e);
@@ -366,6 +367,7 @@ public class IoTDBDataRegionSyncConnector extends 
IoTDBDataNodeSyncConnector {
                         tsFile.getName(), tsFile.length()));
       } catch (final Exception e) {
         clientAndStatus.setRight(false);
+        clientManager.adjustTimeoutIfNecessary(e);
         throw new PipeConnectionException(
             String.format("Network error when seal file %s, because %s.", 
tsFile, e.getMessage()),
             e);
diff --git 
a/iotdb-core/node-commons/src/main/java/org/apache/iotdb/commons/pipe/connector/client/IoTDBClientManager.java
 
b/iotdb-core/node-commons/src/main/java/org/apache/iotdb/commons/pipe/connector/client/IoTDBClientManager.java
index b645b9110ef..ab459677a91 100644
--- 
a/iotdb-core/node-commons/src/main/java/org/apache/iotdb/commons/pipe/connector/client/IoTDBClientManager.java
+++ 
b/iotdb-core/node-commons/src/main/java/org/apache/iotdb/commons/pipe/connector/client/IoTDBClientManager.java
@@ -20,11 +20,18 @@
 package org.apache.iotdb.commons.pipe.connector.client;
 
 import org.apache.iotdb.common.rpc.thrift.TEndPoint;
+import org.apache.iotdb.commons.pipe.config.PipeConfig;
 
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import java.net.SocketTimeoutException;
 import java.util.List;
 
 public abstract class IoTDBClientManager {
 
+  private static final Logger LOGGER = 
LoggerFactory.getLogger(IoTDBClientManager.class);
+
   protected final List<TEndPoint> endPointList;
   protected long currentClientIndex = 0;
 
@@ -34,6 +41,10 @@ public abstract class IoTDBClientManager {
   // it is a DataNode receiver. The flag is useless for configNode receiver.
   protected boolean supportModsIfIsDataNodeReceiver = true;
 
+  private static final int MAX_CONNECTION_TIMEOUT_MS = 24 * 60 * 60 * 1000; // 
1 day
+  protected int connectionTimeout =
+      
Math.toIntExact(PipeConfig.getInstance().getPipeConnectorTransferTimeoutMs());
+
   protected IoTDBClientManager(List<TEndPoint> endPointList, boolean 
useLeaderCache) {
     this.endPointList = endPointList;
     this.useLeaderCache = useLeaderCache;
@@ -42,4 +53,31 @@ public abstract class IoTDBClientManager {
   public boolean supportModsIfIsDataNodeReceiver() {
     return supportModsIfIsDataNodeReceiver;
   }
+
+  public void adjustTimeoutIfNecessary(Throwable e) {
+    do {
+      if (e instanceof SocketTimeoutException) {
+        int newConnectionTimeout;
+        try {
+          newConnectionTimeout =
+              Math.min(Math.toIntExact(connectionTimeout * 2L), 
MAX_CONNECTION_TIMEOUT_MS);
+        } catch (ArithmeticException arithmeticException) {
+          newConnectionTimeout = MAX_CONNECTION_TIMEOUT_MS;
+        }
+
+        if (newConnectionTimeout != connectionTimeout) {
+          connectionTimeout = newConnectionTimeout;
+          LOGGER.info(
+              "Pipe connection timeout is adjusted to {} ms ({} mins)",
+              connectionTimeout,
+              connectionTimeout / 60000.0);
+        }
+        return;
+      }
+    } while ((e = e.getCause()) != null);
+  }
+
+  public int getConnectionTimeout() {
+    return connectionTimeout;
+  }
 }
diff --git 
a/iotdb-core/node-commons/src/main/java/org/apache/iotdb/commons/pipe/connector/client/IoTDBSyncClientManager.java
 
b/iotdb-core/node-commons/src/main/java/org/apache/iotdb/commons/pipe/connector/client/IoTDBSyncClientManager.java
index c262313f24a..bc95db59e62 100644
--- 
a/iotdb-core/node-commons/src/main/java/org/apache/iotdb/commons/pipe/connector/client/IoTDBSyncClientManager.java
+++ 
b/iotdb-core/node-commons/src/main/java/org/apache/iotdb/commons/pipe/connector/client/IoTDBSyncClientManager.java
@@ -193,7 +193,7 @@ public abstract class IoTDBSyncClientManager extends 
IoTDBClientManager implemen
             resp.getStatus());
       } else {
         clientAndStatus.setRight(true);
-        client.setTimeout((int) 
PipeConfig.getInstance().getPipeConnectorTransferTimeoutMs());
+        client.setTimeout(connectionTimeout);
         LOGGER.info(
             "Handshake success. Target server ip: {}, port: {}",
             client.getIpAddress(),

Reply via email to