This is an automated email from the ASF dual-hosted git repository.
rong pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/iotdb.git
The following commit(s) were added to refs/heads/master by this push:
new 8b18a87b696 Pipe: Unbinded the default meter for remaining insertion
event count for hybrid degrading & Changed some default values of related
properties (#15615)
8b18a87b696 is described below
commit 8b18a87b696aa54ff848f6a2400b67628ff67b5a
Author: Caideyipi <[email protected]>
AuthorDate: Fri May 30 10:25:09 2025 +0800
Pipe: Unbinded the default meter for remaining insertion event count for
hybrid degrading & Changed some default values of related properties (#15615)
---
.../PipeConfigNodeRemainingTimeOperator.java | 4 ++--
.../PipeDataNodeRemainingEventAndTimeOperator.java | 6 ++---
.../apache/iotdb/commons/conf/CommonConfig.java | 27 +++++++++++++++++-----
...meRateAverageTime.java => PipeRateAverage.java} | 8 ++-----
.../iotdb/commons/pipe/config/PipeConfig.java | 10 ++++++--
.../iotdb/commons/pipe/config/PipeDescriptor.java | 11 +++++++--
6 files changed, 45 insertions(+), 21 deletions(-)
diff --git
a/iotdb-core/confignode/src/main/java/org/apache/iotdb/confignode/manager/pipe/metric/overview/PipeConfigNodeRemainingTimeOperator.java
b/iotdb-core/confignode/src/main/java/org/apache/iotdb/confignode/manager/pipe/metric/overview/PipeConfigNodeRemainingTimeOperator.java
index 2c28fd02e50..0b2f79ecfb1 100644
---
a/iotdb-core/confignode/src/main/java/org/apache/iotdb/confignode/manager/pipe/metric/overview/PipeConfigNodeRemainingTimeOperator.java
+++
b/iotdb-core/confignode/src/main/java/org/apache/iotdb/confignode/manager/pipe/metric/overview/PipeConfigNodeRemainingTimeOperator.java
@@ -19,7 +19,7 @@
package org.apache.iotdb.confignode.manager.pipe.metric.overview;
-import org.apache.iotdb.commons.enums.PipeRemainingTimeRateAverageTime;
+import org.apache.iotdb.commons.enums.PipeRateAverage;
import org.apache.iotdb.commons.pipe.config.PipeConfig;
import org.apache.iotdb.commons.pipe.metric.PipeRemainingOperator;
import
org.apache.iotdb.confignode.manager.pipe.agent.task.PipeConfigNodeSubtask;
@@ -56,7 +56,7 @@ class PipeConfigNodeRemainingTimeOperator extends
PipeRemainingOperator {
* @return The estimated remaining time
*/
double getRemainingTime() {
- final PipeRemainingTimeRateAverageTime
pipeRemainingTimeCommitRateAverageTime =
+ final PipeRateAverage pipeRemainingTimeCommitRateAverageTime =
PipeConfig.getInstance().getPipeRemainingTimeCommitRateAverageTime();
// Do not calculate heartbeat event
diff --git
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/metric/overview/PipeDataNodeRemainingEventAndTimeOperator.java
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/metric/overview/PipeDataNodeRemainingEventAndTimeOperator.java
index 218448da0d6..7aee55df429 100644
---
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/metric/overview/PipeDataNodeRemainingEventAndTimeOperator.java
+++
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/metric/overview/PipeDataNodeRemainingEventAndTimeOperator.java
@@ -19,7 +19,7 @@
package org.apache.iotdb.db.pipe.metric.overview;
-import org.apache.iotdb.commons.enums.PipeRemainingTimeRateAverageTime;
+import org.apache.iotdb.commons.enums.PipeRateAverage;
import org.apache.iotdb.commons.pipe.config.PipeConfig;
import org.apache.iotdb.commons.pipe.metric.PipeRemainingOperator;
import
org.apache.iotdb.db.pipe.extractor.schemaregion.IoTDBSchemaRegionExtractor;
@@ -106,7 +106,7 @@ class PipeDataNodeRemainingEventAndTimeOperator extends
PipeRemainingOperator {
lastInsertNodeEventCountSmoothingTime = System.currentTimeMillis();
}
return PipeConfig.getInstance()
- .getPipeRemainingTimeCommitRateAverageTime()
+ .getPipeRemainingInsertNodeCountAverage()
.getMeterRate(insertNodeEventCountMeter);
}
@@ -135,7 +135,7 @@ class PipeDataNodeRemainingEventAndTimeOperator extends
PipeRemainingOperator {
* @return The estimated remaining time
*/
double getRemainingTime() {
- final PipeRemainingTimeRateAverageTime
pipeRemainingTimeCommitRateAverageTime =
+ final PipeRateAverage pipeRemainingTimeCommitRateAverageTime =
PipeConfig.getInstance().getPipeRemainingTimeCommitRateAverageTime();
final double invocationValue = collectInvocationHistogram.getMean();
diff --git
a/iotdb-core/node-commons/src/main/java/org/apache/iotdb/commons/conf/CommonConfig.java
b/iotdb-core/node-commons/src/main/java/org/apache/iotdb/commons/conf/CommonConfig.java
index 4a1c9ea104c..3004ace2e30 100644
---
a/iotdb-core/node-commons/src/main/java/org/apache/iotdb/commons/conf/CommonConfig.java
+++
b/iotdb-core/node-commons/src/main/java/org/apache/iotdb/commons/conf/CommonConfig.java
@@ -23,7 +23,7 @@ import org.apache.iotdb.common.rpc.thrift.TEndPoint;
import
org.apache.iotdb.commons.client.property.ClientPoolProperty.DefaultProperty;
import org.apache.iotdb.commons.cluster.NodeStatus;
import org.apache.iotdb.commons.enums.HandleSystemErrorStrategy;
-import org.apache.iotdb.commons.enums.PipeRemainingTimeRateAverageTime;
+import org.apache.iotdb.commons.enums.PipeRateAverage;
import org.apache.iotdb.commons.utils.FileUtils;
import org.apache.iotdb.commons.utils.KillPoint.KillPoint;
import org.apache.iotdb.rpc.RpcUtils;
@@ -280,7 +280,7 @@ public class CommonConfig {
private long pipeStorageEngineFlushTimeIntervalMs = Long.MAX_VALUE;
private int pipeMaxAllowedRemainingInsertEventCountPerPipe = 10000;
private int pipeMaxAllowedTotalRemainingInsertEventCount = 50000;
- private int pipeRemainingEventCountSmoothingIntervalSeconds = 15;
+ private int pipeRemainingEventCountSmoothingIntervalSeconds = 10;
private int pipeMetaReportMaxLogNumPerRound = 10;
private int pipeMetaReportMaxLogIntervalRounds = 36;
@@ -301,8 +301,8 @@ public class CommonConfig {
private long pipeListeningQueueTransferSnapshotThreshold = 1000;
private int pipeSnapshotExecutionMaxBatchSize = 1000;
private long pipeRemainingTimeCommitRateAutoSwitchSeconds = 30;
- private PipeRemainingTimeRateAverageTime
pipeRemainingTimeCommitRateAverageTime =
- PipeRemainingTimeRateAverageTime.MEAN;
+ private PipeRateAverage pipeRemainingTimeCommitRateAverageTime =
PipeRateAverage.FIVE_MINUTES;
+ private PipeRateAverage pipeRemainingInsertNodeCountAverage =
PipeRateAverage.ONE_MINUTE;
private double pipeTsFileScanParsingThreshold = 0.05;
private double pipeDynamicMemoryHistoryWeight = 0.5;
private double pipeDynamicMemoryAdjustmentThreshold = 0.05;
@@ -1831,12 +1831,12 @@ public class CommonConfig {
pipeRemainingTimeCommitRateAutoSwitchSeconds);
}
- public PipeRemainingTimeRateAverageTime
getPipeRemainingTimeCommitRateAverageTime() {
+ public PipeRateAverage getPipeRemainingTimeCommitRateAverageTime() {
return pipeRemainingTimeCommitRateAverageTime;
}
public void setPipeRemainingTimeCommitRateAverageTime(
- PipeRemainingTimeRateAverageTime pipeRemainingTimeCommitRateAverageTime)
{
+ PipeRateAverage pipeRemainingTimeCommitRateAverageTime) {
if (Objects.equals(
this.pipeRemainingTimeCommitRateAverageTime,
pipeRemainingTimeCommitRateAverageTime)) {
return;
@@ -1847,6 +1847,21 @@ public class CommonConfig {
pipeRemainingTimeCommitRateAverageTime);
}
+ public PipeRateAverage getPipeRemainingInsertNodeCountAverage() {
+ return pipeRemainingInsertNodeCountAverage;
+ }
+
+ public void setPipeRemainingInsertNodeCountAverage(
+ PipeRateAverage pipeRemainingInsertNodeCountAverage) {
+ if (Objects.equals(
+ this.pipeRemainingInsertNodeCountAverage,
pipeRemainingInsertNodeCountAverage)) {
+ return;
+ }
+ this.pipeRemainingInsertNodeCountAverage =
pipeRemainingInsertNodeCountAverage;
+ logger.info(
+ "pipeRemainingInsertEventCountAverage is set to {}",
pipeRemainingInsertNodeCountAverage);
+ }
+
public double getPipeTsFileScanParsingThreshold() {
return pipeTsFileScanParsingThreshold;
}
diff --git
a/iotdb-core/node-commons/src/main/java/org/apache/iotdb/commons/enums/PipeRemainingTimeRateAverageTime.java
b/iotdb-core/node-commons/src/main/java/org/apache/iotdb/commons/enums/PipeRateAverage.java
similarity index 81%
rename from
iotdb-core/node-commons/src/main/java/org/apache/iotdb/commons/enums/PipeRemainingTimeRateAverageTime.java
rename to
iotdb-core/node-commons/src/main/java/org/apache/iotdb/commons/enums/PipeRateAverage.java
index 25f799d09f6..2a08d2f9608 100644
---
a/iotdb-core/node-commons/src/main/java/org/apache/iotdb/commons/enums/PipeRemainingTimeRateAverageTime.java
+++
b/iotdb-core/node-commons/src/main/java/org/apache/iotdb/commons/enums/PipeRateAverage.java
@@ -19,11 +19,9 @@
package org.apache.iotdb.commons.enums;
-import org.apache.iotdb.commons.pipe.config.PipeConfig;
-
import com.codahale.metrics.Meter;
-public enum PipeRemainingTimeRateAverageTime {
+public enum PipeRateAverage {
ONE_MINUTE,
FIVE_MINUTES,
FIFTEEN_MINUTES,
@@ -41,9 +39,7 @@ public enum PipeRemainingTimeRateAverageTime {
return meter.getMeanRate();
default:
throw new UnsupportedOperationException(
- String.format(
- "The type %s is not supported in average time of pipe
remaining time.",
-
PipeConfig.getInstance().getPipeRemainingTimeCommitRateAverageTime()));
+ String.format("The type %s is not supported in pipe rate
average.", this));
}
}
}
diff --git
a/iotdb-core/node-commons/src/main/java/org/apache/iotdb/commons/pipe/config/PipeConfig.java
b/iotdb-core/node-commons/src/main/java/org/apache/iotdb/commons/pipe/config/PipeConfig.java
index b10d5580f73..b099e713dcc 100644
---
a/iotdb-core/node-commons/src/main/java/org/apache/iotdb/commons/pipe/config/PipeConfig.java
+++
b/iotdb-core/node-commons/src/main/java/org/apache/iotdb/commons/pipe/config/PipeConfig.java
@@ -21,7 +21,7 @@ package org.apache.iotdb.commons.pipe.config;
import org.apache.iotdb.commons.conf.CommonConfig;
import org.apache.iotdb.commons.conf.CommonDescriptor;
-import org.apache.iotdb.commons.enums.PipeRemainingTimeRateAverageTime;
+import org.apache.iotdb.commons.enums.PipeRateAverage;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
@@ -223,10 +223,14 @@ public class PipeConfig {
return COMMON_CONFIG.getPipeRemainingTimeCommitRateAutoSwitchSeconds();
}
- public PipeRemainingTimeRateAverageTime
getPipeRemainingTimeCommitRateAverageTime() {
+ public PipeRateAverage getPipeRemainingTimeCommitRateAverageTime() {
return COMMON_CONFIG.getPipeRemainingTimeCommitRateAverageTime();
}
+ public PipeRateAverage getPipeRemainingInsertNodeCountAverage() {
+ return COMMON_CONFIG.getPipeRemainingInsertNodeCountAverage();
+ }
+
public double getPipeTsFileScanParsingThreshold() {
return COMMON_CONFIG.getPipeTsFileScanParsingThreshold();
}
@@ -513,6 +517,8 @@ public class PipeConfig {
getPipeRemainingTimeCommitAutoSwitchSeconds());
LOGGER.info(
"PipeRemainingTimeCommitRateAverageTime: {}",
getPipeRemainingTimeCommitRateAverageTime());
+ LOGGER.info(
+ "PipePipeRemainingInsertEventCountAverage: {}",
getPipeRemainingInsertNodeCountAverage());
LOGGER.info("PipeTsFileScanParsingThreshold(): {}",
getPipeTsFileScanParsingThreshold());
LOGGER.info("PipeDynamicMemoryHistoryWeight: {}",
getPipeDynamicMemoryHistoryWeight());
diff --git
a/iotdb-core/node-commons/src/main/java/org/apache/iotdb/commons/pipe/config/PipeDescriptor.java
b/iotdb-core/node-commons/src/main/java/org/apache/iotdb/commons/pipe/config/PipeDescriptor.java
index ea2829172ce..cc9850e90d3 100644
---
a/iotdb-core/node-commons/src/main/java/org/apache/iotdb/commons/pipe/config/PipeDescriptor.java
+++
b/iotdb-core/node-commons/src/main/java/org/apache/iotdb/commons/pipe/config/PipeDescriptor.java
@@ -22,7 +22,7 @@ package org.apache.iotdb.commons.pipe.config;
import org.apache.iotdb.commons.conf.CommonConfig;
import org.apache.iotdb.commons.conf.ConfigurationFileUtils;
import org.apache.iotdb.commons.conf.TrimProperties;
-import org.apache.iotdb.commons.enums.PipeRemainingTimeRateAverageTime;
+import org.apache.iotdb.commons.enums.PipeRateAverage;
import java.io.IOException;
import java.util.Optional;
@@ -184,12 +184,19 @@ public class PipeDescriptor {
"pipe_snapshot_execution_max_batch_size",
String.valueOf(config.getPipeSnapshotExecutionMaxBatchSize()))));
config.setPipeRemainingTimeCommitRateAverageTime(
- PipeRemainingTimeRateAverageTime.valueOf(
+ PipeRateAverage.valueOf(
properties
.getProperty(
"pipe_remaining_time_commit_rate_average_time",
String.valueOf(config.getPipeRemainingTimeCommitRateAverageTime()))
.trim()));
+ config.setPipeRemainingInsertNodeCountAverage(
+ PipeRateAverage.valueOf(
+ properties
+ .getProperty(
+ "pipe_remaining_insert_node_count_average",
+
String.valueOf(config.getPipeRemainingInsertNodeCountAverage()))
+ .trim()));
}
public static void loadPipeInternalConfig(CommonConfig config,
TrimProperties properties)