This is an automated email from the ASF dual-hosted git repository.

justinchen pushed a commit to branch downgrade-13
in repository https://gitbox.apache.org/repos/asf/iotdb.git


The following commit(s) were added to refs/heads/downgrade-13 by this push:
     new 4d37caf3c72 Pipe: Reduced the downgraded epochs' downgrading limit to 
reduce the latency (#17184)
4d37caf3c72 is described below

commit 4d37caf3c7234bd330dae00dc17db08a47597372
Author: Caideyipi <[email protected]>
AuthorDate: Mon Feb 9 14:21:01 2026 +0800

    Pipe: Reduced the downgraded epochs' downgrading limit to reduce the 
latency (#17184)
---
 .../db/pipe/event/realtime/PipeRealtimeEvent.java  |  2 +-
 .../PipeRealtimeDataRegionHybridSource.java        | 16 ++++++++++--
 .../apache/iotdb/commons/conf/CommonConfig.java    | 30 ++++++++++++++++++++++
 .../iotdb/commons/pipe/config/PipeConfig.java      | 14 ++++++++++
 .../iotdb/commons/pipe/config/PipeDescriptor.java  | 10 ++++++++
 5 files changed, 69 insertions(+), 3 deletions(-)

diff --git 
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/event/realtime/PipeRealtimeEvent.java
 
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/event/realtime/PipeRealtimeEvent.java
index 07658d83cf4..c8474e360da 100644
--- 
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/event/realtime/PipeRealtimeEvent.java
+++ 
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/event/realtime/PipeRealtimeEvent.java
@@ -88,7 +88,7 @@ public class PipeRealtimeEvent extends EnrichedEvent {
     device2Measurements = null;
   }
 
-  public boolean mayExtractorUseTablets(final PipeRealtimeDataRegionSource 
extractor) {
+  public boolean maySourceOnlyUseTablets(final PipeRealtimeDataRegionSource 
extractor) {
     final TsFileEpoch.State state = tsFileEpoch.getState(extractor);
     return state.equals(TsFileEpoch.State.EMPTY) || 
state.equals(TsFileEpoch.State.USING_TABLET);
   }
diff --git 
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/source/dataregion/realtime/PipeRealtimeDataRegionHybridSource.java
 
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/source/dataregion/realtime/PipeRealtimeDataRegionHybridSource.java
index 347c936cc8f..5566b7d7dee 100644
--- 
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/source/dataregion/realtime/PipeRealtimeDataRegionHybridSource.java
+++ 
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/source/dataregion/realtime/PipeRealtimeDataRegionHybridSource.java
@@ -20,6 +20,7 @@
 package org.apache.iotdb.db.pipe.source.dataregion.realtime;
 
 import org.apache.iotdb.commons.exception.pipe.PipeRuntimeNonCriticalException;
+import org.apache.iotdb.commons.pipe.config.PipeConfig;
 import org.apache.iotdb.commons.pipe.event.ProgressReportEvent;
 import org.apache.iotdb.db.pipe.agent.PipeDataNodeAgent;
 import org.apache.iotdb.db.pipe.event.common.heartbeat.PipeHeartbeatEvent;
@@ -205,11 +206,22 @@ public class PipeRealtimeDataRegionHybridSource extends 
PipeRealtimeDataRegionSo
     final long floatingMemoryUsageInByte =
         PipeDataNodeAgent.task().getFloatingMemoryUsageInByte(pipeName);
     final long pipeCount = PipeDataNodeAgent.task().getPipeCount();
-    final long totalFloatingMemorySizeInBytes =
+    long totalFloatingMemorySizeInBytes =
         PipeMemoryManager.getTotalFloatingMemorySizeInBytes();
+    // If the occupied memory has reached the max, it may cause a large 
latency to the receiver due
+    // to queuing. To reduce the latency, we lower the memory limit forcibly 
in the single tsFile
+    // since the tsFile is doomed to be transferred, then more downgrading 
will just cause more
+    // latency to a few points and will greatly reduce the incoming latencies.
+    if (PipeConfig.getInstance().getPipeRealtimeForceDowngradingEnabled()
+        && !event.maySourceOnlyUseTablets(this)) {
+      totalFloatingMemorySizeInBytes =
+          (long)
+              ((double) totalFloatingMemorySizeInBytes
+                  * 
PipeConfig.getInstance().getPipeRealtimeForceDowngradingProportion());
+    }
     final boolean mayInsertNodeMemoryReachDangerousThreshold =
         floatingMemoryUsageInByte * pipeCount >= 
totalFloatingMemorySizeInBytes;
-    if (mayInsertNodeMemoryReachDangerousThreshold && 
event.mayExtractorUseTablets(this)) {
+    if (mayInsertNodeMemoryReachDangerousThreshold && 
event.maySourceOnlyUseTablets(this)) {
       final PipeDataNodeRemainingEventAndTimeOperator operator =
           
PipeDataNodeSinglePipeMetrics.getInstance().remainingEventAndTimeOperatorMap.get(pipeID);
       LOGGER.info(
diff --git 
a/iotdb-core/node-commons/src/main/java/org/apache/iotdb/commons/conf/CommonConfig.java
 
b/iotdb-core/node-commons/src/main/java/org/apache/iotdb/commons/conf/CommonConfig.java
index 9e4212c0b2b..a60c870959f 100644
--- 
a/iotdb-core/node-commons/src/main/java/org/apache/iotdb/commons/conf/CommonConfig.java
+++ 
b/iotdb-core/node-commons/src/main/java/org/apache/iotdb/commons/conf/CommonConfig.java
@@ -206,6 +206,8 @@ public class CommonConfig {
   // Sequentially poll the tsFile by default
   private int pipeRealTimeQueuePollHistoricalTsFileThreshold = 1;
   private int pipeRealTimeQueueMaxWaitingTsFileSize = 1;
+  private boolean pipeRealtimeForceDowngradingEnabled = true;
+  private double pipeRealtimeForceDowngradingProportion = 0.25d;
 
   /** The maximum number of threads that can be used to execute subtasks in 
PipeSubtaskExecutor. */
   private int pipeSubtaskExecutorMaxThreadNum =
@@ -1491,6 +1493,34 @@ public class CommonConfig {
         pipeRealTimeQueueMaxWaitingTsFileSize);
   }
 
+  public boolean getPipeRealtimeForceDowngradingEnabled() {
+    return pipeRealtimeForceDowngradingEnabled;
+  }
+
+  public void setPipeRealtimeForceDowngradingEnabled(boolean 
pipeRealtimeForceDowngradingEnabled) {
+    if (this.pipeRealtimeForceDowngradingEnabled == 
pipeRealtimeForceDowngradingEnabled) {
+      return;
+    }
+    this.pipeRealtimeForceDowngradingEnabled = 
pipeRealtimeForceDowngradingEnabled;
+    logger.info(
+        "pipeRealtimeForceDowngradingTime is set to {}.", 
pipeRealtimeForceDowngradingEnabled);
+  }
+
+  public double getPipeRealtimeForceDowngradingProportion() {
+    return pipeRealtimeForceDowngradingProportion;
+  }
+
+  public void setPipeRealtimeForceDowngradingProportion(
+      double pipeRealtimeForceDowngradingProportion) {
+    if (this.pipeRealtimeForceDowngradingProportion == 
pipeRealtimeForceDowngradingProportion) {
+      return;
+    }
+    this.pipeRealtimeForceDowngradingProportion = 
pipeRealtimeForceDowngradingProportion;
+    logger.info(
+        "pipeRealtimeForceDowngradingProportion is set to {}.",
+        pipeRealtimeForceDowngradingProportion);
+  }
+
   public void setPipeAirGapReceiverEnabled(boolean pipeAirGapReceiverEnabled) {
     if (pipeAirGapReceiverEnabled == this.pipeAirGapReceiverEnabled) {
       return;
diff --git 
a/iotdb-core/node-commons/src/main/java/org/apache/iotdb/commons/pipe/config/PipeConfig.java
 
b/iotdb-core/node-commons/src/main/java/org/apache/iotdb/commons/pipe/config/PipeConfig.java
index a815800e23f..733d7b258dd 100644
--- 
a/iotdb-core/node-commons/src/main/java/org/apache/iotdb/commons/pipe/config/PipeConfig.java
+++ 
b/iotdb-core/node-commons/src/main/java/org/apache/iotdb/commons/pipe/config/PipeConfig.java
@@ -117,6 +117,14 @@ public class PipeConfig {
     return COMMON_CONFIG.getPipeRealTimeQueueMaxWaitingTsFileSize();
   }
 
+  public boolean getPipeRealtimeForceDowngradingEnabled() {
+    return COMMON_CONFIG.getPipeRealtimeForceDowngradingEnabled();
+  }
+
+  public double getPipeRealtimeForceDowngradingProportion() {
+    return COMMON_CONFIG.getPipeRealtimeForceDowngradingProportion();
+  }
+
   /////////////////////////////// Subtask Executor 
///////////////////////////////
 
   public int getPipeSubtaskExecutorMaxThreadNum() {
@@ -477,6 +485,12 @@ public class PipeConfig {
     LOGGER.info(
         "PipeRealTimeQueuePollHistoricalTsFileThreshold: {}",
         getPipeRealTimeQueuePollHistoricalTsFileThreshold());
+    LOGGER.info(
+        "PipeRealTimeQueueMaxWaitingTsFileSize: {}", 
getPipeRealTimeQueueMaxWaitingTsFileSize());
+    LOGGER.info(
+        "PipeRealtimeForceDowngradingEnabled: {}", 
getPipeRealtimeForceDowngradingEnabled());
+    LOGGER.info(
+        "PipeRealtimeForceDowngradingProportion: {}", 
getPipeRealtimeForceDowngradingProportion());
 
     LOGGER.info("PipeSubtaskExecutorMaxThreadNum: {}", 
getPipeSubtaskExecutorMaxThreadNum());
     LOGGER.info(
diff --git 
a/iotdb-core/node-commons/src/main/java/org/apache/iotdb/commons/pipe/config/PipeDescriptor.java
 
b/iotdb-core/node-commons/src/main/java/org/apache/iotdb/commons/pipe/config/PipeDescriptor.java
index 5e11df2d086..ed7723b4972 100644
--- 
a/iotdb-core/node-commons/src/main/java/org/apache/iotdb/commons/pipe/config/PipeDescriptor.java
+++ 
b/iotdb-core/node-commons/src/main/java/org/apache/iotdb/commons/pipe/config/PipeDescriptor.java
@@ -273,6 +273,16 @@ public class PipeDescriptor {
             properties.getProperty(
                 "pipe_realTime_queue_max_waiting_tsFile_size",
                 
String.valueOf(config.getPipeRealTimeQueueMaxWaitingTsFileSize()))));
+    config.setPipeRealtimeForceDowngradingEnabled(
+        Boolean.parseBoolean(
+            properties.getProperty(
+                "pipe_realtime_force_downgrading_enabled",
+                
String.valueOf(config.getPipeRealtimeForceDowngradingEnabled()))));
+    config.setPipeRealtimeForceDowngradingProportion(
+        Double.parseDouble(
+            properties.getProperty(
+                "pipe_realtime_force_downgrading_proportion",
+                
String.valueOf(config.getPipeRealtimeForceDowngradingProportion()))));
     config.setPipeSubtaskExecutorBasicCheckPointIntervalByConsumedEventCount(
         Integer.parseInt(
             properties.getProperty(

Reply via email to