This is an automated email from the ASF dual-hosted git repository.

rong pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/iotdb.git


The following commit(s) were added to refs/heads/master by this push:
     new 8b18a87b696 Pipe: Unbinded the default meter for remaining insertion 
event count for hybrid degrading & Changed some default values of related 
properties (#15615)
8b18a87b696 is described below

commit 8b18a87b696aa54ff848f6a2400b67628ff67b5a
Author: Caideyipi <[email protected]>
AuthorDate: Fri May 30 10:25:09 2025 +0800

    Pipe: Unbinded the default meter for remaining insertion event count for 
hybrid degrading & Changed some default values of related properties (#15615)
---
 .../PipeConfigNodeRemainingTimeOperator.java       |  4 ++--
 .../PipeDataNodeRemainingEventAndTimeOperator.java |  6 ++---
 .../apache/iotdb/commons/conf/CommonConfig.java    | 27 +++++++++++++++++-----
 ...meRateAverageTime.java => PipeRateAverage.java} |  8 ++-----
 .../iotdb/commons/pipe/config/PipeConfig.java      | 10 ++++++--
 .../iotdb/commons/pipe/config/PipeDescriptor.java  | 11 +++++++--
 6 files changed, 45 insertions(+), 21 deletions(-)

diff --git 
a/iotdb-core/confignode/src/main/java/org/apache/iotdb/confignode/manager/pipe/metric/overview/PipeConfigNodeRemainingTimeOperator.java
 
b/iotdb-core/confignode/src/main/java/org/apache/iotdb/confignode/manager/pipe/metric/overview/PipeConfigNodeRemainingTimeOperator.java
index 2c28fd02e50..0b2f79ecfb1 100644
--- 
a/iotdb-core/confignode/src/main/java/org/apache/iotdb/confignode/manager/pipe/metric/overview/PipeConfigNodeRemainingTimeOperator.java
+++ 
b/iotdb-core/confignode/src/main/java/org/apache/iotdb/confignode/manager/pipe/metric/overview/PipeConfigNodeRemainingTimeOperator.java
@@ -19,7 +19,7 @@
 
 package org.apache.iotdb.confignode.manager.pipe.metric.overview;
 
-import org.apache.iotdb.commons.enums.PipeRemainingTimeRateAverageTime;
+import org.apache.iotdb.commons.enums.PipeRateAverage;
 import org.apache.iotdb.commons.pipe.config.PipeConfig;
 import org.apache.iotdb.commons.pipe.metric.PipeRemainingOperator;
 import 
org.apache.iotdb.confignode.manager.pipe.agent.task.PipeConfigNodeSubtask;
@@ -56,7 +56,7 @@ class PipeConfigNodeRemainingTimeOperator extends 
PipeRemainingOperator {
    * @return The estimated remaining time
    */
   double getRemainingTime() {
-    final PipeRemainingTimeRateAverageTime 
pipeRemainingTimeCommitRateAverageTime =
+    final PipeRateAverage pipeRemainingTimeCommitRateAverageTime =
         PipeConfig.getInstance().getPipeRemainingTimeCommitRateAverageTime();
 
     // Do not calculate heartbeat event
diff --git 
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/metric/overview/PipeDataNodeRemainingEventAndTimeOperator.java
 
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/metric/overview/PipeDataNodeRemainingEventAndTimeOperator.java
index 218448da0d6..7aee55df429 100644
--- 
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/metric/overview/PipeDataNodeRemainingEventAndTimeOperator.java
+++ 
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/metric/overview/PipeDataNodeRemainingEventAndTimeOperator.java
@@ -19,7 +19,7 @@
 
 package org.apache.iotdb.db.pipe.metric.overview;
 
-import org.apache.iotdb.commons.enums.PipeRemainingTimeRateAverageTime;
+import org.apache.iotdb.commons.enums.PipeRateAverage;
 import org.apache.iotdb.commons.pipe.config.PipeConfig;
 import org.apache.iotdb.commons.pipe.metric.PipeRemainingOperator;
 import 
org.apache.iotdb.db.pipe.extractor.schemaregion.IoTDBSchemaRegionExtractor;
@@ -106,7 +106,7 @@ class PipeDataNodeRemainingEventAndTimeOperator extends 
PipeRemainingOperator {
       lastInsertNodeEventCountSmoothingTime = System.currentTimeMillis();
     }
     return PipeConfig.getInstance()
-        .getPipeRemainingTimeCommitRateAverageTime()
+        .getPipeRemainingInsertNodeCountAverage()
         .getMeterRate(insertNodeEventCountMeter);
   }
 
@@ -135,7 +135,7 @@ class PipeDataNodeRemainingEventAndTimeOperator extends 
PipeRemainingOperator {
    * @return The estimated remaining time
    */
   double getRemainingTime() {
-    final PipeRemainingTimeRateAverageTime 
pipeRemainingTimeCommitRateAverageTime =
+    final PipeRateAverage pipeRemainingTimeCommitRateAverageTime =
         PipeConfig.getInstance().getPipeRemainingTimeCommitRateAverageTime();
 
     final double invocationValue = collectInvocationHistogram.getMean();
diff --git 
a/iotdb-core/node-commons/src/main/java/org/apache/iotdb/commons/conf/CommonConfig.java
 
b/iotdb-core/node-commons/src/main/java/org/apache/iotdb/commons/conf/CommonConfig.java
index 4a1c9ea104c..3004ace2e30 100644
--- 
a/iotdb-core/node-commons/src/main/java/org/apache/iotdb/commons/conf/CommonConfig.java
+++ 
b/iotdb-core/node-commons/src/main/java/org/apache/iotdb/commons/conf/CommonConfig.java
@@ -23,7 +23,7 @@ import org.apache.iotdb.common.rpc.thrift.TEndPoint;
 import 
org.apache.iotdb.commons.client.property.ClientPoolProperty.DefaultProperty;
 import org.apache.iotdb.commons.cluster.NodeStatus;
 import org.apache.iotdb.commons.enums.HandleSystemErrorStrategy;
-import org.apache.iotdb.commons.enums.PipeRemainingTimeRateAverageTime;
+import org.apache.iotdb.commons.enums.PipeRateAverage;
 import org.apache.iotdb.commons.utils.FileUtils;
 import org.apache.iotdb.commons.utils.KillPoint.KillPoint;
 import org.apache.iotdb.rpc.RpcUtils;
@@ -280,7 +280,7 @@ public class CommonConfig {
   private long pipeStorageEngineFlushTimeIntervalMs = Long.MAX_VALUE;
   private int pipeMaxAllowedRemainingInsertEventCountPerPipe = 10000;
   private int pipeMaxAllowedTotalRemainingInsertEventCount = 50000;
-  private int pipeRemainingEventCountSmoothingIntervalSeconds = 15;
+  private int pipeRemainingEventCountSmoothingIntervalSeconds = 10;
 
   private int pipeMetaReportMaxLogNumPerRound = 10;
   private int pipeMetaReportMaxLogIntervalRounds = 36;
@@ -301,8 +301,8 @@ public class CommonConfig {
   private long pipeListeningQueueTransferSnapshotThreshold = 1000;
   private int pipeSnapshotExecutionMaxBatchSize = 1000;
   private long pipeRemainingTimeCommitRateAutoSwitchSeconds = 30;
-  private PipeRemainingTimeRateAverageTime 
pipeRemainingTimeCommitRateAverageTime =
-      PipeRemainingTimeRateAverageTime.MEAN;
+  private PipeRateAverage pipeRemainingTimeCommitRateAverageTime = 
PipeRateAverage.FIVE_MINUTES;
+  private PipeRateAverage pipeRemainingInsertNodeCountAverage = 
PipeRateAverage.ONE_MINUTE;
   private double pipeTsFileScanParsingThreshold = 0.05;
   private double pipeDynamicMemoryHistoryWeight = 0.5;
   private double pipeDynamicMemoryAdjustmentThreshold = 0.05;
@@ -1831,12 +1831,12 @@ public class CommonConfig {
         pipeRemainingTimeCommitRateAutoSwitchSeconds);
   }
 
-  public PipeRemainingTimeRateAverageTime 
getPipeRemainingTimeCommitRateAverageTime() {
+  public PipeRateAverage getPipeRemainingTimeCommitRateAverageTime() {
     return pipeRemainingTimeCommitRateAverageTime;
   }
 
   public void setPipeRemainingTimeCommitRateAverageTime(
-      PipeRemainingTimeRateAverageTime pipeRemainingTimeCommitRateAverageTime) 
{
+      PipeRateAverage pipeRemainingTimeCommitRateAverageTime) {
     if (Objects.equals(
         this.pipeRemainingTimeCommitRateAverageTime, 
pipeRemainingTimeCommitRateAverageTime)) {
       return;
@@ -1847,6 +1847,21 @@ public class CommonConfig {
         pipeRemainingTimeCommitRateAverageTime);
   }
 
+  public PipeRateAverage getPipeRemainingInsertNodeCountAverage() {
+    return pipeRemainingInsertNodeCountAverage;
+  }
+
+  public void setPipeRemainingInsertNodeCountAverage(
+      PipeRateAverage pipeRemainingInsertNodeCountAverage) {
+    if (Objects.equals(
+        this.pipeRemainingInsertNodeCountAverage, 
pipeRemainingInsertNodeCountAverage)) {
+      return;
+    }
+    this.pipeRemainingInsertNodeCountAverage = 
pipeRemainingInsertNodeCountAverage;
+    logger.info(
+        "pipeRemainingInsertEventCountAverage is set to {}", 
pipeRemainingInsertNodeCountAverage);
+  }
+
   public double getPipeTsFileScanParsingThreshold() {
     return pipeTsFileScanParsingThreshold;
   }
diff --git 
a/iotdb-core/node-commons/src/main/java/org/apache/iotdb/commons/enums/PipeRemainingTimeRateAverageTime.java
 
b/iotdb-core/node-commons/src/main/java/org/apache/iotdb/commons/enums/PipeRateAverage.java
similarity index 81%
rename from 
iotdb-core/node-commons/src/main/java/org/apache/iotdb/commons/enums/PipeRemainingTimeRateAverageTime.java
rename to 
iotdb-core/node-commons/src/main/java/org/apache/iotdb/commons/enums/PipeRateAverage.java
index 25f799d09f6..2a08d2f9608 100644
--- 
a/iotdb-core/node-commons/src/main/java/org/apache/iotdb/commons/enums/PipeRemainingTimeRateAverageTime.java
+++ 
b/iotdb-core/node-commons/src/main/java/org/apache/iotdb/commons/enums/PipeRateAverage.java
@@ -19,11 +19,9 @@
 
 package org.apache.iotdb.commons.enums;
 
-import org.apache.iotdb.commons.pipe.config.PipeConfig;
-
 import com.codahale.metrics.Meter;
 
-public enum PipeRemainingTimeRateAverageTime {
+public enum PipeRateAverage {
   ONE_MINUTE,
   FIVE_MINUTES,
   FIFTEEN_MINUTES,
@@ -41,9 +39,7 @@ public enum PipeRemainingTimeRateAverageTime {
         return meter.getMeanRate();
       default:
         throw new UnsupportedOperationException(
-            String.format(
-                "The type %s is not supported in average time of pipe 
remaining time.",
-                
PipeConfig.getInstance().getPipeRemainingTimeCommitRateAverageTime()));
+            String.format("The type %s is not supported in pipe rate 
average.", this));
     }
   }
 }
diff --git 
a/iotdb-core/node-commons/src/main/java/org/apache/iotdb/commons/pipe/config/PipeConfig.java
 
b/iotdb-core/node-commons/src/main/java/org/apache/iotdb/commons/pipe/config/PipeConfig.java
index b10d5580f73..b099e713dcc 100644
--- 
a/iotdb-core/node-commons/src/main/java/org/apache/iotdb/commons/pipe/config/PipeConfig.java
+++ 
b/iotdb-core/node-commons/src/main/java/org/apache/iotdb/commons/pipe/config/PipeConfig.java
@@ -21,7 +21,7 @@ package org.apache.iotdb.commons.pipe.config;
 
 import org.apache.iotdb.commons.conf.CommonConfig;
 import org.apache.iotdb.commons.conf.CommonDescriptor;
-import org.apache.iotdb.commons.enums.PipeRemainingTimeRateAverageTime;
+import org.apache.iotdb.commons.enums.PipeRateAverage;
 
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
@@ -223,10 +223,14 @@ public class PipeConfig {
     return COMMON_CONFIG.getPipeRemainingTimeCommitRateAutoSwitchSeconds();
   }
 
-  public PipeRemainingTimeRateAverageTime 
getPipeRemainingTimeCommitRateAverageTime() {
+  public PipeRateAverage getPipeRemainingTimeCommitRateAverageTime() {
     return COMMON_CONFIG.getPipeRemainingTimeCommitRateAverageTime();
   }
 
+  public PipeRateAverage getPipeRemainingInsertNodeCountAverage() {
+    return COMMON_CONFIG.getPipeRemainingInsertNodeCountAverage();
+  }
+
   public double getPipeTsFileScanParsingThreshold() {
     return COMMON_CONFIG.getPipeTsFileScanParsingThreshold();
   }
@@ -513,6 +517,8 @@ public class PipeConfig {
         getPipeRemainingTimeCommitAutoSwitchSeconds());
     LOGGER.info(
         "PipeRemainingTimeCommitRateAverageTime: {}", 
getPipeRemainingTimeCommitRateAverageTime());
+    LOGGER.info(
+        "PipePipeRemainingInsertEventCountAverage: {}", 
getPipeRemainingInsertNodeCountAverage());
     LOGGER.info("PipeTsFileScanParsingThreshold(): {}", 
getPipeTsFileScanParsingThreshold());
 
     LOGGER.info("PipeDynamicMemoryHistoryWeight: {}", 
getPipeDynamicMemoryHistoryWeight());
diff --git 
a/iotdb-core/node-commons/src/main/java/org/apache/iotdb/commons/pipe/config/PipeDescriptor.java
 
b/iotdb-core/node-commons/src/main/java/org/apache/iotdb/commons/pipe/config/PipeDescriptor.java
index ea2829172ce..cc9850e90d3 100644
--- 
a/iotdb-core/node-commons/src/main/java/org/apache/iotdb/commons/pipe/config/PipeDescriptor.java
+++ 
b/iotdb-core/node-commons/src/main/java/org/apache/iotdb/commons/pipe/config/PipeDescriptor.java
@@ -22,7 +22,7 @@ package org.apache.iotdb.commons.pipe.config;
 import org.apache.iotdb.commons.conf.CommonConfig;
 import org.apache.iotdb.commons.conf.ConfigurationFileUtils;
 import org.apache.iotdb.commons.conf.TrimProperties;
-import org.apache.iotdb.commons.enums.PipeRemainingTimeRateAverageTime;
+import org.apache.iotdb.commons.enums.PipeRateAverage;
 
 import java.io.IOException;
 import java.util.Optional;
@@ -184,12 +184,19 @@ public class PipeDescriptor {
                 "pipe_snapshot_execution_max_batch_size",
                 
String.valueOf(config.getPipeSnapshotExecutionMaxBatchSize()))));
     config.setPipeRemainingTimeCommitRateAverageTime(
-        PipeRemainingTimeRateAverageTime.valueOf(
+        PipeRateAverage.valueOf(
             properties
                 .getProperty(
                     "pipe_remaining_time_commit_rate_average_time",
                     
String.valueOf(config.getPipeRemainingTimeCommitRateAverageTime()))
                 .trim()));
+    config.setPipeRemainingInsertNodeCountAverage(
+        PipeRateAverage.valueOf(
+            properties
+                .getProperty(
+                    "pipe_remaining_insert_node_count_average",
+                    
String.valueOf(config.getPipeRemainingInsertNodeCountAverage()))
+                .trim()));
   }
 
   public static void loadPipeInternalConfig(CommonConfig config, 
TrimProperties properties)

Reply via email to