This is an automated email from the ASF dual-hosted git repository.

zhihao pushed a commit to branch perf/szh/change_point_in_window
in repository https://gitbox.apache.org/repos/asf/iotdb.git

commit 195f439d93fd9f956aca85ed25ffeed9f6013e41
Author: Sh-Zh-7 <[email protected]>
AuthorDate: Sun Feb 22 00:21:36 2026 +0800

    Add ChangePoint operator and its UT
---
 .../operator/source/ChangePointOperator.java       | 537 +++++++++++++++++++
 .../operator/ChangePointOperatorTest.java          | 576 +++++++++++++++++++++
 2 files changed, 1113 insertions(+)

diff --git 
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/execution/operator/source/ChangePointOperator.java
 
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/execution/operator/source/ChangePointOperator.java
new file mode 100644
index 00000000000..71323b15f38
--- /dev/null
+++ 
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/execution/operator/source/ChangePointOperator.java
@@ -0,0 +1,537 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.iotdb.db.queryengine.execution.operator.source;
+
+import org.apache.iotdb.commons.path.IFullPath;
+import org.apache.iotdb.db.queryengine.execution.MemoryEstimationHelper;
+import org.apache.iotdb.db.queryengine.execution.operator.OperatorContext;
+import org.apache.iotdb.db.queryengine.plan.planner.plan.node.PlanNodeId;
+import 
org.apache.iotdb.db.queryengine.plan.planner.plan.parameter.SeriesScanOptions;
+import org.apache.iotdb.db.queryengine.plan.statement.component.Ordering;
+
+import org.apache.tsfile.block.column.Column;
+import org.apache.tsfile.block.column.ColumnBuilder;
+import org.apache.tsfile.common.conf.TSFileDescriptor;
+import org.apache.tsfile.enums.TSDataType;
+import org.apache.tsfile.file.metadata.statistics.Statistics;
+import org.apache.tsfile.read.common.block.TsBlock;
+import org.apache.tsfile.read.common.block.column.TimeColumnBuilder;
+import org.apache.tsfile.utils.Binary;
+import org.apache.tsfile.utils.RamUsageEstimator;
+
+import java.io.IOException;
+import java.util.List;
+import java.util.Optional;
+import java.util.concurrent.TimeUnit;
+
+/**
+ * ChangePointOperator removes consecutive identical values from a time 
series, keeping only the
+ * first occurrence when a value changes. It leverages TsFile statistics (min 
== max) at
+ * file/chunk/page level to skip segments where all values are the same as the 
previously seen
+ * value, avoiding unnecessary I/O.
+ */
+@SuppressWarnings({"squid:S3776", "squid:S135", "squid:S3740"})
+public class ChangePointOperator extends AbstractDataSourceOperator {
+
+  private static final long INSTANCE_SIZE =
+      RamUsageEstimator.shallowSizeOfInstance(ChangePointOperator.class);
+
+  private final TSDataType dataType;
+  private final boolean canUseStatistics;
+
+  private boolean finished = false;
+  private boolean isFirstPoint = true;
+
+  private boolean cacheBoolean;
+  private int cacheInt;
+  private long cacheLong;
+  private float cacheFloat;
+  private double cacheDouble;
+  private Binary cacheBinary;
+
+  public ChangePointOperator(
+      OperatorContext context,
+      PlanNodeId sourceId,
+      IFullPath seriesPath,
+      Ordering scanOrder,
+      SeriesScanOptions seriesScanOptions,
+      boolean canUseStatistics) {
+    this.sourceId = sourceId;
+    this.operatorContext = context;
+    this.dataType = seriesPath.getSeriesType();
+    this.canUseStatistics = canUseStatistics;
+    this.seriesScanUtil =
+        new SeriesScanUtil(seriesPath, scanOrder, seriesScanOptions, 
context.getInstanceContext());
+    this.maxReturnSize =
+        Math.min(maxReturnSize, 
TSFileDescriptor.getInstance().getConfig().getPageSizeInByte());
+  }
+
+  // ======================== Operator interface ========================
+
+  @Override
+  public TsBlock next() throws Exception {
+    if (retainedTsBlock != null) {
+      return getResultFromRetainedTsBlock();
+    }
+    if (resultTsBlockBuilder.isEmpty()) {
+      return null;
+    }
+    resultTsBlock = resultTsBlockBuilder.build();
+    resultTsBlockBuilder.reset();
+    return checkTsBlockSizeAndGetResult();
+  }
+
+  @Override
+  public boolean hasNext() throws Exception {
+    if (retainedTsBlock != null) {
+      return true;
+    }
+    try {
+      long maxRuntime = 
operatorContext.getMaxRunTime().roundTo(TimeUnit.NANOSECONDS);
+      long start = System.nanoTime();
+      boolean noMoreData = false;
+
+      do {
+        if (readPageData()) {
+          continue;
+        }
+        Optional<Boolean> b = readChunkData();
+        if (!b.isPresent() || b.get()) {
+          continue;
+        }
+        b = readFileData();
+        if (!b.isPresent() || b.get()) {
+          continue;
+        }
+        noMoreData = true;
+        break;
+      } while (System.nanoTime() - start < maxRuntime
+          && !resultTsBlockBuilder.isFull()
+          && retainedTsBlock == null);
+
+      finished = (resultTsBlockBuilder.isEmpty() && retainedTsBlock == null && 
noMoreData);
+      return !finished;
+    } catch (IOException e) {
+      throw new RuntimeException("Error happened while scanning the file", e);
+    }
+  }
+
+  @Override
+  public boolean isFinished() throws Exception {
+    return finished;
+  }
+
+  @Override
+  public void close() throws Exception {
+    // no resources to release
+  }
+
+  @Override
+  public long calculateMaxPeekMemory() {
+    return Math.max(maxReturnSize, 
TSFileDescriptor.getInstance().getConfig().getPageSizeInByte());
+  }
+
+  @Override
+  public long calculateMaxReturnSize() {
+    return maxReturnSize;
+  }
+
+  @Override
+  public long calculateRetainedSizeAfterCallingNext() {
+    return calculateMaxPeekMemoryWithCounter() - calculateMaxReturnSize();
+  }
+
+  @Override
+  public long ramBytesUsed() {
+    return INSTANCE_SIZE
+        + 
MemoryEstimationHelper.getEstimatedSizeOfAccountableObject(seriesScanUtil)
+        + 
MemoryEstimationHelper.getEstimatedSizeOfAccountableObject(operatorContext)
+        + MemoryEstimationHelper.getEstimatedSizeOfAccountableObject(sourceId)
+        + (resultTsBlockBuilder == null ? 0 : 
resultTsBlockBuilder.getRetainedSizeInBytes());
+  }
+
+  @Override
+  protected List<TSDataType> getResultDataTypes() {
+    return seriesScanUtil.getTsDataTypeList();
+  }
+
+  // ======================== Hierarchical scanning with statistics 
========================
+
+  private Optional<Boolean> readFileData() throws IOException {
+    Optional<Boolean> b = seriesScanUtil.hasNextFile();
+    if (!b.isPresent() || !b.get()) {
+      return b;
+    }
+
+    if (canUseStatistics && seriesScanUtil.canUseCurrentFileStatistics()) {
+      Statistics fileStatistics = seriesScanUtil.currentFileStatistics(0);
+      if (fileStatistics != null && isUniformSegment(fileStatistics)) {
+        handleUniformSegment(
+            seriesScanUtil.currentFileTimeStatistics().getStartTime(), 
fileStatistics);
+        seriesScanUtil.skipCurrentFile();
+        return Optional.of(true);
+      }
+    }
+
+    b = readChunkData();
+    if (!b.isPresent() || b.get()) {
+      return b;
+    }
+    return Optional.empty();
+  }
+
+  private Optional<Boolean> readChunkData() throws IOException {
+    Optional<Boolean> b = seriesScanUtil.hasNextChunk();
+    if (!b.isPresent() || !b.get()) {
+      return b;
+    }
+
+    if (canUseStatistics && seriesScanUtil.canUseCurrentChunkStatistics()) {
+      Statistics chunkStatistics = seriesScanUtil.currentChunkStatistics(0);
+      if (chunkStatistics != null && isUniformSegment(chunkStatistics)) {
+        handleUniformSegment(
+            seriesScanUtil.currentChunkTimeStatistics().getStartTime(), 
chunkStatistics);
+        seriesScanUtil.skipCurrentChunk();
+        return Optional.of(true);
+      }
+    }
+
+    if (readPageData()) {
+      return Optional.of(true);
+    }
+    return Optional.empty();
+  }
+
+  private boolean readPageData() throws IOException {
+    if (!seriesScanUtil.hasNextPage()) {
+      return false;
+    }
+
+    if (canUseStatistics && seriesScanUtil.canUseCurrentPageStatistics()) {
+      Statistics pageStatistics = seriesScanUtil.currentPageStatistics(0);
+      if (pageStatistics != null && isUniformSegment(pageStatistics)) {
+        handleUniformSegment(
+            seriesScanUtil.currentPageTimeStatistics().getStartTime(), 
pageStatistics);
+        seriesScanUtil.skipCurrentPage();
+        return true;
+      }
+    }
+
+    TsBlock tsBlock = seriesScanUtil.nextPage();
+    if (tsBlock == null || tsBlock.isEmpty()) {
+      return true;
+    }
+    processRawData(tsBlock);
+    return true;
+  }
+
+  // ======================== Statistics-based optimization 
========================
+
+  /** Returns true if all values in the segment are identical (min == max). */
+  private boolean isUniformSegment(Statistics statistics) {
+    return statistics.getMinValue().equals(statistics.getMaxValue());
+  }
+
+  /**
+   * Handles a uniform segment (all values identical). If this is the first 
point ever seen, or if
+   * the uniform value differs from the cached value, emit a single change 
point. Otherwise skip.
+   */
+  private void handleUniformSegment(long startTime, Statistics statistics) {
+    Object uniformValue = statistics.getMinValue();
+
+    if (isFirstPoint) {
+      isFirstPoint = false;
+      updateCacheFromStatistics(uniformValue);
+      emitPoint(startTime, uniformValue);
+    } else if (!valueEqualsCached(uniformValue)) {
+      updateCacheFromStatistics(uniformValue);
+      emitPoint(startTime, uniformValue);
+    } else {
+      updateCacheFromStatistics(uniformValue);
+    }
+  }
+
+  /** Emits a single (timestamp, value) pair into the result builder. */
+  private void emitPoint(long timestamp, Object value) {
+    TimeColumnBuilder timeColumnBuilder = 
resultTsBlockBuilder.getTimeColumnBuilder();
+    ColumnBuilder columnBuilder = resultTsBlockBuilder.getColumnBuilder(0);
+    timeColumnBuilder.writeLong(timestamp);
+
+    switch (dataType) {
+      case BOOLEAN:
+        columnBuilder.writeBoolean((Boolean) value);
+        break;
+      case INT32:
+        columnBuilder.writeInt(((Number) value).intValue());
+        break;
+      case INT64:
+        columnBuilder.writeLong(((Number) value).longValue());
+        break;
+      case FLOAT:
+        columnBuilder.writeFloat(((Number) value).floatValue());
+        break;
+      case DOUBLE:
+        columnBuilder.writeDouble(((Number) value).doubleValue());
+        break;
+      case TEXT:
+        columnBuilder.writeBinary((Binary) value);
+        break;
+      default:
+        break;
+    }
+    resultTsBlockBuilder.declarePosition();
+  }
+
+  /** Updates the typed cache fields from a statistics uniform value. */
+  private void updateCacheFromStatistics(Object value) {
+    switch (dataType) {
+      case BOOLEAN:
+        cacheBoolean = (Boolean) value;
+        break;
+      case INT32:
+        cacheInt = ((Number) value).intValue();
+        break;
+      case INT64:
+        cacheLong = ((Number) value).longValue();
+        break;
+      case FLOAT:
+        cacheFloat = ((Number) value).floatValue();
+        break;
+      case DOUBLE:
+        cacheDouble = ((Number) value).doubleValue();
+        break;
+      case TEXT:
+        cacheBinary = (Binary) value;
+        break;
+      default:
+        break;
+    }
+  }
+
+  /** Checks whether the given statistics uniform value equals the cached 
value. */
+  private boolean valueEqualsCached(Object value) {
+    switch (dataType) {
+      case BOOLEAN:
+        return cacheBoolean == (Boolean) value;
+      case INT32:
+        return cacheInt == ((Number) value).intValue();
+      case INT64:
+        return cacheLong == ((Number) value).longValue();
+      case FLOAT:
+        return Float.compare(cacheFloat, ((Number) value).floatValue()) == 0;
+      case DOUBLE:
+        return Double.compare(cacheDouble, ((Number) value).doubleValue()) == 
0;
+      case TEXT:
+        return cacheBinary != null && cacheBinary.equals(value);
+      default:
+        return false;
+    }
+  }
+
+  // ======================== Raw data processing ========================
+
+  /** Row-by-row change point detection on a TsBlock. */
+  private void processRawData(TsBlock tsBlock) {
+    int size = tsBlock.getPositionCount();
+    Column timeColumn = tsBlock.getTimeColumn();
+    Column valueColumn = tsBlock.getColumn(0);
+    TimeColumnBuilder timeColumnBuilder = 
resultTsBlockBuilder.getTimeColumnBuilder();
+    ColumnBuilder columnBuilder = resultTsBlockBuilder.getColumnBuilder(0);
+
+    switch (dataType) {
+      case BOOLEAN:
+        processBoolean(size, timeColumn, valueColumn, timeColumnBuilder, 
columnBuilder);
+        break;
+      case INT32:
+        processInt(size, timeColumn, valueColumn, timeColumnBuilder, 
columnBuilder);
+        break;
+      case INT64:
+        processLong(size, timeColumn, valueColumn, timeColumnBuilder, 
columnBuilder);
+        break;
+      case FLOAT:
+        processFloat(size, timeColumn, valueColumn, timeColumnBuilder, 
columnBuilder);
+        break;
+      case DOUBLE:
+        processDouble(size, timeColumn, valueColumn, timeColumnBuilder, 
columnBuilder);
+        break;
+      case TEXT:
+        processText(size, timeColumn, valueColumn, timeColumnBuilder, 
columnBuilder);
+        break;
+      default:
+        break;
+    }
+  }
+
+  private void processBoolean(
+      int size,
+      Column timeColumn,
+      Column valueColumn,
+      TimeColumnBuilder timeColumnBuilder,
+      ColumnBuilder columnBuilder) {
+    for (int i = 0; i < size; i++) {
+      if (valueColumn.isNull(i)) {
+        continue;
+      }
+      boolean val = valueColumn.getBoolean(i);
+      if (isFirstPoint) {
+        isFirstPoint = false;
+        cacheBoolean = val;
+        timeColumnBuilder.writeLong(timeColumn.getLong(i));
+        columnBuilder.writeBoolean(val);
+        resultTsBlockBuilder.declarePosition();
+      } else if (val != cacheBoolean) {
+        cacheBoolean = val;
+        timeColumnBuilder.writeLong(timeColumn.getLong(i));
+        columnBuilder.writeBoolean(val);
+        resultTsBlockBuilder.declarePosition();
+      }
+    }
+  }
+
+  private void processInt(
+      int size,
+      Column timeColumn,
+      Column valueColumn,
+      TimeColumnBuilder timeColumnBuilder,
+      ColumnBuilder columnBuilder) {
+    for (int i = 0; i < size; i++) {
+      if (valueColumn.isNull(i)) {
+        continue;
+      }
+      int val = valueColumn.getInt(i);
+      if (isFirstPoint) {
+        isFirstPoint = false;
+        cacheInt = val;
+        timeColumnBuilder.writeLong(timeColumn.getLong(i));
+        columnBuilder.writeInt(val);
+        resultTsBlockBuilder.declarePosition();
+      } else if (val != cacheInt) {
+        cacheInt = val;
+        timeColumnBuilder.writeLong(timeColumn.getLong(i));
+        columnBuilder.writeInt(val);
+        resultTsBlockBuilder.declarePosition();
+      }
+    }
+  }
+
+  private void processLong(
+      int size,
+      Column timeColumn,
+      Column valueColumn,
+      TimeColumnBuilder timeColumnBuilder,
+      ColumnBuilder columnBuilder) {
+    for (int i = 0; i < size; i++) {
+      if (valueColumn.isNull(i)) {
+        continue;
+      }
+      long val = valueColumn.getLong(i);
+      if (isFirstPoint) {
+        isFirstPoint = false;
+        cacheLong = val;
+        timeColumnBuilder.writeLong(timeColumn.getLong(i));
+        columnBuilder.writeLong(val);
+        resultTsBlockBuilder.declarePosition();
+      } else if (val != cacheLong) {
+        cacheLong = val;
+        timeColumnBuilder.writeLong(timeColumn.getLong(i));
+        columnBuilder.writeLong(val);
+        resultTsBlockBuilder.declarePosition();
+      }
+    }
+  }
+
+  private void processFloat(
+      int size,
+      Column timeColumn,
+      Column valueColumn,
+      TimeColumnBuilder timeColumnBuilder,
+      ColumnBuilder columnBuilder) {
+    for (int i = 0; i < size; i++) {
+      if (valueColumn.isNull(i)) {
+        continue;
+      }
+      float val = valueColumn.getFloat(i);
+      if (isFirstPoint) {
+        isFirstPoint = false;
+        cacheFloat = val;
+        timeColumnBuilder.writeLong(timeColumn.getLong(i));
+        columnBuilder.writeFloat(val);
+        resultTsBlockBuilder.declarePosition();
+      } else if (Float.compare(val, cacheFloat) != 0) {
+        cacheFloat = val;
+        timeColumnBuilder.writeLong(timeColumn.getLong(i));
+        columnBuilder.writeFloat(val);
+        resultTsBlockBuilder.declarePosition();
+      }
+    }
+  }
+
+  private void processDouble(
+      int size,
+      Column timeColumn,
+      Column valueColumn,
+      TimeColumnBuilder timeColumnBuilder,
+      ColumnBuilder columnBuilder) {
+    for (int i = 0; i < size; i++) {
+      if (valueColumn.isNull(i)) {
+        continue;
+      }
+      double val = valueColumn.getDouble(i);
+      if (isFirstPoint) {
+        isFirstPoint = false;
+        cacheDouble = val;
+        timeColumnBuilder.writeLong(timeColumn.getLong(i));
+        columnBuilder.writeDouble(val);
+        resultTsBlockBuilder.declarePosition();
+      } else if (Double.compare(val, cacheDouble) != 0) {
+        cacheDouble = val;
+        timeColumnBuilder.writeLong(timeColumn.getLong(i));
+        columnBuilder.writeDouble(val);
+        resultTsBlockBuilder.declarePosition();
+      }
+    }
+  }
+
+  private void processText(
+      int size,
+      Column timeColumn,
+      Column valueColumn,
+      TimeColumnBuilder timeColumnBuilder,
+      ColumnBuilder columnBuilder) {
+    for (int i = 0; i < size; i++) {
+      if (valueColumn.isNull(i)) {
+        continue;
+      }
+      Binary val = valueColumn.getBinary(i);
+      if (isFirstPoint) {
+        isFirstPoint = false;
+        cacheBinary = val;
+        timeColumnBuilder.writeLong(timeColumn.getLong(i));
+        columnBuilder.writeBinary(val);
+        resultTsBlockBuilder.declarePosition();
+      } else if (!val.equals(cacheBinary)) {
+        cacheBinary = val;
+        timeColumnBuilder.writeLong(timeColumn.getLong(i));
+        columnBuilder.writeBinary(val);
+        resultTsBlockBuilder.declarePosition();
+      }
+    }
+  }
+}
diff --git 
a/iotdb-core/datanode/src/test/java/org/apache/iotdb/db/queryengine/execution/operator/ChangePointOperatorTest.java
 
b/iotdb-core/datanode/src/test/java/org/apache/iotdb/db/queryengine/execution/operator/ChangePointOperatorTest.java
new file mode 100644
index 00000000000..a81fa60cc8a
--- /dev/null
+++ 
b/iotdb-core/datanode/src/test/java/org/apache/iotdb/db/queryengine/execution/operator/ChangePointOperatorTest.java
@@ -0,0 +1,576 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.iotdb.db.queryengine.execution.operator;
+
+import org.apache.iotdb.commons.concurrent.IoTDBThreadPoolFactory;
+import org.apache.iotdb.commons.path.IFullPath;
+import org.apache.iotdb.commons.path.NonAlignedFullPath;
+import org.apache.iotdb.db.queryengine.common.FragmentInstanceId;
+import org.apache.iotdb.db.queryengine.common.PlanFragmentId;
+import org.apache.iotdb.db.queryengine.common.QueryId;
+import org.apache.iotdb.db.queryengine.execution.driver.DriverContext;
+import 
org.apache.iotdb.db.queryengine.execution.fragment.FragmentInstanceContext;
+import 
org.apache.iotdb.db.queryengine.execution.fragment.FragmentInstanceStateMachine;
+import 
org.apache.iotdb.db.queryengine.execution.operator.source.ChangePointOperator;
+import org.apache.iotdb.db.queryengine.plan.planner.plan.node.PlanNodeId;
+import 
org.apache.iotdb.db.queryengine.plan.planner.plan.parameter.SeriesScanOptions;
+import org.apache.iotdb.db.queryengine.plan.statement.component.Ordering;
+import org.apache.iotdb.db.storageengine.buffer.BloomFilterCache;
+import org.apache.iotdb.db.storageengine.buffer.ChunkCache;
+import org.apache.iotdb.db.storageengine.buffer.TimeSeriesMetadataCache;
+import org.apache.iotdb.db.storageengine.dataregion.read.QueryDataSource;
+import 
org.apache.iotdb.db.storageengine.dataregion.read.control.FileReaderManager;
+import org.apache.iotdb.db.storageengine.dataregion.tsfile.TsFileResource;
+import 
org.apache.iotdb.db.storageengine.dataregion.tsfile.TsFileResourceStatus;
+import org.apache.iotdb.db.utils.EnvironmentUtils;
+import org.apache.iotdb.db.utils.constant.TestConstant;
+
+import com.google.common.collect.Sets;
+import io.airlift.units.Duration;
+import org.apache.tsfile.enums.TSDataType;
+import org.apache.tsfile.file.metadata.IDeviceID;
+import org.apache.tsfile.file.metadata.enums.CompressionType;
+import org.apache.tsfile.file.metadata.enums.TSEncoding;
+import org.apache.tsfile.read.common.block.TsBlock;
+import org.apache.tsfile.write.TsFileWriter;
+import org.apache.tsfile.write.record.TSRecord;
+import org.apache.tsfile.write.record.datapoint.IntDataPoint;
+import org.apache.tsfile.write.schema.IMeasurementSchema;
+import org.apache.tsfile.write.schema.MeasurementSchema;
+import org.junit.After;
+import org.junit.Assert;
+import org.junit.Before;
+import org.junit.Test;
+
+import java.io.File;
+import java.io.IOException;
+import java.util.ArrayList;
+import java.util.HashMap;
+import java.util.List;
+import java.util.Map;
+import java.util.Set;
+import java.util.concurrent.ExecutorService;
+import java.util.concurrent.TimeUnit;
+
+import static 
org.apache.iotdb.db.queryengine.execution.fragment.FragmentInstanceContext.createFragmentInstanceContext;
+import static org.junit.Assert.assertEquals;
+import static org.junit.Assert.assertFalse;
+import static org.junit.Assert.assertTrue;
+
+public class ChangePointOperatorTest {
+
+  private static final String SG_NAME = "root.ChangePointOperatorTest";
+  private static final String DEVICE_ID = SG_NAME + ".device0";
+  private static final String MEASUREMENT = "sensor0";
+  private static final long FLUSH_INTERVAL = 20;
+
+  private final List<TsFileResource> seqResources = new ArrayList<>();
+  private final List<TsFileResource> unSeqResources = new ArrayList<>();
+  private ExecutorService instanceNotificationExecutor;
+
+  @Before
+  public void setUp() {
+    instanceNotificationExecutor =
+        IoTDBThreadPoolFactory.newFixedThreadPool(1, 
"test-instance-notification");
+  }
+
+  @After
+  public void tearDown() throws IOException {
+    for (TsFileResource r : seqResources) {
+      r.remove();
+    }
+    for (TsFileResource r : unSeqResources) {
+      r.remove();
+    }
+    seqResources.clear();
+    unSeqResources.clear();
+    FileReaderManager.getInstance().closeAndRemoveAllOpenedReaders();
+    ChunkCache.getInstance().clear();
+    TimeSeriesMetadataCache.getInstance().clear();
+    BloomFilterCache.getInstance().clear();
+    EnvironmentUtils.cleanAllDir();
+    instanceNotificationExecutor.shutdown();
+  }
+
+  /**
+   * All values are distinct (0, 1, 2, ..., 99). Every point is a change 
point, so the operator
+   * should output all 100 points.
+   */
+  @Test
+  public void testAllDistinctValues() throws Exception {
+    int[] values = new int[100];
+    for (int i = 0; i < 100; i++) {
+      values[i] = i;
+    }
+    prepareSeqFile(0, values);
+
+    ChangePointOperator operator = createOperator(false);
+    List<long[]> result = collectResults(operator);
+
+    assertEquals(100, result.size());
+    for (int i = 0; i < 100; i++) {
+      assertEquals(i, result.get(i)[0]);
+      assertEquals(i, result.get(i)[1]);
+    }
+    operator.close();
+  }
+
+  /**
+   * All values are the same constant (42). Only the first point should be 
emitted as a change
+   * point.
+   */
+  @Test
+  public void testAllSameValues() throws Exception {
+    int[] values = new int[100];
+    for (int i = 0; i < 100; i++) {
+      values[i] = 42;
+    }
+    prepareSeqFile(0, values);
+
+    ChangePointOperator operator = createOperator(false);
+    List<long[]> result = collectResults(operator);
+
+    assertEquals(1, result.size());
+    assertEquals(0, result.get(0)[0]);
+    assertEquals(42, result.get(0)[1]);
+    operator.close();
+  }
+
+  /**
+   * All values are the same constant (42), with statistics optimization 
enabled. Should produce the
+   * same result as without statistics: only the first point.
+   */
+  @Test
+  public void testAllSameValuesWithStatistics() throws Exception {
+    int[] values = new int[100];
+    for (int i = 0; i < 100; i++) {
+      values[i] = 42;
+    }
+    prepareSeqFile(0, values);
+
+    ChangePointOperator operator = createOperator(true);
+    List<long[]> result = collectResults(operator);
+
+    assertEquals(1, result.size());
+    assertEquals(0, result.get(0)[0]);
+    assertEquals(42, result.get(0)[1]);
+    operator.close();
+  }
+
+  /**
+   * Values form runs of consecutive duplicates: 10 x value_A, 10 x value_B, 
10 x value_C, ... The
+   * operator should output only the first point of each run.
+   */
+  @Test
+  public void testConsecutiveDuplicateRuns() throws Exception {
+    int runLength = 10;
+    int numRuns = 10;
+    int totalPoints = runLength * numRuns;
+    int[] values = new int[totalPoints];
+    for (int run = 0; run < numRuns; run++) {
+      for (int j = 0; j < runLength; j++) {
+        values[run * runLength + j] = (run + 1) * 100;
+      }
+    }
+    prepareSeqFile(0, values);
+
+    ChangePointOperator operator = createOperator(false);
+    List<long[]> result = collectResults(operator);
+
+    assertEquals(numRuns, result.size());
+    for (int run = 0; run < numRuns; run++) {
+      assertEquals(run * runLength, result.get(run)[0]);
+      assertEquals((run + 1) * 100, result.get(run)[1]);
+    }
+    operator.close();
+  }
+
+  /**
+   * Same data as testConsecutiveDuplicateRuns but with statistics 
optimization enabled. The result
+   * must be identical.
+   */
+  @Test
+  public void testConsecutiveDuplicateRunsWithStatistics() throws Exception {
+    int runLength = 10;
+    int numRuns = 10;
+    int totalPoints = runLength * numRuns;
+    int[] values = new int[totalPoints];
+    for (int run = 0; run < numRuns; run++) {
+      for (int j = 0; j < runLength; j++) {
+        values[run * runLength + j] = (run + 1) * 100;
+      }
+    }
+    prepareSeqFile(0, values);
+
+    ChangePointOperator operator = createOperator(true);
+    List<long[]> result = collectResults(operator);
+
+    assertEquals(numRuns, result.size());
+    for (int run = 0; run < numRuns; run++) {
+      assertEquals(run * runLength, result.get(run)[0]);
+      assertEquals((run + 1) * 100, result.get(run)[1]);
+    }
+    operator.close();
+  }
+
+  /**
+   * Alternating pattern: each point differs from the previous one (1, 2, 1, 
2, ...). All points
+   * are change points.
+   */
+  @Test
+  public void testAlternatingValues() throws Exception {
+    int[] values = new int[60];
+    for (int i = 0; i < 60; i++) {
+      values[i] = (i % 2 == 0) ? 1 : 2;
+    }
+    prepareSeqFile(0, values);
+
+    ChangePointOperator operator = createOperator(false);
+    List<long[]> result = collectResults(operator);
+
+    assertEquals(60, result.size());
+    for (int i = 0; i < 60; i++) {
+      assertEquals(i, result.get(i)[0]);
+      assertEquals((i % 2 == 0) ? 1 : 2, result.get(i)[1]);
+    }
+    operator.close();
+  }
+
+  /**
+   * A single data point. The operator should emit exactly one change point.
+   */
+  @Test
+  public void testSinglePoint() throws Exception {
+    prepareSeqFile(0, new int[] {99});
+
+    ChangePointOperator operator = createOperator(false);
+    List<long[]> result = collectResults(operator);
+
+    assertEquals(1, result.size());
+    assertEquals(0, result.get(0)[0]);
+    assertEquals(99, result.get(0)[1]);
+    operator.close();
+  }
+
+  /**
+   * Data spans multiple TsFile pages (flush every 20 rows). Each page has a 
constant value, but
+   * value changes across pages. With statistics enabled, entire pages should 
be skipped or emit a
+   * single point.
+   *
+   * <p>Page 0 (time 0-19): all 100, Page 1 (time 20-39): all 100, Page 2 
(time 40-59): all 200
+   *
+   * <p>Expected: 2 change points: (0, 100) and (40, 200)
+   */
+  @Test
+  public void testStatisticsSkipAcrossPages() throws Exception {
+    int[] values = new int[60];
+    for (int i = 0; i < 40; i++) {
+      values[i] = 100;
+    }
+    for (int i = 40; i < 60; i++) {
+      values[i] = 200;
+    }
+    prepareSeqFile(0, values);
+
+    ChangePointOperator operatorWithStats = createOperator(true);
+    List<long[]> resultWithStats = collectResults(operatorWithStats);
+    operatorWithStats.close();
+
+    assertEquals(2, resultWithStats.size());
+    assertEquals(0, resultWithStats.get(0)[0]);
+    assertEquals(100, resultWithStats.get(0)[1]);
+    assertEquals(40, resultWithStats.get(1)[0]);
+    assertEquals(200, resultWithStats.get(1)[1]);
+  }
+
+  /**
+   * Same as testStatisticsSkipAcrossPages but without statistics. Verifies 
the raw-data path
+   * produces the same result.
+   */
+  @Test
+  public void testNoStatisticsAcrossPages() throws Exception {
+    int[] values = new int[60];
+    for (int i = 0; i < 40; i++) {
+      values[i] = 100;
+    }
+    for (int i = 40; i < 60; i++) {
+      values[i] = 200;
+    }
+    prepareSeqFile(0, values);
+
+    ChangePointOperator operatorNoStats = createOperator(false);
+    List<long[]> resultNoStats = collectResults(operatorNoStats);
+    operatorNoStats.close();
+
+    assertEquals(2, resultNoStats.size());
+    assertEquals(0, resultNoStats.get(0)[0]);
+    assertEquals(100, resultNoStats.get(0)[1]);
+    assertEquals(40, resultNoStats.get(1)[0]);
+    assertEquals(200, resultNoStats.get(1)[1]);
+  }
+
+  /**
+   * Tests that statistics and non-statistics paths yield identical results 
for mixed data where some
+   * pages are uniform and others are not.
+   *
+   * <p>Page 0 (0-19): all 5 (uniform), Page 1 (20-39): values 5,6,5,6,... 
(non-uniform), Page 2
+   * (40-59): all 6 (uniform)
+   */
+  @Test
+  public void testStatisticsAndRawPathConsistency() throws Exception {
+    int[] values = new int[60];
+    for (int i = 0; i < 20; i++) {
+      values[i] = 5;
+    }
+    for (int i = 20; i < 40; i++) {
+      values[i] = (i % 2 == 0) ? 5 : 6;
+    }
+    for (int i = 40; i < 60; i++) {
+      values[i] = 6;
+    }
+    prepareSeqFile(0, values);
+
+    ChangePointOperator opWithStats = createOperator(true);
+    List<long[]> resultWithStats = collectResults(opWithStats);
+    opWithStats.close();
+
+    tearDownResources();
+
+    // Re-create the same data
+    for (int i = 0; i < 20; i++) {
+      values[i] = 5;
+    }
+    for (int i = 20; i < 40; i++) {
+      values[i] = (i % 2 == 0) ? 5 : 6;
+    }
+    for (int i = 40; i < 60; i++) {
+      values[i] = 6;
+    }
+    prepareSeqFile(0, values);
+
+    ChangePointOperator opNoStats = createOperator(false);
+    List<long[]> resultNoStats = collectResults(opNoStats);
+    opNoStats.close();
+
+    assertEquals(resultNoStats.size(), resultWithStats.size());
+    for (int i = 0; i < resultNoStats.size(); i++) {
+      assertEquals(
+          "Mismatch at index " + i + " timestamp",
+          resultNoStats.get(i)[0],
+          resultWithStats.get(i)[0]);
+      assertEquals(
+          "Mismatch at index " + i + " value",
+          resultNoStats.get(i)[1],
+          resultWithStats.get(i)[1]);
+    }
+  }
+
+  /**
+   * Multiple seq files. File 0: all value 10 (time 0-49), File 1: all value 
20 (time 50-99). The
+   * operator should output 2 change points across files.
+   */
+  @Test
+  public void testMultipleFiles() throws Exception {
+    int[] values1 = new int[50];
+    for (int i = 0; i < 50; i++) {
+      values1[i] = 10;
+    }
+    prepareSeqFile(0, values1);
+
+    int[] values2 = new int[50];
+    for (int i = 0; i < 50; i++) {
+      values2[i] = 20;
+    }
+    prepareSeqFile(50, values2);
+
+    ChangePointOperator operator = createOperator(true);
+    List<long[]> result = collectResults(operator);
+    operator.close();
+
+    assertEquals(2, result.size());
+    assertEquals(0, result.get(0)[0]);
+    assertEquals(10, result.get(0)[1]);
+    assertEquals(50, result.get(1)[0]);
+    assertEquals(20, result.get(1)[1]);
+  }
+
+  /**
+   * Multiple files where the value does NOT change across file boundary. File 
0: all value 10 (time
+   * 0-49), File 1: all value 10 (time 50-99). Should output only 1 change 
point.
+   */
+  @Test
+  public void testMultipleFilesSameValue() throws Exception {
+    int[] values1 = new int[50];
+    for (int i = 0; i < 50; i++) {
+      values1[i] = 10;
+    }
+    prepareSeqFile(0, values1);
+
+    int[] values2 = new int[50];
+    for (int i = 0; i < 50; i++) {
+      values2[i] = 10;
+    }
+    prepareSeqFile(50, values2);
+
+    ChangePointOperator operator = createOperator(true);
+    List<long[]> result = collectResults(operator);
+    operator.close();
+
+    assertEquals(1, result.size());
+    assertEquals(0, result.get(0)[0]);
+    assertEquals(10, result.get(0)[1]);
+  }
+
+  /**
+   * Verifies isFinished() returns true after all data is consumed.
+   */
+  @Test
+  public void testIsFinished() throws Exception {
+    prepareSeqFile(0, new int[] {1, 1, 2, 2, 3});
+
+    ChangePointOperator operator = createOperator(false);
+    assertFalse(operator.isFinished());
+
+    while (operator.hasNext()) {
+      operator.next();
+    }
+    assertTrue(operator.isFinished());
+    operator.close();
+  }
+
+  // ==================== Helper methods ====================
+
+  private ChangePointOperator createOperator(boolean canUseStatistics) throws 
Exception {
+    IFullPath measurementPath =
+        new NonAlignedFullPath(
+            IDeviceID.Factory.DEFAULT_FACTORY.create(DEVICE_ID),
+            new MeasurementSchema(MEASUREMENT, TSDataType.INT32));
+    Set<String> allSensors = Sets.newHashSet(MEASUREMENT);
+
+    QueryId queryId = new QueryId("stub_query");
+    FragmentInstanceId instanceId =
+        new FragmentInstanceId(new PlanFragmentId(queryId, 0), 
"stub-instance");
+    FragmentInstanceStateMachine stateMachine =
+        new FragmentInstanceStateMachine(instanceId, 
instanceNotificationExecutor);
+    FragmentInstanceContext fragmentInstanceContext =
+        createFragmentInstanceContext(instanceId, stateMachine);
+    DriverContext driverContext = new DriverContext(fragmentInstanceContext, 
0);
+    PlanNodeId planNodeId = new PlanNodeId("1");
+    driverContext.addOperatorContext(
+        1, planNodeId, ChangePointOperator.class.getSimpleName());
+
+    SeriesScanOptions.Builder scanOptionsBuilder = new 
SeriesScanOptions.Builder();
+    scanOptionsBuilder.withAllSensors(allSensors);
+
+    ChangePointOperator operator =
+        new ChangePointOperator(
+            driverContext.getOperatorContexts().get(0),
+            planNodeId,
+            measurementPath,
+            Ordering.ASC,
+            scanOptionsBuilder.build(),
+            canUseStatistics);
+
+    operator.initQueryDataSource(
+        new QueryDataSource(seqResources, unSeqResources));
+    operator.getOperatorContext().setMaxRunTime(new Duration(500, 
TimeUnit.MILLISECONDS));
+
+    return operator;
+  }
+
+  /**
+   * Collects all (timestamp, int_value) pairs from the operator output.
+   */
+  private List<long[]> collectResults(ChangePointOperator operator) throws 
Exception {
+    List<long[]> results = new ArrayList<>();
+    while (operator.hasNext()) {
+      TsBlock tsBlock = operator.next();
+      if (tsBlock == null) {
+        continue;
+      }
+      for (int i = 0; i < tsBlock.getPositionCount(); i++) {
+        long time = tsBlock.getTimeByIndex(i);
+        int value = tsBlock.getColumn(0).getInt(i);
+        results.add(new long[] {time, value});
+      }
+    }
+    return results;
+  }
+
+  /**
+   * Creates a sequential TsFile with the given INT32 values starting at 
timeOffset. Timestamps are
+   * timeOffset, timeOffset+1, ..., timeOffset+values.length-1. The file is 
flushed every {@link
+   * #FLUSH_INTERVAL} rows to create multiple pages.
+   */
+  private void prepareSeqFile(long timeOffset, int[] values) throws Exception {
+    int fileIndex = seqResources.size();
+    File file = new File(TestConstant.getTestTsFilePath(SG_NAME, 0, 0, 
fileIndex));
+    TsFileResource resource = new TsFileResource(file);
+    resource.setStatusForTest(TsFileResourceStatus.NORMAL);
+    resource.setMinPlanIndex(fileIndex);
+    resource.setMaxPlanIndex(fileIndex);
+    resource.setVersion(fileIndex);
+
+    IMeasurementSchema schema =
+        new MeasurementSchema(MEASUREMENT, TSDataType.INT32, TSEncoding.PLAIN, 
CompressionType.UNCOMPRESSED);
+
+    if (!file.getParentFile().exists()) {
+      Assert.assertTrue(file.getParentFile().mkdirs());
+    }
+    TsFileWriter writer = new TsFileWriter(file);
+    Map<String, IMeasurementSchema> template = new HashMap<>();
+    template.put(schema.getMeasurementName(), schema);
+    writer.registerSchemaTemplate("template0", template, false);
+    writer.registerDevice(DEVICE_ID, "template0");
+
+    for (int i = 0; i < values.length; i++) {
+      long timestamp = timeOffset + i;
+      TSRecord record = new TSRecord(DEVICE_ID, timestamp);
+      record.addTuple(new IntDataPoint(MEASUREMENT, values[i]));
+      writer.writeRecord(record);
+
+      resource.updateStartTime(
+          IDeviceID.Factory.DEFAULT_FACTORY.create(DEVICE_ID), timestamp);
+      resource.updateEndTime(
+          IDeviceID.Factory.DEFAULT_FACTORY.create(DEVICE_ID), timestamp);
+
+      if ((i + 1) % FLUSH_INTERVAL == 0) {
+        writer.flush();
+      }
+    }
+    writer.close();
+
+    seqResources.add(resource);
+  }
+
+  private void tearDownResources() throws IOException {
+    for (TsFileResource r : seqResources) {
+      r.remove();
+    }
+    for (TsFileResource r : unSeqResources) {
+      r.remove();
+    }
+    seqResources.clear();
+    unSeqResources.clear();
+    FileReaderManager.getInstance().closeAndRemoveAllOpenedReaders();
+    ChunkCache.getInstance().clear();
+    TimeSeriesMetadataCache.getInstance().clear();
+    BloomFilterCache.getInstance().clear();
+  }
+}


Reply via email to