This is an automated email from the ASF dual-hosted git repository.

jiangtian pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/iotdb.git


The following commit(s) were added to refs/heads/master by this push:
     new e7f80974364 Introduce delay analyzer to get watermark between sequence 
data and unsequece data based on ICDE 2022 (#16886)
e7f80974364 is described below

commit e7f80974364329ea908e04fd630ffd43c145d6de
Author: Peng Junzhi <[email protected]>
AuthorDate: Wed Mar 11 20:38:58 2026 -0500

    Introduce delay analyzer to get watermark between sequence data and 
unsequece data based on ICDE 2022 (#16886)
    
    * introduce delay analyzer based on ICDE 2022
    
    * adopt to data-region
    
    * fix ut
    
    * fix
    
    * fix review
    
    * fix review
    
    * empty commit
---
 .../java/org/apache/iotdb/db/conf/IoTDBConfig.java |  58 +++
 .../org/apache/iotdb/db/conf/IoTDBDescriptor.java  |  46 +++
 .../db/storageengine/dataregion/DataRegion.java    |  60 +++-
 .../org/apache/iotdb/db/tools/DelayAnalyzer.java   | 387 +++++++++++++++++++++
 .../apache/iotdb/db/tools/DelayAnalyzerTest.java   | 361 +++++++++++++++++++
 .../conf/iotdb-system.properties.template          |  32 ++
 6 files changed, 943 insertions(+), 1 deletion(-)

diff --git 
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/conf/IoTDBConfig.java 
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/conf/IoTDBConfig.java
index 98c15a2d9bf..ea2ceadfe4d 100644
--- 
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/conf/IoTDBConfig.java
+++ 
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/conf/IoTDBConfig.java
@@ -398,6 +398,24 @@ public class IoTDBConfig {
   /** The interval to check whether unsequence memtables need flushing. Unit: 
ms */
   private long unseqMemtableFlushCheckInterval = 30 * 1000L;
 
+  /**
+   * Whether to enable delay analyzer for tracking data arrival delays and 
calculating safe
+   * watermark
+   */
+  private boolean enableDelayAnalyzer = false;
+
+  /** Default window size for delay analyzer, empirical value: 10000 sample 
points */
+  private int delayAnalyzerWindowSize = 10000;
+
+  /** Minimum window size for delay analyzer validation */
+  private int delayAnalyzerMinWindowSize = 1000;
+
+  /** Maximum window size for delay analyzer validation */
+  private int delayAnalyzerMaxWindowSize = 100000;
+
+  /** Default confidence level for delay analyzer: 99% */
+  private double delayAnalyzerConfidenceLevel = 0.99;
+
   /** The sort algorithm used in TVList */
   private TVListSortAlgorithm tvListSortAlgorithm = TVListSortAlgorithm.TIM;
 
@@ -2218,6 +2236,46 @@ public class IoTDBConfig {
     this.unseqMemtableFlushCheckInterval = unseqMemtableFlushCheckInterval;
   }
 
+  public boolean isEnableDelayAnalyzer() {
+    return enableDelayAnalyzer;
+  }
+
+  public void setEnableDelayAnalyzer(boolean enableDelayAnalyzer) {
+    this.enableDelayAnalyzer = enableDelayAnalyzer;
+  }
+
+  public int getDelayAnalyzerWindowSize() {
+    return delayAnalyzerWindowSize;
+  }
+
+  public void setDelayAnalyzerWindowSize(int delayAnalyzerWindowSize) {
+    this.delayAnalyzerWindowSize = delayAnalyzerWindowSize;
+  }
+
+  public int getDelayAnalyzerMinWindowSize() {
+    return delayAnalyzerMinWindowSize;
+  }
+
+  public void setDelayAnalyzerMinWindowSize(int delayAnalyzerMinWindowSize) {
+    this.delayAnalyzerMinWindowSize = delayAnalyzerMinWindowSize;
+  }
+
+  public int getDelayAnalyzerMaxWindowSize() {
+    return delayAnalyzerMaxWindowSize;
+  }
+
+  public void setDelayAnalyzerMaxWindowSize(int delayAnalyzerMaxWindowSize) {
+    this.delayAnalyzerMaxWindowSize = delayAnalyzerMaxWindowSize;
+  }
+
+  public double getDelayAnalyzerConfidenceLevel() {
+    return delayAnalyzerConfidenceLevel;
+  }
+
+  public void setDelayAnalyzerConfidenceLevel(double 
delayAnalyzerConfidenceLevel) {
+    this.delayAnalyzerConfidenceLevel = delayAnalyzerConfidenceLevel;
+  }
+
   public TVListSortAlgorithm getTvListSortAlgorithm() {
     return tvListSortAlgorithm;
   }
diff --git 
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/conf/IoTDBDescriptor.java
 
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/conf/IoTDBDescriptor.java
index 6730138b2af..e086d429a4e 100644
--- 
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/conf/IoTDBDescriptor.java
+++ 
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/conf/IoTDBDescriptor.java
@@ -1144,6 +1144,52 @@ public class IoTDBDescriptor {
             properties.getProperty(
                 "include_null_value_in_write_throughput_metric",
                 
String.valueOf(conf.isIncludeNullValueInWriteThroughputMetric()))));
+
+    conf.setEnableDelayAnalyzer(
+        Boolean.parseBoolean(
+            properties.getProperty(
+                "enable_delay_analyzer",
+                
ConfigurationFileUtils.getConfigurationDefaultValue("enable_delay_analyzer"))));
+
+    int delayAnalyzerWindowSize =
+        Integer.parseInt(
+            properties.getProperty(
+                "delay_analyzer_window_size",
+                
ConfigurationFileUtils.getConfigurationDefaultValue("delay_analyzer_window_size")));
+    if (delayAnalyzerWindowSize > 0) {
+      LOGGER.warn("[DelayAnalyzer] Set delay_analyzer_window_size to {}", 
delayAnalyzerWindowSize);
+      conf.setDelayAnalyzerWindowSize(delayAnalyzerWindowSize);
+    }
+
+    int delayAnalyzerMinWindowSize =
+        Integer.parseInt(
+            properties.getProperty(
+                "delay_analyzer_min_window_size",
+                ConfigurationFileUtils.getConfigurationDefaultValue(
+                    "delay_analyzer_min_window_size")));
+    if (delayAnalyzerMinWindowSize > 0) {
+      conf.setDelayAnalyzerMinWindowSize(delayAnalyzerMinWindowSize);
+    }
+
+    int delayAnalyzerMaxWindowSize =
+        Integer.parseInt(
+            properties.getProperty(
+                "delay_analyzer_max_window_size",
+                ConfigurationFileUtils.getConfigurationDefaultValue(
+                    "delay_analyzer_max_window_size")));
+    if (delayAnalyzerMaxWindowSize > 0) {
+      conf.setDelayAnalyzerMaxWindowSize(delayAnalyzerMaxWindowSize);
+    }
+
+    double delayAnalyzerConfidenceLevel =
+        Double.parseDouble(
+            properties.getProperty(
+                "delay_analyzer_confidence_level",
+                ConfigurationFileUtils.getConfigurationDefaultValue(
+                    "delay_analyzer_confidence_level")));
+    if (delayAnalyzerConfidenceLevel > 0 && delayAnalyzerConfidenceLevel <= 1) 
{
+      conf.setDelayAnalyzerConfidenceLevel(delayAnalyzerConfidenceLevel);
+    }
   }
 
   private void loadFixedSizeLimitForQuery(
diff --git 
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/storageengine/dataregion/DataRegion.java
 
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/storageengine/dataregion/DataRegion.java
index 20d53439b95..a60c2722cd5 100644
--- 
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/storageengine/dataregion/DataRegion.java
+++ 
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/storageengine/dataregion/DataRegion.java
@@ -156,6 +156,7 @@ import 
org.apache.iotdb.db.storageengine.rescon.memory.TimePartitionInfo;
 import org.apache.iotdb.db.storageengine.rescon.memory.TimePartitionManager;
 import org.apache.iotdb.db.storageengine.rescon.memory.TsFileResourceManager;
 import 
org.apache.iotdb.db.storageengine.rescon.quotas.DataNodeSpaceQuotaManager;
+import org.apache.iotdb.db.tools.DelayAnalyzer;
 import org.apache.iotdb.db.tools.settle.TsFileAndModSettleTool;
 import org.apache.iotdb.db.utils.CommonUtils;
 import org.apache.iotdb.db.utils.DateTimeUtils;
@@ -379,6 +380,9 @@ public class DataRegion implements IDataRegionForQuery {
   private ILoadDiskSelector ordinaryLoadDiskSelector;
   private ILoadDiskSelector pipeAndIoTV2LoadDiskSelector;
 
+  /** Delay analyzer for tracking data arrival delays and calculating safe 
watermarks */
+  private final DelayAnalyzer delayAnalyzer;
+
   /**
    * Construct a database processor.
    *
@@ -397,6 +401,14 @@ public class DataRegion implements IDataRegionForQuery {
     this.dataRegionId = new DataRegionId(Integer.parseInt(dataRegionIdString));
     this.databaseName = databaseName;
     this.fileFlushPolicy = fileFlushPolicy;
+    this.delayAnalyzer =
+        config.isEnableDelayAnalyzer()
+            ? new DelayAnalyzer(
+                config.getDelayAnalyzerWindowSize(),
+                config.getDelayAnalyzerMinWindowSize(),
+                config.getDelayAnalyzerMaxWindowSize(),
+                config.getDelayAnalyzerConfidenceLevel())
+            : null;
     acquireDirectBufferMemory();
 
     dataRegionSysDir = SystemFileFactory.INSTANCE.getFile(systemDir, 
dataRegionIdString);
@@ -463,6 +475,14 @@ public class DataRegion implements IDataRegionForQuery {
     partitionMaxFileVersions.put(0L, 0L);
     upgradeModFileThreadPool = null;
     this.metrics = new DataRegionMetrics(this);
+    this.delayAnalyzer =
+        config.isEnableDelayAnalyzer()
+            ? new DelayAnalyzer(
+                config.getDelayAnalyzerWindowSize(),
+                config.getDelayAnalyzerMinWindowSize(),
+                config.getDelayAnalyzerMaxWindowSize(),
+                config.getDelayAnalyzerConfidenceLevel())
+            : null;
 
     initDiskSelector();
   }
@@ -1161,6 +1181,12 @@ public class DataRegion implements IDataRegionForQuery {
       if (deleted) {
         return;
       }
+      if (delayAnalyzer != null) {
+        long arrivalTime = System.currentTimeMillis();
+        long generationTime = insertRowNode.getTime();
+        delayAnalyzer.update(generationTime, arrivalTime);
+      }
+
       // init map
       long timePartitionId = 
TimePartitionUtils.getTimePartitionId(insertRowNode.getTime());
       initFlushTimeMap(timePartitionId);
@@ -1346,6 +1372,13 @@ public class DataRegion implements IDataRegionForQuery {
             "Won't insert tablet {}, because region is deleted", 
insertTabletNode.getSearchIndex());
         return;
       }
+      if (delayAnalyzer != null) {
+        long arrivalTime = System.currentTimeMillis();
+        long[] times = insertTabletNode.getTimes();
+        for (long generationTime : times) {
+          delayAnalyzer.update(generationTime, arrivalTime);
+        }
+      }
       TSStatus[] results = new TSStatus[insertTabletNode.getRowCount()];
       Arrays.fill(results, RpcUtils.SUCCESS_STATUS);
       long[] infoForMetrics = new long[5];
@@ -4403,6 +4436,10 @@ public class DataRegion implements IDataRegionForQuery {
       Map<TsFileProcessor, InsertRowsNode> tsFileProcessorMap = new 
HashMap<>();
       for (int i = 0; i < 
insertRowsOfOneDeviceNode.getInsertRowNodeList().size(); i++) {
         InsertRowNode insertRowNode = 
insertRowsOfOneDeviceNode.getInsertRowNodeList().get(i);
+        if (delayAnalyzer != null) {
+          long arrivalTime = System.currentTimeMillis();
+          delayAnalyzer.update(insertRowNode.getTime(), arrivalTime);
+        }
         if (!CommonUtils.isAlive(insertRowNode.getTime(), ttl)) {
           // we do not need to write these part of data, as they can not be 
queried
           // or the sub-plan has already been executed, we are retrying other 
sub-plans
@@ -4522,6 +4559,10 @@ public class DataRegion implements IDataRegionForQuery {
       long[] timePartitionIds = new 
long[insertRowsNode.getInsertRowNodeList().size()];
       for (int i = 0; i < insertRowsNode.getInsertRowNodeList().size(); i++) {
         InsertRowNode insertRowNode = 
insertRowsNode.getInsertRowNodeList().get(i);
+        if (delayAnalyzer != null) {
+          long arrivalTime = System.currentTimeMillis();
+          delayAnalyzer.update(insertRowNode.getTime(), arrivalTime);
+        }
         long ttl = getTTL(insertRowNode);
         if (!CommonUtils.isAlive(insertRowNode.getTime(), ttl)) {
           insertRowsNode
@@ -4610,9 +4651,16 @@ public class DataRegion implements IDataRegionForQuery {
       // infoForMetrics[1]: ScheduleMemoryBlockTimeCost
       // infoForMetrics[2]: ScheduleWalTimeCost
       // infoForMetrics[3]: ScheduleMemTableTimeCost
-      // infoForMetrics[4]: InsertedPointsNumber
+      // infoForMetrics[4]: InsertedPointsNumbe
       for (int i = 0; i < 
insertMultiTabletsNode.getInsertTabletNodeList().size(); i++) {
         InsertTabletNode insertTabletNode = 
insertMultiTabletsNode.getInsertTabletNodeList().get(i);
+        long[] times = insertTabletNode.getTimes();
+        if (delayAnalyzer != null) {
+          for (long generationTime : times) {
+            long arrivalTime = System.currentTimeMillis();
+            delayAnalyzer.update(generationTime, arrivalTime);
+          }
+        }
         TSStatus[] results = new TSStatus[insertTabletNode.getRowCount()];
         Arrays.fill(results, RpcUtils.SUCCESS_STATUS);
         boolean noFailure = false;
@@ -4916,6 +4964,16 @@ public class DataRegion implements IDataRegionForQuery {
     return tsFileManager;
   }
 
+  /**
+   * Get the delay analyzer instance for this data region
+   *
+   * @return DelayAnalyzer instance for tracking data arrival delays and 
calculating safe
+   *     watermarks, or null if delay analyzer is disabled
+   */
+  public DelayAnalyzer getDelayAnalyzer() {
+    return delayAnalyzer;
+  }
+
   private long getTTL(InsertNode insertNode) {
     if (insertNode.getTableName() == null) {
       return 
DataNodeTTLCache.getInstance().getTTLForTree(insertNode.getTargetPath().getNodes());
diff --git 
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/tools/DelayAnalyzer.java
 
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/tools/DelayAnalyzer.java
new file mode 100644
index 00000000000..bb76dac6c22
--- /dev/null
+++ 
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/tools/DelayAnalyzer.java
@@ -0,0 +1,387 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.iotdb.db.tools;
+
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import java.util.Arrays;
+import java.util.concurrent.locks.ReadWriteLock;
+import java.util.concurrent.locks.ReentrantReadWriteLock;
+
+/**
+ * DelayAnalyzer: Calculate watermarks for in-order and out-of-order data 
based on statistics and
+ * models
+ *
+ * <p>This implementation is based on the paper "Separation or Not: On 
Handling Out-of-Order
+ * Time-Series Data" (ICDE 2022) proposed method, which dynamically calculates 
safe watermarks by
+ * analyzing the Cumulative Distribution Function (CDF) of data arrival delays.
+ *
+ * <p>Core concepts:
+ *
+ * <ul>
+ *   <li>Record the generation time (Event Time) and arrival time (System 
Time) of each data point
+ *   <li>Calculate delay: delay = arrivalTime - generationTime
+ *   <li>Maintain a sliding window to store recent delay samples
+ *   <li>Use CDF to calculate delay quantiles (e.g., P99) at specific 
confidence levels
+ *   <li>Safe watermark = Current system time - P99 delay
+ * </ul>
+ *
+ * <p>Key formulas from the paper:
+ *
+ * <ul>
+ *   <li>Delay calculation: p.t_d = p.t_a - p.t_g (Formula 160)
+ *   <li>Use F(x) (CDF) to estimate the probability of out-of-order data 
(Reference 110)
+ *   <li>Dynamic delay distribution: Use sliding window to maintain recent 
delay samples
+ * </ul>
+ *
+ * <p>Features:
+ *
+ * <ul>
+ *   <li>Thread-safe: Uses ReadWriteLock to support high-concurrency 
read/write operations
+ *   <li>Memory efficient: Uses circular buffer with fixed memory footprint
+ *   <li>Clock skew handling: Automatically handles negative delays (clock 
rollback)
+ *   <li>Dynamic adaptation: Sliding window automatically adapts to changes in 
delay distribution
+ * </ul>
+ *
+ * @see <a href="https://ieeexplore.ieee.org/document/9835234";>ICDE 2022 
Paper</a>
+ */
+public class DelayAnalyzer {
+
+  private static final Logger LOGGER = 
LoggerFactory.getLogger(DelayAnalyzer.class);
+
+  /** Default window size, empirical value: 10000 sample points */
+  public static final int DEFAULT_WINDOW_SIZE = 10000;
+
+  /** Recommended window size range */
+  public static final int MIN_WINDOW_SIZE = 1000;
+
+  public static final int MAX_WINDOW_SIZE = 100000;
+
+  /** Default confidence level: 99% */
+  public static final double DEFAULT_CONFIDENCE_LEVEL = 0.99;
+
+  /**
+   * Sliding window size for storing recent delay sample points. A larger 
window provides more
+   * accurate P99 calculations but increases memory and sorting overhead. The 
paper mentions using
+   * dynamic delay distribution to adapt to changes in delay patterns.
+   */
+  private final int windowSize;
+
+  /** Circular buffer storing delay samples (unit: milliseconds) */
+  private final long[] delaySamples;
+
+  /** Current write position (cursor of the circular buffer) */
+  private int cursor = 0;
+
+  /** Flag indicating whether the buffer is full */
+  private boolean isFull = false;
+
+  /** Total number of samples (for statistics) */
+  private long totalSamples = 0;
+
+  /**
+   * ReadWriteLock ensures thread safety in high-concurrency scenarios. Read 
operations (calculating
+   * quantiles) use read lock, write operations (updating delays) use write 
lock.
+   */
+  private final ReadWriteLock lock = new ReentrantReadWriteLock();
+
+  /** Confidence level for calculating safe watermarks */
+  private final double confidenceLevel;
+
+  /** Constructor using default window size */
+  public DelayAnalyzer() {
+    this(DEFAULT_WINDOW_SIZE, true);
+  }
+
+  /**
+   * Constructor
+   *
+   * @param windowSize Sliding window size, recommended range: 5000 - 10000. 
Larger windows provide
+   *     more accurate statistics but increase memory and computational 
overhead.
+   * @throws IllegalArgumentException if window size is not within valid range
+   */
+  public DelayAnalyzer(int windowSize, boolean checkSize) {
+    if (checkSize) {
+      if (windowSize < MIN_WINDOW_SIZE || windowSize > MAX_WINDOW_SIZE) {
+        throw new IllegalArgumentException(
+            String.format(
+                "Window size must be between %d and %d, got %d",
+                MIN_WINDOW_SIZE, MAX_WINDOW_SIZE, windowSize));
+      }
+    }
+
+    this.windowSize = windowSize;
+    this.delaySamples = new long[windowSize];
+    this.confidenceLevel = DEFAULT_CONFIDENCE_LEVEL;
+  }
+
+  /**
+   * Constructor with configurable window size and validation range
+   *
+   * @param windowSize Sliding window size, recommended range: 5000 - 10000. 
Larger windows provide
+   *     more accurate statistics but increase memory and computational 
overhead.
+   * @param minWindowSize Minimum window size for validation
+   * @param maxWindowSize Maximum window size for validation
+   * @throws IllegalArgumentException if window size is not within valid range
+   */
+  public DelayAnalyzer(int windowSize, int minWindowSize, int maxWindowSize) {
+    this(windowSize, minWindowSize, maxWindowSize, DEFAULT_CONFIDENCE_LEVEL);
+  }
+
+  /**
+   * Constructor with configurable window size, validation range and 
confidence level
+   *
+   * @param windowSize Sliding window size, recommended range: 5000 - 10000. 
Larger windows provide
+   *     more accurate statistics but increase memory and computational 
overhead.
+   * @param minWindowSize Minimum window size for validation
+   * @param maxWindowSize Maximum window size for validation
+   * @param confidenceLevel Confidence level for calculating safe watermarks, 
range: (0, 1]
+   * @throws IllegalArgumentException if window size is not within valid range 
or confidence level
+   *     is invalid
+   */
+  public DelayAnalyzer(
+      int windowSize, int minWindowSize, int maxWindowSize, double 
confidenceLevel) {
+    if (windowSize < minWindowSize || windowSize > maxWindowSize) {
+      throw new IllegalArgumentException(
+          String.format(
+              "Window size must be between %d and %d, got %d",
+              minWindowSize, maxWindowSize, windowSize));
+    }
+    if (confidenceLevel <= 0 || confidenceLevel > 1) {
+      throw new IllegalArgumentException(
+          String.format("Confidence level must be between 0 and 1, got %f", 
confidenceLevel));
+    }
+
+    this.windowSize = windowSize;
+    this.delaySamples = new long[windowSize];
+    this.confidenceLevel = confidenceLevel;
+  }
+
+  /**
+   * Core method: Record a new data point
+   *
+   * <p>Corresponds to the formula in the paper: p.t_d = p.t_a - p.t_g 
(Formula 160)
+   *
+   * <p>Delay calculation logic:
+   *
+   * <ul>
+   *   <li>delay = arrivalTime - generationTime
+   *   <li>If delay < 0, it indicates clock skew or out-of-order data, set it 
to 0 (Paper Reference
+   *       26: clock skew correction)
+   * </ul>
+   *
+   * @param generationTime Data generation time (Event Time), unit: 
milliseconds
+   * @param arrivalTime Data arrival time (System Time), unit: milliseconds
+   */
+  public void update(long generationTime, long arrivalTime) {
+    // Calculate delay; delay cannot be negative (clock skew correction, Paper 
Reference 26)
+    long delay = Math.max(0, arrivalTime - generationTime);
+
+    lock.writeLock().lock();
+    try {
+      delaySamples[cursor] = delay;
+      cursor++;
+      totalSamples++;
+
+      // Circular buffer: when cursor reaches window size, reset to 0 and mark 
as full
+      if (cursor >= windowSize) {
+        cursor = 0;
+        isFull = true;
+      }
+    } finally {
+      lock.writeLock().unlock();
+    }
+  }
+
+  /**
+   * Get the delay threshold for a specific quantile
+   *
+   * <p>Corresponds to the method in the paper using F(x) (CDF) to estimate 
the probability of
+   * out-of-order data (Reference 110).
+   *
+   * <p>Calculation steps:
+   *
+   * <ol>
+   *   <li>Get a snapshot of the current window (to avoid holding write lock 
for too long)
+   *   <li>Sort the samples
+   *   <li>Calculate index position based on quantile
+   *   <li>Return the delay value at the corresponding position
+   * </ol>
+   *
+   * @param percentile Quantile, range (0, 1], e.g., 0.99 represents 99% 
quantile (P99)
+   * @return Delay threshold, unit: milliseconds
+   * @throws IllegalArgumentException if percentile is not within valid range
+   */
+  public long getDelayQuantile(double percentile) {
+    if (percentile <= 0 || percentile > 1) {
+      throw new IllegalArgumentException(
+          String.format("Percentile must be between 0 and 1, got %f", 
percentile));
+    }
+
+    long[] snapshot;
+    int currentSize;
+
+    // Get snapshot to avoid blocking write lock for too long
+    lock.readLock().lock();
+    try {
+      if (!isFull && cursor == 0) {
+        // No data yet
+        return 0;
+      }
+      currentSize = isFull ? windowSize : cursor;
+      snapshot = Arrays.copyOf(delaySamples, currentSize);
+    } finally {
+      lock.readLock().unlock();
+    }
+
+    // Sort to calculate quantile (Arrays.sort performance is sufficient for 
small datasets)
+    Arrays.sort(snapshot);
+
+    // Calculate index corresponding to quantile
+    int index = (int) Math.ceil(currentSize * percentile) - 1;
+    // Boundary correction to prevent index out of bounds
+    index = Math.max(0, Math.min(index, currentSize - 1));
+
+    return snapshot[index];
+  }
+
+  /**
+   * Get the recommended safe watermark
+   *
+   * <p>Definition: At confidence level confidenceLevel, the probability that 
all data before this
+   * timestamp has arrived.
+   *
+   * <p>Calculation formula:
+   *
+   * <pre>
+   *   SafeWatermark = CurrentSystemTime - P99_Delay
+   * </pre>
+   *
+   * <p>Meaning: If current system time is T and P99 delay is D, then data 
before timestamp T-D has
+   * a 99% probability of having all arrived.
+   *
+   * @param currentSystemTime Current system time, unit: milliseconds
+   * @param confidenceLevel Confidence level, e.g., 0.99 represents 99% 
confidence
+   * @return Safe watermark timestamp, unit: milliseconds
+   */
+  public long getSafeWatermark(long currentSystemTime, double confidenceLevel) 
{
+    long pDelay = getDelayQuantile(confidenceLevel);
+    // Watermark = Current time - P99 delay
+    long watermark = currentSystemTime - pDelay;
+
+    if (LOGGER.isDebugEnabled()) {
+      LOGGER.debug(
+          "[DelayAnalyzer] Calculated safe watermark: {} (currentTime: {}, P{} 
delay: {}ms)",
+          watermark,
+          currentSystemTime,
+          (int) (confidenceLevel * 100),
+          pDelay);
+    }
+
+    return watermark;
+  }
+
+  /**
+   * Get the recommended safe watermark (using configured confidence level)
+   *
+   * @param currentSystemTime Current system time, unit: milliseconds
+   * @return Safe watermark timestamp, unit: milliseconds
+   */
+  public long getSafeWatermark(long currentSystemTime) {
+    return getSafeWatermark(currentSystemTime, confidenceLevel);
+  }
+
+  /**
+   * Get current statistics
+   *
+   * @return String containing statistics such as P50, P95, P99, maximum 
value, etc.
+   */
+  public String getStatistics() {
+    if (!isFull && cursor == 0) {
+      return "[DelayAnalyzer] No data collected yet";
+    }
+
+    long p50 = getDelayQuantile(0.50);
+    long p95 = getDelayQuantile(0.95);
+    long p99 = getDelayQuantile(0.99);
+    long max = getDelayQuantile(1.00);
+    int currentSize = isFull ? windowSize : cursor;
+
+    return String.format(
+        "DelayAnalyzer Statistics -> Samples: %d/%d, P50: %dms, P95: %dms, 
P99: %dms, Max: %dms",
+        currentSize, windowSize, p50, p95, p99, max);
+  }
+
+  /** Print current statistics (for debugging) */
+  public void printStatistics() {
+    LOGGER.info(getStatistics());
+  }
+
+  /**
+   * Get the number of samples in the current window
+   *
+   * @return Current valid number of samples
+   */
+  public int getCurrentSampleCount() {
+    lock.readLock().lock();
+    try {
+      return isFull ? windowSize : cursor;
+    } finally {
+      lock.readLock().unlock();
+    }
+  }
+
+  /**
+   * Get total number of samples (including samples that have been overwritten)
+   *
+   * @return Total number of samples recorded since creation
+   */
+  public long getTotalSamples() {
+    lock.readLock().lock();
+    try {
+      return totalSamples;
+    } finally {
+      lock.readLock().unlock();
+    }
+  }
+
+  /**
+   * Get window size
+   *
+   * @return Window size
+   */
+  public int getWindowSize() {
+    return windowSize;
+  }
+
+  /** Reset the analyzer, clearing all statistical data */
+  public void reset() {
+    lock.writeLock().lock();
+    try {
+      Arrays.fill(delaySamples, 0);
+      cursor = 0;
+      isFull = false;
+      totalSamples = 0;
+      LOGGER.debug("[DelayAnalyzer] DelayAnalyzer has been reset");
+    } finally {
+      lock.writeLock().unlock();
+    }
+  }
+}
diff --git 
a/iotdb-core/datanode/src/test/java/org/apache/iotdb/db/tools/DelayAnalyzerTest.java
 
b/iotdb-core/datanode/src/test/java/org/apache/iotdb/db/tools/DelayAnalyzerTest.java
new file mode 100644
index 00000000000..07ff339bbdc
--- /dev/null
+++ 
b/iotdb-core/datanode/src/test/java/org/apache/iotdb/db/tools/DelayAnalyzerTest.java
@@ -0,0 +1,361 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.iotdb.db.tools;
+
+import org.junit.Assert;
+import org.junit.Test;
+
+import java.util.Random;
+import java.util.concurrent.CountDownLatch;
+import java.util.concurrent.ExecutorService;
+import java.util.concurrent.Executors;
+import java.util.concurrent.TimeUnit;
+
+/** Unit tests for DelayAnalyzer */
+public class DelayAnalyzerTest {
+
+  /**
+   * Test 1: Basic functionality verification. Verifies simple data ingestion 
and P99 calculation to
+   * ensure CDF logic is correct.
+   */
+  @Test
+  public void testBasicQuantileCalculation() {
+    // Window size of 100
+    DelayAnalyzer analyzer = new DelayAnalyzer(100, false);
+
+    long now = System.currentTimeMillis();
+    // Ingest delay data ranging from 0ms to 100ms
+    for (int i = 0; i <= 100; i++) {
+      // arrival = now, generation = now - delay
+      analyzer.update(now - i, now);
+    }
+
+    // Verification: Since data is uniformly distributed from 0-99
+    // P50 should be around 49 or 50
+    // P99 should be 99
+    long p50 = analyzer.getDelayQuantile(0.50);
+    long p99 = analyzer.getDelayQuantile(0.99);
+    long max = analyzer.getDelayQuantile(1.00);
+
+    System.out.println("Basic Test -> P50: " + p50 + ", P99: " + p99);
+
+    // Allow 1ms calculation error (depending on rounding logic)
+    Assert.assertTrue("P50 should be around 49", Math.abs(p50 - 49) <= 1);
+    Assert.assertEquals("P99 should be 99", 99, p99);
+    Assert.assertEquals("Max should be 100", 100, max);
+  }
+
+  /**
+   * Test 2: Circular buffer eviction mechanism (sliding window). Corresponds 
to "dynamic delay
+   * distribution" mentioned in the paper. Scenario: Ingest low-latency data 
first, then
+   * high-latency data, verifying that old data is correctly evicted.
+   */
+  @Test
+  public void testCircularBufferEviction() {
+    // Extremely small window size: 5
+    DelayAnalyzer analyzer = new DelayAnalyzer(5, false);
+    long now = System.currentTimeMillis();
+
+    // Phase 1: Fill the window with small delays (10ms)
+    for (int i = 0; i < 5; i++) {
+      analyzer.update(now - 10, now);
+    }
+    Assert.assertEquals("Initial max delay should be 10", 10, 
analyzer.getDelayQuantile(1.0));
+
+    // Phase 2: Ingest new high-latency data (100ms)
+    // Writing 5 new points should evict all previous 10ms points
+    for (int i = 0; i < 5; i++) {
+      analyzer.update(now - 100, now);
+    }
+
+    // Verification: The minimum delay in the window should now be 100, old 
10ms data should be
+    // evicted
+    // If the circular logic is wrong, old data might still be read
+    long minInWindow = analyzer.getDelayQuantile(0.01); // Approximate P0
+    Assert.assertEquals("Old data (10ms) should be evicted, min should be 
100", 100, minInWindow);
+  }
+
+  /**
+   * Test 3: Negative delay handling (clock skew). The paper mentions clock 
skew can cause
+   * anomalies; the code should clamp negative values to 0.
+   */
+  @Test
+  public void testNegativeDelayHandling() {
+    DelayAnalyzer analyzer = new DelayAnalyzer(1000, true);
+    long now = 10000;
+
+    // Generation time is later than arrival time (future time), simulating 
clock rollback or
+    // desynchronization
+    analyzer.update(now + 5000, now);
+
+    long maxDelay = analyzer.getDelayQuantile(1.0);
+    Assert.assertEquals("Negative delay should be clamped to 0", 0, maxDelay);
+  }
+
+  /**
+   * Test 4: Watermark calculation logic. Verifies if getSafeWatermark 
correctly uses the quantile.
+   */
+  @Test
+  public void testSafeWatermarkLogic() {
+    DelayAnalyzer analyzer = new DelayAnalyzer(100, false);
+
+    // Inject a fixed delay of 500ms
+    analyzer.update(1000, 1500);
+
+    long currentSystemTime = 2000;
+    // P99 should be 500ms
+    // Watermark = Current(2000) - P99(500) = 1500
+    long watermark = analyzer.getSafeWatermark(currentSystemTime, 0.99);
+
+    Assert.assertEquals(1500, watermark);
+  }
+
+  /** Test 5: Empty buffer and boundary handling */
+  @Test
+  public void testEmptyAndBoundaries() {
+    DelayAnalyzer analyzer = new DelayAnalyzer(1000, true);
+
+    // 1. When no data exists
+    Assert.assertEquals("Empty buffer should return 0", 0, 
analyzer.getDelayQuantile(0.99));
+
+    // 2. Only 1 data point
+    analyzer.update(100, 150); // delay 50
+    Assert.assertEquals(50, analyzer.getDelayQuantile(0.01));
+    Assert.assertEquals(50, analyzer.getDelayQuantile(0.99));
+
+    // 3. Illegal arguments
+    Assert.assertThrows(
+        IllegalArgumentException.class,
+        () -> {
+          analyzer.getDelayQuantile(1.5);
+        });
+
+    Assert.assertThrows(
+        IllegalArgumentException.class,
+        () -> {
+          analyzer.getDelayQuantile(-0.1);
+        });
+  }
+
+  /**
+   * Test 6: Concurrency safety test. Simulates high-concurrency write 
scenarios to verify if
+   * ReadWriteLock works effectively, ensuring no exceptions or deadlocks 
occur.
+   */
+  @Test
+  public void testConcurrency() throws InterruptedException {
+    int windowSize = 2000;
+    final DelayAnalyzer analyzer = new DelayAnalyzer(windowSize, true);
+    int threadCount = 10;
+    final int writesPerThread = 5000;
+
+    ExecutorService executor = Executors.newFixedThreadPool(threadCount);
+    CountDownLatch latch = new CountDownLatch(threadCount);
+    final Random random = new Random();
+
+    // Start 10 threads writing aggressively
+    for (int i = 0; i < threadCount; i++) {
+      executor.submit(
+          () -> {
+            try {
+              for (int j = 0; j < writesPerThread; j++) {
+                long now = System.currentTimeMillis();
+                // Random delay 0-100ms
+                analyzer.update(now - random.nextInt(100), now);
+              }
+            } finally {
+              latch.countDown();
+            }
+          });
+    }
+
+    // Main thread attempts to read simultaneously
+    for (int i = 0; i < 10; i++) {
+      analyzer.getDelayQuantile(0.99);
+      Thread.sleep(10);
+    }
+
+    latch.await(5, TimeUnit.SECONDS); // Wait for all write threads to finish
+    executor.shutdown();
+
+    // Verification: Internal state should remain consistent after concurrent 
writes
+    // Simple check: Should be able to read data normally without 
IndexOutOfBounds or
+    // ConcurrentModificationException
+    long p99 = analyzer.getDelayQuantile(0.99);
+
+    Assert.assertTrue("P99 should be >= 0", p99 >= 0);
+    Assert.assertTrue("P99 should be <= 100", p99 <= 100);
+
+    System.out.println("Concurrency Test Passed. P99: " + p99);
+  }
+
+  /** Test 7: Default constructor and default confidence level */
+  @Test
+  public void testDefaultConstructor() {
+    DelayAnalyzer analyzer = new DelayAnalyzer();
+    Assert.assertEquals(
+        "Default window size should be " + DelayAnalyzer.DEFAULT_WINDOW_SIZE,
+        DelayAnalyzer.DEFAULT_WINDOW_SIZE,
+        analyzer.getWindowSize());
+
+    long now = System.currentTimeMillis();
+    analyzer.update(now - 100, now);
+
+    // Test default confidence level watermark calculation
+    long watermark1 = analyzer.getSafeWatermark(now);
+    long watermark2 = analyzer.getSafeWatermark(now, 
DelayAnalyzer.DEFAULT_CONFIDENCE_LEVEL);
+    Assert.assertEquals("Default confidence level should be consistent", 
watermark1, watermark2);
+  }
+
+  /** Test 8: Window size validation */
+  @Test
+  public void testWindowSizeValidation() {
+    // Test minimum window size
+    DelayAnalyzer analyzer1 = new DelayAnalyzer(DelayAnalyzer.MIN_WINDOW_SIZE, 
true);
+    Assert.assertEquals(DelayAnalyzer.MIN_WINDOW_SIZE, 
analyzer1.getWindowSize());
+
+    // Test maximum window size
+    DelayAnalyzer analyzer2 = new DelayAnalyzer(DelayAnalyzer.MAX_WINDOW_SIZE, 
true);
+    Assert.assertEquals(DelayAnalyzer.MAX_WINDOW_SIZE, 
analyzer2.getWindowSize());
+
+    // Test window size that is too small
+    Assert.assertThrows(
+        IllegalArgumentException.class,
+        () -> {
+          new DelayAnalyzer(DelayAnalyzer.MIN_WINDOW_SIZE - 1, true);
+        });
+
+    // Test window size that is too large
+    Assert.assertThrows(
+        IllegalArgumentException.class,
+        () -> {
+          new DelayAnalyzer(DelayAnalyzer.MAX_WINDOW_SIZE + 1, true);
+        });
+  }
+
+  /** Test 9: Statistics retrieval */
+  @Test
+  public void testStatistics() {
+    DelayAnalyzer analyzer = new DelayAnalyzer(100, false);
+    long now = System.currentTimeMillis();
+
+    // Add some data
+    for (int i = 0; i < 50; i++) {
+      analyzer.update(now - i, now);
+    }
+
+    String stats = analyzer.getStatistics();
+    Assert.assertNotNull("Statistics should not be null", stats);
+    Assert.assertTrue("Statistics should contain P50", stats.contains("P50"));
+    Assert.assertTrue("Statistics should contain P99", stats.contains("P99"));
+
+    // Test empty statistics
+    DelayAnalyzer emptyAnalyzer = new DelayAnalyzer(1000, true);
+    String emptyStats = emptyAnalyzer.getStatistics();
+    Assert.assertTrue("Empty statistics should indicate no data", 
emptyStats.contains("No data"));
+  }
+
+  /** Test 10: Reset functionality */
+  @Test
+  public void testReset() {
+    DelayAnalyzer analyzer = new DelayAnalyzer(100, false);
+    long now = System.currentTimeMillis();
+
+    // Add data
+    for (int i = 0; i < 50; i++) {
+      analyzer.update(now - i, now);
+    }
+
+    Assert.assertTrue("Should have data", analyzer.getCurrentSampleCount() > 
0);
+    Assert.assertTrue("Total samples should be > 0", 
analyzer.getTotalSamples() > 0);
+
+    // Reset
+    analyzer.reset();
+
+    Assert.assertEquals(
+        "After reset, current sample count should be 0", 0, 
analyzer.getCurrentSampleCount());
+    Assert.assertEquals("After reset, total samples should be 0", 0, 
analyzer.getTotalSamples());
+    Assert.assertEquals(
+        "After reset, quantile should return 0", 0, 
analyzer.getDelayQuantile(0.99));
+  }
+
+  /** Test 11: Sample count functionality */
+  @Test
+  public void testSampleCount() {
+    DelayAnalyzer analyzer = new DelayAnalyzer(100, false);
+    long now = System.currentTimeMillis();
+
+    // Initial state
+    Assert.assertEquals("Initial sample count should be 0", 0, 
analyzer.getCurrentSampleCount());
+    Assert.assertEquals("Initial total samples should be 0", 0, 
analyzer.getTotalSamples());
+
+    // Add 50 samples
+    for (int i = 0; i < 50; i++) {
+      analyzer.update(now - i, now);
+    }
+
+    Assert.assertEquals("Current sample count should be 50", 50, 
analyzer.getCurrentSampleCount());
+    Assert.assertEquals("Total samples should be 50", 50, 
analyzer.getTotalSamples());
+
+    // Fill the window
+    for (int i = 50; i < 150; i++) {
+      analyzer.update(now - i, now);
+    }
+
+    Assert.assertEquals(
+        "After window is full, current sample count should be 100",
+        100,
+        analyzer.getCurrentSampleCount());
+    Assert.assertEquals("Total samples should be 150", 150, 
analyzer.getTotalSamples());
+  }
+
+  /** Test 12: Accuracy of different quantiles */
+  @Test
+  public void testDifferentQuantiles() {
+    DelayAnalyzer analyzer = new DelayAnalyzer(1000, true);
+    long now = System.currentTimeMillis();
+
+    // Create uniformly distributed delays: 0-999ms
+    for (int i = 0; i < 1000; i++) {
+      analyzer.update(now - i, now);
+    }
+
+    // Verify different quantiles
+    long p10 = analyzer.getDelayQuantile(0.10);
+    long p25 = analyzer.getDelayQuantile(0.25);
+    long p50 = analyzer.getDelayQuantile(0.50);
+    long p75 = analyzer.getDelayQuantile(0.75);
+    long p90 = analyzer.getDelayQuantile(0.90);
+    long p99 = analyzer.getDelayQuantile(0.99);
+
+    // Allow 2ms error
+    Assert.assertTrue("P10 should be around 100", Math.abs(p10 - 100) <= 2);
+    Assert.assertTrue("P25 should be around 250", Math.abs(p25 - 250) <= 2);
+    Assert.assertTrue("P50 should be around 500", Math.abs(p50 - 500) <= 2);
+    Assert.assertTrue("P75 should be around 750", Math.abs(p75 - 750) <= 2);
+    Assert.assertTrue("P90 should be around 900", Math.abs(p90 - 900) <= 2);
+    Assert.assertTrue("P99 should be around 990", Math.abs(p99 - 990) <= 2);
+
+    // Verify monotonicity: quantiles should be increasing
+    Assert.assertTrue("P10 <= P25", p10 <= p25);
+    Assert.assertTrue("P25 <= P50", p25 <= p50);
+    Assert.assertTrue("P50 <= P75", p50 <= p75);
+    Assert.assertTrue("P75 <= P90", p75 <= p90);
+    Assert.assertTrue("P90 <= P99", p90 <= p99);
+  }
+}
diff --git 
a/iotdb-core/node-commons/src/assembly/resources/conf/iotdb-system.properties.template
 
b/iotdb-core/node-commons/src/assembly/resources/conf/iotdb-system.properties.template
index f4ebae2fb80..3f1c4ab41ff 100644
--- 
a/iotdb-core/node-commons/src/assembly/resources/conf/iotdb-system.properties.template
+++ 
b/iotdb-core/node-commons/src/assembly/resources/conf/iotdb-system.properties.template
@@ -1253,6 +1253,38 @@ unseq_memtable_flush_interval_in_ms=600000
 # Datatype: long
 unseq_memtable_flush_check_interval_in_ms=30000
 
+# Whether to enable delay analyzer for tracking data arrival delays and 
calculating safe watermarks.
+# The delay analyzer tracks the delay between data generation time and arrival 
time to calculate
+# safe watermarks for handling out-of-order data.
+# effectiveMode: restart
+# Datatype: boolean
+enable_delay_analyzer=true
+
+# Default window size for delay analyzer, empirical value: 10000 sample points.
+# A larger window provides more accurate P99 calculations but increases memory 
and sorting overhead.
+# effectiveMode: restart
+# Datatype: int
+delay_analyzer_window_size=10000
+
+# Minimum window size for delay analyzer validation.
+# Window size must be between min and max window size.
+# effectiveMode: restart
+# Datatype: int
+delay_analyzer_min_window_size=1000
+
+# Maximum window size for delay analyzer validation.
+# Window size must be between min and max window size.
+# effectiveMode: restart
+# Datatype: int
+delay_analyzer_max_window_size=100000
+
+# Default confidence level for delay analyzer: 99% (0.99).
+# Used for calculating safe watermarks based on delay quantiles.
+# Range: (0, 1], e.g., 0.99 represents 99% quantile (P99).
+# effectiveMode: restart
+# Datatype: double
+delay_analyzer_confidence_level=0.99
+
 # The sort algorithms used in the memtable's TVList
 # TIM: default tim sort,
 # QUICK: quick sort,


Reply via email to