This is an automated email from the ASF dual-hosted git repository.
jiangtian pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/iotdb.git
The following commit(s) were added to refs/heads/master by this push:
new e7f80974364 Introduce delay analyzer to get watermark between sequence
data and unsequece data based on ICDE 2022 (#16886)
e7f80974364 is described below
commit e7f80974364329ea908e04fd630ffd43c145d6de
Author: Peng Junzhi <[email protected]>
AuthorDate: Wed Mar 11 20:38:58 2026 -0500
Introduce delay analyzer to get watermark between sequence data and
unsequece data based on ICDE 2022 (#16886)
* introduce delay analyzer based on ICDE 2022
* adopt to data-region
* fix ut
* fix
* fix review
* fix review
* empty commit
---
.../java/org/apache/iotdb/db/conf/IoTDBConfig.java | 58 +++
.../org/apache/iotdb/db/conf/IoTDBDescriptor.java | 46 +++
.../db/storageengine/dataregion/DataRegion.java | 60 +++-
.../org/apache/iotdb/db/tools/DelayAnalyzer.java | 387 +++++++++++++++++++++
.../apache/iotdb/db/tools/DelayAnalyzerTest.java | 361 +++++++++++++++++++
.../conf/iotdb-system.properties.template | 32 ++
6 files changed, 943 insertions(+), 1 deletion(-)
diff --git
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/conf/IoTDBConfig.java
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/conf/IoTDBConfig.java
index 98c15a2d9bf..ea2ceadfe4d 100644
---
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/conf/IoTDBConfig.java
+++
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/conf/IoTDBConfig.java
@@ -398,6 +398,24 @@ public class IoTDBConfig {
/** The interval to check whether unsequence memtables need flushing. Unit:
ms */
private long unseqMemtableFlushCheckInterval = 30 * 1000L;
+ /**
+ * Whether to enable delay analyzer for tracking data arrival delays and
calculating safe
+ * watermark
+ */
+ private boolean enableDelayAnalyzer = false;
+
+ /** Default window size for delay analyzer, empirical value: 10000 sample
points */
+ private int delayAnalyzerWindowSize = 10000;
+
+ /** Minimum window size for delay analyzer validation */
+ private int delayAnalyzerMinWindowSize = 1000;
+
+ /** Maximum window size for delay analyzer validation */
+ private int delayAnalyzerMaxWindowSize = 100000;
+
+ /** Default confidence level for delay analyzer: 99% */
+ private double delayAnalyzerConfidenceLevel = 0.99;
+
/** The sort algorithm used in TVList */
private TVListSortAlgorithm tvListSortAlgorithm = TVListSortAlgorithm.TIM;
@@ -2218,6 +2236,46 @@ public class IoTDBConfig {
this.unseqMemtableFlushCheckInterval = unseqMemtableFlushCheckInterval;
}
+ public boolean isEnableDelayAnalyzer() {
+ return enableDelayAnalyzer;
+ }
+
+ public void setEnableDelayAnalyzer(boolean enableDelayAnalyzer) {
+ this.enableDelayAnalyzer = enableDelayAnalyzer;
+ }
+
+ public int getDelayAnalyzerWindowSize() {
+ return delayAnalyzerWindowSize;
+ }
+
+ public void setDelayAnalyzerWindowSize(int delayAnalyzerWindowSize) {
+ this.delayAnalyzerWindowSize = delayAnalyzerWindowSize;
+ }
+
+ public int getDelayAnalyzerMinWindowSize() {
+ return delayAnalyzerMinWindowSize;
+ }
+
+ public void setDelayAnalyzerMinWindowSize(int delayAnalyzerMinWindowSize) {
+ this.delayAnalyzerMinWindowSize = delayAnalyzerMinWindowSize;
+ }
+
+ public int getDelayAnalyzerMaxWindowSize() {
+ return delayAnalyzerMaxWindowSize;
+ }
+
+ public void setDelayAnalyzerMaxWindowSize(int delayAnalyzerMaxWindowSize) {
+ this.delayAnalyzerMaxWindowSize = delayAnalyzerMaxWindowSize;
+ }
+
+ public double getDelayAnalyzerConfidenceLevel() {
+ return delayAnalyzerConfidenceLevel;
+ }
+
+ public void setDelayAnalyzerConfidenceLevel(double
delayAnalyzerConfidenceLevel) {
+ this.delayAnalyzerConfidenceLevel = delayAnalyzerConfidenceLevel;
+ }
+
public TVListSortAlgorithm getTvListSortAlgorithm() {
return tvListSortAlgorithm;
}
diff --git
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/conf/IoTDBDescriptor.java
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/conf/IoTDBDescriptor.java
index 6730138b2af..e086d429a4e 100644
---
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/conf/IoTDBDescriptor.java
+++
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/conf/IoTDBDescriptor.java
@@ -1144,6 +1144,52 @@ public class IoTDBDescriptor {
properties.getProperty(
"include_null_value_in_write_throughput_metric",
String.valueOf(conf.isIncludeNullValueInWriteThroughputMetric()))));
+
+ conf.setEnableDelayAnalyzer(
+ Boolean.parseBoolean(
+ properties.getProperty(
+ "enable_delay_analyzer",
+
ConfigurationFileUtils.getConfigurationDefaultValue("enable_delay_analyzer"))));
+
+ int delayAnalyzerWindowSize =
+ Integer.parseInt(
+ properties.getProperty(
+ "delay_analyzer_window_size",
+
ConfigurationFileUtils.getConfigurationDefaultValue("delay_analyzer_window_size")));
+ if (delayAnalyzerWindowSize > 0) {
+ LOGGER.warn("[DelayAnalyzer] Set delay_analyzer_window_size to {}",
delayAnalyzerWindowSize);
+ conf.setDelayAnalyzerWindowSize(delayAnalyzerWindowSize);
+ }
+
+ int delayAnalyzerMinWindowSize =
+ Integer.parseInt(
+ properties.getProperty(
+ "delay_analyzer_min_window_size",
+ ConfigurationFileUtils.getConfigurationDefaultValue(
+ "delay_analyzer_min_window_size")));
+ if (delayAnalyzerMinWindowSize > 0) {
+ conf.setDelayAnalyzerMinWindowSize(delayAnalyzerMinWindowSize);
+ }
+
+ int delayAnalyzerMaxWindowSize =
+ Integer.parseInt(
+ properties.getProperty(
+ "delay_analyzer_max_window_size",
+ ConfigurationFileUtils.getConfigurationDefaultValue(
+ "delay_analyzer_max_window_size")));
+ if (delayAnalyzerMaxWindowSize > 0) {
+ conf.setDelayAnalyzerMaxWindowSize(delayAnalyzerMaxWindowSize);
+ }
+
+ double delayAnalyzerConfidenceLevel =
+ Double.parseDouble(
+ properties.getProperty(
+ "delay_analyzer_confidence_level",
+ ConfigurationFileUtils.getConfigurationDefaultValue(
+ "delay_analyzer_confidence_level")));
+ if (delayAnalyzerConfidenceLevel > 0 && delayAnalyzerConfidenceLevel <= 1)
{
+ conf.setDelayAnalyzerConfidenceLevel(delayAnalyzerConfidenceLevel);
+ }
}
private void loadFixedSizeLimitForQuery(
diff --git
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/storageengine/dataregion/DataRegion.java
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/storageengine/dataregion/DataRegion.java
index 20d53439b95..a60c2722cd5 100644
---
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/storageengine/dataregion/DataRegion.java
+++
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/storageengine/dataregion/DataRegion.java
@@ -156,6 +156,7 @@ import
org.apache.iotdb.db.storageengine.rescon.memory.TimePartitionInfo;
import org.apache.iotdb.db.storageengine.rescon.memory.TimePartitionManager;
import org.apache.iotdb.db.storageengine.rescon.memory.TsFileResourceManager;
import
org.apache.iotdb.db.storageengine.rescon.quotas.DataNodeSpaceQuotaManager;
+import org.apache.iotdb.db.tools.DelayAnalyzer;
import org.apache.iotdb.db.tools.settle.TsFileAndModSettleTool;
import org.apache.iotdb.db.utils.CommonUtils;
import org.apache.iotdb.db.utils.DateTimeUtils;
@@ -379,6 +380,9 @@ public class DataRegion implements IDataRegionForQuery {
private ILoadDiskSelector ordinaryLoadDiskSelector;
private ILoadDiskSelector pipeAndIoTV2LoadDiskSelector;
+ /** Delay analyzer for tracking data arrival delays and calculating safe
watermarks */
+ private final DelayAnalyzer delayAnalyzer;
+
/**
* Construct a database processor.
*
@@ -397,6 +401,14 @@ public class DataRegion implements IDataRegionForQuery {
this.dataRegionId = new DataRegionId(Integer.parseInt(dataRegionIdString));
this.databaseName = databaseName;
this.fileFlushPolicy = fileFlushPolicy;
+ this.delayAnalyzer =
+ config.isEnableDelayAnalyzer()
+ ? new DelayAnalyzer(
+ config.getDelayAnalyzerWindowSize(),
+ config.getDelayAnalyzerMinWindowSize(),
+ config.getDelayAnalyzerMaxWindowSize(),
+ config.getDelayAnalyzerConfidenceLevel())
+ : null;
acquireDirectBufferMemory();
dataRegionSysDir = SystemFileFactory.INSTANCE.getFile(systemDir,
dataRegionIdString);
@@ -463,6 +475,14 @@ public class DataRegion implements IDataRegionForQuery {
partitionMaxFileVersions.put(0L, 0L);
upgradeModFileThreadPool = null;
this.metrics = new DataRegionMetrics(this);
+ this.delayAnalyzer =
+ config.isEnableDelayAnalyzer()
+ ? new DelayAnalyzer(
+ config.getDelayAnalyzerWindowSize(),
+ config.getDelayAnalyzerMinWindowSize(),
+ config.getDelayAnalyzerMaxWindowSize(),
+ config.getDelayAnalyzerConfidenceLevel())
+ : null;
initDiskSelector();
}
@@ -1161,6 +1181,12 @@ public class DataRegion implements IDataRegionForQuery {
if (deleted) {
return;
}
+ if (delayAnalyzer != null) {
+ long arrivalTime = System.currentTimeMillis();
+ long generationTime = insertRowNode.getTime();
+ delayAnalyzer.update(generationTime, arrivalTime);
+ }
+
// init map
long timePartitionId =
TimePartitionUtils.getTimePartitionId(insertRowNode.getTime());
initFlushTimeMap(timePartitionId);
@@ -1346,6 +1372,13 @@ public class DataRegion implements IDataRegionForQuery {
"Won't insert tablet {}, because region is deleted",
insertTabletNode.getSearchIndex());
return;
}
+ if (delayAnalyzer != null) {
+ long arrivalTime = System.currentTimeMillis();
+ long[] times = insertTabletNode.getTimes();
+ for (long generationTime : times) {
+ delayAnalyzer.update(generationTime, arrivalTime);
+ }
+ }
TSStatus[] results = new TSStatus[insertTabletNode.getRowCount()];
Arrays.fill(results, RpcUtils.SUCCESS_STATUS);
long[] infoForMetrics = new long[5];
@@ -4403,6 +4436,10 @@ public class DataRegion implements IDataRegionForQuery {
Map<TsFileProcessor, InsertRowsNode> tsFileProcessorMap = new
HashMap<>();
for (int i = 0; i <
insertRowsOfOneDeviceNode.getInsertRowNodeList().size(); i++) {
InsertRowNode insertRowNode =
insertRowsOfOneDeviceNode.getInsertRowNodeList().get(i);
+ if (delayAnalyzer != null) {
+ long arrivalTime = System.currentTimeMillis();
+ delayAnalyzer.update(insertRowNode.getTime(), arrivalTime);
+ }
if (!CommonUtils.isAlive(insertRowNode.getTime(), ttl)) {
// we do not need to write these part of data, as they can not be
queried
// or the sub-plan has already been executed, we are retrying other
sub-plans
@@ -4522,6 +4559,10 @@ public class DataRegion implements IDataRegionForQuery {
long[] timePartitionIds = new
long[insertRowsNode.getInsertRowNodeList().size()];
for (int i = 0; i < insertRowsNode.getInsertRowNodeList().size(); i++) {
InsertRowNode insertRowNode =
insertRowsNode.getInsertRowNodeList().get(i);
+ if (delayAnalyzer != null) {
+ long arrivalTime = System.currentTimeMillis();
+ delayAnalyzer.update(insertRowNode.getTime(), arrivalTime);
+ }
long ttl = getTTL(insertRowNode);
if (!CommonUtils.isAlive(insertRowNode.getTime(), ttl)) {
insertRowsNode
@@ -4610,9 +4651,16 @@ public class DataRegion implements IDataRegionForQuery {
// infoForMetrics[1]: ScheduleMemoryBlockTimeCost
// infoForMetrics[2]: ScheduleWalTimeCost
// infoForMetrics[3]: ScheduleMemTableTimeCost
- // infoForMetrics[4]: InsertedPointsNumber
+ // infoForMetrics[4]: InsertedPointsNumbe
for (int i = 0; i <
insertMultiTabletsNode.getInsertTabletNodeList().size(); i++) {
InsertTabletNode insertTabletNode =
insertMultiTabletsNode.getInsertTabletNodeList().get(i);
+ long[] times = insertTabletNode.getTimes();
+ if (delayAnalyzer != null) {
+ for (long generationTime : times) {
+ long arrivalTime = System.currentTimeMillis();
+ delayAnalyzer.update(generationTime, arrivalTime);
+ }
+ }
TSStatus[] results = new TSStatus[insertTabletNode.getRowCount()];
Arrays.fill(results, RpcUtils.SUCCESS_STATUS);
boolean noFailure = false;
@@ -4916,6 +4964,16 @@ public class DataRegion implements IDataRegionForQuery {
return tsFileManager;
}
+ /**
+ * Get the delay analyzer instance for this data region
+ *
+ * @return DelayAnalyzer instance for tracking data arrival delays and
calculating safe
+ * watermarks, or null if delay analyzer is disabled
+ */
+ public DelayAnalyzer getDelayAnalyzer() {
+ return delayAnalyzer;
+ }
+
private long getTTL(InsertNode insertNode) {
if (insertNode.getTableName() == null) {
return
DataNodeTTLCache.getInstance().getTTLForTree(insertNode.getTargetPath().getNodes());
diff --git
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/tools/DelayAnalyzer.java
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/tools/DelayAnalyzer.java
new file mode 100644
index 00000000000..bb76dac6c22
--- /dev/null
+++
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/tools/DelayAnalyzer.java
@@ -0,0 +1,387 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.iotdb.db.tools;
+
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import java.util.Arrays;
+import java.util.concurrent.locks.ReadWriteLock;
+import java.util.concurrent.locks.ReentrantReadWriteLock;
+
+/**
+ * DelayAnalyzer: Calculate watermarks for in-order and out-of-order data
based on statistics and
+ * models
+ *
+ * <p>This implementation is based on the paper "Separation or Not: On
Handling Out-of-Order
+ * Time-Series Data" (ICDE 2022) proposed method, which dynamically calculates
safe watermarks by
+ * analyzing the Cumulative Distribution Function (CDF) of data arrival delays.
+ *
+ * <p>Core concepts:
+ *
+ * <ul>
+ * <li>Record the generation time (Event Time) and arrival time (System
Time) of each data point
+ * <li>Calculate delay: delay = arrivalTime - generationTime
+ * <li>Maintain a sliding window to store recent delay samples
+ * <li>Use CDF to calculate delay quantiles (e.g., P99) at specific
confidence levels
+ * <li>Safe watermark = Current system time - P99 delay
+ * </ul>
+ *
+ * <p>Key formulas from the paper:
+ *
+ * <ul>
+ * <li>Delay calculation: p.t_d = p.t_a - p.t_g (Formula 160)
+ * <li>Use F(x) (CDF) to estimate the probability of out-of-order data
(Reference 110)
+ * <li>Dynamic delay distribution: Use sliding window to maintain recent
delay samples
+ * </ul>
+ *
+ * <p>Features:
+ *
+ * <ul>
+ * <li>Thread-safe: Uses ReadWriteLock to support high-concurrency
read/write operations
+ * <li>Memory efficient: Uses circular buffer with fixed memory footprint
+ * <li>Clock skew handling: Automatically handles negative delays (clock
rollback)
+ * <li>Dynamic adaptation: Sliding window automatically adapts to changes in
delay distribution
+ * </ul>
+ *
+ * @see <a href="https://ieeexplore.ieee.org/document/9835234">ICDE 2022
Paper</a>
+ */
+public class DelayAnalyzer {
+
+ private static final Logger LOGGER =
LoggerFactory.getLogger(DelayAnalyzer.class);
+
+ /** Default window size, empirical value: 10000 sample points */
+ public static final int DEFAULT_WINDOW_SIZE = 10000;
+
+ /** Recommended window size range */
+ public static final int MIN_WINDOW_SIZE = 1000;
+
+ public static final int MAX_WINDOW_SIZE = 100000;
+
+ /** Default confidence level: 99% */
+ public static final double DEFAULT_CONFIDENCE_LEVEL = 0.99;
+
+ /**
+ * Sliding window size for storing recent delay sample points. A larger
window provides more
+ * accurate P99 calculations but increases memory and sorting overhead. The
paper mentions using
+ * dynamic delay distribution to adapt to changes in delay patterns.
+ */
+ private final int windowSize;
+
+ /** Circular buffer storing delay samples (unit: milliseconds) */
+ private final long[] delaySamples;
+
+ /** Current write position (cursor of the circular buffer) */
+ private int cursor = 0;
+
+ /** Flag indicating whether the buffer is full */
+ private boolean isFull = false;
+
+ /** Total number of samples (for statistics) */
+ private long totalSamples = 0;
+
+ /**
+ * ReadWriteLock ensures thread safety in high-concurrency scenarios. Read
operations (calculating
+ * quantiles) use read lock, write operations (updating delays) use write
lock.
+ */
+ private final ReadWriteLock lock = new ReentrantReadWriteLock();
+
+ /** Confidence level for calculating safe watermarks */
+ private final double confidenceLevel;
+
+ /** Constructor using default window size */
+ public DelayAnalyzer() {
+ this(DEFAULT_WINDOW_SIZE, true);
+ }
+
+ /**
+ * Constructor
+ *
+ * @param windowSize Sliding window size, recommended range: 5000 - 10000.
Larger windows provide
+ * more accurate statistics but increase memory and computational
overhead.
+ * @throws IllegalArgumentException if window size is not within valid range
+ */
+ public DelayAnalyzer(int windowSize, boolean checkSize) {
+ if (checkSize) {
+ if (windowSize < MIN_WINDOW_SIZE || windowSize > MAX_WINDOW_SIZE) {
+ throw new IllegalArgumentException(
+ String.format(
+ "Window size must be between %d and %d, got %d",
+ MIN_WINDOW_SIZE, MAX_WINDOW_SIZE, windowSize));
+ }
+ }
+
+ this.windowSize = windowSize;
+ this.delaySamples = new long[windowSize];
+ this.confidenceLevel = DEFAULT_CONFIDENCE_LEVEL;
+ }
+
+ /**
+ * Constructor with configurable window size and validation range
+ *
+ * @param windowSize Sliding window size, recommended range: 5000 - 10000.
Larger windows provide
+ * more accurate statistics but increase memory and computational
overhead.
+ * @param minWindowSize Minimum window size for validation
+ * @param maxWindowSize Maximum window size for validation
+ * @throws IllegalArgumentException if window size is not within valid range
+ */
+ public DelayAnalyzer(int windowSize, int minWindowSize, int maxWindowSize) {
+ this(windowSize, minWindowSize, maxWindowSize, DEFAULT_CONFIDENCE_LEVEL);
+ }
+
+ /**
+ * Constructor with configurable window size, validation range and
confidence level
+ *
+ * @param windowSize Sliding window size, recommended range: 5000 - 10000.
Larger windows provide
+ * more accurate statistics but increase memory and computational
overhead.
+ * @param minWindowSize Minimum window size for validation
+ * @param maxWindowSize Maximum window size for validation
+ * @param confidenceLevel Confidence level for calculating safe watermarks,
range: (0, 1]
+ * @throws IllegalArgumentException if window size is not within valid range
or confidence level
+ * is invalid
+ */
+ public DelayAnalyzer(
+ int windowSize, int minWindowSize, int maxWindowSize, double
confidenceLevel) {
+ if (windowSize < minWindowSize || windowSize > maxWindowSize) {
+ throw new IllegalArgumentException(
+ String.format(
+ "Window size must be between %d and %d, got %d",
+ minWindowSize, maxWindowSize, windowSize));
+ }
+ if (confidenceLevel <= 0 || confidenceLevel > 1) {
+ throw new IllegalArgumentException(
+ String.format("Confidence level must be between 0 and 1, got %f",
confidenceLevel));
+ }
+
+ this.windowSize = windowSize;
+ this.delaySamples = new long[windowSize];
+ this.confidenceLevel = confidenceLevel;
+ }
+
+ /**
+ * Core method: Record a new data point
+ *
+ * <p>Corresponds to the formula in the paper: p.t_d = p.t_a - p.t_g
(Formula 160)
+ *
+ * <p>Delay calculation logic:
+ *
+ * <ul>
+ * <li>delay = arrivalTime - generationTime
+ * <li>If delay < 0, it indicates clock skew or out-of-order data, set it
to 0 (Paper Reference
+ * 26: clock skew correction)
+ * </ul>
+ *
+ * @param generationTime Data generation time (Event Time), unit:
milliseconds
+ * @param arrivalTime Data arrival time (System Time), unit: milliseconds
+ */
+ public void update(long generationTime, long arrivalTime) {
+ // Calculate delay; delay cannot be negative (clock skew correction, Paper
Reference 26)
+ long delay = Math.max(0, arrivalTime - generationTime);
+
+ lock.writeLock().lock();
+ try {
+ delaySamples[cursor] = delay;
+ cursor++;
+ totalSamples++;
+
+ // Circular buffer: when cursor reaches window size, reset to 0 and mark
as full
+ if (cursor >= windowSize) {
+ cursor = 0;
+ isFull = true;
+ }
+ } finally {
+ lock.writeLock().unlock();
+ }
+ }
+
+ /**
+ * Get the delay threshold for a specific quantile
+ *
+ * <p>Corresponds to the method in the paper using F(x) (CDF) to estimate
the probability of
+ * out-of-order data (Reference 110).
+ *
+ * <p>Calculation steps:
+ *
+ * <ol>
+ * <li>Get a snapshot of the current window (to avoid holding write lock
for too long)
+ * <li>Sort the samples
+ * <li>Calculate index position based on quantile
+ * <li>Return the delay value at the corresponding position
+ * </ol>
+ *
+ * @param percentile Quantile, range (0, 1], e.g., 0.99 represents 99%
quantile (P99)
+ * @return Delay threshold, unit: milliseconds
+ * @throws IllegalArgumentException if percentile is not within valid range
+ */
+ public long getDelayQuantile(double percentile) {
+ if (percentile <= 0 || percentile > 1) {
+ throw new IllegalArgumentException(
+ String.format("Percentile must be between 0 and 1, got %f",
percentile));
+ }
+
+ long[] snapshot;
+ int currentSize;
+
+ // Get snapshot to avoid blocking write lock for too long
+ lock.readLock().lock();
+ try {
+ if (!isFull && cursor == 0) {
+ // No data yet
+ return 0;
+ }
+ currentSize = isFull ? windowSize : cursor;
+ snapshot = Arrays.copyOf(delaySamples, currentSize);
+ } finally {
+ lock.readLock().unlock();
+ }
+
+ // Sort to calculate quantile (Arrays.sort performance is sufficient for
small datasets)
+ Arrays.sort(snapshot);
+
+ // Calculate index corresponding to quantile
+ int index = (int) Math.ceil(currentSize * percentile) - 1;
+ // Boundary correction to prevent index out of bounds
+ index = Math.max(0, Math.min(index, currentSize - 1));
+
+ return snapshot[index];
+ }
+
+ /**
+ * Get the recommended safe watermark
+ *
+ * <p>Definition: At confidence level confidenceLevel, the probability that
all data before this
+ * timestamp has arrived.
+ *
+ * <p>Calculation formula:
+ *
+ * <pre>
+ * SafeWatermark = CurrentSystemTime - P99_Delay
+ * </pre>
+ *
+ * <p>Meaning: If current system time is T and P99 delay is D, then data
before timestamp T-D has
+ * a 99% probability of having all arrived.
+ *
+ * @param currentSystemTime Current system time, unit: milliseconds
+ * @param confidenceLevel Confidence level, e.g., 0.99 represents 99%
confidence
+ * @return Safe watermark timestamp, unit: milliseconds
+ */
+ public long getSafeWatermark(long currentSystemTime, double confidenceLevel)
{
+ long pDelay = getDelayQuantile(confidenceLevel);
+ // Watermark = Current time - P99 delay
+ long watermark = currentSystemTime - pDelay;
+
+ if (LOGGER.isDebugEnabled()) {
+ LOGGER.debug(
+ "[DelayAnalyzer] Calculated safe watermark: {} (currentTime: {}, P{}
delay: {}ms)",
+ watermark,
+ currentSystemTime,
+ (int) (confidenceLevel * 100),
+ pDelay);
+ }
+
+ return watermark;
+ }
+
+ /**
+ * Get the recommended safe watermark (using configured confidence level)
+ *
+ * @param currentSystemTime Current system time, unit: milliseconds
+ * @return Safe watermark timestamp, unit: milliseconds
+ */
+ public long getSafeWatermark(long currentSystemTime) {
+ return getSafeWatermark(currentSystemTime, confidenceLevel);
+ }
+
+ /**
+ * Get current statistics
+ *
+ * @return String containing statistics such as P50, P95, P99, maximum
value, etc.
+ */
+ public String getStatistics() {
+ if (!isFull && cursor == 0) {
+ return "[DelayAnalyzer] No data collected yet";
+ }
+
+ long p50 = getDelayQuantile(0.50);
+ long p95 = getDelayQuantile(0.95);
+ long p99 = getDelayQuantile(0.99);
+ long max = getDelayQuantile(1.00);
+ int currentSize = isFull ? windowSize : cursor;
+
+ return String.format(
+ "DelayAnalyzer Statistics -> Samples: %d/%d, P50: %dms, P95: %dms,
P99: %dms, Max: %dms",
+ currentSize, windowSize, p50, p95, p99, max);
+ }
+
+ /** Print current statistics (for debugging) */
+ public void printStatistics() {
+ LOGGER.info(getStatistics());
+ }
+
+ /**
+ * Get the number of samples in the current window
+ *
+ * @return Current valid number of samples
+ */
+ public int getCurrentSampleCount() {
+ lock.readLock().lock();
+ try {
+ return isFull ? windowSize : cursor;
+ } finally {
+ lock.readLock().unlock();
+ }
+ }
+
+ /**
+ * Get total number of samples (including samples that have been overwritten)
+ *
+ * @return Total number of samples recorded since creation
+ */
+ public long getTotalSamples() {
+ lock.readLock().lock();
+ try {
+ return totalSamples;
+ } finally {
+ lock.readLock().unlock();
+ }
+ }
+
+ /**
+ * Get window size
+ *
+ * @return Window size
+ */
+ public int getWindowSize() {
+ return windowSize;
+ }
+
+ /** Reset the analyzer, clearing all statistical data */
+ public void reset() {
+ lock.writeLock().lock();
+ try {
+ Arrays.fill(delaySamples, 0);
+ cursor = 0;
+ isFull = false;
+ totalSamples = 0;
+ LOGGER.debug("[DelayAnalyzer] DelayAnalyzer has been reset");
+ } finally {
+ lock.writeLock().unlock();
+ }
+ }
+}
diff --git
a/iotdb-core/datanode/src/test/java/org/apache/iotdb/db/tools/DelayAnalyzerTest.java
b/iotdb-core/datanode/src/test/java/org/apache/iotdb/db/tools/DelayAnalyzerTest.java
new file mode 100644
index 00000000000..07ff339bbdc
--- /dev/null
+++
b/iotdb-core/datanode/src/test/java/org/apache/iotdb/db/tools/DelayAnalyzerTest.java
@@ -0,0 +1,361 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.iotdb.db.tools;
+
+import org.junit.Assert;
+import org.junit.Test;
+
+import java.util.Random;
+import java.util.concurrent.CountDownLatch;
+import java.util.concurrent.ExecutorService;
+import java.util.concurrent.Executors;
+import java.util.concurrent.TimeUnit;
+
+/** Unit tests for DelayAnalyzer */
+public class DelayAnalyzerTest {
+
+ /**
+ * Test 1: Basic functionality verification. Verifies simple data ingestion
and P99 calculation to
+ * ensure CDF logic is correct.
+ */
+ @Test
+ public void testBasicQuantileCalculation() {
+ // Window size of 100
+ DelayAnalyzer analyzer = new DelayAnalyzer(100, false);
+
+ long now = System.currentTimeMillis();
+ // Ingest delay data ranging from 0ms to 100ms
+ for (int i = 0; i <= 100; i++) {
+ // arrival = now, generation = now - delay
+ analyzer.update(now - i, now);
+ }
+
+ // Verification: Since data is uniformly distributed from 0-99
+ // P50 should be around 49 or 50
+ // P99 should be 99
+ long p50 = analyzer.getDelayQuantile(0.50);
+ long p99 = analyzer.getDelayQuantile(0.99);
+ long max = analyzer.getDelayQuantile(1.00);
+
+ System.out.println("Basic Test -> P50: " + p50 + ", P99: " + p99);
+
+ // Allow 1ms calculation error (depending on rounding logic)
+ Assert.assertTrue("P50 should be around 49", Math.abs(p50 - 49) <= 1);
+ Assert.assertEquals("P99 should be 99", 99, p99);
+ Assert.assertEquals("Max should be 100", 100, max);
+ }
+
+ /**
+ * Test 2: Circular buffer eviction mechanism (sliding window). Corresponds
to "dynamic delay
+ * distribution" mentioned in the paper. Scenario: Ingest low-latency data
first, then
+ * high-latency data, verifying that old data is correctly evicted.
+ */
+ @Test
+ public void testCircularBufferEviction() {
+ // Extremely small window size: 5
+ DelayAnalyzer analyzer = new DelayAnalyzer(5, false);
+ long now = System.currentTimeMillis();
+
+ // Phase 1: Fill the window with small delays (10ms)
+ for (int i = 0; i < 5; i++) {
+ analyzer.update(now - 10, now);
+ }
+ Assert.assertEquals("Initial max delay should be 10", 10,
analyzer.getDelayQuantile(1.0));
+
+ // Phase 2: Ingest new high-latency data (100ms)
+ // Writing 5 new points should evict all previous 10ms points
+ for (int i = 0; i < 5; i++) {
+ analyzer.update(now - 100, now);
+ }
+
+ // Verification: The minimum delay in the window should now be 100, old
10ms data should be
+ // evicted
+ // If the circular logic is wrong, old data might still be read
+ long minInWindow = analyzer.getDelayQuantile(0.01); // Approximate P0
+ Assert.assertEquals("Old data (10ms) should be evicted, min should be
100", 100, minInWindow);
+ }
+
+ /**
+ * Test 3: Negative delay handling (clock skew). The paper mentions clock
skew can cause
+ * anomalies; the code should clamp negative values to 0.
+ */
+ @Test
+ public void testNegativeDelayHandling() {
+ DelayAnalyzer analyzer = new DelayAnalyzer(1000, true);
+ long now = 10000;
+
+ // Generation time is later than arrival time (future time), simulating
clock rollback or
+ // desynchronization
+ analyzer.update(now + 5000, now);
+
+ long maxDelay = analyzer.getDelayQuantile(1.0);
+ Assert.assertEquals("Negative delay should be clamped to 0", 0, maxDelay);
+ }
+
+ /**
+ * Test 4: Watermark calculation logic. Verifies if getSafeWatermark
correctly uses the quantile.
+ */
+ @Test
+ public void testSafeWatermarkLogic() {
+ DelayAnalyzer analyzer = new DelayAnalyzer(100, false);
+
+ // Inject a fixed delay of 500ms
+ analyzer.update(1000, 1500);
+
+ long currentSystemTime = 2000;
+ // P99 should be 500ms
+ // Watermark = Current(2000) - P99(500) = 1500
+ long watermark = analyzer.getSafeWatermark(currentSystemTime, 0.99);
+
+ Assert.assertEquals(1500, watermark);
+ }
+
+ /** Test 5: Empty buffer and boundary handling */
+ @Test
+ public void testEmptyAndBoundaries() {
+ DelayAnalyzer analyzer = new DelayAnalyzer(1000, true);
+
+ // 1. When no data exists
+ Assert.assertEquals("Empty buffer should return 0", 0,
analyzer.getDelayQuantile(0.99));
+
+ // 2. Only 1 data point
+ analyzer.update(100, 150); // delay 50
+ Assert.assertEquals(50, analyzer.getDelayQuantile(0.01));
+ Assert.assertEquals(50, analyzer.getDelayQuantile(0.99));
+
+ // 3. Illegal arguments
+ Assert.assertThrows(
+ IllegalArgumentException.class,
+ () -> {
+ analyzer.getDelayQuantile(1.5);
+ });
+
+ Assert.assertThrows(
+ IllegalArgumentException.class,
+ () -> {
+ analyzer.getDelayQuantile(-0.1);
+ });
+ }
+
+ /**
+ * Test 6: Concurrency safety test. Simulates high-concurrency write
scenarios to verify if
+ * ReadWriteLock works effectively, ensuring no exceptions or deadlocks
occur.
+ */
+ @Test
+ public void testConcurrency() throws InterruptedException {
+ int windowSize = 2000;
+ final DelayAnalyzer analyzer = new DelayAnalyzer(windowSize, true);
+ int threadCount = 10;
+ final int writesPerThread = 5000;
+
+ ExecutorService executor = Executors.newFixedThreadPool(threadCount);
+ CountDownLatch latch = new CountDownLatch(threadCount);
+ final Random random = new Random();
+
+ // Start 10 threads writing aggressively
+ for (int i = 0; i < threadCount; i++) {
+ executor.submit(
+ () -> {
+ try {
+ for (int j = 0; j < writesPerThread; j++) {
+ long now = System.currentTimeMillis();
+ // Random delay 0-100ms
+ analyzer.update(now - random.nextInt(100), now);
+ }
+ } finally {
+ latch.countDown();
+ }
+ });
+ }
+
+ // Main thread attempts to read simultaneously
+ for (int i = 0; i < 10; i++) {
+ analyzer.getDelayQuantile(0.99);
+ Thread.sleep(10);
+ }
+
+ latch.await(5, TimeUnit.SECONDS); // Wait for all write threads to finish
+ executor.shutdown();
+
+ // Verification: Internal state should remain consistent after concurrent
writes
+ // Simple check: Should be able to read data normally without
IndexOutOfBounds or
+ // ConcurrentModificationException
+ long p99 = analyzer.getDelayQuantile(0.99);
+
+ Assert.assertTrue("P99 should be >= 0", p99 >= 0);
+ Assert.assertTrue("P99 should be <= 100", p99 <= 100);
+
+ System.out.println("Concurrency Test Passed. P99: " + p99);
+ }
+
+ /** Test 7: Default constructor and default confidence level */
+ @Test
+ public void testDefaultConstructor() {
+ DelayAnalyzer analyzer = new DelayAnalyzer();
+ Assert.assertEquals(
+ "Default window size should be " + DelayAnalyzer.DEFAULT_WINDOW_SIZE,
+ DelayAnalyzer.DEFAULT_WINDOW_SIZE,
+ analyzer.getWindowSize());
+
+ long now = System.currentTimeMillis();
+ analyzer.update(now - 100, now);
+
+ // Test default confidence level watermark calculation
+ long watermark1 = analyzer.getSafeWatermark(now);
+ long watermark2 = analyzer.getSafeWatermark(now,
DelayAnalyzer.DEFAULT_CONFIDENCE_LEVEL);
+ Assert.assertEquals("Default confidence level should be consistent",
watermark1, watermark2);
+ }
+
+ /** Test 8: Window size validation */
+ @Test
+ public void testWindowSizeValidation() {
+ // Test minimum window size
+ DelayAnalyzer analyzer1 = new DelayAnalyzer(DelayAnalyzer.MIN_WINDOW_SIZE,
true);
+ Assert.assertEquals(DelayAnalyzer.MIN_WINDOW_SIZE,
analyzer1.getWindowSize());
+
+ // Test maximum window size
+ DelayAnalyzer analyzer2 = new DelayAnalyzer(DelayAnalyzer.MAX_WINDOW_SIZE,
true);
+ Assert.assertEquals(DelayAnalyzer.MAX_WINDOW_SIZE,
analyzer2.getWindowSize());
+
+ // Test window size that is too small
+ Assert.assertThrows(
+ IllegalArgumentException.class,
+ () -> {
+ new DelayAnalyzer(DelayAnalyzer.MIN_WINDOW_SIZE - 1, true);
+ });
+
+ // Test window size that is too large
+ Assert.assertThrows(
+ IllegalArgumentException.class,
+ () -> {
+ new DelayAnalyzer(DelayAnalyzer.MAX_WINDOW_SIZE + 1, true);
+ });
+ }
+
+ /** Test 9: Statistics retrieval */
+ @Test
+ public void testStatistics() {
+ DelayAnalyzer analyzer = new DelayAnalyzer(100, false);
+ long now = System.currentTimeMillis();
+
+ // Add some data
+ for (int i = 0; i < 50; i++) {
+ analyzer.update(now - i, now);
+ }
+
+ String stats = analyzer.getStatistics();
+ Assert.assertNotNull("Statistics should not be null", stats);
+ Assert.assertTrue("Statistics should contain P50", stats.contains("P50"));
+ Assert.assertTrue("Statistics should contain P99", stats.contains("P99"));
+
+ // Test empty statistics
+ DelayAnalyzer emptyAnalyzer = new DelayAnalyzer(1000, true);
+ String emptyStats = emptyAnalyzer.getStatistics();
+ Assert.assertTrue("Empty statistics should indicate no data",
emptyStats.contains("No data"));
+ }
+
+ /** Test 10: Reset functionality */
+ @Test
+ public void testReset() {
+ DelayAnalyzer analyzer = new DelayAnalyzer(100, false);
+ long now = System.currentTimeMillis();
+
+ // Add data
+ for (int i = 0; i < 50; i++) {
+ analyzer.update(now - i, now);
+ }
+
+ Assert.assertTrue("Should have data", analyzer.getCurrentSampleCount() >
0);
+ Assert.assertTrue("Total samples should be > 0",
analyzer.getTotalSamples() > 0);
+
+ // Reset
+ analyzer.reset();
+
+ Assert.assertEquals(
+ "After reset, current sample count should be 0", 0,
analyzer.getCurrentSampleCount());
+ Assert.assertEquals("After reset, total samples should be 0", 0,
analyzer.getTotalSamples());
+ Assert.assertEquals(
+ "After reset, quantile should return 0", 0,
analyzer.getDelayQuantile(0.99));
+ }
+
+ /** Test 11: Sample count functionality */
+ @Test
+ public void testSampleCount() {
+ DelayAnalyzer analyzer = new DelayAnalyzer(100, false);
+ long now = System.currentTimeMillis();
+
+ // Initial state
+ Assert.assertEquals("Initial sample count should be 0", 0,
analyzer.getCurrentSampleCount());
+ Assert.assertEquals("Initial total samples should be 0", 0,
analyzer.getTotalSamples());
+
+ // Add 50 samples
+ for (int i = 0; i < 50; i++) {
+ analyzer.update(now - i, now);
+ }
+
+ Assert.assertEquals("Current sample count should be 50", 50,
analyzer.getCurrentSampleCount());
+ Assert.assertEquals("Total samples should be 50", 50,
analyzer.getTotalSamples());
+
+ // Fill the window
+ for (int i = 50; i < 150; i++) {
+ analyzer.update(now - i, now);
+ }
+
+ Assert.assertEquals(
+ "After window is full, current sample count should be 100",
+ 100,
+ analyzer.getCurrentSampleCount());
+ Assert.assertEquals("Total samples should be 150", 150,
analyzer.getTotalSamples());
+ }
+
+ /** Test 12: Accuracy of different quantiles */
+ @Test
+ public void testDifferentQuantiles() {
+ DelayAnalyzer analyzer = new DelayAnalyzer(1000, true);
+ long now = System.currentTimeMillis();
+
+ // Create uniformly distributed delays: 0-999ms
+ for (int i = 0; i < 1000; i++) {
+ analyzer.update(now - i, now);
+ }
+
+ // Verify different quantiles
+ long p10 = analyzer.getDelayQuantile(0.10);
+ long p25 = analyzer.getDelayQuantile(0.25);
+ long p50 = analyzer.getDelayQuantile(0.50);
+ long p75 = analyzer.getDelayQuantile(0.75);
+ long p90 = analyzer.getDelayQuantile(0.90);
+ long p99 = analyzer.getDelayQuantile(0.99);
+
+ // Allow 2ms error
+ Assert.assertTrue("P10 should be around 100", Math.abs(p10 - 100) <= 2);
+ Assert.assertTrue("P25 should be around 250", Math.abs(p25 - 250) <= 2);
+ Assert.assertTrue("P50 should be around 500", Math.abs(p50 - 500) <= 2);
+ Assert.assertTrue("P75 should be around 750", Math.abs(p75 - 750) <= 2);
+ Assert.assertTrue("P90 should be around 900", Math.abs(p90 - 900) <= 2);
+ Assert.assertTrue("P99 should be around 990", Math.abs(p99 - 990) <= 2);
+
+ // Verify monotonicity: quantiles should be increasing
+ Assert.assertTrue("P10 <= P25", p10 <= p25);
+ Assert.assertTrue("P25 <= P50", p25 <= p50);
+ Assert.assertTrue("P50 <= P75", p50 <= p75);
+ Assert.assertTrue("P75 <= P90", p75 <= p90);
+ Assert.assertTrue("P90 <= P99", p90 <= p99);
+ }
+}
diff --git
a/iotdb-core/node-commons/src/assembly/resources/conf/iotdb-system.properties.template
b/iotdb-core/node-commons/src/assembly/resources/conf/iotdb-system.properties.template
index f4ebae2fb80..3f1c4ab41ff 100644
---
a/iotdb-core/node-commons/src/assembly/resources/conf/iotdb-system.properties.template
+++
b/iotdb-core/node-commons/src/assembly/resources/conf/iotdb-system.properties.template
@@ -1253,6 +1253,38 @@ unseq_memtable_flush_interval_in_ms=600000
# Datatype: long
unseq_memtable_flush_check_interval_in_ms=30000
+# Whether to enable delay analyzer for tracking data arrival delays and
calculating safe watermarks.
+# The delay analyzer tracks the delay between data generation time and arrival
time to calculate
+# safe watermarks for handling out-of-order data.
+# effectiveMode: restart
+# Datatype: boolean
+enable_delay_analyzer=true
+
+# Default window size for delay analyzer, empirical value: 10000 sample points.
+# A larger window provides more accurate P99 calculations but increases memory
and sorting overhead.
+# effectiveMode: restart
+# Datatype: int
+delay_analyzer_window_size=10000
+
+# Minimum window size for delay analyzer validation.
+# Window size must be between min and max window size.
+# effectiveMode: restart
+# Datatype: int
+delay_analyzer_min_window_size=1000
+
+# Maximum window size for delay analyzer validation.
+# Window size must be between min and max window size.
+# effectiveMode: restart
+# Datatype: int
+delay_analyzer_max_window_size=100000
+
+# Default confidence level for delay analyzer: 99% (0.99).
+# Used for calculating safe watermarks based on delay quantiles.
+# Range: (0, 1], e.g., 0.99 represents 99% quantile (P99).
+# effectiveMode: restart
+# Datatype: double
+delay_analyzer_confidence_level=0.99
+
# The sort algorithms used in the memtable's TVList
# TIM: default tim sort,
# QUICK: quick sort,