This is an automated email from the ASF dual-hosted git repository.
rong pushed a commit to branch dev/1.3
in repository https://gitbox.apache.org/repos/asf/iotdb.git
The following commit(s) were added to refs/heads/dev/1.3 by this push:
new 3a61340a081 Pipe: Fixed the insert node EMA logic for degrading
(#15741) (#15758)
3a61340a081 is described below
commit 3a61340a081b42f630c31e5e9180f4e74b228e1d
Author: Caideyipi <[email protected]>
AuthorDate: Wed Jun 18 14:46:48 2025 +0800
Pipe: Fixed the insert node EMA logic for degrading (#15741) (#15758)
---
.../PipeDataNodeRemainingEventAndTimeOperator.java | 30 ++++++++++---------
.../apache/iotdb/commons/conf/CommonConfig.java | 34 +++++-----------------
.../iotdb/commons/pipe/config/PipeConfig.java | 13 ++-------
.../iotdb/commons/pipe/config/PipeDescriptor.java | 14 +++------
4 files changed, 31 insertions(+), 60 deletions(-)
diff --git
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/metric/overview/PipeDataNodeRemainingEventAndTimeOperator.java
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/metric/overview/PipeDataNodeRemainingEventAndTimeOperator.java
index 47dc0ff18b9..86368acf353 100644
---
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/metric/overview/PipeDataNodeRemainingEventAndTimeOperator.java
+++
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/metric/overview/PipeDataNodeRemainingEventAndTimeOperator.java
@@ -59,9 +59,7 @@ class PipeDataNodeRemainingEventAndTimeOperator extends
PipeRemainingOperator {
private Timer insertNodeTransferTimer =
DoNothingMetricManager.DO_NOTHING_TIMER;
private Timer tsfileTransferTimer = DoNothingMetricManager.DO_NOTHING_TIMER;
- private volatile long lastInsertNodeEventCountSmoothingTime = Long.MIN_VALUE;
- private final Meter insertNodeEventCountMeter =
- new Meter(new ExponentialMovingAverages(), Clock.defaultClock());
+ private final InsertNodeEMA insertNodeEventCountEMA = new InsertNodeEMA();
private double lastDataRegionCommitSmoothingValue = Long.MAX_VALUE;
private double lastSchemaRegionCommitSmoothingValue = Long.MAX_VALUE;
@@ -105,17 +103,8 @@ class PipeDataNodeRemainingEventAndTimeOperator extends
PipeRemainingOperator {
}
double getRemainingInsertEventSmoothingCount() {
- if (PipeConfig.getInstance().getPipeRemainingInsertNodeCountAverage() ==
PipeRateAverage.NONE) {
- return insertNodeEventCount.get();
- }
- if (System.currentTimeMillis() - lastInsertNodeEventCountSmoothingTime
- >=
PipeConfig.getInstance().getPipeRemainingInsertEventCountSmoothingIntervalSeconds())
{
- insertNodeEventCountMeter.mark(insertNodeEventCount.get());
- lastInsertNodeEventCountSmoothingTime = System.currentTimeMillis();
- }
- return PipeConfig.getInstance()
- .getPipeRemainingInsertNodeCountAverage()
- .getMeterRate(insertNodeEventCountMeter);
+ insertNodeEventCountEMA.update(insertNodeEventCount.get());
+ return insertNodeEventCountEMA.insertNodeEMAValue;
}
long getRemainingEvents() {
@@ -277,4 +266,17 @@ class PipeDataNodeRemainingEventAndTimeOperator extends
PipeRemainingOperator {
dataRegionCommitMeter.set(null);
schemaRegionCommitMeter.set(null);
}
+
+ private static class InsertNodeEMA {
+ private double insertNodeEMAValue;
+
+ public void update(final double newValue) {
+ final double alpha =
PipeConfig.getInstance().getPipeRemainingInsertNodeCountEMAAlpha();
+ if (insertNodeEMAValue == 0) {
+ insertNodeEMAValue = newValue;
+ } else {
+ insertNodeEMAValue = alpha * newValue + (1 - alpha) *
insertNodeEMAValue;
+ }
+ }
+ }
}
diff --git
a/iotdb-core/node-commons/src/main/java/org/apache/iotdb/commons/conf/CommonConfig.java
b/iotdb-core/node-commons/src/main/java/org/apache/iotdb/commons/conf/CommonConfig.java
index e82cc2fb1dd..59d348abe5c 100644
---
a/iotdb-core/node-commons/src/main/java/org/apache/iotdb/commons/conf/CommonConfig.java
+++
b/iotdb-core/node-commons/src/main/java/org/apache/iotdb/commons/conf/CommonConfig.java
@@ -289,7 +289,6 @@ public class CommonConfig {
private long pipeStorageEngineFlushTimeIntervalMs = Long.MAX_VALUE;
private int pipeMaxAllowedRemainingInsertEventCountPerPipe = 10000;
private int pipeMaxAllowedTotalRemainingInsertEventCount = 50000;
- private int pipeRemainingEventCountSmoothingIntervalSeconds = 10;
private int pipeMetaReportMaxLogNumPerRound = 10;
private int pipeMetaReportMaxLogIntervalRounds = 36;
@@ -311,7 +310,7 @@ public class CommonConfig {
private int pipeSnapshotExecutionMaxBatchSize = 1000;
private long pipeRemainingTimeCommitRateAutoSwitchSeconds = 30;
private PipeRateAverage pipeRemainingTimeCommitRateAverageTime =
PipeRateAverage.FIVE_MINUTES;
- private PipeRateAverage pipeRemainingInsertNodeCountAverage =
PipeRateAverage.ONE_MINUTE;
+ private double pipeRemainingInsertNodeCountEMAAlpha = 0.1;
private double pipeTsFileScanParsingThreshold = 0.05;
private double pipeDynamicMemoryHistoryWeight = 0.5;
private double pipeDynamicMemoryAdjustmentThreshold = 0.05;
@@ -1603,23 +1602,6 @@ public class CommonConfig {
pipeMaxAllowedTotalRemainingInsertEventCount);
}
- public int getPipeRemainingInsertEventCountSmoothingIntervalSeconds() {
- return pipeRemainingEventCountSmoothingIntervalSeconds;
- }
-
- public void setPipeRemainingInsertEventCountSmoothingIntervalSeconds(
- int pipeRemainingEventCountSmoothingIntervalSeconds) {
- if (this.pipeRemainingEventCountSmoothingIntervalSeconds
- == pipeRemainingEventCountSmoothingIntervalSeconds) {
- return;
- }
- this.pipeRemainingEventCountSmoothingIntervalSeconds =
- pipeRemainingEventCountSmoothingIntervalSeconds;
- logger.info(
- "pipeRemainingEventCountSmoothingIntervalSeconds is set to {}",
- pipeRemainingEventCountSmoothingIntervalSeconds);
- }
-
public void setPipeStuckRestartIntervalSeconds(long
pipeStuckRestartIntervalSeconds) {
if (this.pipeStuckRestartIntervalSeconds ==
pipeStuckRestartIntervalSeconds) {
return;
@@ -1935,19 +1917,19 @@ public class CommonConfig {
pipeRemainingTimeCommitRateAverageTime);
}
- public PipeRateAverage getPipeRemainingInsertNodeCountAverage() {
- return pipeRemainingInsertNodeCountAverage;
+ public double getPipeRemainingInsertNodeCountEMAAlpha() {
+ return pipeRemainingInsertNodeCountEMAAlpha;
}
- public void setPipeRemainingInsertNodeCountAverage(
- PipeRateAverage pipeRemainingInsertNodeCountAverage) {
+ public void setPipeRemainingInsertNodeCountEMAAlpha(
+ final double pipeRemainingInsertNodeCountEMAAlpha) {
if (Objects.equals(
- this.pipeRemainingInsertNodeCountAverage,
pipeRemainingInsertNodeCountAverage)) {
+ this.pipeRemainingInsertNodeCountEMAAlpha,
pipeRemainingInsertNodeCountEMAAlpha)) {
return;
}
- this.pipeRemainingInsertNodeCountAverage =
pipeRemainingInsertNodeCountAverage;
+ this.pipeRemainingInsertNodeCountEMAAlpha =
pipeRemainingInsertNodeCountEMAAlpha;
logger.info(
- "pipeRemainingInsertEventCountAverage is set to {}",
pipeRemainingInsertNodeCountAverage);
+ "pipeRemainingInsertEventCountAverage is set to {}",
pipeRemainingInsertNodeCountEMAAlpha);
}
public double getPipeTsFileScanParsingThreshold() {
diff --git
a/iotdb-core/node-commons/src/main/java/org/apache/iotdb/commons/pipe/config/PipeConfig.java
b/iotdb-core/node-commons/src/main/java/org/apache/iotdb/commons/pipe/config/PipeConfig.java
index a616d80b64e..be0c70d7f42 100644
---
a/iotdb-core/node-commons/src/main/java/org/apache/iotdb/commons/pipe/config/PipeConfig.java
+++
b/iotdb-core/node-commons/src/main/java/org/apache/iotdb/commons/pipe/config/PipeConfig.java
@@ -251,8 +251,8 @@ public class PipeConfig {
return COMMON_CONFIG.getPipeRemainingTimeCommitRateAverageTime();
}
- public PipeRateAverage getPipeRemainingInsertNodeCountAverage() {
- return COMMON_CONFIG.getPipeRemainingInsertNodeCountAverage();
+ public double getPipeRemainingInsertNodeCountEMAAlpha() {
+ return COMMON_CONFIG.getPipeRemainingInsertNodeCountEMAAlpha();
}
public double getPipeTsFileScanParsingThreshold() {
@@ -383,10 +383,6 @@ public class PipeConfig {
return COMMON_CONFIG.getPipeMaxAllowedTotalRemainingInsertEventCount();
}
- public int getPipeRemainingInsertEventCountSmoothingIntervalSeconds() {
- return
COMMON_CONFIG.getPipeRemainingInsertEventCountSmoothingIntervalSeconds();
- }
-
/////////////////////////////// Logger ///////////////////////////////
public int getPipeMetaReportMaxLogNumPerRound() {
@@ -551,7 +547,7 @@ public class PipeConfig {
LOGGER.info(
"PipeRemainingTimeCommitRateAverageTime: {}",
getPipeRemainingTimeCommitRateAverageTime());
LOGGER.info(
- "PipePipeRemainingInsertEventCountAverage: {}",
getPipeRemainingInsertNodeCountAverage());
+ "PipePipeRemainingInsertEventCountAverage: {}",
getPipeRemainingInsertNodeCountEMAAlpha());
LOGGER.info("PipeTsFileScanParsingThreshold(): {}",
getPipeTsFileScanParsingThreshold());
LOGGER.info("PipeTransferTsFileSync: {}", isTransferTsFileSync());
@@ -645,9 +641,6 @@ public class PipeConfig {
LOGGER.info(
"PipeMaxAllowedTotalRemainingInsertEventCount: {}",
getPipeMaxAllowedTotalRemainingInsertEventCount());
- LOGGER.info(
- "PipeRemainingInsertEventCountSmoothingIntervalSeconds: {}",
- getPipeRemainingInsertEventCountSmoothingIntervalSeconds());
LOGGER.info("PipeMetaReportMaxLogNumPerRound: {}",
getPipeMetaReportMaxLogNumPerRound());
LOGGER.info("PipeMetaReportMaxLogIntervalRounds: {}",
getPipeMetaReportMaxLogIntervalRounds());
diff --git
a/iotdb-core/node-commons/src/main/java/org/apache/iotdb/commons/pipe/config/PipeDescriptor.java
b/iotdb-core/node-commons/src/main/java/org/apache/iotdb/commons/pipe/config/PipeDescriptor.java
index 1849209d651..7086ff731bf 100644
---
a/iotdb-core/node-commons/src/main/java/org/apache/iotdb/commons/pipe/config/PipeDescriptor.java
+++
b/iotdb-core/node-commons/src/main/java/org/apache/iotdb/commons/pipe/config/PipeDescriptor.java
@@ -208,12 +208,12 @@ public class PipeDescriptor {
"pipe_remaining_time_commit_rate_average_time",
String.valueOf(config.getPipeRemainingTimeCommitRateAverageTime()))
.trim()));
- config.setPipeRemainingInsertNodeCountAverage(
- PipeRateAverage.valueOf(
+ config.setPipeRemainingInsertNodeCountEMAAlpha(
+ Double.parseDouble(
properties
.getProperty(
- "pipe_remaining_insert_node_count_average",
-
String.valueOf(config.getPipeRemainingInsertNodeCountAverage()))
+ "pipe_remaining_insert_node_count_ema_alpha",
+
String.valueOf(config.getPipeRemainingInsertNodeCountEMAAlpha()))
.trim()));
}
@@ -476,12 +476,6 @@ public class PipeDescriptor {
properties.getProperty(
"pipe_max_allowed_total_remaining_insert_event_count",
String.valueOf(config.getPipeMaxAllowedTotalRemainingInsertEventCount()))));
- config.setPipeRemainingInsertEventCountSmoothingIntervalSeconds(
- Integer.parseInt(
- properties.getProperty(
- "pipe_remaining_insert_event_count_smoothing_interval_seconds",
- String.valueOf(
-
config.getPipeRemainingInsertEventCountSmoothingIntervalSeconds()))));
config.setPipeStuckRestartMinIntervalMs(
Long.parseLong(
properties.getProperty(