This is an automated email from the ASF dual-hosted git repository.

justinchen pushed a commit to branch dev/1.3
in repository https://gitbox.apache.org/repos/asf/iotdb.git


The following commit(s) were added to refs/heads/dev/1.3 by this push:
     new 77e87e494fc [To dev/1.3] Pipe: Reduced the progress index report 
interval & Added some logs (#15905) (#15908)
77e87e494fc is described below

commit 77e87e494fcc20e45e180886c782f14cdf3bd499
Author: Caideyipi <[email protected]>
AuthorDate: Thu Jul 10 19:24:46 2025 +0800

    [To dev/1.3] Pipe: Reduced the progress index report interval & Added some 
logs (#15905) (#15908)
    
    * partial
    
    * Changed default
    
    * Update PipeDataNodeTaskAgent.java
    
    * Next
---
 .../pipe/agent/task/PipeConfigNodeTaskAgent.java   | 20 ++++++------
 .../db/pipe/agent/task/PipeDataNodeTaskAgent.java  | 37 ++++++++++++----------
 .../apache/iotdb/commons/conf/CommonConfig.java    | 10 +++---
 .../iotdb/commons/pipe/config/PipeConfig.java      |  2 +-
 .../iotdb/commons/pipe/config/PipeDescriptor.java  |  2 +-
 .../commons/pipe/resource/log/PipeLogManager.java  |  2 +-
 .../commons/pipe/resource/log/PipeLogStatus.java   |  4 +--
 7 files changed, 39 insertions(+), 38 deletions(-)

diff --git 
a/iotdb-core/confignode/src/main/java/org/apache/iotdb/confignode/manager/pipe/agent/task/PipeConfigNodeTaskAgent.java
 
b/iotdb-core/confignode/src/main/java/org/apache/iotdb/confignode/manager/pipe/agent/task/PipeConfigNodeTaskAgent.java
index b115ba25b4a..ae20141c81f 100644
--- 
a/iotdb-core/confignode/src/main/java/org/apache/iotdb/confignode/manager/pipe/agent/task/PipeConfigNodeTaskAgent.java
+++ 
b/iotdb-core/confignode/src/main/java/org/apache/iotdb/confignode/manager/pipe/agent/task/PipeConfigNodeTaskAgent.java
@@ -205,21 +205,19 @@ public class PipeConfigNodeTaskAgent extends 
PipeTaskAgent {
     if (isShutdown() || !PipeConfigNodeAgent.runtime().isLeaderReady()) {
       return;
     }
-
-    LOGGER.info("Received pipe heartbeat request {} from config coordinator.", 
req.heartbeatId);
+    final Optional<Logger> logger =
+        PipeConfigNodeResourceManager.log()
+            .schedule(
+                PipeConfigNodeTaskAgent.class,
+                PipeConfig.getInstance().getPipeMetaReportMaxLogNumPerRound(),
+                
PipeConfig.getInstance().getPipeMetaReportMaxLogIntervalRounds(),
+                pipeMetaKeeper.getPipeMetaCount());
+    LOGGER.debug("Received pipe heartbeat request {} from config 
coordinator.", req.heartbeatId);
 
     final List<ByteBuffer> pipeMetaBinaryList = new ArrayList<>();
     final List<Long> pipeRemainingEventCountList = new ArrayList<>();
     final List<Double> pipeRemainingTimeList = new ArrayList<>();
     try {
-      final Optional<Logger> logger =
-          PipeConfigNodeResourceManager.log()
-              .schedule(
-                  PipeConfigNodeTaskAgent.class,
-                  
PipeConfig.getInstance().getPipeMetaReportMaxLogNumPerRound(),
-                  
PipeConfig.getInstance().getPipeMetaReportMaxLogIntervalRounds(),
-                  pipeMetaKeeper.getPipeMetaCount());
-
       for (final PipeMeta pipeMeta : pipeMetaKeeper.getPipeMetaList()) {
         pipeMetaBinaryList.add(pipeMeta.serialize());
 
@@ -242,7 +240,7 @@ public class PipeConfigNodeTaskAgent extends PipeTaskAgent {
                     remainingEventCount,
                     estimatedRemainingTime));
       }
-      LOGGER.info("Reported {} pipe metas.", pipeMetaBinaryList.size());
+      logger.ifPresent(l -> l.info("Reported {} pipe metas.", 
pipeMetaBinaryList.size()));
     } catch (final IOException e) {
       throw new TException(e);
     }
diff --git 
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/agent/task/PipeDataNodeTaskAgent.java
 
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/agent/task/PipeDataNodeTaskAgent.java
index 711b305285a..5d21e2970de 100644
--- 
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/agent/task/PipeDataNodeTaskAgent.java
+++ 
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/agent/task/PipeDataNodeTaskAgent.java
@@ -400,6 +400,13 @@ public class PipeDataNodeTaskAgent extends PipeTaskAgent {
     if (PipeDataNodeAgent.runtime().isShutdown()) {
       return;
     }
+    final Optional<Logger> logger =
+        PipeDataNodeResourceManager.log()
+            .schedule(
+                PipeDataNodeTaskAgent.class,
+                PipeConfig.getInstance().getPipeMetaReportMaxLogNumPerRound(),
+                
PipeConfig.getInstance().getPipeMetaReportMaxLogIntervalRounds(),
+                pipeMetaKeeper.getPipeMetaCount());
 
     final Set<Integer> dataRegionIds =
         StorageEngine.getInstance().getAllDataRegionIds().stream()
@@ -411,13 +418,6 @@ public class PipeDataNodeTaskAgent extends PipeTaskAgent {
     final List<Long> pipeRemainingEventCountList = new ArrayList<>();
     final List<Double> pipeRemainingTimeList = new ArrayList<>();
     try {
-      final Optional<Logger> logger =
-          PipeDataNodeResourceManager.log()
-              .schedule(
-                  PipeDataNodeTaskAgent.class,
-                  
PipeConfig.getInstance().getPipeMetaReportMaxLogNumPerRound(),
-                  
PipeConfig.getInstance().getPipeMetaReportMaxLogIntervalRounds(),
-                  pipeMetaKeeper.getPipeMetaCount());
       for (final PipeMeta pipeMeta : pipeMetaKeeper.getPipeMetaList()) {
         pipeMetaBinaryList.add(pipeMeta.serialize());
 
@@ -464,7 +464,7 @@ public class PipeDataNodeTaskAgent extends PipeTaskAgent {
                     remainingEventAndTime.getLeft(),
                     remainingEventAndTime.getRight()));
       }
-      LOGGER.info("Reported {} pipe metas.", pipeMetaBinaryList.size());
+      logger.ifPresent(l -> l.info("Reported {} pipe metas.", 
pipeMetaBinaryList.size()));
     } catch (final IOException | IllegalPathException e) {
       throw new TException(e);
     }
@@ -479,10 +479,18 @@ public class PipeDataNodeTaskAgent extends PipeTaskAgent {
   protected void collectPipeMetaListInternal(
       final TPipeHeartbeatReq req, final TPipeHeartbeatResp resp) throws 
TException {
     // Do nothing if data node is removing or removed, or request does not 
need pipe meta list
+    // If the heartbeatId == Long.MIN_VALUE then it's shutdown report and 
shall not be skipped
     if (PipeDataNodeAgent.runtime().isShutdown() && req.heartbeatId != 
Long.MIN_VALUE) {
       return;
     }
-    LOGGER.info("Received pipe heartbeat request {} from config node.", 
req.heartbeatId);
+    final Optional<Logger> logger =
+        PipeDataNodeResourceManager.log()
+            .schedule(
+                PipeDataNodeTaskAgent.class,
+                PipeConfig.getInstance().getPipeMetaReportMaxLogNumPerRound(),
+                
PipeConfig.getInstance().getPipeMetaReportMaxLogIntervalRounds(),
+                pipeMetaKeeper.getPipeMetaCount());
+    LOGGER.debug("Received pipe heartbeat request {} from config node.", 
req.heartbeatId);
 
     final Set<Integer> dataRegionIds =
         StorageEngine.getInstance().getAllDataRegionIds().stream()
@@ -494,13 +502,6 @@ public class PipeDataNodeTaskAgent extends PipeTaskAgent {
     final List<Long> pipeRemainingEventCountList = new ArrayList<>();
     final List<Double> pipeRemainingTimeList = new ArrayList<>();
     try {
-      final Optional<Logger> logger =
-          PipeDataNodeResourceManager.log()
-              .schedule(
-                  PipeDataNodeTaskAgent.class,
-                  
PipeConfig.getInstance().getPipeMetaReportMaxLogNumPerRound(),
-                  
PipeConfig.getInstance().getPipeMetaReportMaxLogIntervalRounds(),
-                  pipeMetaKeeper.getPipeMetaCount());
       for (final PipeMeta pipeMeta : pipeMetaKeeper.getPipeMetaList()) {
         pipeMetaBinaryList.add(pipeMeta.serialize());
 
@@ -547,7 +548,7 @@ public class PipeDataNodeTaskAgent extends PipeTaskAgent {
                     remainingEventAndTime.getLeft(),
                     remainingEventAndTime.getRight()));
       }
-      LOGGER.info("Reported {} pipe metas.", pipeMetaBinaryList.size());
+      logger.ifPresent(l -> l.info("Reported {} pipe metas.", 
pipeMetaBinaryList.size()));
     } catch (final IOException | IllegalPathException e) {
       throw new TException(e);
     }
@@ -839,6 +840,8 @@ public class PipeDataNodeTaskAgent extends PipeTaskAgent {
               IoTDBDescriptor.getInstance().getConfig().getDataNodeId(), resp);
       if (TSStatusCode.SUCCESS_STATUS.getStatusCode() != result.getCode()) {
         LOGGER.warn("Failed to persist progress index to configNode, status: 
{}", result);
+      } else {
+        LOGGER.info("Successfully persisted all pipe's info to configNode.");
       }
     } catch (final Exception e) {
       LOGGER.warn(e.getMessage());
diff --git 
a/iotdb-core/node-commons/src/main/java/org/apache/iotdb/commons/conf/CommonConfig.java
 
b/iotdb-core/node-commons/src/main/java/org/apache/iotdb/commons/conf/CommonConfig.java
index 58765a9cddb..f7adde82e56 100644
--- 
a/iotdb-core/node-commons/src/main/java/org/apache/iotdb/commons/conf/CommonConfig.java
+++ 
b/iotdb-core/node-commons/src/main/java/org/apache/iotdb/commons/conf/CommonConfig.java
@@ -263,7 +263,7 @@ public class CommonConfig {
       (int) (RpcUtils.THRIFT_FRAME_MAX_SIZE * 0.8);
 
   private boolean isSeperatedPipeHeartbeatEnabled = true;
-  private int pipeHeartbeatIntervalSecondsForCollectingPipeMeta = 30;
+  private int pipeHeartbeatIntervalSecondsForCollectingPipeMeta = 3;
   private long pipeMetaSyncerInitialSyncDelayMinutes = 3;
   private long pipeMetaSyncerSyncIntervalMinutes = 3;
   private long pipeMetaSyncerAutoRestartPipeCheckIntervalRound = 1;
@@ -294,8 +294,8 @@ public class CommonConfig {
   private int pipeMaxAllowedRemainingInsertEventCountPerPipe = 10000;
   private int pipeMaxAllowedTotalRemainingInsertEventCount = 50000;
 
-  private int pipeMetaReportMaxLogNumPerRound = 10;
-  private int pipeMetaReportMaxLogIntervalRounds = 36;
+  private double pipeMetaReportMaxLogNumPerRound = 0.1;
+  private int pipeMetaReportMaxLogIntervalRounds = 360;
   private int pipeTsFilePinMaxLogNumPerRound = 10;
   private int pipeTsFilePinMaxLogIntervalRounds = 90;
   private int pipeWalPinMaxLogNumPerRound = 10;
@@ -1698,11 +1698,11 @@ public class CommonConfig {
     logger.info("pipeFlushAfterTerminateCount is set to {}", 
pipeFlushAfterTerminateCount);
   }
 
-  public int getPipeMetaReportMaxLogNumPerRound() {
+  public double getPipeMetaReportMaxLogNumPerRound() {
     return pipeMetaReportMaxLogNumPerRound;
   }
 
-  public void setPipeMetaReportMaxLogNumPerRound(int 
pipeMetaReportMaxLogNumPerRound) {
+  public void setPipeMetaReportMaxLogNumPerRound(double 
pipeMetaReportMaxLogNumPerRound) {
     if (this.pipeMetaReportMaxLogNumPerRound == 
pipeMetaReportMaxLogNumPerRound) {
       return;
     }
diff --git 
a/iotdb-core/node-commons/src/main/java/org/apache/iotdb/commons/pipe/config/PipeConfig.java
 
b/iotdb-core/node-commons/src/main/java/org/apache/iotdb/commons/pipe/config/PipeConfig.java
index cb59931ba08..fdbda7fefd4 100644
--- 
a/iotdb-core/node-commons/src/main/java/org/apache/iotdb/commons/pipe/config/PipeConfig.java
+++ 
b/iotdb-core/node-commons/src/main/java/org/apache/iotdb/commons/pipe/config/PipeConfig.java
@@ -393,7 +393,7 @@ public class PipeConfig {
 
   /////////////////////////////// Logger ///////////////////////////////
 
-  public int getPipeMetaReportMaxLogNumPerRound() {
+  public double getPipeMetaReportMaxLogNumPerRound() {
     return COMMON_CONFIG.getPipeMetaReportMaxLogNumPerRound();
   }
 
diff --git 
a/iotdb-core/node-commons/src/main/java/org/apache/iotdb/commons/pipe/config/PipeDescriptor.java
 
b/iotdb-core/node-commons/src/main/java/org/apache/iotdb/commons/pipe/config/PipeDescriptor.java
index 6f04a652b9d..852beda7ed9 100644
--- 
a/iotdb-core/node-commons/src/main/java/org/apache/iotdb/commons/pipe/config/PipeDescriptor.java
+++ 
b/iotdb-core/node-commons/src/main/java/org/apache/iotdb/commons/pipe/config/PipeDescriptor.java
@@ -128,7 +128,7 @@ public class PipeDescriptor {
                 Integer.toString(config.getPipeAirGapReceiverPort()))));
 
     config.setPipeMetaReportMaxLogNumPerRound(
-        Integer.parseInt(
+        Double.parseDouble(
             properties.getProperty(
                 "pipe_meta_report_max_log_num_per_round",
                 String.valueOf(config.getPipeMetaReportMaxLogNumPerRound()))));
diff --git 
a/iotdb-core/node-commons/src/main/java/org/apache/iotdb/commons/pipe/resource/log/PipeLogManager.java
 
b/iotdb-core/node-commons/src/main/java/org/apache/iotdb/commons/pipe/resource/log/PipeLogManager.java
index 49699fdf878..69d8b5294db 100644
--- 
a/iotdb-core/node-commons/src/main/java/org/apache/iotdb/commons/pipe/resource/log/PipeLogManager.java
+++ 
b/iotdb-core/node-commons/src/main/java/org/apache/iotdb/commons/pipe/resource/log/PipeLogManager.java
@@ -34,7 +34,7 @@ public class PipeLogManager {
 
   public Optional<Logger> schedule(
       final Class<?> logClass,
-      final int maxAverageScale,
+      final double maxAverageScale,
       final int maxLogInterval,
       final int scale) {
     return logClass2LogStatusMap
diff --git 
a/iotdb-core/node-commons/src/main/java/org/apache/iotdb/commons/pipe/resource/log/PipeLogStatus.java
 
b/iotdb-core/node-commons/src/main/java/org/apache/iotdb/commons/pipe/resource/log/PipeLogStatus.java
index 9348708281f..5427de9a831 100644
--- 
a/iotdb-core/node-commons/src/main/java/org/apache/iotdb/commons/pipe/resource/log/PipeLogStatus.java
+++ 
b/iotdb-core/node-commons/src/main/java/org/apache/iotdb/commons/pipe/resource/log/PipeLogStatus.java
@@ -29,11 +29,11 @@ class PipeLogStatus {
 
   private final Logger logger;
 
-  private final int maxAverageScale;
+  private final double maxAverageScale;
   private final int maxLogInterval;
   private final AtomicLong currentRounds = new AtomicLong(0);
 
-  PipeLogStatus(final Class<?> logClass, final int maxAverageScale, final int 
maxLogInterval) {
+  PipeLogStatus(final Class<?> logClass, final double maxAverageScale, final 
int maxLogInterval) {
     logger = LoggerFactory.getLogger(logClass);
 
     this.maxAverageScale = maxAverageScale;

Reply via email to