This is an automated email from the ASF dual-hosted git repository.
rong pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/iotdb.git
The following commit(s) were added to refs/heads/master by this push:
new d675ba14ab5 Pipe: Globally adjust timeout when syncing huge tsfiles &
Speed up file transfer after sync task failed (#12491)
d675ba14ab5 is described below
commit d675ba14ab53d57df6cc3c115afc36a5781f78a9
Author: Steve Yurong Su <[email protected]>
AuthorDate: Thu May 9 22:25:30 2024 +0800
Pipe: Globally adjust timeout when syncing huge tsfiles & Speed up file
transfer after sync task failed (#12491)
---
.../async/IoTDBDataRegionAsyncConnector.java | 35 +++++++++++++++++-----
.../PipeTransferTsFileInsertionEventHandler.java | 6 ++--
.../scheduler/load/LoadTsFileDispatcherImpl.java | 19 +++++++-----
.../async/AsyncPipeDataTransferServiceClient.java | 10 +++++++
.../pipe/connector/client/IoTDBClientManager.java | 24 ++++++++++-----
.../connector/client/IoTDBSyncClientManager.java | 2 +-
.../commons/pipe/progress/PipeEventCommitter.java | 17 +++++++++++
7 files changed, 87 insertions(+), 26 deletions(-)
diff --git
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/connector/protocol/thrift/async/IoTDBDataRegionAsyncConnector.java
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/connector/protocol/thrift/async/IoTDBDataRegionAsyncConnector.java
index 02c02b94b5e..1e01b824220 100644
---
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/connector/protocol/thrift/async/IoTDBDataRegionAsyncConnector.java
+++
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/connector/protocol/thrift/async/IoTDBDataRegionAsyncConnector.java
@@ -161,6 +161,11 @@ public class IoTDBDataRegionAsyncConnector extends
IoTDBConnector {
return;
}
+ transferWithoutCheck(tabletInsertionEvent);
+ }
+
+ private void transferWithoutCheck(final TabletInsertionEvent
tabletInsertionEvent)
+ throws Exception {
if (isTabletBatchModeEnabled) {
if (tabletBatchBuilder.onEvent(tabletInsertionEvent)) {
final PipeTransferTabletBatchEventHandler
pipeTransferTabletBatchEventHandler =
@@ -265,6 +270,11 @@ public class IoTDBDataRegionAsyncConnector extends
IoTDBConnector {
return;
}
+ transferWithoutCheck(tsFileInsertionEvent);
+ }
+
+ private void transferWithoutCheck(final TsFileInsertionEvent
tsFileInsertionEvent)
+ throws Exception {
final PipeTsFileInsertionEvent pipeTsFileInsertionEvent =
(PipeTsFileInsertionEvent) tsFileInsertionEvent;
// We increase the reference count for this event to determine if the
event may be released.
@@ -275,15 +285,23 @@ public class IoTDBDataRegionAsyncConnector extends
IoTDBConnector {
return;
}
- // Just in case. To avoid the case that exception occurred when
constructing the handler.
- if (!pipeTsFileInsertionEvent.getTsFile().exists()) {
- throw new
FileNotFoundException(pipeTsFileInsertionEvent.getTsFile().getAbsolutePath());
- }
+ // We assume that no exceptions will be thrown after reference count is
increased.
+ try {
+ // Just in case. To avoid the case that exception occurred when
constructing the handler.
+ if (!pipeTsFileInsertionEvent.getTsFile().exists()) {
+ throw new
FileNotFoundException(pipeTsFileInsertionEvent.getTsFile().getAbsolutePath());
+ }
- final PipeTransferTsFileInsertionEventHandler
pipeTransferTsFileInsertionEventHandler =
- new PipeTransferTsFileInsertionEventHandler(pipeTsFileInsertionEvent,
this);
+ final PipeTransferTsFileInsertionEventHandler
pipeTransferTsFileInsertionEventHandler =
+ new
PipeTransferTsFileInsertionEventHandler(pipeTsFileInsertionEvent, this);
- transfer(pipeTransferTsFileInsertionEventHandler);
+ transfer(pipeTransferTsFileInsertionEventHandler);
+ } catch (Exception e) {
+ // Just in case. To avoid the case that exception occurred when
constructing the handler.
+ pipeTsFileInsertionEvent.decreaseReferenceCount(
+ IoTDBDataRegionAsyncConnector.class.getName(), false);
+ throw e;
+ }
}
private void transfer(
@@ -350,7 +368,8 @@ public class IoTDBDataRegionAsyncConnector extends
IoTDBConnector {
} else if (peekedEvent instanceof PipeRawTabletInsertionEvent) {
retryConnector.transfer((PipeRawTabletInsertionEvent) peekedEvent);
} else if (peekedEvent instanceof PipeTsFileInsertionEvent) {
- retryConnector.transfer((PipeTsFileInsertionEvent) peekedEvent);
+ // Using the async connector to transfer the event for performance.
+ transferWithoutCheck((PipeTsFileInsertionEvent) peekedEvent);
} else {
LOGGER.warn(
"IoTDBThriftAsyncConnector does not support transfer generic
event: {}.", peekedEvent);
diff --git
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/connector/protocol/thrift/async/handler/PipeTransferTsFileInsertionEventHandler.java
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/connector/protocol/thrift/async/handler/PipeTransferTsFileInsertionEventHandler.java
index 9e426f41702..213d980c186 100644
---
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/connector/protocol/thrift/async/handler/PipeTransferTsFileInsertionEventHandler.java
+++
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/connector/protocol/thrift/async/handler/PipeTransferTsFileInsertionEventHandler.java
@@ -103,7 +103,7 @@ public class PipeTransferTsFileInsertionEventHandler
this.client = client;
client.setShouldReturnSelf(false);
- client.setTimeout(clientManager.getConnectionTimeout());
+ client.setTimeoutDynamically(clientManager.getConnectionTimeout());
final int readLength = reader.read(readBuffer);
@@ -228,7 +228,9 @@ public class PipeTransferTsFileInsertionEventHandler
exception);
try {
- clientManager.adjustTimeoutIfNecessary(exception);
+ if (Objects.nonNull(clientManager)) {
+ clientManager.adjustTimeoutIfNecessary(exception);
+ }
} catch (Exception e) {
LOGGER.warn("Failed to adjust timeout when failed to transfer file.", e);
}
diff --git
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/scheduler/load/LoadTsFileDispatcherImpl.java
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/scheduler/load/LoadTsFileDispatcherImpl.java
index 83a12e0d637..a630c94797b 100644
---
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/scheduler/load/LoadTsFileDispatcherImpl.java
+++
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/scheduler/load/LoadTsFileDispatcherImpl.java
@@ -61,6 +61,7 @@ import java.util.List;
import java.util.Set;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Future;
+import java.util.concurrent.TimeoutException;
import java.util.concurrent.atomic.AtomicInteger;
import static com.google.common.util.concurrent.Futures.immediateFuture;
@@ -70,7 +71,8 @@ public class LoadTsFileDispatcherImpl implements
IFragInstanceDispatcher {
private static final Logger LOGGER =
LoggerFactory.getLogger(LoadTsFileDispatcherImpl.class);
private static final int MAX_CONNECTION_TIMEOUT_MS = 24 * 60 * 60 * 1000; //
1 day
- private static final AtomicInteger CONNECTION_TIMEOUT_IN_MS =
+ private static final int FIRST_ADJUSTMENT_TIMEOUT_MS = 6 * 60 * 60 * 1000;
// 6 hours
+ private static final AtomicInteger CONNECTION_TIMEOUT_MS =
new
AtomicInteger(IoTDBDescriptor.getInstance().getConfig().getConnectionTimeoutInMS());
private String uuid;
@@ -194,7 +196,7 @@ public class LoadTsFileDispatcherImpl implements
IFragInstanceDispatcher {
throws FragmentInstanceDispatchException {
try (SyncDataNodeInternalServiceClient client =
internalServiceClientManager.borrowClient(endPoint)) {
- client.setTimeout(CONNECTION_TIMEOUT_IN_MS.get());
+ client.setTimeout(CONNECTION_TIMEOUT_MS.get());
final TLoadResp loadResp = client.sendTsFilePieceNode(loadTsFileReq);
if (!loadResp.isAccepted()) {
@@ -278,7 +280,7 @@ public class LoadTsFileDispatcherImpl implements
IFragInstanceDispatcher {
throws FragmentInstanceDispatchException {
try (SyncDataNodeInternalServiceClient client =
internalServiceClientManager.borrowClient(endPoint)) {
- client.setTimeout(CONNECTION_TIMEOUT_IN_MS.get());
+ client.setTimeout(CONNECTION_TIMEOUT_MS.get());
final TLoadResp loadResp = client.sendLoadCommand(loadCommandReq);
if (!loadResp.isAccepted()) {
@@ -306,18 +308,21 @@ public class LoadTsFileDispatcherImpl implements
IFragInstanceDispatcher {
private static void adjustTimeoutIfNecessary(Throwable e) {
do {
- if (e instanceof SocketTimeoutException) {
+ if (e instanceof SocketTimeoutException || e instanceof
TimeoutException) {
int newConnectionTimeout;
try {
newConnectionTimeout =
Math.min(
- Math.toIntExact(CONNECTION_TIMEOUT_IN_MS.get() * 2L),
MAX_CONNECTION_TIMEOUT_MS);
+ Math.max(
+ FIRST_ADJUSTMENT_TIMEOUT_MS,
+ Math.toIntExact(CONNECTION_TIMEOUT_MS.get() * 2L)),
+ MAX_CONNECTION_TIMEOUT_MS);
} catch (ArithmeticException arithmeticException) {
newConnectionTimeout = MAX_CONNECTION_TIMEOUT_MS;
}
- if (newConnectionTimeout != CONNECTION_TIMEOUT_IN_MS.get()) {
- CONNECTION_TIMEOUT_IN_MS.set(newConnectionTimeout);
+ if (newConnectionTimeout != CONNECTION_TIMEOUT_MS.get()) {
+ CONNECTION_TIMEOUT_MS.set(newConnectionTimeout);
LOGGER.info(
"Load remote procedure call connection timeout is adjusted to {}
ms ({} mins)",
newConnectionTimeout,
diff --git
a/iotdb-core/node-commons/src/main/java/org/apache/iotdb/commons/client/async/AsyncPipeDataTransferServiceClient.java
b/iotdb-core/node-commons/src/main/java/org/apache/iotdb/commons/client/async/AsyncPipeDataTransferServiceClient.java
index 2b8110bc7bc..186db8ede4c 100644
---
a/iotdb-core/node-commons/src/main/java/org/apache/iotdb/commons/client/async/AsyncPipeDataTransferServiceClient.java
+++
b/iotdb-core/node-commons/src/main/java/org/apache/iotdb/commons/client/async/AsyncPipeDataTransferServiceClient.java
@@ -31,6 +31,7 @@ import org.apache.commons.lang3.exception.ExceptionUtils;
import org.apache.commons.pool2.PooledObject;
import org.apache.commons.pool2.impl.DefaultPooledObject;
import org.apache.thrift.async.TAsyncClientManager;
+import org.apache.thrift.transport.TNonblockingSocket;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
@@ -117,6 +118,15 @@ public class AsyncPipeDataTransferServiceClient extends
IClientRPCService.AsyncC
this.shouldReturnSelf.set(shouldReturnSelf);
}
+ public void setTimeoutDynamically(int timeout) {
+ try {
+ ((TNonblockingSocket) ___transport).setTimeout(timeout);
+ } catch (Exception e) {
+ setTimeout(timeout);
+ LOGGER.error("Failed to set timeout dynamically, set it statically", e);
+ }
+ }
+
private void close() {
___transport.close();
___currentMethod = null;
diff --git
a/iotdb-core/node-commons/src/main/java/org/apache/iotdb/commons/pipe/connector/client/IoTDBClientManager.java
b/iotdb-core/node-commons/src/main/java/org/apache/iotdb/commons/pipe/connector/client/IoTDBClientManager.java
index bb1f0062363..73e0543fe67 100644
---
a/iotdb-core/node-commons/src/main/java/org/apache/iotdb/commons/pipe/connector/client/IoTDBClientManager.java
+++
b/iotdb-core/node-commons/src/main/java/org/apache/iotdb/commons/pipe/connector/client/IoTDBClientManager.java
@@ -27,6 +27,8 @@ import org.slf4j.LoggerFactory;
import java.net.SocketTimeoutException;
import java.util.List;
+import java.util.concurrent.TimeoutException;
+import java.util.concurrent.atomic.AtomicInteger;
public abstract class IoTDBClientManager {
@@ -42,7 +44,9 @@ public abstract class IoTDBClientManager {
protected boolean supportModsIfIsDataNodeReceiver = true;
private static final int MAX_CONNECTION_TIMEOUT_MS = 24 * 60 * 60 * 1000; //
1 day
- protected int connectionTimeout =
PipeConfig.getInstance().getPipeConnectorTransferTimeoutMs();
+ private static final int FIRST_ADJUSTMENT_TIMEOUT_MS = 6 * 60 * 60 * 1000;
// 6 hours
+ protected static final AtomicInteger CONNECTION_TIMEOUT_MS =
+ new
AtomicInteger(PipeConfig.getInstance().getPipeConnectorTransferTimeoutMs());
protected IoTDBClientManager(List<TEndPoint> endPointList, boolean
useLeaderCache) {
this.endPointList = endPointList;
@@ -55,21 +59,25 @@ public abstract class IoTDBClientManager {
public void adjustTimeoutIfNecessary(Throwable e) {
do {
- if (e instanceof SocketTimeoutException) {
+ if (e instanceof SocketTimeoutException || e instanceof
TimeoutException) {
int newConnectionTimeout;
try {
newConnectionTimeout =
- Math.min(Math.toIntExact(connectionTimeout * 2L),
MAX_CONNECTION_TIMEOUT_MS);
+ Math.min(
+ Math.max(
+ FIRST_ADJUSTMENT_TIMEOUT_MS,
+ Math.toIntExact(CONNECTION_TIMEOUT_MS.get() * 2L)),
+ MAX_CONNECTION_TIMEOUT_MS);
} catch (ArithmeticException arithmeticException) {
newConnectionTimeout = MAX_CONNECTION_TIMEOUT_MS;
}
- if (newConnectionTimeout != connectionTimeout) {
- connectionTimeout = newConnectionTimeout;
+ if (newConnectionTimeout != CONNECTION_TIMEOUT_MS.get()) {
+ CONNECTION_TIMEOUT_MS.set(newConnectionTimeout);
LOGGER.info(
"Pipe connection timeout is adjusted to {} ms ({} mins)",
- connectionTimeout,
- connectionTimeout / 60000.0);
+ newConnectionTimeout,
+ newConnectionTimeout / 60000.0);
}
return;
}
@@ -77,6 +85,6 @@ public abstract class IoTDBClientManager {
}
public int getConnectionTimeout() {
- return connectionTimeout;
+ return CONNECTION_TIMEOUT_MS.get();
}
}
diff --git
a/iotdb-core/node-commons/src/main/java/org/apache/iotdb/commons/pipe/connector/client/IoTDBSyncClientManager.java
b/iotdb-core/node-commons/src/main/java/org/apache/iotdb/commons/pipe/connector/client/IoTDBSyncClientManager.java
index a29d7bdb0a2..667313e1976 100644
---
a/iotdb-core/node-commons/src/main/java/org/apache/iotdb/commons/pipe/connector/client/IoTDBSyncClientManager.java
+++
b/iotdb-core/node-commons/src/main/java/org/apache/iotdb/commons/pipe/connector/client/IoTDBSyncClientManager.java
@@ -193,7 +193,7 @@ public abstract class IoTDBSyncClientManager extends
IoTDBClientManager implemen
resp.getStatus());
} else {
clientAndStatus.setRight(true);
- client.setTimeout(connectionTimeout);
+ client.setTimeout(CONNECTION_TIMEOUT_MS.get());
LOGGER.info(
"Handshake success. Target server ip: {}, port: {}",
client.getIpAddress(),
diff --git
a/iotdb-core/node-commons/src/main/java/org/apache/iotdb/commons/pipe/progress/PipeEventCommitter.java
b/iotdb-core/node-commons/src/main/java/org/apache/iotdb/commons/pipe/progress/PipeEventCommitter.java
index eeec9c8be25..fb0830dd681 100644
---
a/iotdb-core/node-commons/src/main/java/org/apache/iotdb/commons/pipe/progress/PipeEventCommitter.java
+++
b/iotdb-core/node-commons/src/main/java/org/apache/iotdb/commons/pipe/progress/PipeEventCommitter.java
@@ -63,6 +63,15 @@ public class PipeEventCommitter {
public synchronized void commit(EnrichedEvent event) {
commitQueue.offer(event);
+ LOGGER.info(
+ "COMMIT QUEUE OFFER: pipe name {}, creation time {}, region id {},
event commit id {}, last commit id {}, commit queue size {}",
+ pipeName,
+ creationTime,
+ dataRegionId,
+ event.getCommitId(),
+ lastCommitId.get(),
+ commitQueue.size());
+
while (!commitQueue.isEmpty()) {
final EnrichedEvent e = commitQueue.peek();
@@ -84,6 +93,14 @@ public class PipeEventCommitter {
e.onCommitted();
lastCommitId.incrementAndGet();
commitQueue.poll();
+
+ LOGGER.info(
+ "COMMIT QUEUE POLL: pipe name {}, creation time {}, region id {},
last commit id {}, commit queue size after commit {}",
+ pipeName,
+ creationTime,
+ dataRegionId,
+ lastCommitId.get(),
+ commitQueue.size());
}
}