This is an automated email from the ASF dual-hosted git repository.
justinchen pushed a commit to branch downgrade-13
in repository https://gitbox.apache.org/repos/asf/iotdb.git
The following commit(s) were added to refs/heads/downgrade-13 by this push:
new 4d37caf3c72 Pipe: Reduced the downgraded epochs' downgrading limit to
reduce the latency (#17184)
4d37caf3c72 is described below
commit 4d37caf3c7234bd330dae00dc17db08a47597372
Author: Caideyipi <[email protected]>
AuthorDate: Mon Feb 9 14:21:01 2026 +0800
Pipe: Reduced the downgraded epochs' downgrading limit to reduce the
latency (#17184)
---
.../db/pipe/event/realtime/PipeRealtimeEvent.java | 2 +-
.../PipeRealtimeDataRegionHybridSource.java | 16 ++++++++++--
.../apache/iotdb/commons/conf/CommonConfig.java | 30 ++++++++++++++++++++++
.../iotdb/commons/pipe/config/PipeConfig.java | 14 ++++++++++
.../iotdb/commons/pipe/config/PipeDescriptor.java | 10 ++++++++
5 files changed, 69 insertions(+), 3 deletions(-)
diff --git
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/event/realtime/PipeRealtimeEvent.java
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/event/realtime/PipeRealtimeEvent.java
index 07658d83cf4..c8474e360da 100644
---
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/event/realtime/PipeRealtimeEvent.java
+++
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/event/realtime/PipeRealtimeEvent.java
@@ -88,7 +88,7 @@ public class PipeRealtimeEvent extends EnrichedEvent {
device2Measurements = null;
}
- public boolean mayExtractorUseTablets(final PipeRealtimeDataRegionSource
extractor) {
+ public boolean maySourceOnlyUseTablets(final PipeRealtimeDataRegionSource
extractor) {
final TsFileEpoch.State state = tsFileEpoch.getState(extractor);
return state.equals(TsFileEpoch.State.EMPTY) ||
state.equals(TsFileEpoch.State.USING_TABLET);
}
diff --git
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/source/dataregion/realtime/PipeRealtimeDataRegionHybridSource.java
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/source/dataregion/realtime/PipeRealtimeDataRegionHybridSource.java
index 347c936cc8f..5566b7d7dee 100644
---
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/source/dataregion/realtime/PipeRealtimeDataRegionHybridSource.java
+++
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/source/dataregion/realtime/PipeRealtimeDataRegionHybridSource.java
@@ -20,6 +20,7 @@
package org.apache.iotdb.db.pipe.source.dataregion.realtime;
import org.apache.iotdb.commons.exception.pipe.PipeRuntimeNonCriticalException;
+import org.apache.iotdb.commons.pipe.config.PipeConfig;
import org.apache.iotdb.commons.pipe.event.ProgressReportEvent;
import org.apache.iotdb.db.pipe.agent.PipeDataNodeAgent;
import org.apache.iotdb.db.pipe.event.common.heartbeat.PipeHeartbeatEvent;
@@ -205,11 +206,22 @@ public class PipeRealtimeDataRegionHybridSource extends
PipeRealtimeDataRegionSo
final long floatingMemoryUsageInByte =
PipeDataNodeAgent.task().getFloatingMemoryUsageInByte(pipeName);
final long pipeCount = PipeDataNodeAgent.task().getPipeCount();
- final long totalFloatingMemorySizeInBytes =
+ long totalFloatingMemorySizeInBytes =
PipeMemoryManager.getTotalFloatingMemorySizeInBytes();
+ // If the occupied memory has reached the max, it may cause a large
latency to the receiver due
+ // to queuing. To reduce the latency, we lower the memory limit forcibly
in the single tsFile
+ // since the tsFile is doomed to be transferred, then more downgrading
will just cause more
+ // latency to a few points and will greatly reduce the incoming latencies.
+ if (PipeConfig.getInstance().getPipeRealtimeForceDowngradingEnabled()
+ && !event.maySourceOnlyUseTablets(this)) {
+ totalFloatingMemorySizeInBytes =
+ (long)
+ ((double) totalFloatingMemorySizeInBytes
+ *
PipeConfig.getInstance().getPipeRealtimeForceDowngradingProportion());
+ }
final boolean mayInsertNodeMemoryReachDangerousThreshold =
floatingMemoryUsageInByte * pipeCount >=
totalFloatingMemorySizeInBytes;
- if (mayInsertNodeMemoryReachDangerousThreshold &&
event.mayExtractorUseTablets(this)) {
+ if (mayInsertNodeMemoryReachDangerousThreshold &&
event.maySourceOnlyUseTablets(this)) {
final PipeDataNodeRemainingEventAndTimeOperator operator =
PipeDataNodeSinglePipeMetrics.getInstance().remainingEventAndTimeOperatorMap.get(pipeID);
LOGGER.info(
diff --git
a/iotdb-core/node-commons/src/main/java/org/apache/iotdb/commons/conf/CommonConfig.java
b/iotdb-core/node-commons/src/main/java/org/apache/iotdb/commons/conf/CommonConfig.java
index 9e4212c0b2b..a60c870959f 100644
---
a/iotdb-core/node-commons/src/main/java/org/apache/iotdb/commons/conf/CommonConfig.java
+++
b/iotdb-core/node-commons/src/main/java/org/apache/iotdb/commons/conf/CommonConfig.java
@@ -206,6 +206,8 @@ public class CommonConfig {
// Sequentially poll the tsFile by default
private int pipeRealTimeQueuePollHistoricalTsFileThreshold = 1;
private int pipeRealTimeQueueMaxWaitingTsFileSize = 1;
+ private boolean pipeRealtimeForceDowngradingEnabled = true;
+ private double pipeRealtimeForceDowngradingProportion = 0.25d;
/** The maximum number of threads that can be used to execute subtasks in
PipeSubtaskExecutor. */
private int pipeSubtaskExecutorMaxThreadNum =
@@ -1491,6 +1493,34 @@ public class CommonConfig {
pipeRealTimeQueueMaxWaitingTsFileSize);
}
+ public boolean getPipeRealtimeForceDowngradingEnabled() {
+ return pipeRealtimeForceDowngradingEnabled;
+ }
+
+ public void setPipeRealtimeForceDowngradingEnabled(boolean
pipeRealtimeForceDowngradingEnabled) {
+ if (this.pipeRealtimeForceDowngradingEnabled ==
pipeRealtimeForceDowngradingEnabled) {
+ return;
+ }
+ this.pipeRealtimeForceDowngradingEnabled =
pipeRealtimeForceDowngradingEnabled;
+ logger.info(
+ "pipeRealtimeForceDowngradingTime is set to {}.",
pipeRealtimeForceDowngradingEnabled);
+ }
+
+ public double getPipeRealtimeForceDowngradingProportion() {
+ return pipeRealtimeForceDowngradingProportion;
+ }
+
+ public void setPipeRealtimeForceDowngradingProportion(
+ double pipeRealtimeForceDowngradingProportion) {
+ if (this.pipeRealtimeForceDowngradingProportion ==
pipeRealtimeForceDowngradingProportion) {
+ return;
+ }
+ this.pipeRealtimeForceDowngradingProportion =
pipeRealtimeForceDowngradingProportion;
+ logger.info(
+ "pipeRealtimeForceDowngradingProportion is set to {}.",
+ pipeRealtimeForceDowngradingProportion);
+ }
+
public void setPipeAirGapReceiverEnabled(boolean pipeAirGapReceiverEnabled) {
if (pipeAirGapReceiverEnabled == this.pipeAirGapReceiverEnabled) {
return;
diff --git
a/iotdb-core/node-commons/src/main/java/org/apache/iotdb/commons/pipe/config/PipeConfig.java
b/iotdb-core/node-commons/src/main/java/org/apache/iotdb/commons/pipe/config/PipeConfig.java
index a815800e23f..733d7b258dd 100644
---
a/iotdb-core/node-commons/src/main/java/org/apache/iotdb/commons/pipe/config/PipeConfig.java
+++
b/iotdb-core/node-commons/src/main/java/org/apache/iotdb/commons/pipe/config/PipeConfig.java
@@ -117,6 +117,14 @@ public class PipeConfig {
return COMMON_CONFIG.getPipeRealTimeQueueMaxWaitingTsFileSize();
}
+ public boolean getPipeRealtimeForceDowngradingEnabled() {
+ return COMMON_CONFIG.getPipeRealtimeForceDowngradingEnabled();
+ }
+
+ public double getPipeRealtimeForceDowngradingProportion() {
+ return COMMON_CONFIG.getPipeRealtimeForceDowngradingProportion();
+ }
+
/////////////////////////////// Subtask Executor
///////////////////////////////
public int getPipeSubtaskExecutorMaxThreadNum() {
@@ -477,6 +485,12 @@ public class PipeConfig {
LOGGER.info(
"PipeRealTimeQueuePollHistoricalTsFileThreshold: {}",
getPipeRealTimeQueuePollHistoricalTsFileThreshold());
+ LOGGER.info(
+ "PipeRealTimeQueueMaxWaitingTsFileSize: {}",
getPipeRealTimeQueueMaxWaitingTsFileSize());
+ LOGGER.info(
+ "PipeRealtimeForceDowngradingEnabled: {}",
getPipeRealtimeForceDowngradingEnabled());
+ LOGGER.info(
+ "PipeRealtimeForceDowngradingProportion: {}",
getPipeRealtimeForceDowngradingProportion());
LOGGER.info("PipeSubtaskExecutorMaxThreadNum: {}",
getPipeSubtaskExecutorMaxThreadNum());
LOGGER.info(
diff --git
a/iotdb-core/node-commons/src/main/java/org/apache/iotdb/commons/pipe/config/PipeDescriptor.java
b/iotdb-core/node-commons/src/main/java/org/apache/iotdb/commons/pipe/config/PipeDescriptor.java
index 5e11df2d086..ed7723b4972 100644
---
a/iotdb-core/node-commons/src/main/java/org/apache/iotdb/commons/pipe/config/PipeDescriptor.java
+++
b/iotdb-core/node-commons/src/main/java/org/apache/iotdb/commons/pipe/config/PipeDescriptor.java
@@ -273,6 +273,16 @@ public class PipeDescriptor {
properties.getProperty(
"pipe_realTime_queue_max_waiting_tsFile_size",
String.valueOf(config.getPipeRealTimeQueueMaxWaitingTsFileSize()))));
+ config.setPipeRealtimeForceDowngradingEnabled(
+ Boolean.parseBoolean(
+ properties.getProperty(
+ "pipe_realtime_force_downgrading_enabled",
+
String.valueOf(config.getPipeRealtimeForceDowngradingEnabled()))));
+ config.setPipeRealtimeForceDowngradingProportion(
+ Double.parseDouble(
+ properties.getProperty(
+ "pipe_realtime_force_downgrading_proportion",
+
String.valueOf(config.getPipeRealtimeForceDowngradingProportion()))));
config.setPipeSubtaskExecutorBasicCheckPointIntervalByConsumedEventCount(
Integer.parseInt(
properties.getProperty(