This is an automated email from the ASF dual-hosted git repository.
rong pushed a commit to branch dev/1.3
in repository https://gitbox.apache.org/repos/asf/iotdb.git
The following commit(s) were added to refs/heads/dev/1.3 by this push:
new 399ba485602 Pipe: Fix stuck caused by async connector client not
returned after transferring tsfiles & Fix validateTsFile and
shouldMarkAsPipeRequest may not be effective (#15245)
399ba485602 is described below
commit 399ba4856029e35816397b71292b876782a2d043
Author: Steve Yurong Su <[email protected]>
AuthorDate: Tue Apr 1 17:27:59 2025 +0800
Pipe: Fix stuck caused by async connector client not returned after
transferring tsfiles & Fix validateTsFile and shouldMarkAsPipeRequest may not
be effective (#15245)
---
.../client/IoTDBDataNodeAsyncClientManager.java | 6 ++-
.../async/IoTDBDataRegionAsyncConnector.java | 14 ++++--
.../async/handler/PipeTransferTsFileHandler.java | 56 ++++++++++++++++++----
3 files changed, 60 insertions(+), 16 deletions(-)
diff --git
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/connector/client/IoTDBDataNodeAsyncClientManager.java
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/connector/client/IoTDBDataNodeAsyncClientManager.java
index 273fd67fbd0..ed42dfe10ef 100644
---
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/connector/client/IoTDBDataNodeAsyncClientManager.java
+++
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/connector/client/IoTDBDataNodeAsyncClientManager.java
@@ -102,10 +102,12 @@ public class IoTDBDataNodeAsyncClientManager extends
IoTDBClientManager
receiverAttributes =
String.format(
- "%s-%s-%s",
+ "%s-%s-%s-%s-%s",
Base64.getEncoder().encodeToString((username + ":" +
password).getBytes()),
shouldReceiverConvertOnTypeMismatch,
- loadTsFileStrategy);
+ loadTsFileStrategy,
+ validateTsFile,
+ shouldMarkAsPipeRequest);
synchronized (IoTDBDataNodeAsyncClientManager.class) {
if
(!ASYNC_PIPE_DATA_TRANSFER_CLIENT_MANAGER_HOLDER.containsKey(receiverAttributes))
{
ASYNC_PIPE_DATA_TRANSFER_CLIENT_MANAGER_HOLDER.putIfAbsent(
diff --git
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/connector/protocol/thrift/async/IoTDBDataRegionAsyncConnector.java
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/connector/protocol/thrift/async/IoTDBDataRegionAsyncConnector.java
index 4e71c3af431..c5f5758284c 100644
---
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/connector/protocol/thrift/async/IoTDBDataRegionAsyncConnector.java
+++
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/connector/protocol/thrift/async/IoTDBDataRegionAsyncConnector.java
@@ -443,14 +443,11 @@ public class IoTDBDataRegionAsyncConnector extends
IoTDBConnector {
/**
* Transfer queued {@link Event}s which are waiting for retry.
*
- * @throws Exception if an error occurs. The error will be handled by pipe
framework, which will
- * retry the {@link Event} and mark the {@link Event} as failure and
stop the pipe if the
- * retry times exceeds the threshold.
* @see PipeConnector#transfer(Event) for more details.
* @see PipeConnector#transfer(TabletInsertionEvent) for more details.
* @see PipeConnector#transfer(TsFileInsertionEvent) for more details.
*/
- private void transferQueuedEventsIfNecessary(final boolean forced) throws
Exception {
+ private void transferQueuedEventsIfNecessary(final boolean forced) {
if (retryEventQueue.isEmpty()
|| (!forced
&& retryEventQueueEventCounter.getTabletInsertionEventCount()
@@ -511,7 +508,14 @@ public class IoTDBDataRegionAsyncConnector extends
IoTDBConnector {
}
if (remainingEvents <= retryEventQueue.size()) {
- throw new PipeException("Failed to transfer events in retry queue.");
+ throw new PipeException(
+ "Failed to retry transferring events in the retry queue.
Remaining events: "
+ + retryEventQueue.size()
+ + " (tablet events: "
+ + retryEventQueueEventCounter.getTabletInsertionEventCount()
+ + ", tsfile events: "
+ + retryEventQueueEventCounter.getTsFileInsertionEventCount()
+ + ").");
}
}
}
diff --git
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/connector/protocol/thrift/async/handler/PipeTransferTsFileHandler.java
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/connector/protocol/thrift/async/handler/PipeTransferTsFileHandler.java
index 5de2ef38948..abbe2d90b76 100644
---
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/connector/protocol/thrift/async/handler/PipeTransferTsFileHandler.java
+++
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/connector/protocol/thrift/async/handler/PipeTransferTsFileHandler.java
@@ -88,7 +88,7 @@ public class PipeTransferTsFileHandler extends
PipeTransferTrackableHandler {
private final AtomicBoolean isSealSignalSent;
private IoTDBDataNodeAsyncClientManager clientManager;
- private AsyncPipeDataTransferServiceClient client;
+ private volatile AsyncPipeDataTransferServiceClient client;
public PipeTransferTsFileHandler(
final IoTDBDataRegionAsyncConnector connector,
@@ -149,6 +149,14 @@ public class PipeTransferTsFileHandler extends
PipeTransferTrackableHandler {
this.clientManager = clientManager;
this.client = client;
+ if (client == null) {
+ LOGGER.warn(
+ "Client has been returned to the pool. Current handler status is {}.
Will not transfer {}.",
+ connector.isClosed() ? "CLOSED" : "NOT CLOSED",
+ tsFile);
+ return;
+ }
+
client.setShouldReturnSelf(false);
client.setTimeoutDynamically(clientManager.getConnectionTimeout());
@@ -225,6 +233,17 @@ public class PipeTransferTsFileHandler extends
PipeTransferTrackableHandler {
position += readLength;
}
+ @Override
+ public void onComplete(final TPipeTransferResp response) {
+ try {
+ super.onComplete(response);
+ } finally {
+ if (connector.isClosed()) {
+ returnClientIfNecessary();
+ }
+ }
+ }
+
@Override
protected boolean onCompleteInternal(final TPipeTransferResp response) {
if (isSealSignalSent.get()) {
@@ -284,10 +303,7 @@ public class PipeTransferTsFileHandler extends
PipeTransferTrackableHandler {
referenceCount);
}
- if (client != null) {
- client.setShouldReturnSelf(true);
- client.returnSelf();
- }
+ returnClientIfNecessary();
}
return true;
@@ -326,6 +342,15 @@ public class PipeTransferTsFileHandler extends
PipeTransferTrackableHandler {
return false; // due to seal transfer not yet completed
}
+ @Override
+ public void onError(final Exception exception) {
+ try {
+ super.onError(exception);
+ } finally {
+ returnClientIfNecessary();
+ }
+ }
+
@Override
protected void onErrorInternal(final Exception exception) {
try {
@@ -371,10 +396,7 @@ public class PipeTransferTsFileHandler extends
PipeTransferTrackableHandler {
LOGGER.warn("Failed to close file reader or delete tsFile when failed to
transfer file.", e);
} finally {
try {
- if (client != null) {
- client.setShouldReturnSelf(true);
- client.returnSelf();
- }
+ returnClientIfNecessary();
} finally {
if (eventsHadBeenAddedToRetryQueue.compareAndSet(false, true)) {
connector.addFailureEventsToRetryQueue(events);
@@ -383,10 +405,26 @@ public class PipeTransferTsFileHandler extends
PipeTransferTrackableHandler {
}
}
+ private void returnClientIfNecessary() {
+ if (client != null) {
+ client.setShouldReturnSelf(true);
+ client.returnSelf();
+ client = null;
+ }
+ }
+
@Override
protected void doTransfer(
final AsyncPipeDataTransferServiceClient client, final TPipeTransferReq
req)
throws TException {
+ if (client == null) {
+ LOGGER.warn(
+ "Client has been returned to the pool. Current handler status is {}.
Will not transfer {}.",
+ connector.isClosed() ? "CLOSED" : "NOT CLOSED",
+ tsFile);
+ return;
+ }
+
client.pipeTransfer(req, this);
}