This is an automated email from the ASF dual-hosted git repository.

rong pushed a commit to branch fix-pipe-client-not-return-master
in repository https://gitbox.apache.org/repos/asf/iotdb.git

commit 39854d7eae9451c3ec3bea88b0a1039b7c0dd96f
Author: Steve Yurong Su <[email protected]>
AuthorDate: Tue Apr 1 17:27:59 2025 +0800

    Pipe: Fix stuck caused by async connector client not returned after 
transferring tsfiles & Fix validateTsFile and shouldMarkAsPipeRequest may not 
be effective (#15245)
---
 .../client/IoTDBDataNodeAsyncClientManager.java    |  6 ++-
 .../async/IoTDBDataRegionAsyncConnector.java       | 12 +++--
 .../async/handler/PipeTransferTsFileHandler.java   | 56 ++++++++++++++++++----
 3 files changed, 59 insertions(+), 15 deletions(-)

diff --git 
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/connector/client/IoTDBDataNodeAsyncClientManager.java
 
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/connector/client/IoTDBDataNodeAsyncClientManager.java
index 79c56fa6890..aa5bdd5112f 100644
--- 
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/connector/client/IoTDBDataNodeAsyncClientManager.java
+++ 
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/connector/client/IoTDBDataNodeAsyncClientManager.java
@@ -102,10 +102,12 @@ public class IoTDBDataNodeAsyncClientManager extends 
IoTDBClientManager
 
     receiverAttributes =
         String.format(
-            "%s-%s-%s",
+            "%s-%s-%s-%s-%s",
             Base64.getEncoder().encodeToString((username + ":" + 
password).getBytes()),
             shouldReceiverConvertOnTypeMismatch,
-            loadTsFileStrategy);
+            loadTsFileStrategy,
+            validateTsFile,
+            shouldMarkAsPipeRequest);
     synchronized (IoTDBDataNodeAsyncClientManager.class) {
       if 
(!ASYNC_PIPE_DATA_TRANSFER_CLIENT_MANAGER_HOLDER.containsKey(receiverAttributes))
 {
         ASYNC_PIPE_DATA_TRANSFER_CLIENT_MANAGER_HOLDER.putIfAbsent(
diff --git 
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/connector/protocol/thrift/async/IoTDBDataRegionAsyncConnector.java
 
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/connector/protocol/thrift/async/IoTDBDataRegionAsyncConnector.java
index 3ca9635fb85..b8ea083a113 100644
--- 
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/connector/protocol/thrift/async/IoTDBDataRegionAsyncConnector.java
+++ 
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/connector/protocol/thrift/async/IoTDBDataRegionAsyncConnector.java
@@ -460,9 +460,6 @@ public class IoTDBDataRegionAsyncConnector extends 
IoTDBConnector {
   /**
    * Transfer queued {@link Event}s which are waiting for retry.
    *
-   * @throws Exception if an error occurs. The error will be handled by pipe 
framework, which will
-   *     retry the {@link Event} and mark the {@link Event} as failure and 
stop the pipe if the
-   *     retry times exceeds the threshold.
    * @see PipeConnector#transfer(Event) for more details.
    * @see PipeConnector#transfer(TabletInsertionEvent) for more details.
    * @see PipeConnector#transfer(TsFileInsertionEvent) for more details.
@@ -528,7 +525,14 @@ public class IoTDBDataRegionAsyncConnector extends 
IoTDBConnector {
         }
 
         if (remainingEvents <= retryEventQueue.size()) {
-          throw new PipeException("Failed to transfer events in retry queue.");
+          throw new PipeException(
+              "Failed to retry transferring events in the retry queue. 
Remaining events: "
+                  + retryEventQueue.size()
+                  + " (tablet events: "
+                  + retryEventQueueEventCounter.getTabletInsertionEventCount()
+                  + ", tsfile events: "
+                  + retryEventQueueEventCounter.getTsFileInsertionEventCount()
+                  + ").");
         }
       }
     }
diff --git 
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/connector/protocol/thrift/async/handler/PipeTransferTsFileHandler.java
 
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/connector/protocol/thrift/async/handler/PipeTransferTsFileHandler.java
index 6698ebc0ca3..0657e69e68e 100644
--- 
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/connector/protocol/thrift/async/handler/PipeTransferTsFileHandler.java
+++ 
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/connector/protocol/thrift/async/handler/PipeTransferTsFileHandler.java
@@ -89,7 +89,7 @@ public class PipeTransferTsFileHandler extends 
PipeTransferTrackableHandler {
   private final AtomicBoolean isSealSignalSent;
 
   private IoTDBDataNodeAsyncClientManager clientManager;
-  private AsyncPipeDataTransferServiceClient client;
+  private volatile AsyncPipeDataTransferServiceClient client;
 
   public PipeTransferTsFileHandler(
       final IoTDBDataRegionAsyncConnector connector,
@@ -152,6 +152,14 @@ public class PipeTransferTsFileHandler extends 
PipeTransferTrackableHandler {
     this.clientManager = clientManager;
     this.client = client;
 
+    if (client == null) {
+      LOGGER.warn(
+          "Client has been returned to the pool. Current handler status is {}. 
Will not transfer {}.",
+          connector.isClosed() ? "CLOSED" : "NOT CLOSED",
+          tsFile);
+      return;
+    }
+
     client.setShouldReturnSelf(false);
     client.setTimeoutDynamically(clientManager.getConnectionTimeout());
 
@@ -233,6 +241,17 @@ public class PipeTransferTsFileHandler extends 
PipeTransferTrackableHandler {
     position += readLength;
   }
 
+  @Override
+  public void onComplete(final TPipeTransferResp response) {
+    try {
+      super.onComplete(response);
+    } finally {
+      if (connector.isClosed()) {
+        returnClientIfNecessary();
+      }
+    }
+  }
+
   @Override
   protected boolean onCompleteInternal(final TPipeTransferResp response) {
     if (isSealSignalSent.get()) {
@@ -292,10 +311,7 @@ public class PipeTransferTsFileHandler extends 
PipeTransferTrackableHandler {
               referenceCount);
         }
 
-        if (client != null) {
-          client.setShouldReturnSelf(true);
-          client.returnSelf();
-        }
+        returnClientIfNecessary();
       }
 
       return true;
@@ -334,6 +350,15 @@ public class PipeTransferTsFileHandler extends 
PipeTransferTrackableHandler {
     return false; // due to seal transfer not yet completed
   }
 
+  @Override
+  public void onError(final Exception exception) {
+    try {
+      super.onError(exception);
+    } finally {
+      returnClientIfNecessary();
+    }
+  }
+
   @Override
   protected void onErrorInternal(final Exception exception) {
     try {
@@ -379,10 +404,7 @@ public class PipeTransferTsFileHandler extends 
PipeTransferTrackableHandler {
       LOGGER.warn("Failed to close file reader or delete tsFile when failed to 
transfer file.", e);
     } finally {
       try {
-        if (client != null) {
-          client.setShouldReturnSelf(true);
-          client.returnSelf();
-        }
+        returnClientIfNecessary();
       } finally {
         if (eventsHadBeenAddedToRetryQueue.compareAndSet(false, true)) {
           connector.addFailureEventsToRetryQueue(events);
@@ -391,10 +413,26 @@ public class PipeTransferTsFileHandler extends 
PipeTransferTrackableHandler {
     }
   }
 
+  private void returnClientIfNecessary() {
+    if (client != null) {
+      client.setShouldReturnSelf(true);
+      client.returnSelf();
+      client = null;
+    }
+  }
+
   @Override
   protected void doTransfer(
       final AsyncPipeDataTransferServiceClient client, final TPipeTransferReq 
req)
       throws TException {
+    if (client == null) {
+      LOGGER.warn(
+          "Client has been returned to the pool. Current handler status is {}. 
Will not transfer {}.",
+          connector.isClosed() ? "CLOSED" : "NOT CLOSED",
+          tsFile);
+      return;
+    }
+
     client.pipeTransfer(req, this);
   }
 

Reply via email to