This is an automated email from the ASF dual-hosted git repository.

rong pushed a commit to branch pr/11973
in repository https://gitbox.apache.org/repos/asf/iotdb.git

commit b2d747e843ae8496a1a81fc18534479e18990d2d
Author: Steve Yurong Su <[email protected]>
AuthorDate: Mon Feb 19 17:49:04 2024 +0800

    refactor
---
 .../db/pipe/agent/task/PipeTaskDataNodeAgent.java  | 35 +++++++++++-----------
 .../iotdb/db/pipe/resource/log/PipeLogManager.java | 16 ++++++----
 .../iotdb/db/pipe/resource/log/PipeLogStatus.java  | 18 ++++++++---
 .../resource/tsfile/PipeTsFileResourceManager.java | 23 +++++++-------
 .../pipe/resource/wal/PipeWALResourceManager.java  | 23 ++++++++++----
 .../apache/iotdb/commons/conf/CommonConfig.java    | 18 +++++++++++
 .../iotdb/commons/conf/CommonDescriptor.java       | 10 +++++++
 .../iotdb/commons/pipe/config/PipeConfig.java      | 17 +++++++++++
 8 files changed, 115 insertions(+), 45 deletions(-)

diff --git 
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/agent/task/PipeTaskDataNodeAgent.java
 
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/agent/task/PipeTaskDataNodeAgent.java
index 7895200a1dc..299814b57af 100644
--- 
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/agent/task/PipeTaskDataNodeAgent.java
+++ 
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/agent/task/PipeTaskDataNodeAgent.java
@@ -60,6 +60,7 @@ import java.util.HashMap;
 import java.util.HashSet;
 import java.util.List;
 import java.util.Map;
+import java.util.Optional;
 import java.util.Set;
 import java.util.stream.Collectors;
 
@@ -71,14 +72,6 @@ public class PipeTaskDataNodeAgent extends PipeTaskAgent {
 
   protected static final IoTDBConfig CONFIG = 
IoTDBDescriptor.getInstance().getConfig();
 
-  public PipeTaskDataNodeAgent() {
-    PipeResourceManager.log()
-        .register(
-            PipeTaskDataNodeAgent.class,
-            PipeConfig.getInstance().getPipeMetaReportMaxLogNumPerRound(),
-            PipeConfig.getInstance().getPipeMetaReportMaxLogIntervalRounds());
-  }
-
   ////////////////////////// Pipe Task Management Entry 
//////////////////////////
 
   @Override
@@ -284,15 +277,18 @@ public class PipeTaskDataNodeAgent extends PipeTaskAgent {
 
     final List<ByteBuffer> pipeMetaBinaryList = new ArrayList<>();
     try {
-      boolean printThisRound =
+      final Optional<Logger> logger =
           PipeResourceManager.log()
-              .schedule(PipeTaskDataNodeAgent.class, 
pipeMetaKeeper.getPipeMetaCount());
+              .schedule(
+                  PipeTaskDataNodeAgent.class,
+                  
PipeConfig.getInstance().getPipeMetaReportMaxLogNumPerRound(),
+                  
PipeConfig.getInstance().getPipeMetaReportMaxLogIntervalRounds(),
+                  pipeMetaKeeper.getPipeMetaCount());
       for (final PipeMeta pipeMeta : pipeMetaKeeper.getPipeMetaList()) {
         pipeMetaBinaryList.add(pipeMeta.serialize());
-        if (LOGGER.isInfoEnabled() && printThisRound) {
-          LOGGER.info("Reporting pipe meta: {}", pipeMeta.coreReportMessage());
-        }
+        logger.ifPresent(l -> l.info("Reporting pipe meta: {}", 
pipeMeta.coreReportMessage()));
       }
+      LOGGER.info("Reported {} pipe metas.", pipeMetaBinaryList.size());
     } catch (IOException e) {
       throw new TException(e);
     }
@@ -319,15 +315,18 @@ public class PipeTaskDataNodeAgent extends PipeTaskAgent {
 
     final List<ByteBuffer> pipeMetaBinaryList = new ArrayList<>();
     try {
-      boolean printThisRound =
+      final Optional<Logger> logger =
           PipeResourceManager.log()
-              .schedule(PipeTaskDataNodeAgent.class, 
pipeMetaKeeper.getPipeMetaCount());
+              .schedule(
+                  PipeTaskDataNodeAgent.class,
+                  
PipeConfig.getInstance().getPipeMetaReportMaxLogNumPerRound(),
+                  
PipeConfig.getInstance().getPipeMetaReportMaxLogIntervalRounds(),
+                  pipeMetaKeeper.getPipeMetaCount());
       for (final PipeMeta pipeMeta : pipeMetaKeeper.getPipeMetaList()) {
         pipeMetaBinaryList.add(pipeMeta.serialize());
-        if (LOGGER.isInfoEnabled() && printThisRound) {
-          LOGGER.info("Reporting pipe meta: {}", pipeMeta.coreReportMessage());
-        }
+        logger.ifPresent(l -> l.info("Reporting pipe meta: {}", 
pipeMeta.coreReportMessage()));
       }
+      LOGGER.info("Reported {} pipe metas.", pipeMetaBinaryList.size());
     } catch (IOException e) {
       throw new TException(e);
     }
diff --git 
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/resource/log/PipeLogManager.java
 
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/resource/log/PipeLogManager.java
index a032ab1c65b..c7f6e445b6a 100644
--- 
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/resource/log/PipeLogManager.java
+++ 
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/resource/log/PipeLogManager.java
@@ -19,18 +19,22 @@
 
 package org.apache.iotdb.db.pipe.resource.log;
 
+import org.slf4j.Logger;
+
+import java.util.Optional;
 import java.util.concurrent.ConcurrentHashMap;
 import java.util.concurrent.ConcurrentMap;
 
 public class PipeLogManager {
+
   private final ConcurrentMap<Class<?>, PipeLogStatus> logClass2LogStatusMap =
       new ConcurrentHashMap<>();
 
-  public void register(Class<?> logClass, int maxAverageScale, int 
maxLogInterval) {
-    logClass2LogStatusMap.put(logClass, new PipeLogStatus(maxAverageScale, 
maxLogInterval));
-  }
-
-  public boolean schedule(Class<?> logClass, int scale) {
-    return logClass2LogStatusMap.get(logClass).schedule(scale);
+  public Optional<Logger> schedule(
+      Class<?> logClass, int maxAverageScale, int maxLogInterval, int scale) {
+    return logClass2LogStatusMap
+        .computeIfAbsent(
+            logClass, k -> new PipeLogStatus(logClass, maxAverageScale, 
maxLogInterval))
+        .schedule(scale);
   }
 }
diff --git 
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/resource/log/PipeLogStatus.java
 
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/resource/log/PipeLogStatus.java
index 53cd28a79ad..67355dcc01a 100644
--- 
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/resource/log/PipeLogStatus.java
+++ 
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/resource/log/PipeLogStatus.java
@@ -19,24 +19,34 @@
 
 package org.apache.iotdb.db.pipe.resource.log;
 
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import java.util.Optional;
 import java.util.concurrent.atomic.AtomicLong;
 
 class PipeLogStatus {
+
+  private final Logger logger;
+
   private final int maxAverageScale;
   private final int maxLogInterval;
   private final AtomicLong currentRounds = new AtomicLong(0);
 
-  PipeLogStatus(int maxAverageScale, int maxLogInterval) {
+  PipeLogStatus(Class<?> logClass, int maxAverageScale, int maxLogInterval) {
+    logger = LoggerFactory.getLogger(logClass);
+
     this.maxAverageScale = maxAverageScale;
     this.maxLogInterval = maxLogInterval;
   }
 
-  boolean schedule(int scale) {
+  synchronized Optional<Logger> schedule(int scale) {
     if (currentRounds.incrementAndGet()
         >= Math.min((int) Math.ceil((double) scale / maxAverageScale), 
maxLogInterval)) {
       currentRounds.set(0);
-      return true;
+      return Optional.of(logger);
     }
-    return false;
+
+    return Optional.empty();
   }
 }
diff --git 
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/resource/tsfile/PipeTsFileResourceManager.java
 
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/resource/tsfile/PipeTsFileResourceManager.java
index d26ba1df007..00f5d1c964a 100644
--- 
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/resource/tsfile/PipeTsFileResourceManager.java
+++ 
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/resource/tsfile/PipeTsFileResourceManager.java
@@ -38,6 +38,7 @@ import java.nio.file.Path;
 import java.util.Iterator;
 import java.util.List;
 import java.util.Map;
+import java.util.Optional;
 import java.util.concurrent.ConcurrentHashMap;
 import java.util.concurrent.TimeUnit;
 import java.util.concurrent.locks.ReentrantLock;
@@ -56,11 +57,6 @@ public class PipeTsFileResourceManager {
             "PipeTsFileResourceManager#ttlCheck()",
             this::tryTtlCheck,
             Math.max(PipeTsFileResource.TSFILE_MIN_TIME_TO_LIVE_IN_MS / 1000, 
1));
-    PipeResourceManager.log()
-        .register(
-            PipeTsFileResourceManager.class,
-            PipeConfig.getInstance().getPipeTsFilePinMaxLogNumPerRound(),
-            PipeConfig.getInstance().getPipeTsFilePinMaxLogIntervalRounds());
   }
 
   private void tryTtlCheck() {
@@ -84,11 +80,14 @@ public class PipeTsFileResourceManager {
   private void ttlCheck() {
     final Iterator<Map.Entry<String, PipeTsFileResource>> iterator =
         hardlinkOrCopiedFileToPipeTsFileResourceMap.entrySet().iterator();
-    boolean printThisRound =
+    final Optional<Logger> logger =
         PipeResourceManager.log()
             .schedule(
                 PipeTsFileResourceManager.class,
+                PipeConfig.getInstance().getPipeTsFilePinMaxLogNumPerRound(),
+                
PipeConfig.getInstance().getPipeTsFilePinMaxLogIntervalRounds(),
                 hardlinkOrCopiedFileToPipeTsFileResourceMap.size());
+
     while (iterator.hasNext()) {
       final Map.Entry<String, PipeTsFileResource> entry = iterator.next();
 
@@ -96,12 +95,12 @@ public class PipeTsFileResourceManager {
         if (entry.getValue().closeIfOutOfTimeToLive()) {
           iterator.remove();
         } else {
-          if (printThisRound) {
-            LOGGER.info(
-                "Pipe file (file name: {}) is still referenced {} times",
-                entry.getKey(),
-                entry.getValue().getReferenceCount());
-          }
+          logger.ifPresent(
+              l ->
+                  l.info(
+                      "Pipe file (file name: {}) is still referenced {} times",
+                      entry.getKey(),
+                      entry.getValue().getReferenceCount()));
         }
       } catch (IOException e) {
         LOGGER.warn("failed to close PipeTsFileResource when checking TTL: ", 
e);
diff --git 
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/resource/wal/PipeWALResourceManager.java
 
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/resource/wal/PipeWALResourceManager.java
index aa8f4e71623..7e0adbacd32 100644
--- 
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/resource/wal/PipeWALResourceManager.java
+++ 
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/resource/wal/PipeWALResourceManager.java
@@ -19,7 +19,9 @@
 
 package org.apache.iotdb.db.pipe.resource.wal;
 
+import org.apache.iotdb.commons.pipe.config.PipeConfig;
 import org.apache.iotdb.db.pipe.agent.PipeAgent;
+import org.apache.iotdb.db.pipe.resource.PipeResourceManager;
 import org.apache.iotdb.db.storageengine.dataregion.wal.utils.WALEntryHandler;
 
 import org.slf4j.Logger;
@@ -30,6 +32,7 @@ import java.util.ConcurrentModificationException;
 import java.util.Iterator;
 import java.util.Map;
 import java.util.Objects;
+import java.util.Optional;
 import java.util.concurrent.ConcurrentHashMap;
 import java.util.concurrent.locks.ReentrantLock;
 
@@ -61,6 +64,14 @@ public abstract class PipeWALResourceManager {
   private void ttlCheck() {
     final Iterator<Map.Entry<Long, PipeWALResource>> iterator =
         memtableIdToPipeWALResourceMap.entrySet().iterator();
+    final Optional<Logger> logger =
+        PipeResourceManager.log()
+            .schedule(
+                PipeWALResourceManager.class,
+                PipeConfig.getInstance().getPipeWalPinMaxLogNumPerRound(),
+                PipeConfig.getInstance().getPipeWalPinMaxLogIntervalRounds(),
+                memtableIdToPipeWALResourceMap.size());
+
     try {
       while (iterator.hasNext()) {
         final Map.Entry<Long, PipeWALResource> entry = iterator.next();
@@ -71,11 +82,13 @@ public abstract class PipeWALResourceManager {
         try {
           if (entry.getValue().invalidateIfPossible()) {
             iterator.remove();
-          } else if (LOGGER.isDebugEnabled()) {
-            LOGGER.debug(
-                "WAL (memtableId {}) is still referenced {} times",
-                entry.getKey(),
-                entry.getValue().getReferenceCount());
+          } else {
+            logger.ifPresent(
+                l ->
+                    l.info(
+                        "WAL (memtableId {}) is still referenced {} times",
+                        entry.getKey(),
+                        entry.getValue().getReferenceCount()));
           }
         } finally {
           lock.unlock();
diff --git 
a/iotdb-core/node-commons/src/main/java/org/apache/iotdb/commons/conf/CommonConfig.java
 
b/iotdb-core/node-commons/src/main/java/org/apache/iotdb/commons/conf/CommonConfig.java
index 10d331693f7..0a3d1381663 100644
--- 
a/iotdb-core/node-commons/src/main/java/org/apache/iotdb/commons/conf/CommonConfig.java
+++ 
b/iotdb-core/node-commons/src/main/java/org/apache/iotdb/commons/conf/CommonConfig.java
@@ -193,6 +193,8 @@ public class CommonConfig {
   private int pipeMetaReportMaxLogIntervalRounds = 36;
   private int pipeTsFilePinMaxLogNumPerRound = 10;
   private int pipeTsFilePinMaxLogIntervalRounds = 90;
+  private int pipeWalPinMaxLogNumPerRound = 10;
+  private int pipeWalPinMaxLogIntervalRounds = 90;
 
   private boolean pipeMemoryManagementEnabled = true;
   private long pipeMemoryAllocateRetryIntervalMs = 1000;
@@ -824,6 +826,22 @@ public class CommonConfig {
     this.pipeTsFilePinMaxLogIntervalRounds = pipeTsFilePinMaxLogIntervalRounds;
   }
 
+  public int getPipeWalPinMaxLogNumPerRound() {
+    return pipeWalPinMaxLogNumPerRound;
+  }
+
+  public void setPipeWalPinMaxLogNumPerRound(int pipeWalPinMaxLogNumPerRound) {
+    this.pipeWalPinMaxLogNumPerRound = pipeWalPinMaxLogNumPerRound;
+  }
+
+  public int getPipeWalPinMaxLogIntervalRounds() {
+    return pipeWalPinMaxLogIntervalRounds;
+  }
+
+  public void setPipeWalPinMaxLogIntervalRounds(int 
pipeWalPinMaxLogIntervalRounds) {
+    this.pipeWalPinMaxLogIntervalRounds = pipeWalPinMaxLogIntervalRounds;
+  }
+
   public boolean getPipeMemoryManagementEnabled() {
     return pipeMemoryManagementEnabled;
   }
diff --git 
a/iotdb-core/node-commons/src/main/java/org/apache/iotdb/commons/conf/CommonDescriptor.java
 
b/iotdb-core/node-commons/src/main/java/org/apache/iotdb/commons/conf/CommonDescriptor.java
index c145f98b882..5faf8723817 100644
--- 
a/iotdb-core/node-commons/src/main/java/org/apache/iotdb/commons/conf/CommonDescriptor.java
+++ 
b/iotdb-core/node-commons/src/main/java/org/apache/iotdb/commons/conf/CommonDescriptor.java
@@ -464,6 +464,16 @@ public class CommonDescriptor {
             properties.getProperty(
                 "pipe_tsfile_pin_max_log_interval_rounds",
                 
String.valueOf(config.getPipeTsFilePinMaxLogIntervalRounds()))));
+    config.setPipeWalPinMaxLogNumPerRound(
+        Integer.parseInt(
+            properties.getProperty(
+                "pipe_wal_pin_max_log_num_per_round",
+                String.valueOf(config.getPipeWalPinMaxLogNumPerRound()))));
+    config.setPipeWalPinMaxLogIntervalRounds(
+        Integer.parseInt(
+            properties.getProperty(
+                "pipe_wal_pin_max_log_interval_rounds",
+                String.valueOf(config.getPipeWalPinMaxLogIntervalRounds()))));
 
     config.setPipeMemoryManagementEnabled(
         Boolean.parseBoolean(
diff --git 
a/iotdb-core/node-commons/src/main/java/org/apache/iotdb/commons/pipe/config/PipeConfig.java
 
b/iotdb-core/node-commons/src/main/java/org/apache/iotdb/commons/pipe/config/PipeConfig.java
index 9d7928df310..80223a4439c 100644
--- 
a/iotdb-core/node-commons/src/main/java/org/apache/iotdb/commons/pipe/config/PipeConfig.java
+++ 
b/iotdb-core/node-commons/src/main/java/org/apache/iotdb/commons/pipe/config/PipeConfig.java
@@ -185,6 +185,8 @@ public class PipeConfig {
     return COMMON_CONFIG.getPipeStuckRestartIntervalSeconds();
   }
 
+  /////////////////////////////// Logger ///////////////////////////////
+
   public int getPipeMetaReportMaxLogNumPerRound() {
     return COMMON_CONFIG.getPipeMetaReportMaxLogNumPerRound();
   }
@@ -201,6 +203,14 @@ public class PipeConfig {
     return COMMON_CONFIG.getPipeTsFilePinMaxLogIntervalRounds();
   }
 
+  public int getPipeWalPinMaxLogNumPerRound() {
+    return COMMON_CONFIG.getPipeWalPinMaxLogNumPerRound();
+  }
+
+  public int getPipeWalPinMaxLogIntervalRounds() {
+    return COMMON_CONFIG.getPipeWalPinMaxLogIntervalRounds();
+  }
+
   /////////////////////////////// Memory ///////////////////////////////
 
   public boolean getPipeMemoryManagementEnabled() {
@@ -300,6 +310,13 @@ public class PipeConfig {
     LOGGER.info("PipeMaxAllowedLinkedTsFileCount: {}", 
getPipeMaxAllowedLinkedTsFileCount());
     LOGGER.info("PipeStuckRestartIntervalSeconds: {}", 
getPipeStuckRestartIntervalSeconds());
 
+    LOGGER.info("PipeMetaReportMaxLogNumPerRound: {}", 
getPipeMetaReportMaxLogNumPerRound());
+    LOGGER.info("PipeMetaReportMaxLogIntervalRounds: {}", 
getPipeMetaReportMaxLogIntervalRounds());
+    LOGGER.info("PipeTsFilePinMaxLogNumPerRound: {}", 
getPipeTsFilePinMaxLogNumPerRound());
+    LOGGER.info("PipeTsFilePinMaxLogIntervalRounds: {}", 
getPipeTsFilePinMaxLogIntervalRounds());
+    LOGGER.info("PipeWalPinMaxLogNumPerRound: {}", 
getPipeWalPinMaxLogNumPerRound());
+    LOGGER.info("PipeWalPinMaxLogIntervalRounds: {}", 
getPipeWalPinMaxLogIntervalRounds());
+
     LOGGER.info("PipeMemoryManagementEnabled: {}", 
getPipeMemoryManagementEnabled());
     LOGGER.info("PipeMemoryAllocateMaxRetries: {}", 
getPipeMemoryAllocateMaxRetries());
     LOGGER.info(

Reply via email to