This is an automated email from the ASF dual-hosted git repository. rong pushed a commit to branch fix-pipe-client-not-return in repository https://gitbox.apache.org/repos/asf/iotdb.git
commit 7876cbb58393dbb4f3acc0376339a16c2aeb6927 Author: Steve Yurong Su <[email protected]> AuthorDate: Tue Apr 1 15:26:07 2025 +0800 Pipe: Fix stuck caused by async connector client not returned after transferring tsfiles & Fix validateTsFile and shouldMarkAsPipeRequest may not be effective --- .../client/IoTDBDataNodeAsyncClientManager.java | 6 ++- .../async/IoTDBDataRegionAsyncConnector.java | 14 ++++-- .../async/handler/PipeTransferTsFileHandler.java | 56 ++++++++++++++++++---- 3 files changed, 60 insertions(+), 16 deletions(-) diff --git a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/connector/client/IoTDBDataNodeAsyncClientManager.java b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/connector/client/IoTDBDataNodeAsyncClientManager.java index 273fd67fbd0..ed42dfe10ef 100644 --- a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/connector/client/IoTDBDataNodeAsyncClientManager.java +++ b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/connector/client/IoTDBDataNodeAsyncClientManager.java @@ -102,10 +102,12 @@ public class IoTDBDataNodeAsyncClientManager extends IoTDBClientManager receiverAttributes = String.format( - "%s-%s-%s", + "%s-%s-%s-%s-%s", Base64.getEncoder().encodeToString((username + ":" + password).getBytes()), shouldReceiverConvertOnTypeMismatch, - loadTsFileStrategy); + loadTsFileStrategy, + validateTsFile, + shouldMarkAsPipeRequest); synchronized (IoTDBDataNodeAsyncClientManager.class) { if (!ASYNC_PIPE_DATA_TRANSFER_CLIENT_MANAGER_HOLDER.containsKey(receiverAttributes)) { ASYNC_PIPE_DATA_TRANSFER_CLIENT_MANAGER_HOLDER.putIfAbsent( diff --git a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/connector/protocol/thrift/async/IoTDBDataRegionAsyncConnector.java b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/connector/protocol/thrift/async/IoTDBDataRegionAsyncConnector.java index 4e71c3af431..c5f5758284c 100644 --- a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/connector/protocol/thrift/async/IoTDBDataRegionAsyncConnector.java +++ b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/connector/protocol/thrift/async/IoTDBDataRegionAsyncConnector.java @@ -443,14 +443,11 @@ public class IoTDBDataRegionAsyncConnector extends IoTDBConnector { /** * Transfer queued {@link Event}s which are waiting for retry. * - * @throws Exception if an error occurs. The error will be handled by pipe framework, which will - * retry the {@link Event} and mark the {@link Event} as failure and stop the pipe if the - * retry times exceeds the threshold. * @see PipeConnector#transfer(Event) for more details. * @see PipeConnector#transfer(TabletInsertionEvent) for more details. * @see PipeConnector#transfer(TsFileInsertionEvent) for more details. */ - private void transferQueuedEventsIfNecessary(final boolean forced) throws Exception { + private void transferQueuedEventsIfNecessary(final boolean forced) { if (retryEventQueue.isEmpty() || (!forced && retryEventQueueEventCounter.getTabletInsertionEventCount() @@ -511,7 +508,14 @@ public class IoTDBDataRegionAsyncConnector extends IoTDBConnector { } if (remainingEvents <= retryEventQueue.size()) { - throw new PipeException("Failed to transfer events in retry queue."); + throw new PipeException( + "Failed to retry transferring events in the retry queue. Remaining events: " + + retryEventQueue.size() + + " (tablet events: " + + retryEventQueueEventCounter.getTabletInsertionEventCount() + + ", tsfile events: " + + retryEventQueueEventCounter.getTsFileInsertionEventCount() + + ")."); } } } diff --git a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/connector/protocol/thrift/async/handler/PipeTransferTsFileHandler.java b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/connector/protocol/thrift/async/handler/PipeTransferTsFileHandler.java index 5de2ef38948..abbe2d90b76 100644 --- a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/connector/protocol/thrift/async/handler/PipeTransferTsFileHandler.java +++ b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/connector/protocol/thrift/async/handler/PipeTransferTsFileHandler.java @@ -88,7 +88,7 @@ public class PipeTransferTsFileHandler extends PipeTransferTrackableHandler { private final AtomicBoolean isSealSignalSent; private IoTDBDataNodeAsyncClientManager clientManager; - private AsyncPipeDataTransferServiceClient client; + private volatile AsyncPipeDataTransferServiceClient client; public PipeTransferTsFileHandler( final IoTDBDataRegionAsyncConnector connector, @@ -149,6 +149,14 @@ public class PipeTransferTsFileHandler extends PipeTransferTrackableHandler { this.clientManager = clientManager; this.client = client; + if (client == null) { + LOGGER.warn( + "Client has been returned to the pool. Current handler status is {}. Will not transfer {}.", + connector.isClosed() ? "CLOSED" : "NOT CLOSED", + tsFile); + return; + } + client.setShouldReturnSelf(false); client.setTimeoutDynamically(clientManager.getConnectionTimeout()); @@ -225,6 +233,17 @@ public class PipeTransferTsFileHandler extends PipeTransferTrackableHandler { position += readLength; } + @Override + public void onComplete(final TPipeTransferResp response) { + try { + super.onComplete(response); + } finally { + if (connector.isClosed()) { + returnClientIfNecessary(); + } + } + } + @Override protected boolean onCompleteInternal(final TPipeTransferResp response) { if (isSealSignalSent.get()) { @@ -284,10 +303,7 @@ public class PipeTransferTsFileHandler extends PipeTransferTrackableHandler { referenceCount); } - if (client != null) { - client.setShouldReturnSelf(true); - client.returnSelf(); - } + returnClientIfNecessary(); } return true; @@ -326,6 +342,15 @@ public class PipeTransferTsFileHandler extends PipeTransferTrackableHandler { return false; // due to seal transfer not yet completed } + @Override + public void onError(final Exception exception) { + try { + super.onError(exception); + } finally { + returnClientIfNecessary(); + } + } + @Override protected void onErrorInternal(final Exception exception) { try { @@ -371,10 +396,7 @@ public class PipeTransferTsFileHandler extends PipeTransferTrackableHandler { LOGGER.warn("Failed to close file reader or delete tsFile when failed to transfer file.", e); } finally { try { - if (client != null) { - client.setShouldReturnSelf(true); - client.returnSelf(); - } + returnClientIfNecessary(); } finally { if (eventsHadBeenAddedToRetryQueue.compareAndSet(false, true)) { connector.addFailureEventsToRetryQueue(events); @@ -383,10 +405,26 @@ public class PipeTransferTsFileHandler extends PipeTransferTrackableHandler { } } + private void returnClientIfNecessary() { + if (client != null) { + client.setShouldReturnSelf(true); + client.returnSelf(); + client = null; + } + } + @Override protected void doTransfer( final AsyncPipeDataTransferServiceClient client, final TPipeTransferReq req) throws TException { + if (client == null) { + LOGGER.warn( + "Client has been returned to the pool. Current handler status is {}. Will not transfer {}.", + connector.isClosed() ? "CLOSED" : "NOT CLOSED", + tsFile); + return; + } + client.pipeTransfer(req, this); }
