This is an automated email from the ASF dual-hosted git repository.

Caideyipi pushed a commit to branch dev/1.3
in repository https://gitbox.apache.org/repos/asf/iotdb.git


The following commit(s) were added to refs/heads/dev/1.3 by this push:
     new ddeb23b53ce [To dev/1.3] Pipe Log: Reduce repeatable logs (#17700) 
(#17793)
ddeb23b53ce is described below

commit ddeb23b53cef249d9b2931da866063e489dd382b
Author: Caideyipi <[email protected]>
AuthorDate: Mon Jun 1 16:08:55 2026 +0800

    [To dev/1.3] Pipe Log: Reduce repeatable logs (#17700) (#17793)
    
    Backport the non-i18n log reduction part of 
b616502aec0b700d0b7f3a1577e9ecee1edc365b.
---
 .../sink/client/IoTDBDataNodeAsyncClientManager.java | 18 ++++++++++--------
 .../PipeConsensusTabletBatchEventHandler.java        | 14 +++++++++-----
 .../PipeConsensusTabletInsertionEventHandler.java    | 20 ++++++++++++++------
 .../PipeConsensusTsFileInsertionEventHandler.java    | 10 ++++++----
 .../thrift/async/IoTDBDataRegionAsyncSink.java       | 20 +++++++++++---------
 .../async/handler/PipeTransferTrackableHandler.java  |  4 +++-
 .../async/handler/PipeTransferTsFileHandler.java     | 13 ++++++++-----
 7 files changed, 61 insertions(+), 38 deletions(-)

diff --git 
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/sink/client/IoTDBDataNodeAsyncClientManager.java
 
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/sink/client/IoTDBDataNodeAsyncClientManager.java
index 435f890f3a2..e2f6dbec569 100644
--- 
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/sink/client/IoTDBDataNodeAsyncClientManager.java
+++ 
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/sink/client/IoTDBDataNodeAsyncClientManager.java
@@ -193,11 +193,12 @@ public class IoTDBDataNodeAsyncClientManager extends 
IoTDBClientManager
         return client;
       }
     } catch (final Exception e) {
-      LOGGER.warn(
-          "failed to borrow client {}:{} for cached leader.",
+      PipeLogger.log(
+          LOGGER::warn,
+          e,
+          "Failed to borrow client %s:%s for cached leader.",
           endPoint.getIp(),
-          endPoint.getPort(),
-          e);
+          endPoint.getPort());
     }
 
     return borrowClient();
@@ -342,11 +343,12 @@ public class IoTDBDataNodeAsyncClientManager extends 
IoTDBClientManager
           client.close();
           client.invalidateAll();
         } catch (final Exception e) {
-          LOGGER.warn(
-              "Failed to close client {}:{} after handshake failure when the 
manager is closed.",
+          PipeLogger.log(
+              LOGGER::warn,
+              e,
+              "Failed to close client %s:%s after handshake failure when the 
manager is closed.",
               targetNodeUrl.getIp(),
-              targetNodeUrl.getPort(),
-              e);
+              targetNodeUrl.getPort());
         }
       }
       client.setShouldReturnSelf(true);
diff --git 
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/sink/protocol/pipeconsensus/handler/PipeConsensusTabletBatchEventHandler.java
 
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/sink/protocol/pipeconsensus/handler/PipeConsensusTabletBatchEventHandler.java
index 56d2c9ad062..32481f43bef 100644
--- 
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/sink/protocol/pipeconsensus/handler/PipeConsensusTabletBatchEventHandler.java
+++ 
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/sink/protocol/pipeconsensus/handler/PipeConsensusTabletBatchEventHandler.java
@@ -22,6 +22,7 @@ package 
org.apache.iotdb.db.pipe.sink.protocol.pipeconsensus.handler;
 import org.apache.iotdb.common.rpc.thrift.TSStatus;
 import org.apache.iotdb.commons.client.async.AsyncPipeConsensusServiceClient;
 import org.apache.iotdb.commons.pipe.event.EnrichedEvent;
+import org.apache.iotdb.commons.pipe.resource.log.PipeLogger;
 import org.apache.iotdb.consensus.pipe.thrift.TPipeConsensusBatchTransferReq;
 import org.apache.iotdb.consensus.pipe.thrift.TPipeConsensusBatchTransferResp;
 import org.apache.iotdb.consensus.pipe.thrift.TPipeConsensusTransferResp;
@@ -116,17 +117,20 @@ public class PipeConsensusTabletBatchEventHandler
 
   @Override
   public void onError(final Exception exception) {
-    LOGGER.warn(
-        "PipeConsensus: Failed to transfer TabletInsertionEvent batch. Total 
failed events: {}, related pipe names: {}",
-        events.size(),
+    final Object pipeNames =
         events.stream()
             .map(
                 event ->
                     event instanceof EnrichedEvent
                         ? ((EnrichedEvent) event).getPipeName()
                         : "UNKNOWN")
-            .collect(Collectors.toSet()),
-        exception);
+            .collect(Collectors.toSet());
+    PipeLogger.log(
+        LOGGER::warn,
+        exception,
+        "PipeConsensus: Failed to transfer TabletInsertionEvent batch. Total 
failed events: %s, related pipe names: %s",
+        events.size(),
+        pipeNames);
 
     connector.addFailureEventsToRetryQueue(events);
   }
diff --git 
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/sink/protocol/pipeconsensus/handler/PipeConsensusTabletInsertionEventHandler.java
 
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/sink/protocol/pipeconsensus/handler/PipeConsensusTabletInsertionEventHandler.java
index 9d027e711f3..91c083fe3c6 100644
--- 
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/sink/protocol/pipeconsensus/handler/PipeConsensusTabletInsertionEventHandler.java
+++ 
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/sink/protocol/pipeconsensus/handler/PipeConsensusTabletInsertionEventHandler.java
@@ -22,6 +22,7 @@ package 
org.apache.iotdb.db.pipe.sink.protocol.pipeconsensus.handler;
 import org.apache.iotdb.common.rpc.thrift.TSStatus;
 import org.apache.iotdb.commons.client.async.AsyncPipeConsensusServiceClient;
 import org.apache.iotdb.commons.pipe.event.EnrichedEvent;
+import org.apache.iotdb.commons.pipe.resource.log.PipeLogger;
 import org.apache.iotdb.consensus.pipe.thrift.TPipeConsensusTransferReq;
 import org.apache.iotdb.consensus.pipe.thrift.TPipeConsensusTransferResp;
 import org.apache.iotdb.db.pipe.consensus.PipeConsensusSinkMetrics;
@@ -106,14 +107,21 @@ public abstract class 
PipeConsensusTabletInsertionEventHandler<E extends TPipeCo
 
   @Override
   public void onError(Exception exception) {
-    LOGGER.warn(
-        "Failed to transfer TabletInsertionEvent {} (committer key={}, commit 
id={}).",
+    final Object eventReportMessage =
         event instanceof EnrichedEvent
             ? ((EnrichedEvent) event).coreReportMessage()
-            : event.toString(),
-        event instanceof EnrichedEvent ? ((EnrichedEvent) 
event).getCommitterKey() : null,
-        event instanceof EnrichedEvent ? ((EnrichedEvent) event).getCommitId() 
: null,
-        exception);
+            : event.toString();
+    final Object committerKey =
+        event instanceof EnrichedEvent ? ((EnrichedEvent) 
event).getCommitterKey() : null;
+    final Object commitId =
+        event instanceof EnrichedEvent ? ((EnrichedEvent) event).getCommitId() 
: null;
+    PipeLogger.log(
+        LOGGER::warn,
+        exception,
+        "Failed to transfer TabletInsertionEvent %s (committer key=%s, commit 
id=%s).",
+        eventReportMessage,
+        committerKey,
+        commitId);
 
     connector.addFailureEventToRetryQueue(event);
     metric.recordRetryCounter();
diff --git 
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/sink/protocol/pipeconsensus/handler/PipeConsensusTsFileInsertionEventHandler.java
 
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/sink/protocol/pipeconsensus/handler/PipeConsensusTsFileInsertionEventHandler.java
index 22b55239e19..3496c0cf857 100644
--- 
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/sink/protocol/pipeconsensus/handler/PipeConsensusTsFileInsertionEventHandler.java
+++ 
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/sink/protocol/pipeconsensus/handler/PipeConsensusTsFileInsertionEventHandler.java
@@ -23,6 +23,7 @@ import org.apache.iotdb.common.rpc.thrift.TConsensusGroupId;
 import org.apache.iotdb.common.rpc.thrift.TSStatus;
 import org.apache.iotdb.commons.client.async.AsyncPipeConsensusServiceClient;
 import org.apache.iotdb.commons.pipe.config.PipeConfig;
+import org.apache.iotdb.commons.pipe.resource.log.PipeLogger;
 import 
org.apache.iotdb.commons.pipe.sink.payload.pipeconsensus.response.PipeConsensusTransferFilePieceResp;
 import org.apache.iotdb.consensus.pipe.thrift.TCommitId;
 import org.apache.iotdb.consensus.pipe.thrift.TPipeConsensusTransferResp;
@@ -273,12 +274,13 @@ public class PipeConsensusTsFileInsertionEventHandler
 
   @Override
   public void onError(final Exception exception) {
-    LOGGER.warn(
-        "Failed to transfer TsFileInsertionEvent {} (committer key {}, commit 
id {}).",
+    PipeLogger.log(
+        LOGGER::warn,
+        exception,
+        "Failed to transfer TsFileInsertionEvent %s (committer key %s, commit 
id %s).",
         tsFile,
         event.getCommitterKey(),
-        event.getCommitId(),
-        exception);
+        event.getCommitId());
 
     try {
       if (reader != null) {
diff --git 
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/sink/protocol/thrift/async/IoTDBDataRegionAsyncSink.java
 
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/sink/protocol/thrift/async/IoTDBDataRegionAsyncSink.java
index aaef2ae435f..9d0c1563c78 100644
--- 
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/sink/protocol/thrift/async/IoTDBDataRegionAsyncSink.java
+++ 
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/sink/protocol/thrift/async/IoTDBDataRegionAsyncSink.java
@@ -258,7 +258,7 @@ public class IoTDBDataRegionAsyncSink extends IoTDBSink {
                   false));
         }
       } catch (final Exception e) {
-        LOGGER.warn("Failed to transfer tsfile batch ({}).", sealedFiles, e);
+        PipeLogger.log(LOGGER::warn, e, "Failed to transfer tsfile batch 
(%s).", sealedFiles);
         if (eventsHadBeenAddedToRetryQueue.compareAndSet(false, true)) {
           addFailureEventsToRetryQueue(events, e);
         }
@@ -437,17 +437,19 @@ public class IoTDBDataRegionAsyncSink extends IoTDBSink {
       } catch (final Exception e) {
         if (e instanceof InterruptedException) {
           Thread.currentThread().interrupt();
-          LOGGER.warn(
-              "Transfer tsfile event {} asynchronously was interrupted.",
-              pipeTransferTsFileHandler.getTsFile(),
-              e);
+          PipeLogger.log(
+              LOGGER::warn,
+              e,
+              "Transfer tsfile event %s asynchronously was interrupted.",
+              pipeTransferTsFileHandler.getTsFile());
         }
 
         pipeTransferTsFileHandler.onError(e);
-        LOGGER.warn(
-            "Failed to transfer tsfile event {} asynchronously.",
-            pipeTransferTsFileHandler.getTsFile(),
-            e);
+        PipeLogger.log(
+            LOGGER::warn,
+            e,
+            "Failed to transfer tsfile event %s asynchronously.",
+            pipeTransferTsFileHandler.getTsFile());
       }
     }
   }
diff --git 
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/sink/protocol/thrift/async/handler/PipeTransferTrackableHandler.java
 
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/sink/protocol/thrift/async/handler/PipeTransferTrackableHandler.java
index a0e6ad73fe7..40b05066a93 100644
--- 
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/sink/protocol/thrift/async/handler/PipeTransferTrackableHandler.java
+++ 
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/sink/protocol/thrift/async/handler/PipeTransferTrackableHandler.java
@@ -21,6 +21,7 @@ package 
org.apache.iotdb.db.pipe.sink.protocol.thrift.async.handler;
 
 import org.apache.iotdb.commons.client.ThriftClient;
 import 
org.apache.iotdb.commons.client.async.AsyncPipeDataTransferServiceClient;
+import org.apache.iotdb.commons.pipe.resource.log.PipeLogger;
 import 
org.apache.iotdb.commons.pipe.sink.payload.thrift.common.PipeTransferSliceReqBuilder;
 import 
org.apache.iotdb.db.pipe.sink.protocol.thrift.async.IoTDBDataRegionAsyncSink;
 import org.apache.iotdb.pipe.api.exception.PipeConnectionException;
@@ -105,7 +106,8 @@ public abstract class PipeTransferTrackableHandler
       client.returnSelf(
           (e) -> {
             if (e instanceof IllegalStateException) {
-              LOGGER.info(
+              PipeLogger.log(
+                  LOGGER::info,
                   "Illegal state when return the client to object pool, maybe 
the pool is already cleared. Will ignore.");
               return true;
             }
diff --git 
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/sink/protocol/thrift/async/handler/PipeTransferTsFileHandler.java
 
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/sink/protocol/thrift/async/handler/PipeTransferTsFileHandler.java
index c79a09ec239..8d9648f5292 100644
--- 
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/sink/protocol/thrift/async/handler/PipeTransferTsFileHandler.java
+++ 
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/sink/protocol/thrift/async/handler/PipeTransferTsFileHandler.java
@@ -157,8 +157,9 @@ public class PipeTransferTsFileHandler extends 
PipeTransferTrackableHandler {
     this.client = client;
 
     if (client == null) {
-      LOGGER.warn(
-          "Client has been returned to the pool. Current handler status is {}. 
Will not transfer {}.",
+      PipeLogger.log(
+          LOGGER::warn,
+          "Client has been returned to the pool. Current handler status is %s. 
Will not transfer %s.",
           sink.isClosed() ? "CLOSED" : "NOT CLOSED",
           tsFile);
       return;
@@ -420,7 +421,8 @@ public class PipeTransferTsFileHandler extends 
PipeTransferTrackableHandler {
     client.returnSelf(
         (e) -> {
           if (e instanceof IllegalStateException) {
-            LOGGER.info(
+            PipeLogger.log(
+                LOGGER::info,
                 "Illegal state when return the client to object pool, maybe 
the pool is already cleared. Will ignore.");
             return true;
           }
@@ -434,8 +436,9 @@ public class PipeTransferTsFileHandler extends 
PipeTransferTrackableHandler {
       final AsyncPipeDataTransferServiceClient client, final TPipeTransferReq 
req)
       throws TException {
     if (client == null) {
-      LOGGER.warn(
-          "Client has been returned to the pool. Current handler status is {}. 
Will not transfer {}.",
+      PipeLogger.log(
+          LOGGER::warn,
+          "Client has been returned to the pool. Current handler status is %s. 
Will not transfer %s.",
           sink.isClosed() ? "CLOSED" : "NOT CLOSED",
           tsFile);
       return;

Reply via email to