This is an automated email from the ASF dual-hosted git repository.

rong pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/iotdb.git


The following commit(s) were added to refs/heads/master by this push:
     new cc4dc0a9ed0 Pipe: Reduced pipe logs and controlled how often the log 
is printed below a certain frequency (#11973)
cc4dc0a9ed0 is described below

commit cc4dc0a9ed0031a93cb24320ba503a5f2873938f
Author: Caideyipi <[email protected]>
AuthorDate: Mon Feb 19 19:43:08 2024 +0800

    Pipe: Reduced pipe logs and controlled how often the log is printed below a 
certain frequency (#11973)
    
    - Reduced meta report & tsFile pin & wal pin logs according to log scales
    - Reduced connector event messages to core level
    - Downgraded the retry queue logs in async connector
    
    ---------
    
    Co-authored-by: Steve Yurong Su <[email protected]>
---
 .../db/pipe/agent/task/PipeTaskDataNodeAgent.java  | 21 ++++++++-
 .../thrift/async/IoTDBThriftAsyncConnector.java    |  9 ++--
 .../PipeTransferTabletBatchEventHandler.java       |  9 +++-
 .../PipeTransferTabletInsertionEventHandler.java   |  4 +-
 .../apache/iotdb/db/pipe/event/EnrichedEvent.java  | 34 ++++++++++---
 .../tablet/PipeInsertNodeTabletInsertionEvent.java |  9 ++++
 .../common/tablet/PipeRawTabletInsertionEvent.java | 13 +++++
 .../db/pipe/resource/PipeResourceManager.java      |  7 +++
 .../iotdb/db/pipe/resource/log/PipeLogManager.java | 40 ++++++++++++++++
 .../iotdb/db/pipe/resource/log/PipeLogStatus.java  | 52 ++++++++++++++++++++
 .../db/pipe/resource/memory/PipeMemoryBlock.java   |  5 ++
 .../resource/tsfile/PipeTsFileResourceManager.java | 20 ++++++--
 .../pipe/resource/wal/PipeWALResourceManager.java  | 23 +++++++--
 .../subtask/connector/PipeConnectorSubtask.java    | 12 +++--
 .../dataregion/wal/utils/WALEntryHandler.java      |  5 +-
 .../apache/iotdb/commons/conf/CommonConfig.java    | 55 ++++++++++++++++++++++
 .../iotdb/commons/conf/CommonDescriptor.java       | 31 ++++++++++++
 .../iotdb/commons/pipe/config/PipeConfig.java      | 33 +++++++++++++
 .../commons/pipe/task/meta/PipeMetaKeeper.java     |  4 ++
 19 files changed, 357 insertions(+), 29 deletions(-)

diff --git 
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/agent/task/PipeTaskDataNodeAgent.java
 
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/agent/task/PipeTaskDataNodeAgent.java
index 4f7dde606c8..299814b57af 100644
--- 
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/agent/task/PipeTaskDataNodeAgent.java
+++ 
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/agent/task/PipeTaskDataNodeAgent.java
@@ -60,6 +60,7 @@ import java.util.HashMap;
 import java.util.HashSet;
 import java.util.List;
 import java.util.Map;
+import java.util.Optional;
 import java.util.Set;
 import java.util.stream.Collectors;
 
@@ -276,10 +277,18 @@ public class PipeTaskDataNodeAgent extends PipeTaskAgent {
 
     final List<ByteBuffer> pipeMetaBinaryList = new ArrayList<>();
     try {
+      final Optional<Logger> logger =
+          PipeResourceManager.log()
+              .schedule(
+                  PipeTaskDataNodeAgent.class,
+                  
PipeConfig.getInstance().getPipeMetaReportMaxLogNumPerRound(),
+                  
PipeConfig.getInstance().getPipeMetaReportMaxLogIntervalRounds(),
+                  pipeMetaKeeper.getPipeMetaCount());
       for (final PipeMeta pipeMeta : pipeMetaKeeper.getPipeMetaList()) {
         pipeMetaBinaryList.add(pipeMeta.serialize());
-        LOGGER.info("Reporting pipe meta: {}", pipeMeta.coreReportMessage());
+        logger.ifPresent(l -> l.info("Reporting pipe meta: {}", 
pipeMeta.coreReportMessage()));
       }
+      LOGGER.info("Reported {} pipe metas.", pipeMetaBinaryList.size());
     } catch (IOException e) {
       throw new TException(e);
     }
@@ -306,10 +315,18 @@ public class PipeTaskDataNodeAgent extends PipeTaskAgent {
 
     final List<ByteBuffer> pipeMetaBinaryList = new ArrayList<>();
     try {
+      final Optional<Logger> logger =
+          PipeResourceManager.log()
+              .schedule(
+                  PipeTaskDataNodeAgent.class,
+                  
PipeConfig.getInstance().getPipeMetaReportMaxLogNumPerRound(),
+                  
PipeConfig.getInstance().getPipeMetaReportMaxLogIntervalRounds(),
+                  pipeMetaKeeper.getPipeMetaCount());
       for (final PipeMeta pipeMeta : pipeMetaKeeper.getPipeMetaList()) {
         pipeMetaBinaryList.add(pipeMeta.serialize());
-        LOGGER.info("Reporting pipe meta: {}", pipeMeta.coreReportMessage());
+        logger.ifPresent(l -> l.info("Reporting pipe meta: {}", 
pipeMeta.coreReportMessage()));
       }
+      LOGGER.info("Reported {} pipe metas.", pipeMetaBinaryList.size());
     } catch (IOException e) {
       throw new TException(e);
     }
diff --git 
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/connector/protocol/thrift/async/IoTDBThriftAsyncConnector.java
 
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/connector/protocol/thrift/async/IoTDBThriftAsyncConnector.java
index b182e35adcd..f7d37a343ff 100644
--- 
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/connector/protocol/thrift/async/IoTDBThriftAsyncConnector.java
+++ 
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/connector/protocol/thrift/async/IoTDBThriftAsyncConnector.java
@@ -330,8 +330,8 @@ public class IoTDBThriftAsyncConnector extends 
IoTDBConnector {
             peekedEvent,
             polledEvent);
       }
-      if (polledEvent != null) {
-        LOGGER.info("Polled event {} from retry queue.", polledEvent);
+      if (polledEvent != null && LOGGER.isDebugEnabled()) {
+        LOGGER.debug("Polled event {} from retry queue.", polledEvent);
       }
     }
   }
@@ -354,8 +354,9 @@ public class IoTDBThriftAsyncConnector extends 
IoTDBConnector {
    */
   public synchronized void addFailureEventToRetryQueue(Event event) {
     retryEventQueue.offer(event);
-
-    LOGGER.info("Added event {} to retry queue.", event);
+    if (LOGGER.isDebugEnabled()) {
+      LOGGER.debug("Added event {} to retry queue.", event);
+    }
   }
 
   //////////////////////////// Operations for close 
////////////////////////////
diff --git 
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/connector/protocol/thrift/async/handler/PipeTransferTabletBatchEventHandler.java
 
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/connector/protocol/thrift/async/handler/PipeTransferTabletBatchEventHandler.java
index 5e9ddf66ca1..1100ed7f697 100644
--- 
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/connector/protocol/thrift/async/handler/PipeTransferTabletBatchEventHandler.java
+++ 
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/connector/protocol/thrift/async/handler/PipeTransferTabletBatchEventHandler.java
@@ -36,6 +36,7 @@ import org.slf4j.LoggerFactory;
 
 import java.io.IOException;
 import java.util.List;
+import java.util.stream.Collectors;
 
 public class PipeTransferTabletBatchEventHandler implements 
AsyncMethodCallback<TPipeTransferResp> {
 
@@ -87,7 +88,13 @@ public class PipeTransferTabletBatchEventHandler implements 
AsyncMethodCallback<
   public void onError(Exception exception) {
     LOGGER.warn(
         "Failed to transfer TabletInsertionEvent batch {} (request commit 
ids={}).",
-        events,
+        events.stream()
+            .map(
+                event ->
+                    event instanceof EnrichedEvent
+                        ? ((EnrichedEvent) event).coreReportMessage()
+                        : event.toString())
+            .collect(Collectors.toList()),
         requestCommitIds,
         exception);
 
diff --git 
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/connector/protocol/thrift/async/handler/PipeTransferTabletInsertionEventHandler.java
 
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/connector/protocol/thrift/async/handler/PipeTransferTabletInsertionEventHandler.java
index 6a7f79a3162..9f026c3d915 100644
--- 
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/connector/protocol/thrift/async/handler/PipeTransferTabletInsertionEventHandler.java
+++ 
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/connector/protocol/thrift/async/handler/PipeTransferTabletInsertionEventHandler.java
@@ -92,7 +92,9 @@ public abstract class 
PipeTransferTabletInsertionEventHandler<E extends TPipeTra
   public void onError(Exception exception) {
     LOGGER.warn(
         "Failed to transfer TabletInsertionEvent {} (committer key={}, commit 
id={}).",
-        event,
+        event instanceof EnrichedEvent
+            ? ((EnrichedEvent) event).coreReportMessage()
+            : event.toString(),
         event instanceof EnrichedEvent ? ((EnrichedEvent) 
event).getCommitterKey() : null,
         event instanceof EnrichedEvent ? ((EnrichedEvent) event).getCommitId() 
: null,
         exception);
diff --git 
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/event/EnrichedEvent.java
 
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/event/EnrichedEvent.java
index 894c5eae035..6b83e0023d8 100644
--- 
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/event/EnrichedEvent.java
+++ 
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/event/EnrichedEvent.java
@@ -257,18 +257,40 @@ public abstract class EnrichedEvent implements Event {
         + referenceCount.get()
         + ", pipeName='"
         + pipeName
-        + '\''
-        + ", pipeTaskMeta="
+        + "', pipeTaskMeta="
         + pipeTaskMeta
         + ", committerKey='"
         + committerKey
-        + '\''
-        + ", commitId="
+        + "', commitId="
         + commitId
         + ", pattern='"
         + pattern
-        + '\''
-        + ", startTime="
+        + "', startTime="
+        + startTime
+        + ", endTime="
+        + endTime
+        + ", isPatternParsed="
+        + isPatternParsed
+        + ", isTimeParsed="
+        + isTimeParsed
+        + ", shouldReportOnCommit="
+        + shouldReportOnCommit
+        + '}';
+  }
+
+  public String coreReportMessage() {
+    return "EnrichedEvent{"
+        + "referenceCount="
+        + referenceCount.get()
+        + ", pipeName='"
+        + pipeName
+        + "', committerKey='"
+        + committerKey
+        + "', commitId="
+        + commitId
+        + ", pattern='"
+        + pattern
+        + "', startTime="
         + startTime
         + ", endTime="
         + endTime
diff --git 
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/event/common/tablet/PipeInsertNodeTabletInsertionEvent.java
 
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/event/common/tablet/PipeInsertNodeTabletInsertionEvent.java
index 4ae1b9ee64d..652cda59593 100644
--- 
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/event/common/tablet/PipeInsertNodeTabletInsertionEvent.java
+++ 
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/event/common/tablet/PipeInsertNodeTabletInsertionEvent.java
@@ -255,4 +255,13 @@ public class PipeInsertNodeTabletInsertionEvent extends 
EnrichedEvent
         + " - "
         + super.toString();
   }
+
+  @Override
+  public String coreReportMessage() {
+    return String.format(
+            "PipeInsertNodeTabletInsertionEvent{walEntryHandler=%s, 
progressIndex=%s, isAligned=%s, isGeneratedByPipe=%s}",
+            walEntryHandler, progressIndex, isAligned, isGeneratedByPipe)
+        + " - "
+        + super.coreReportMessage();
+  }
 }
diff --git 
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/event/common/tablet/PipeRawTabletInsertionEvent.java
 
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/event/common/tablet/PipeRawTabletInsertionEvent.java
index eca75a95cec..1b843265515 100644
--- 
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/event/common/tablet/PipeRawTabletInsertionEvent.java
+++ 
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/event/common/tablet/PipeRawTabletInsertionEvent.java
@@ -219,4 +219,17 @@ public class PipeRawTabletInsertionEvent extends 
EnrichedEvent implements Tablet
         + " - "
         + super.toString();
   }
+
+  @Override
+  public String coreReportMessage() {
+    return String.format(
+            "PipeRawTabletInsertionEvent{tablet=%s, isAligned=%s, 
sourceEvent=%s, needToReport=%s, allocatedMemoryBlock=%s}",
+            tablet,
+            isAligned,
+            sourceEvent == null ? "null" : sourceEvent.coreReportMessage(),
+            needToReport,
+            allocatedMemoryBlock)
+        + " - "
+        + super.coreReportMessage();
+  }
 }
diff --git 
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/resource/PipeResourceManager.java
 
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/resource/PipeResourceManager.java
index 56fe8cd6bd0..35b872e40b3 100644
--- 
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/resource/PipeResourceManager.java
+++ 
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/resource/PipeResourceManager.java
@@ -20,6 +20,7 @@
 package org.apache.iotdb.db.pipe.resource;
 
 import org.apache.iotdb.commons.pipe.config.PipeConfig;
+import org.apache.iotdb.db.pipe.resource.log.PipeLogManager;
 import org.apache.iotdb.db.pipe.resource.memory.PipeMemoryManager;
 import org.apache.iotdb.db.pipe.resource.tsfile.PipeTsFileResourceManager;
 import org.apache.iotdb.db.pipe.resource.wal.PipeWALResourceManager;
@@ -33,6 +34,7 @@ public class PipeResourceManager {
   private final PipeTsFileResourceManager pipeTsFileResourceManager;
   private final AtomicReference<PipeWALResourceManager> pipeWALResourceManager;
   private final PipeMemoryManager pipeMemoryManager;
+  private final PipeLogManager pipeLogManager;
 
   public static PipeTsFileResourceManager tsfile() {
     return PipeResourceManagerHolder.INSTANCE.pipeTsFileResourceManager;
@@ -56,12 +58,17 @@ public class PipeResourceManager {
     return PipeResourceManagerHolder.INSTANCE.pipeMemoryManager;
   }
 
+  public static PipeLogManager log() {
+    return PipeResourceManagerHolder.INSTANCE.pipeLogManager;
+  }
+
   ///////////////////////////// SINGLETON /////////////////////////////
 
   private PipeResourceManager() {
     pipeTsFileResourceManager = new PipeTsFileResourceManager();
     pipeWALResourceManager = new AtomicReference<>();
     pipeMemoryManager = new PipeMemoryManager();
+    pipeLogManager = new PipeLogManager();
   }
 
   private static class PipeResourceManagerHolder {
diff --git 
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/resource/log/PipeLogManager.java
 
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/resource/log/PipeLogManager.java
new file mode 100644
index 00000000000..c7f6e445b6a
--- /dev/null
+++ 
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/resource/log/PipeLogManager.java
@@ -0,0 +1,40 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.iotdb.db.pipe.resource.log;
+
+import org.slf4j.Logger;
+
+import java.util.Optional;
+import java.util.concurrent.ConcurrentHashMap;
+import java.util.concurrent.ConcurrentMap;
+
+public class PipeLogManager {
+
+  private final ConcurrentMap<Class<?>, PipeLogStatus> logClass2LogStatusMap =
+      new ConcurrentHashMap<>();
+
+  public Optional<Logger> schedule(
+      Class<?> logClass, int maxAverageScale, int maxLogInterval, int scale) {
+    return logClass2LogStatusMap
+        .computeIfAbsent(
+            logClass, k -> new PipeLogStatus(logClass, maxAverageScale, 
maxLogInterval))
+        .schedule(scale);
+  }
+}
diff --git 
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/resource/log/PipeLogStatus.java
 
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/resource/log/PipeLogStatus.java
new file mode 100644
index 00000000000..67355dcc01a
--- /dev/null
+++ 
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/resource/log/PipeLogStatus.java
@@ -0,0 +1,52 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.iotdb.db.pipe.resource.log;
+
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import java.util.Optional;
+import java.util.concurrent.atomic.AtomicLong;
+
+class PipeLogStatus {
+
+  private final Logger logger;
+
+  private final int maxAverageScale;
+  private final int maxLogInterval;
+  private final AtomicLong currentRounds = new AtomicLong(0);
+
+  PipeLogStatus(Class<?> logClass, int maxAverageScale, int maxLogInterval) {
+    logger = LoggerFactory.getLogger(logClass);
+
+    this.maxAverageScale = maxAverageScale;
+    this.maxLogInterval = maxLogInterval;
+  }
+
+  synchronized Optional<Logger> schedule(int scale) {
+    if (currentRounds.incrementAndGet()
+        >= Math.min((int) Math.ceil((double) scale / maxAverageScale), 
maxLogInterval)) {
+      currentRounds.set(0);
+      return Optional.of(logger);
+    }
+
+    return Optional.empty();
+  }
+}
diff --git 
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/resource/memory/PipeMemoryBlock.java
 
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/resource/memory/PipeMemoryBlock.java
index e0306eea7da..98947caab56 100644
--- 
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/resource/memory/PipeMemoryBlock.java
+++ 
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/resource/memory/PipeMemoryBlock.java
@@ -158,6 +158,11 @@ public class PipeMemoryBlock implements AutoCloseable {
     isReleased = true;
   }
 
+  @Override
+  public String toString() {
+    return "PipeMemoryBlock{" + "memoryUsageInBytes=" + 
memoryUsageInBytes.get() + '}';
+  }
+
   @Override
   public void close() {
     while (true) {
diff --git 
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/resource/tsfile/PipeTsFileResourceManager.java
 
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/resource/tsfile/PipeTsFileResourceManager.java
index 9aadda86ae3..00f5d1c964a 100644
--- 
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/resource/tsfile/PipeTsFileResourceManager.java
+++ 
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/resource/tsfile/PipeTsFileResourceManager.java
@@ -23,6 +23,7 @@ import org.apache.iotdb.commons.conf.IoTDBConstant;
 import org.apache.iotdb.commons.pipe.config.PipeConfig;
 import org.apache.iotdb.db.pipe.agent.PipeAgent;
 import org.apache.iotdb.db.pipe.agent.runtime.PipePeriodicalJobExecutor;
+import org.apache.iotdb.db.pipe.resource.PipeResourceManager;
 import org.apache.iotdb.db.storageengine.dataregion.tsfile.TsFileResource;
 import org.apache.iotdb.tsfile.file.metadata.enums.TSDataType;
 
@@ -37,6 +38,7 @@ import java.nio.file.Path;
 import java.util.Iterator;
 import java.util.List;
 import java.util.Map;
+import java.util.Optional;
 import java.util.concurrent.ConcurrentHashMap;
 import java.util.concurrent.TimeUnit;
 import java.util.concurrent.locks.ReentrantLock;
@@ -78,6 +80,14 @@ public class PipeTsFileResourceManager {
   private void ttlCheck() {
     final Iterator<Map.Entry<String, PipeTsFileResource>> iterator =
         hardlinkOrCopiedFileToPipeTsFileResourceMap.entrySet().iterator();
+    final Optional<Logger> logger =
+        PipeResourceManager.log()
+            .schedule(
+                PipeTsFileResourceManager.class,
+                PipeConfig.getInstance().getPipeTsFilePinMaxLogNumPerRound(),
+                
PipeConfig.getInstance().getPipeTsFilePinMaxLogIntervalRounds(),
+                hardlinkOrCopiedFileToPipeTsFileResourceMap.size());
+
     while (iterator.hasNext()) {
       final Map.Entry<String, PipeTsFileResource> entry = iterator.next();
 
@@ -85,10 +95,12 @@ public class PipeTsFileResourceManager {
         if (entry.getValue().closeIfOutOfTimeToLive()) {
           iterator.remove();
         } else {
-          LOGGER.info(
-              "Pipe file (file name: {}) is still referenced {} times",
-              entry.getKey(),
-              entry.getValue().getReferenceCount());
+          logger.ifPresent(
+              l ->
+                  l.info(
+                      "Pipe file (file name: {}) is still referenced {} times",
+                      entry.getKey(),
+                      entry.getValue().getReferenceCount()));
         }
       } catch (IOException e) {
         LOGGER.warn("failed to close PipeTsFileResource when checking TTL: ", 
e);
diff --git 
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/resource/wal/PipeWALResourceManager.java
 
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/resource/wal/PipeWALResourceManager.java
index aa8f4e71623..7e0adbacd32 100644
--- 
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/resource/wal/PipeWALResourceManager.java
+++ 
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/resource/wal/PipeWALResourceManager.java
@@ -19,7 +19,9 @@
 
 package org.apache.iotdb.db.pipe.resource.wal;
 
+import org.apache.iotdb.commons.pipe.config.PipeConfig;
 import org.apache.iotdb.db.pipe.agent.PipeAgent;
+import org.apache.iotdb.db.pipe.resource.PipeResourceManager;
 import org.apache.iotdb.db.storageengine.dataregion.wal.utils.WALEntryHandler;
 
 import org.slf4j.Logger;
@@ -30,6 +32,7 @@ import java.util.ConcurrentModificationException;
 import java.util.Iterator;
 import java.util.Map;
 import java.util.Objects;
+import java.util.Optional;
 import java.util.concurrent.ConcurrentHashMap;
 import java.util.concurrent.locks.ReentrantLock;
 
@@ -61,6 +64,14 @@ public abstract class PipeWALResourceManager {
   private void ttlCheck() {
     final Iterator<Map.Entry<Long, PipeWALResource>> iterator =
         memtableIdToPipeWALResourceMap.entrySet().iterator();
+    final Optional<Logger> logger =
+        PipeResourceManager.log()
+            .schedule(
+                PipeWALResourceManager.class,
+                PipeConfig.getInstance().getPipeWalPinMaxLogNumPerRound(),
+                PipeConfig.getInstance().getPipeWalPinMaxLogIntervalRounds(),
+                memtableIdToPipeWALResourceMap.size());
+
     try {
       while (iterator.hasNext()) {
         final Map.Entry<Long, PipeWALResource> entry = iterator.next();
@@ -71,11 +82,13 @@ public abstract class PipeWALResourceManager {
         try {
           if (entry.getValue().invalidateIfPossible()) {
             iterator.remove();
-          } else if (LOGGER.isDebugEnabled()) {
-            LOGGER.debug(
-                "WAL (memtableId {}) is still referenced {} times",
-                entry.getKey(),
-                entry.getValue().getReferenceCount());
+          } else {
+            logger.ifPresent(
+                l ->
+                    l.info(
+                        "WAL (memtableId {}) is still referenced {} times",
+                        entry.getKey(),
+                        entry.getValue().getReferenceCount()));
           }
         } finally {
           lock.unlock();
diff --git 
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/task/subtask/connector/PipeConnectorSubtask.java
 
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/task/subtask/connector/PipeConnectorSubtask.java
index 167ef3e538f..bc30ee2b0d7 100644
--- 
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/task/subtask/connector/PipeConnectorSubtask.java
+++ 
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/task/subtask/connector/PipeConnectorSubtask.java
@@ -75,8 +75,8 @@ public class PipeConnectorSubtask extends PipeDataNodeSubtask 
{
   // when no event can be pulled.
   private static final PipeHeartbeatEvent CRON_HEARTBEAT_EVENT =
       new PipeHeartbeatEvent("cron", false);
-  private static final long CRON_HEARTBEAT_EVENT_INJECT_INTERVAL_SECONDS =
-      
PipeConfig.getInstance().getPipeSubtaskExecutorCronHeartbeatEventIntervalSeconds();
+  private static final long CRON_HEARTBEAT_EVENT_INJECT_INTERVAL_MILLISECONDS =
+      
PipeConfig.getInstance().getPipeSubtaskExecutorCronHeartbeatEventIntervalSeconds()
 * 1000;
   private long lastHeartbeatEventInjectTime = System.currentTimeMillis();
 
   public PipeConnectorSubtask(
@@ -131,7 +131,7 @@ public class PipeConnectorSubtask extends 
PipeDataNodeSubtask {
     try {
       if (event == null) {
         if (System.currentTimeMillis() - lastHeartbeatEventInjectTime
-            > CRON_HEARTBEAT_EVENT_INJECT_INTERVAL_SECONDS) {
+            > CRON_HEARTBEAT_EVENT_INJECT_INTERVAL_MILLISECONDS) {
           transferHeartbeatEvent(CRON_HEARTBEAT_EVENT);
         }
         return false;
@@ -166,7 +166,11 @@ public class PipeConnectorSubtask extends 
PipeDataNodeSubtask {
         throw new PipeException(
             String.format(
                 "Exception in pipe transfer, subtask: %s, last event: %s, root 
cause: %s",
-                taskID, lastEvent, 
ErrorHandlingUtils.getRootCause(e).getMessage()),
+                taskID,
+                lastEvent instanceof EnrichedEvent
+                    ? ((EnrichedEvent) lastEvent).coreReportMessage()
+                    : lastEvent.toString(),
+                ErrorHandlingUtils.getRootCause(e).getMessage()),
             e);
       } else {
         LOGGER.info("Exception in pipe transfer, ignored because pipe is 
dropped.", e);
diff --git 
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/storageengine/dataregion/wal/utils/WALEntryHandler.java
 
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/storageengine/dataregion/wal/utils/WALEntryHandler.java
index 821263c6858..ea9570033ae 100644
--- 
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/storageengine/dataregion/wal/utils/WALEntryHandler.java
+++ 
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/storageengine/dataregion/wal/utils/WALEntryHandler.java
@@ -20,6 +20,7 @@
 package org.apache.iotdb.db.storageengine.dataregion.wal.utils;
 
 import org.apache.iotdb.db.queryengine.plan.planner.plan.node.write.InsertNode;
+import org.apache.iotdb.db.storageengine.dataregion.wal.buffer.WALEntry;
 import org.apache.iotdb.db.storageengine.dataregion.wal.buffer.WALEntryValue;
 import 
org.apache.iotdb.db.storageengine.dataregion.wal.exception.MemTablePinException;
 import 
org.apache.iotdb.db.storageengine.dataregion.wal.exception.WALPipeException;
@@ -33,8 +34,8 @@ import java.nio.ByteBuffer;
 import java.util.concurrent.atomic.AtomicReference;
 
 /**
- * This handler is used by the Pipe to find the corresponding insert node. 
Besides, it can try to
- * pin/unpin the wal entries by the memTable id.
+ * This handler is used by the Pipe to find the corresponding {@link 
InsertNode}. Besides, it can
+ * try to pin/unpin the {@link WALEntry}s by the memTable id.
  */
 public class WALEntryHandler {
 
diff --git 
a/iotdb-core/node-commons/src/main/java/org/apache/iotdb/commons/conf/CommonConfig.java
 
b/iotdb-core/node-commons/src/main/java/org/apache/iotdb/commons/conf/CommonConfig.java
index 114f759d759..0a3d1381663 100644
--- 
a/iotdb-core/node-commons/src/main/java/org/apache/iotdb/commons/conf/CommonConfig.java
+++ 
b/iotdb-core/node-commons/src/main/java/org/apache/iotdb/commons/conf/CommonConfig.java
@@ -189,6 +189,13 @@ public class CommonConfig {
   private long pipeMaxAllowedLinkedTsFileCount = 100;
   private long pipeStuckRestartIntervalSeconds = 120;
 
+  private int pipeMetaReportMaxLogNumPerRound = 10;
+  private int pipeMetaReportMaxLogIntervalRounds = 36;
+  private int pipeTsFilePinMaxLogNumPerRound = 10;
+  private int pipeTsFilePinMaxLogIntervalRounds = 90;
+  private int pipeWalPinMaxLogNumPerRound = 10;
+  private int pipeWalPinMaxLogIntervalRounds = 90;
+
   private boolean pipeMemoryManagementEnabled = true;
   private long pipeMemoryAllocateRetryIntervalMs = 1000;
   private int pipeMemoryAllocateMaxRetries = 10;
@@ -787,6 +794,54 @@ public class CommonConfig {
     this.pipeStuckRestartIntervalSeconds = pipeStuckRestartIntervalSeconds;
   }
 
+  public int getPipeMetaReportMaxLogNumPerRound() {
+    return pipeMetaReportMaxLogNumPerRound;
+  }
+
+  public void setPipeMetaReportMaxLogNumPerRound(int 
pipeMetaReportMaxLogNumPerRound) {
+    this.pipeMetaReportMaxLogNumPerRound = pipeMetaReportMaxLogNumPerRound;
+  }
+
+  public int getPipeMetaReportMaxLogIntervalRounds() {
+    return pipeMetaReportMaxLogIntervalRounds;
+  }
+
+  public void setPipeMetaReportMaxLogIntervalRounds(int 
pipeMetaReportMaxLogIntervalRounds) {
+    this.pipeMetaReportMaxLogIntervalRounds = 
pipeMetaReportMaxLogIntervalRounds;
+  }
+
+  public int getPipeTsFilePinMaxLogNumPerRound() {
+    return pipeTsFilePinMaxLogNumPerRound;
+  }
+
+  public void setPipeTsFilePinMaxLogNumPerRound(int 
pipeTsFilePinMaxLogNumPerRound) {
+    this.pipeTsFilePinMaxLogNumPerRound = pipeTsFilePinMaxLogNumPerRound;
+  }
+
+  public int getPipeTsFilePinMaxLogIntervalRounds() {
+    return pipeTsFilePinMaxLogIntervalRounds;
+  }
+
+  public void setPipeTsFilePinMaxLogIntervalRounds(int 
pipeTsFilePinMaxLogIntervalRounds) {
+    this.pipeTsFilePinMaxLogIntervalRounds = pipeTsFilePinMaxLogIntervalRounds;
+  }
+
+  public int getPipeWalPinMaxLogNumPerRound() {
+    return pipeWalPinMaxLogNumPerRound;
+  }
+
+  public void setPipeWalPinMaxLogNumPerRound(int pipeWalPinMaxLogNumPerRound) {
+    this.pipeWalPinMaxLogNumPerRound = pipeWalPinMaxLogNumPerRound;
+  }
+
+  public int getPipeWalPinMaxLogIntervalRounds() {
+    return pipeWalPinMaxLogIntervalRounds;
+  }
+
+  public void setPipeWalPinMaxLogIntervalRounds(int 
pipeWalPinMaxLogIntervalRounds) {
+    this.pipeWalPinMaxLogIntervalRounds = pipeWalPinMaxLogIntervalRounds;
+  }
+
   public boolean getPipeMemoryManagementEnabled() {
     return pipeMemoryManagementEnabled;
   }
diff --git 
a/iotdb-core/node-commons/src/main/java/org/apache/iotdb/commons/conf/CommonDescriptor.java
 
b/iotdb-core/node-commons/src/main/java/org/apache/iotdb/commons/conf/CommonDescriptor.java
index f6dfe120db0..5faf8723817 100644
--- 
a/iotdb-core/node-commons/src/main/java/org/apache/iotdb/commons/conf/CommonDescriptor.java
+++ 
b/iotdb-core/node-commons/src/main/java/org/apache/iotdb/commons/conf/CommonDescriptor.java
@@ -444,6 +444,37 @@ public class CommonDescriptor {
                 "pipe_stuck_restart_interval_seconds",
                 String.valueOf(config.getPipeStuckRestartIntervalSeconds()))));
 
+    config.setPipeMetaReportMaxLogNumPerRound(
+        Integer.parseInt(
+            properties.getProperty(
+                "pipe_meta_report_max_log_num_per_round",
+                String.valueOf(config.getPipeMetaReportMaxLogNumPerRound()))));
+    config.setPipeMetaReportMaxLogIntervalRounds(
+        Integer.parseInt(
+            properties.getProperty(
+                "pipe_meta_report_max_log_interval_rounds",
+                
String.valueOf(config.getPipeMetaReportMaxLogIntervalRounds()))));
+    config.setPipeTsFilePinMaxLogNumPerRound(
+        Integer.parseInt(
+            properties.getProperty(
+                "pipe_tsfile_pin_max_log_num_per_round",
+                String.valueOf(config.getPipeTsFilePinMaxLogNumPerRound()))));
+    config.setPipeTsFilePinMaxLogIntervalRounds(
+        Integer.parseInt(
+            properties.getProperty(
+                "pipe_tsfile_pin_max_log_interval_rounds",
+                
String.valueOf(config.getPipeTsFilePinMaxLogIntervalRounds()))));
+    config.setPipeWalPinMaxLogNumPerRound(
+        Integer.parseInt(
+            properties.getProperty(
+                "pipe_wal_pin_max_log_num_per_round",
+                String.valueOf(config.getPipeWalPinMaxLogNumPerRound()))));
+    config.setPipeWalPinMaxLogIntervalRounds(
+        Integer.parseInt(
+            properties.getProperty(
+                "pipe_wal_pin_max_log_interval_rounds",
+                String.valueOf(config.getPipeWalPinMaxLogIntervalRounds()))));
+
     config.setPipeMemoryManagementEnabled(
         Boolean.parseBoolean(
             properties.getProperty(
diff --git 
a/iotdb-core/node-commons/src/main/java/org/apache/iotdb/commons/pipe/config/PipeConfig.java
 
b/iotdb-core/node-commons/src/main/java/org/apache/iotdb/commons/pipe/config/PipeConfig.java
index 219701f74ee..80223a4439c 100644
--- 
a/iotdb-core/node-commons/src/main/java/org/apache/iotdb/commons/pipe/config/PipeConfig.java
+++ 
b/iotdb-core/node-commons/src/main/java/org/apache/iotdb/commons/pipe/config/PipeConfig.java
@@ -185,6 +185,32 @@ public class PipeConfig {
     return COMMON_CONFIG.getPipeStuckRestartIntervalSeconds();
   }
 
+  /////////////////////////////// Logger ///////////////////////////////
+
+  public int getPipeMetaReportMaxLogNumPerRound() {
+    return COMMON_CONFIG.getPipeMetaReportMaxLogNumPerRound();
+  }
+
+  public int getPipeMetaReportMaxLogIntervalRounds() {
+    return COMMON_CONFIG.getPipeMetaReportMaxLogIntervalRounds();
+  }
+
+  public int getPipeTsFilePinMaxLogNumPerRound() {
+    return COMMON_CONFIG.getPipeTsFilePinMaxLogNumPerRound();
+  }
+
+  public int getPipeTsFilePinMaxLogIntervalRounds() {
+    return COMMON_CONFIG.getPipeTsFilePinMaxLogIntervalRounds();
+  }
+
+  public int getPipeWalPinMaxLogNumPerRound() {
+    return COMMON_CONFIG.getPipeWalPinMaxLogNumPerRound();
+  }
+
+  public int getPipeWalPinMaxLogIntervalRounds() {
+    return COMMON_CONFIG.getPipeWalPinMaxLogIntervalRounds();
+  }
+
   /////////////////////////////// Memory ///////////////////////////////
 
   public boolean getPipeMemoryManagementEnabled() {
@@ -284,6 +310,13 @@ public class PipeConfig {
     LOGGER.info("PipeMaxAllowedLinkedTsFileCount: {}", 
getPipeMaxAllowedLinkedTsFileCount());
     LOGGER.info("PipeStuckRestartIntervalSeconds: {}", 
getPipeStuckRestartIntervalSeconds());
 
+    LOGGER.info("PipeMetaReportMaxLogNumPerRound: {}", 
getPipeMetaReportMaxLogNumPerRound());
+    LOGGER.info("PipeMetaReportMaxLogIntervalRounds: {}", 
getPipeMetaReportMaxLogIntervalRounds());
+    LOGGER.info("PipeTsFilePinMaxLogNumPerRound: {}", 
getPipeTsFilePinMaxLogNumPerRound());
+    LOGGER.info("PipeTsFilePinMaxLogIntervalRounds: {}", 
getPipeTsFilePinMaxLogIntervalRounds());
+    LOGGER.info("PipeWalPinMaxLogNumPerRound: {}", 
getPipeWalPinMaxLogNumPerRound());
+    LOGGER.info("PipeWalPinMaxLogIntervalRounds: {}", 
getPipeWalPinMaxLogIntervalRounds());
+
     LOGGER.info("PipeMemoryManagementEnabled: {}", 
getPipeMemoryManagementEnabled());
     LOGGER.info("PipeMemoryAllocateMaxRetries: {}", 
getPipeMemoryAllocateMaxRetries());
     LOGGER.info(
diff --git 
a/iotdb-core/node-commons/src/main/java/org/apache/iotdb/commons/pipe/task/meta/PipeMetaKeeper.java
 
b/iotdb-core/node-commons/src/main/java/org/apache/iotdb/commons/pipe/task/meta/PipeMetaKeeper.java
index 003eec9bdd8..742d2db7135 100644
--- 
a/iotdb-core/node-commons/src/main/java/org/apache/iotdb/commons/pipe/task/meta/PipeMetaKeeper.java
+++ 
b/iotdb-core/node-commons/src/main/java/org/apache/iotdb/commons/pipe/task/meta/PipeMetaKeeper.java
@@ -89,6 +89,10 @@ public class PipeMetaKeeper {
     return pipeNameToPipeMetaMap.values();
   }
 
+  public int getPipeMetaCount() {
+    return pipeNameToPipeMetaMap.size();
+  }
+
   public PipeMeta getPipeMetaByPipeName(String pipeName) {
     return pipeNameToPipeMetaMap.get(pipeName);
   }


Reply via email to