This is an automated email from the ASF dual-hosted git repository.

rong pushed a commit to branch check-shit
in repository https://gitbox.apache.org/repos/asf/iotdb.git


The following commit(s) were added to refs/heads/check-shit by this push:
     new 879ef8064b0 opt logs
879ef8064b0 is described below

commit 879ef8064b0d8d6e6e05d0a78e255eecbfbcd01c
Author: Steve Yurong Su <[email protected]>
AuthorDate: Fri Jul 11 10:26:06 2025 +0800

    opt logs
---
 .../client/IoTDBDataNodeAsyncClientManager.java    |  8 +++--
 .../async/IoTDBDataRegionAsyncConnector.java       | 38 +++++++++++++++++-----
 .../async/handler/PipeTransferTsFileHandler.java   | 28 +++++++++-------
 .../async/AsyncPipeDataTransferServiceClient.java  | 15 ---------
 .../connector/protocol/IoTDBSslSyncConnector.java  |  2 +-
 5 files changed, 53 insertions(+), 38 deletions(-)

diff --git 
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/connector/client/IoTDBDataNodeAsyncClientManager.java
 
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/connector/client/IoTDBDataNodeAsyncClientManager.java
index d8f886f496e..8b23e52710f 100644
--- 
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/connector/client/IoTDBDataNodeAsyncClientManager.java
+++ 
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/connector/client/IoTDBDataNodeAsyncClientManager.java
@@ -299,15 +299,19 @@ public class IoTDBDataNodeAsyncClientManager extends 
IoTDBClientManager
       client.resetMethodStateIfStopped();
       throw e;
     } finally {
-      client.setShouldReturnSelf(true);
       if (isClosed) {
         try {
           client.close();
           client.invalidateAll();
         } catch (final Exception e) {
-          LOGGER.warn("111");
+          LOGGER.warn(
+              "Failed to close client {}:{} after handshake failure when the 
manager is closed.",
+              targetNodeUrl.getIp(),
+              targetNodeUrl.getPort(),
+              e);
         }
       }
+      client.setShouldReturnSelf(true);
       client.returnSelf();
     }
 
diff --git 
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/connector/protocol/thrift/async/IoTDBDataRegionAsyncConnector.java
 
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/connector/protocol/thrift/async/IoTDBDataRegionAsyncConnector.java
index 7d88103f815..176bcd94cfb 100644
--- 
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/connector/protocol/thrift/async/IoTDBDataRegionAsyncConnector.java
+++ 
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/connector/protocol/thrift/async/IoTDBDataRegionAsyncConnector.java
@@ -74,6 +74,7 @@ import java.util.List;
 import java.util.Map;
 import java.util.Objects;
 import java.util.concurrent.BlockingQueue;
+import java.util.concurrent.CompletableFuture;
 import java.util.concurrent.ConcurrentHashMap;
 import java.util.concurrent.ExecutorService;
 import java.util.concurrent.Executors;
@@ -431,15 +432,34 @@ public class IoTDBDataRegionAsyncConnector extends 
IoTDBConnector {
   private void transfer(final PipeTransferTsFileHandler 
pipeTransferTsFileHandler)
       throws Exception {
     transferTsFileCounter.incrementAndGet();
-    AsyncPipeDataTransferServiceClient client = null;
-    try {
-      client = transferTsFileClientManager.borrowClient();
-      pipeTransferTsFileHandler.transfer(transferTsFileClientManager, client);
-    } catch (final Exception ex) {
-      logOnClientException(client, ex);
-      pipeTransferTsFileHandler.onError(ex);
-    } finally {
-      transferTsFileCounter.decrementAndGet();
+    CompletableFuture<Void> completableFuture =
+        CompletableFuture.supplyAsync(
+            () -> {
+              AsyncPipeDataTransferServiceClient client = null;
+              try {
+                client = transferTsFileClientManager.borrowClient();
+                
pipeTransferTsFileHandler.transfer(transferTsFileClientManager, client);
+              } catch (final Exception ex) {
+                logOnClientException(client, ex);
+                pipeTransferTsFileHandler.onError(ex);
+              } finally {
+                transferTsFileCounter.decrementAndGet();
+              }
+              return null;
+            },
+            executor);
+
+    if (PipeConfig.getInstance().isTransferTsFileSync()) {
+      try {
+        completableFuture.get();
+      } catch (InterruptedException e) {
+        Thread.currentThread().interrupt();
+        LOGGER.error("Transfer tsfile event asynchronously was interrupted.", 
e);
+        throw new PipeException("Transfer tsfile event asynchronously was 
interrupted.", e);
+      } catch (Exception e) {
+        LOGGER.error("Failed to transfer tsfile event asynchronously.", e);
+        throw e;
+      }
     }
   }
 
diff --git 
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/connector/protocol/thrift/async/handler/PipeTransferTsFileHandler.java
 
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/connector/protocol/thrift/async/handler/PipeTransferTsFileHandler.java
index 8dc74dbf4c3..f56ceeed0d2 100644
--- 
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/connector/protocol/thrift/async/handler/PipeTransferTsFileHandler.java
+++ 
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/connector/protocol/thrift/async/handler/PipeTransferTsFileHandler.java
@@ -431,19 +431,25 @@ public class PipeTransferTsFileHandler extends 
PipeTransferTrackableHandler {
   }
 
   private void returnClientIfNecessary() {
-    if (client != null) {
-      client.setShouldReturnSelf(true);
-      if (connector.isClosed()) {
-        try {
-          client.close();
-          client.invalidateAll();
-        } catch (final Exception e) {
-          LOGGER.warn("111");
-        }
+    if (client == null) {
+      return;
+    }
+
+    if (connector.isClosed()) {
+      try {
+        client.close();
+        client.invalidateAll();
+      } catch (final Exception e) {
+        LOGGER.warn(
+            "Failed to close or invalidate client when connector is closed. 
Client: {}, Exception: {}",
+            client,
+            e.getMessage(),
+            e);
       }
-      client.returnSelf();
-      client = null;
     }
+    client.setShouldReturnSelf(true);
+    client.returnSelf();
+    client = null;
   }
 
   @Override
diff --git 
a/iotdb-core/node-commons/src/main/java/org/apache/iotdb/commons/client/async/AsyncPipeDataTransferServiceClient.java
 
b/iotdb-core/node-commons/src/main/java/org/apache/iotdb/commons/client/async/AsyncPipeDataTransferServiceClient.java
index eadbb4435ab..c7b1aee0d1c 100644
--- 
a/iotdb-core/node-commons/src/main/java/org/apache/iotdb/commons/client/async/AsyncPipeDataTransferServiceClient.java
+++ 
b/iotdb-core/node-commons/src/main/java/org/apache/iotdb/commons/client/async/AsyncPipeDataTransferServiceClient.java
@@ -72,14 +72,12 @@ public class AsyncPipeDataTransferServiceClient extends 
IClientRPCService.AsyncC
     this.printLogWhenEncounterException = 
property.isPrintLogWhenEncounterException();
     this.endpoint = endpoint;
     this.clientManager = clientManager;
-    LOGGER.warn("AsyncPipeDataTransferServiceClient id = {}, constructor", id);
   }
 
   @Override
   public void onComplete() {
     super.onComplete();
     returnSelf();
-    LOGGER.warn("AsyncPipeDataTransferServiceClient id = {}, onComplete", id);
   }
 
   @Override
@@ -87,22 +85,18 @@ public class AsyncPipeDataTransferServiceClient extends 
IClientRPCService.AsyncC
     super.onError(e);
     ThriftClient.resolveException(e, this);
     returnSelf();
-    LOGGER.warn("AsyncPipeDataTransferServiceClient id = {}, onError", id, e);
   }
 
   @Override
   public void invalidate() {
     if (!hasError()) {
       super.onError(new Exception(String.format("This client %d has been 
invalidated", id)));
-      LOGGER.warn("AsyncPipeDataTransferServiceClient id = {}, invalidate 1", 
id);
     }
-    LOGGER.warn("AsyncPipeDataTransferServiceClient id = {}, invalidate 2", 
id, new Exception());
   }
 
   @Override
   public void invalidateAll() {
     clientManager.clear(endpoint);
-    LOGGER.warn("AsyncPipeDataTransferServiceClient id = {}, invalidateAll", 
id, new Exception());
   }
 
   @Override
@@ -116,14 +110,8 @@ public class AsyncPipeDataTransferServiceClient extends 
IClientRPCService.AsyncC
    */
   public void returnSelf() {
     if (shouldReturnSelf.get()) {
-      if (clientManager.isClosed()) {
-        this.close();
-        this.invalidateAll();
-      }
       clientManager.returnClient(endpoint, this);
-      LOGGER.warn("AsyncPipeDataTransferServiceClient id = {}, returnSelf 1", 
id);
     }
-    LOGGER.warn("AsyncPipeDataTransferServiceClient id = {}, returnSelf 2", 
id);
   }
 
   public void setShouldReturnSelf(final boolean shouldReturnSelf) {
@@ -179,13 +167,10 @@ public class AsyncPipeDataTransferServiceClient extends 
IClientRPCService.AsyncC
       if (___transport != null && ___transport.isOpen()) {
         ___transport.close();
         LOGGER.warn("Manually closing transport to prevent resource leakage.");
-        LOGGER.warn("AsyncPipeDataTransferServiceClient id = {}, 
resetMethodState 1", id);
       }
       ___currentMethod = null;
       LOGGER.info("Method state has been reset due to manager not running.");
-      LOGGER.warn("AsyncPipeDataTransferServiceClient id = {}, 
resetMethodState 2", id);
     }
-    LOGGER.warn("AsyncPipeDataTransferServiceClient id = {}, resetMethodState 
3", id);
   }
 
   public String getIp() {
diff --git 
a/iotdb-core/node-commons/src/main/java/org/apache/iotdb/commons/pipe/connector/protocol/IoTDBSslSyncConnector.java
 
b/iotdb-core/node-commons/src/main/java/org/apache/iotdb/commons/pipe/connector/protocol/IoTDBSslSyncConnector.java
index 82886e11ffe..44e91874641 100644
--- 
a/iotdb-core/node-commons/src/main/java/org/apache/iotdb/commons/pipe/connector/protocol/IoTDBSslSyncConnector.java
+++ 
b/iotdb-core/node-commons/src/main/java/org/apache/iotdb/commons/pipe/connector/protocol/IoTDBSslSyncConnector.java
@@ -66,7 +66,7 @@ public abstract class IoTDBSslSyncConnector extends 
IoTDBConnector {
 
   private static final Logger LOGGER = 
LoggerFactory.getLogger(IoTDBSslSyncConnector.class);
 
-  protected volatile IoTDBSyncClientManager clientManager;
+  private volatile IoTDBSyncClientManager clientManager;
 
   protected IoTDBSyncClientManager getClientManager() {
     if (clientManager == null) {

Reply via email to