This is an automated email from the ASF dual-hosted git repository.
jiangtian pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/iotdb.git
The following commit(s) were added to refs/heads/master by this push:
new c08dc674c74 Pipe: Reduced the downgraded epochs' downgrading limit to
reduce the latency (#17184)
c08dc674c74 is described below
commit c08dc674c74fd72cf02088d613d2817af7bbed22
Author: Caideyipi <[email protected]>
AuthorDate: Mon Feb 9 14:21:01 2026 +0800
Pipe: Reduced the downgraded epochs' downgrading limit to reduce the
latency (#17184)
---
.../db/pipe/event/realtime/PipeRealtimeEvent.java | 2 +-
.../PipeRealtimeDataRegionHybridSource.java | 16 ++++++++++--
.../apache/iotdb/commons/conf/CommonConfig.java | 30 ++++++++++++++++++++++
.../iotdb/commons/pipe/config/PipeConfig.java | 14 ++++++++++
.../iotdb/commons/pipe/config/PipeDescriptor.java | 10 ++++++++
5 files changed, 69 insertions(+), 3 deletions(-)
diff --git
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/event/realtime/PipeRealtimeEvent.java
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/event/realtime/PipeRealtimeEvent.java
index 9ee89189e13..9dbb7b3a74c 100644
---
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/event/realtime/PipeRealtimeEvent.java
+++
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/event/realtime/PipeRealtimeEvent.java
@@ -112,7 +112,7 @@ public class PipeRealtimeEvent extends EnrichedEvent {
device2Measurements = null;
}
- public boolean mayExtractorUseTablets(final PipeRealtimeDataRegionSource
extractor) {
+ public boolean maySourceOnlyUseTablets(final PipeRealtimeDataRegionSource
extractor) {
final TsFileEpoch.State state = tsFileEpoch.getState(extractor);
return state.equals(TsFileEpoch.State.EMPTY) ||
state.equals(TsFileEpoch.State.USING_TABLET);
}
diff --git
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/source/dataregion/realtime/PipeRealtimeDataRegionHybridSource.java
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/source/dataregion/realtime/PipeRealtimeDataRegionHybridSource.java
index 7cf476a66cb..0721683f4d2 100644
---
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/source/dataregion/realtime/PipeRealtimeDataRegionHybridSource.java
+++
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/source/dataregion/realtime/PipeRealtimeDataRegionHybridSource.java
@@ -20,6 +20,7 @@
package org.apache.iotdb.db.pipe.source.dataregion.realtime;
import org.apache.iotdb.commons.exception.pipe.PipeRuntimeNonCriticalException;
+import org.apache.iotdb.commons.pipe.config.PipeConfig;
import org.apache.iotdb.commons.pipe.event.ProgressReportEvent;
import org.apache.iotdb.db.pipe.agent.PipeDataNodeAgent;
import org.apache.iotdb.db.pipe.event.common.deletion.PipeDeleteDataNodeEvent;
@@ -205,11 +206,22 @@ public class PipeRealtimeDataRegionHybridSource extends
PipeRealtimeDataRegionSo
final long floatingMemoryUsageInByte =
PipeDataNodeAgent.task().getFloatingMemoryUsageInByte(pipeName);
final long pipeCount = PipeDataNodeAgent.task().getPipeCount();
- final long totalFloatingMemorySizeInBytes =
+ long totalFloatingMemorySizeInBytes =
PipeDataNodeResourceManager.memory().getTotalFloatingMemorySizeInBytes();
+ // If the occupied memory has reached the max, it may cause a large
latency to the receiver due
+ // to queuing. To reduce the latency, we lower the memory limit forcibly
in the single tsFile
+ // since the tsFile is doomed to be transferred, then more downgrading
will just cause more
+ // latency to a few points and will greatly reduce the incoming latencies.
+ if (PipeConfig.getInstance().getPipeRealtimeForceDowngradingEnabled()
+ && !event.maySourceOnlyUseTablets(this)) {
+ totalFloatingMemorySizeInBytes =
+ (long)
+ ((double) totalFloatingMemorySizeInBytes
+ *
PipeConfig.getInstance().getPipeRealtimeForceDowngradingProportion());
+ }
final boolean mayInsertNodeMemoryReachDangerousThreshold =
floatingMemoryUsageInByte * pipeCount >=
totalFloatingMemorySizeInBytes;
- if (mayInsertNodeMemoryReachDangerousThreshold &&
event.mayExtractorUseTablets(this)) {
+ if (mayInsertNodeMemoryReachDangerousThreshold &&
event.maySourceOnlyUseTablets(this)) {
final PipeDataNodeRemainingEventAndTimeOperator operator =
PipeDataNodeSinglePipeMetrics.getInstance().remainingEventAndTimeOperatorMap.get(pipeID);
LOGGER.info(
diff --git
a/iotdb-core/node-commons/src/main/java/org/apache/iotdb/commons/conf/CommonConfig.java
b/iotdb-core/node-commons/src/main/java/org/apache/iotdb/commons/conf/CommonConfig.java
index 73673bb3136..9990c6a958b 100644
---
a/iotdb-core/node-commons/src/main/java/org/apache/iotdb/commons/conf/CommonConfig.java
+++
b/iotdb-core/node-commons/src/main/java/org/apache/iotdb/commons/conf/CommonConfig.java
@@ -210,6 +210,8 @@ public class CommonConfig {
// Sequentially poll the tsFile by default
private int pipeRealTimeQueuePollHistoricalTsFileThreshold = 1;
private int pipeRealTimeQueueMaxWaitingTsFileSize = 1;
+ private boolean pipeRealtimeForceDowngradingEnabled = true;
+ private double pipeRealtimeForceDowngradingProportion = 0.25d;
/** The maximum number of threads that can be used to execute subtasks in
PipeSubtaskExecutor. */
private int pipeSubtaskExecutorMaxThreadNum =
@@ -1549,6 +1551,34 @@ public class CommonConfig {
pipeRealTimeQueueMaxWaitingTsFileSize);
}
+ public boolean getPipeRealtimeForceDowngradingEnabled() {
+ return pipeRealtimeForceDowngradingEnabled;
+ }
+
+ public void setPipeRealtimeForceDowngradingEnabled(boolean
pipeRealtimeForceDowngradingEnabled) {
+ if (this.pipeRealtimeForceDowngradingEnabled ==
pipeRealtimeForceDowngradingEnabled) {
+ return;
+ }
+ this.pipeRealtimeForceDowngradingEnabled =
pipeRealtimeForceDowngradingEnabled;
+ logger.info(
+ "pipeRealtimeForceDowngradingTime is set to {}.",
pipeRealtimeForceDowngradingEnabled);
+ }
+
+ public double getPipeRealtimeForceDowngradingProportion() {
+ return pipeRealtimeForceDowngradingProportion;
+ }
+
+ public void setPipeRealtimeForceDowngradingProportion(
+ double pipeRealtimeForceDowngradingProportion) {
+ if (this.pipeRealtimeForceDowngradingProportion ==
pipeRealtimeForceDowngradingProportion) {
+ return;
+ }
+ this.pipeRealtimeForceDowngradingProportion =
pipeRealtimeForceDowngradingProportion;
+ logger.info(
+ "pipeRealtimeForceDowngradingProportion is set to {}.",
+ pipeRealtimeForceDowngradingProportion);
+ }
+
public void setPipeAirGapReceiverEnabled(boolean pipeAirGapReceiverEnabled) {
if (pipeAirGapReceiverEnabled == this.pipeAirGapReceiverEnabled) {
return;
diff --git
a/iotdb-core/node-commons/src/main/java/org/apache/iotdb/commons/pipe/config/PipeConfig.java
b/iotdb-core/node-commons/src/main/java/org/apache/iotdb/commons/pipe/config/PipeConfig.java
index 1ed1e39911f..15ae89ee620 100644
---
a/iotdb-core/node-commons/src/main/java/org/apache/iotdb/commons/pipe/config/PipeConfig.java
+++
b/iotdb-core/node-commons/src/main/java/org/apache/iotdb/commons/pipe/config/PipeConfig.java
@@ -117,6 +117,14 @@ public class PipeConfig {
return COMMON_CONFIG.getPipeRealTimeQueueMaxWaitingTsFileSize();
}
+ public boolean getPipeRealtimeForceDowngradingEnabled() {
+ return COMMON_CONFIG.getPipeRealtimeForceDowngradingEnabled();
+ }
+
+ public double getPipeRealtimeForceDowngradingProportion() {
+ return COMMON_CONFIG.getPipeRealtimeForceDowngradingProportion();
+ }
+
/////////////////////////////// Subtask Executor
///////////////////////////////
public int getPipeSubtaskExecutorMaxThreadNum() {
@@ -477,6 +485,12 @@ public class PipeConfig {
LOGGER.info(
"PipeRealTimeQueuePollHistoricalTsFileThreshold: {}",
getPipeRealTimeQueuePollHistoricalTsFileThreshold());
+ LOGGER.info(
+ "PipeRealTimeQueueMaxWaitingTsFileSize: {}",
getPipeRealTimeQueueMaxWaitingTsFileSize());
+ LOGGER.info(
+ "PipeRealtimeForceDowngradingEnabled: {}",
getPipeRealtimeForceDowngradingEnabled());
+ LOGGER.info(
+ "PipeRealtimeForceDowngradingProportion: {}",
getPipeRealtimeForceDowngradingProportion());
LOGGER.info("PipeSubtaskExecutorMaxThreadNum: {}",
getPipeSubtaskExecutorMaxThreadNum());
LOGGER.info(
diff --git
a/iotdb-core/node-commons/src/main/java/org/apache/iotdb/commons/pipe/config/PipeDescriptor.java
b/iotdb-core/node-commons/src/main/java/org/apache/iotdb/commons/pipe/config/PipeDescriptor.java
index 2aeab95972d..05592809bbb 100644
---
a/iotdb-core/node-commons/src/main/java/org/apache/iotdb/commons/pipe/config/PipeDescriptor.java
+++
b/iotdb-core/node-commons/src/main/java/org/apache/iotdb/commons/pipe/config/PipeDescriptor.java
@@ -273,6 +273,16 @@ public class PipeDescriptor {
properties.getProperty(
"pipe_realTime_queue_max_waiting_tsFile_size",
String.valueOf(config.getPipeRealTimeQueueMaxWaitingTsFileSize()))));
+ config.setPipeRealtimeForceDowngradingEnabled(
+ Boolean.parseBoolean(
+ properties.getProperty(
+ "pipe_realtime_force_downgrading_enabled",
+
String.valueOf(config.getPipeRealtimeForceDowngradingEnabled()))));
+ config.setPipeRealtimeForceDowngradingProportion(
+ Double.parseDouble(
+ properties.getProperty(
+ "pipe_realtime_force_downgrading_proportion",
+
String.valueOf(config.getPipeRealtimeForceDowngradingProportion()))));
config.setPipeSubtaskExecutorBasicCheckPointIntervalByConsumedEventCount(
Integer.parseInt(
properties.getProperty(