This is an automated email from the ASF dual-hosted git repository. rong pushed a commit to branch sync-huge-tsfile in repository https://gitbox.apache.org/repos/asf/iotdb.git
commit 6e53e37e43e1423c5140bfa3f99556d049209478 Author: Steve Yurong Su <[email protected]> AuthorDate: Wed May 8 11:48:28 2024 +0800 Pipe: dynamically adjust connection timeout to handle SocketTimeoutException --- .../async/IoTDBDataRegionAsyncConnector.java | 2 +- .../PipeTransferTsFileInsertionEventHandler.java | 19 +++++++++-- .../thrift/sync/IoTDBDataRegionSyncConnector.java | 2 ++ .../pipe/connector/client/IoTDBClientManager.java | 38 ++++++++++++++++++++++ .../connector/client/IoTDBSyncClientManager.java | 2 +- 5 files changed, 58 insertions(+), 5 deletions(-) diff --git a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/connector/protocol/thrift/async/IoTDBDataRegionAsyncConnector.java b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/connector/protocol/thrift/async/IoTDBDataRegionAsyncConnector.java index 56ad0b97686..02c02b94b5e 100644 --- a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/connector/protocol/thrift/async/IoTDBDataRegionAsyncConnector.java +++ b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/connector/protocol/thrift/async/IoTDBDataRegionAsyncConnector.java @@ -291,7 +291,7 @@ public class IoTDBDataRegionAsyncConnector extends IoTDBConnector { AsyncPipeDataTransferServiceClient client = null; try { client = clientManager.borrowClient(); - pipeTransferTsFileInsertionEventHandler.transfer(client); + pipeTransferTsFileInsertionEventHandler.transfer(clientManager, client); } catch (final Exception ex) { logOnClientException(client, ex); pipeTransferTsFileInsertionEventHandler.onError(ex); diff --git a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/connector/protocol/thrift/async/handler/PipeTransferTsFileInsertionEventHandler.java b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/connector/protocol/thrift/async/handler/PipeTransferTsFileInsertionEventHandler.java index bcafce049aa..9e426f41702 100644 --- a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/connector/protocol/thrift/async/handler/PipeTransferTsFileInsertionEventHandler.java +++ b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/connector/protocol/thrift/async/handler/PipeTransferTsFileInsertionEventHandler.java @@ -23,6 +23,7 @@ import org.apache.iotdb.common.rpc.thrift.TSStatus; import org.apache.iotdb.commons.client.async.AsyncPipeDataTransferServiceClient; import org.apache.iotdb.commons.pipe.config.PipeConfig; import org.apache.iotdb.commons.pipe.connector.payload.thrift.response.PipeTransferFilePieceResp; +import org.apache.iotdb.db.pipe.connector.client.IoTDBDataNodeAsyncClientManager; import org.apache.iotdb.db.pipe.connector.payload.evolvable.request.PipeTransferTsFilePieceReq; import org.apache.iotdb.db.pipe.connector.payload.evolvable.request.PipeTransferTsFilePieceWithModReq; import org.apache.iotdb.db.pipe.connector.payload.evolvable.request.PipeTransferTsFileSealReq; @@ -68,6 +69,7 @@ public class PipeTransferTsFileInsertionEventHandler private final AtomicBoolean isSealSignalSent; + private IoTDBDataNodeAsyncClientManager clientManager; private AsyncPipeDataTransferServiceClient client; public PipeTransferTsFileInsertionEventHandler( @@ -93,10 +95,15 @@ public class PipeTransferTsFileInsertionEventHandler isSealSignalSent = new AtomicBoolean(false); } - public void transfer(final AsyncPipeDataTransferServiceClient client) + public void transfer( + IoTDBDataNodeAsyncClientManager clientManager, + final AsyncPipeDataTransferServiceClient client) throws TException, IOException { + this.clientManager = clientManager; this.client = client; + client.setShouldReturnSelf(false); + client.setTimeout(clientManager.getConnectionTimeout()); final int readLength = reader.read(readBuffer); @@ -110,7 +117,7 @@ public class PipeTransferTsFileInsertionEventHandler LOGGER.warn("Failed to close file reader when successfully transferred mod file.", e); } reader = new RandomAccessFile(tsFile, "r"); - transfer(client); + transfer(clientManager, client); } else if (currentFile == tsFile) { isSealSignalSent.set(true); client.pipeTransfer( @@ -205,7 +212,7 @@ public class PipeTransferTsFileInsertionEventHandler } } - transfer(client); + transfer(clientManager, client); } catch (final Exception e) { onError(e); } @@ -220,6 +227,12 @@ public class PipeTransferTsFileInsertionEventHandler event.getCommitId(), exception); + try { + clientManager.adjustTimeoutIfNecessary(exception); + } catch (Exception e) { + LOGGER.warn("Failed to adjust timeout when failed to transfer file.", e); + } + try { if (reader != null) { reader.close(); diff --git a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/connector/protocol/thrift/sync/IoTDBDataRegionSyncConnector.java b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/connector/protocol/thrift/sync/IoTDBDataRegionSyncConnector.java index 9919d954b30..f873cc27646 100644 --- a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/connector/protocol/thrift/sync/IoTDBDataRegionSyncConnector.java +++ b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/connector/protocol/thrift/sync/IoTDBDataRegionSyncConnector.java @@ -350,6 +350,7 @@ public class IoTDBDataRegionSyncConnector extends IoTDBDataNodeSyncConnector { modFile.getName(), modFile.length(), tsFile.getName(), tsFile.length())); } catch (final Exception e) { clientAndStatus.setRight(false); + clientManager.adjustTimeoutIfNecessary(e); throw new PipeConnectionException( String.format("Network error when seal file %s, because %s.", tsFile, e.getMessage()), e); @@ -366,6 +367,7 @@ public class IoTDBDataRegionSyncConnector extends IoTDBDataNodeSyncConnector { tsFile.getName(), tsFile.length())); } catch (final Exception e) { clientAndStatus.setRight(false); + clientManager.adjustTimeoutIfNecessary(e); throw new PipeConnectionException( String.format("Network error when seal file %s, because %s.", tsFile, e.getMessage()), e); diff --git a/iotdb-core/node-commons/src/main/java/org/apache/iotdb/commons/pipe/connector/client/IoTDBClientManager.java b/iotdb-core/node-commons/src/main/java/org/apache/iotdb/commons/pipe/connector/client/IoTDBClientManager.java index b645b9110ef..ab459677a91 100644 --- a/iotdb-core/node-commons/src/main/java/org/apache/iotdb/commons/pipe/connector/client/IoTDBClientManager.java +++ b/iotdb-core/node-commons/src/main/java/org/apache/iotdb/commons/pipe/connector/client/IoTDBClientManager.java @@ -20,11 +20,18 @@ package org.apache.iotdb.commons.pipe.connector.client; import org.apache.iotdb.common.rpc.thrift.TEndPoint; +import org.apache.iotdb.commons.pipe.config.PipeConfig; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +import java.net.SocketTimeoutException; import java.util.List; public abstract class IoTDBClientManager { + private static final Logger LOGGER = LoggerFactory.getLogger(IoTDBClientManager.class); + protected final List<TEndPoint> endPointList; protected long currentClientIndex = 0; @@ -34,6 +41,10 @@ public abstract class IoTDBClientManager { // it is a DataNode receiver. The flag is useless for configNode receiver. protected boolean supportModsIfIsDataNodeReceiver = true; + private static final int MAX_CONNECTION_TIMEOUT_MS = 24 * 60 * 60 * 1000; // 1 day + protected int connectionTimeout = + Math.toIntExact(PipeConfig.getInstance().getPipeConnectorTransferTimeoutMs()); + protected IoTDBClientManager(List<TEndPoint> endPointList, boolean useLeaderCache) { this.endPointList = endPointList; this.useLeaderCache = useLeaderCache; @@ -42,4 +53,31 @@ public abstract class IoTDBClientManager { public boolean supportModsIfIsDataNodeReceiver() { return supportModsIfIsDataNodeReceiver; } + + public void adjustTimeoutIfNecessary(Throwable e) { + do { + if (e instanceof SocketTimeoutException) { + int newConnectionTimeout; + try { + newConnectionTimeout = + Math.min(Math.toIntExact(connectionTimeout * 2L), MAX_CONNECTION_TIMEOUT_MS); + } catch (ArithmeticException arithmeticException) { + newConnectionTimeout = MAX_CONNECTION_TIMEOUT_MS; + } + + if (newConnectionTimeout != connectionTimeout) { + connectionTimeout = newConnectionTimeout; + LOGGER.info( + "Pipe connection timeout is adjusted to {} ms ({} mins)", + connectionTimeout, + connectionTimeout / 60000.0); + } + return; + } + } while ((e = e.getCause()) != null); + } + + public int getConnectionTimeout() { + return connectionTimeout; + } } diff --git a/iotdb-core/node-commons/src/main/java/org/apache/iotdb/commons/pipe/connector/client/IoTDBSyncClientManager.java b/iotdb-core/node-commons/src/main/java/org/apache/iotdb/commons/pipe/connector/client/IoTDBSyncClientManager.java index c262313f24a..bc95db59e62 100644 --- a/iotdb-core/node-commons/src/main/java/org/apache/iotdb/commons/pipe/connector/client/IoTDBSyncClientManager.java +++ b/iotdb-core/node-commons/src/main/java/org/apache/iotdb/commons/pipe/connector/client/IoTDBSyncClientManager.java @@ -193,7 +193,7 @@ public abstract class IoTDBSyncClientManager extends IoTDBClientManager implemen resp.getStatus()); } else { clientAndStatus.setRight(true); - client.setTimeout((int) PipeConfig.getInstance().getPipeConnectorTransferTimeoutMs()); + client.setTimeout(connectionTimeout); LOGGER.info( "Handshake success. Target server ip: {}, port: {}", client.getIpAddress(),
