This is an automated email from the ASF dual-hosted git repository.

rong pushed a commit to branch dev/1.3
in repository https://gitbox.apache.org/repos/asf/iotdb.git


The following commit(s) were added to refs/heads/dev/1.3 by this push:
     new 399ba485602 Pipe: Fix stuck caused by async connector client not 
returned after transferring tsfiles & Fix validateTsFile and 
shouldMarkAsPipeRequest may not be effective (#15245)
399ba485602 is described below

commit 399ba4856029e35816397b71292b876782a2d043
Author: Steve Yurong Su <[email protected]>
AuthorDate: Tue Apr 1 17:27:59 2025 +0800

    Pipe: Fix stuck caused by async connector client not returned after 
transferring tsfiles & Fix validateTsFile and shouldMarkAsPipeRequest may not 
be effective (#15245)
---
 .../client/IoTDBDataNodeAsyncClientManager.java    |  6 ++-
 .../async/IoTDBDataRegionAsyncConnector.java       | 14 ++++--
 .../async/handler/PipeTransferTsFileHandler.java   | 56 ++++++++++++++++++----
 3 files changed, 60 insertions(+), 16 deletions(-)

diff --git 
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/connector/client/IoTDBDataNodeAsyncClientManager.java
 
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/connector/client/IoTDBDataNodeAsyncClientManager.java
index 273fd67fbd0..ed42dfe10ef 100644
--- 
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/connector/client/IoTDBDataNodeAsyncClientManager.java
+++ 
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/connector/client/IoTDBDataNodeAsyncClientManager.java
@@ -102,10 +102,12 @@ public class IoTDBDataNodeAsyncClientManager extends 
IoTDBClientManager
 
     receiverAttributes =
         String.format(
-            "%s-%s-%s",
+            "%s-%s-%s-%s-%s",
             Base64.getEncoder().encodeToString((username + ":" + 
password).getBytes()),
             shouldReceiverConvertOnTypeMismatch,
-            loadTsFileStrategy);
+            loadTsFileStrategy,
+            validateTsFile,
+            shouldMarkAsPipeRequest);
     synchronized (IoTDBDataNodeAsyncClientManager.class) {
       if 
(!ASYNC_PIPE_DATA_TRANSFER_CLIENT_MANAGER_HOLDER.containsKey(receiverAttributes))
 {
         ASYNC_PIPE_DATA_TRANSFER_CLIENT_MANAGER_HOLDER.putIfAbsent(
diff --git 
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/connector/protocol/thrift/async/IoTDBDataRegionAsyncConnector.java
 
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/connector/protocol/thrift/async/IoTDBDataRegionAsyncConnector.java
index 4e71c3af431..c5f5758284c 100644
--- 
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/connector/protocol/thrift/async/IoTDBDataRegionAsyncConnector.java
+++ 
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/connector/protocol/thrift/async/IoTDBDataRegionAsyncConnector.java
@@ -443,14 +443,11 @@ public class IoTDBDataRegionAsyncConnector extends 
IoTDBConnector {
   /**
    * Transfer queued {@link Event}s which are waiting for retry.
    *
-   * @throws Exception if an error occurs. The error will be handled by pipe 
framework, which will
-   *     retry the {@link Event} and mark the {@link Event} as failure and 
stop the pipe if the
-   *     retry times exceeds the threshold.
    * @see PipeConnector#transfer(Event) for more details.
    * @see PipeConnector#transfer(TabletInsertionEvent) for more details.
    * @see PipeConnector#transfer(TsFileInsertionEvent) for more details.
    */
-  private void transferQueuedEventsIfNecessary(final boolean forced) throws 
Exception {
+  private void transferQueuedEventsIfNecessary(final boolean forced) {
     if (retryEventQueue.isEmpty()
         || (!forced
             && retryEventQueueEventCounter.getTabletInsertionEventCount()
@@ -511,7 +508,14 @@ public class IoTDBDataRegionAsyncConnector extends 
IoTDBConnector {
         }
 
         if (remainingEvents <= retryEventQueue.size()) {
-          throw new PipeException("Failed to transfer events in retry queue.");
+          throw new PipeException(
+              "Failed to retry transferring events in the retry queue. 
Remaining events: "
+                  + retryEventQueue.size()
+                  + " (tablet events: "
+                  + retryEventQueueEventCounter.getTabletInsertionEventCount()
+                  + ", tsfile events: "
+                  + retryEventQueueEventCounter.getTsFileInsertionEventCount()
+                  + ").");
         }
       }
     }
diff --git 
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/connector/protocol/thrift/async/handler/PipeTransferTsFileHandler.java
 
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/connector/protocol/thrift/async/handler/PipeTransferTsFileHandler.java
index 5de2ef38948..abbe2d90b76 100644
--- 
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/connector/protocol/thrift/async/handler/PipeTransferTsFileHandler.java
+++ 
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/connector/protocol/thrift/async/handler/PipeTransferTsFileHandler.java
@@ -88,7 +88,7 @@ public class PipeTransferTsFileHandler extends 
PipeTransferTrackableHandler {
   private final AtomicBoolean isSealSignalSent;
 
   private IoTDBDataNodeAsyncClientManager clientManager;
-  private AsyncPipeDataTransferServiceClient client;
+  private volatile AsyncPipeDataTransferServiceClient client;
 
   public PipeTransferTsFileHandler(
       final IoTDBDataRegionAsyncConnector connector,
@@ -149,6 +149,14 @@ public class PipeTransferTsFileHandler extends 
PipeTransferTrackableHandler {
     this.clientManager = clientManager;
     this.client = client;
 
+    if (client == null) {
+      LOGGER.warn(
+          "Client has been returned to the pool. Current handler status is {}. 
Will not transfer {}.",
+          connector.isClosed() ? "CLOSED" : "NOT CLOSED",
+          tsFile);
+      return;
+    }
+
     client.setShouldReturnSelf(false);
     client.setTimeoutDynamically(clientManager.getConnectionTimeout());
 
@@ -225,6 +233,17 @@ public class PipeTransferTsFileHandler extends 
PipeTransferTrackableHandler {
     position += readLength;
   }
 
+  @Override
+  public void onComplete(final TPipeTransferResp response) {
+    try {
+      super.onComplete(response);
+    } finally {
+      if (connector.isClosed()) {
+        returnClientIfNecessary();
+      }
+    }
+  }
+
   @Override
   protected boolean onCompleteInternal(final TPipeTransferResp response) {
     if (isSealSignalSent.get()) {
@@ -284,10 +303,7 @@ public class PipeTransferTsFileHandler extends 
PipeTransferTrackableHandler {
               referenceCount);
         }
 
-        if (client != null) {
-          client.setShouldReturnSelf(true);
-          client.returnSelf();
-        }
+        returnClientIfNecessary();
       }
 
       return true;
@@ -326,6 +342,15 @@ public class PipeTransferTsFileHandler extends 
PipeTransferTrackableHandler {
     return false; // due to seal transfer not yet completed
   }
 
+  @Override
+  public void onError(final Exception exception) {
+    try {
+      super.onError(exception);
+    } finally {
+      returnClientIfNecessary();
+    }
+  }
+
   @Override
   protected void onErrorInternal(final Exception exception) {
     try {
@@ -371,10 +396,7 @@ public class PipeTransferTsFileHandler extends 
PipeTransferTrackableHandler {
       LOGGER.warn("Failed to close file reader or delete tsFile when failed to 
transfer file.", e);
     } finally {
       try {
-        if (client != null) {
-          client.setShouldReturnSelf(true);
-          client.returnSelf();
-        }
+        returnClientIfNecessary();
       } finally {
         if (eventsHadBeenAddedToRetryQueue.compareAndSet(false, true)) {
           connector.addFailureEventsToRetryQueue(events);
@@ -383,10 +405,26 @@ public class PipeTransferTsFileHandler extends 
PipeTransferTrackableHandler {
     }
   }
 
+  private void returnClientIfNecessary() {
+    if (client != null) {
+      client.setShouldReturnSelf(true);
+      client.returnSelf();
+      client = null;
+    }
+  }
+
   @Override
   protected void doTransfer(
       final AsyncPipeDataTransferServiceClient client, final TPipeTransferReq 
req)
       throws TException {
+    if (client == null) {
+      LOGGER.warn(
+          "Client has been returned to the pool. Current handler status is {}. 
Will not transfer {}.",
+          connector.isClosed() ? "CLOSED" : "NOT CLOSED",
+          tsFile);
+      return;
+    }
+
     client.pipeTransfer(req, this);
   }
 

Reply via email to