This is an automated email from the ASF dual-hosted git repository.

zhihao pushed a commit to branch perf/szh/change_point_in_window
in repository https://gitbox.apache.org/repos/asf/iotdb.git

commit 5a616c51710ba16f371f0738d4b55b91894b06b0
Author: Sh-Zh-7 <[email protected]>
AuthorDate: Tue Mar 3 15:39:47 2026 +0800

    Change Point optimization prototype.
---
 .../relational/it/db/it/IoTDBChangePointIT.java    | 196 ++++++++++++++
 .../operator/process/TableChangePointOperator.java | 286 +++++++++++++++++++++
 .../plan/planner/TableOperatorGenerator.java       |  28 ++
 .../plan/planner/plan/node/PlanNodeType.java       |   4 +
 .../plan/planner/plan/node/PlanVisitor.java        |   5 +
 .../distribute/TableDistributedPlanGenerator.java  |  16 ++
 .../ReplaceFilterWindowLeadWithChangePoint.java    | 190 ++++++++++++++
 .../relational/planner/node/ChangePointNode.java   | 140 ++++++++++
 .../plan/relational/planner/node/Patterns.java     |   4 +
 .../optimizations/DistributedOptimizeFactory.java  |   6 +
 .../planner/ChangePointOptimizationTest.java       | 111 ++++++++
 .../planner/assertions/PlanMatchPattern.java       |   5 +
 12 files changed, 991 insertions(+)

diff --git 
a/integration-test/src/test/java/org/apache/iotdb/relational/it/db/it/IoTDBChangePointIT.java
 
b/integration-test/src/test/java/org/apache/iotdb/relational/it/db/it/IoTDBChangePointIT.java
new file mode 100644
index 00000000000..edfaa57a9f9
--- /dev/null
+++ 
b/integration-test/src/test/java/org/apache/iotdb/relational/it/db/it/IoTDBChangePointIT.java
@@ -0,0 +1,196 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.iotdb.relational.it.db.it;
+
+import org.apache.iotdb.it.env.EnvFactory;
+import org.apache.iotdb.it.framework.IoTDBTestRunner;
+import org.apache.iotdb.itbase.category.TableClusterIT;
+import org.apache.iotdb.itbase.category.TableLocalStandaloneIT;
+
+import org.junit.AfterClass;
+import org.junit.BeforeClass;
+import org.junit.Test;
+import org.junit.experimental.categories.Category;
+import org.junit.runner.RunWith;
+
+import java.sql.Connection;
+import java.sql.Statement;
+
+import static org.apache.iotdb.db.it.utils.TestUtils.tableResultSetEqualTest;
+import static org.junit.Assert.fail;
+
+@RunWith(IoTDBTestRunner.class)
+@Category({TableLocalStandaloneIT.class, TableClusterIT.class})
+public class IoTDBChangePointIT {
+  private static final String DATABASE_NAME = "test";
+
+  private static final String[] setupSqls =
+      new String[] {
+        "CREATE DATABASE " + DATABASE_NAME,
+        "USE " + DATABASE_NAME,
+        // Table with consecutive duplicate values for change-point detection
+        "CREATE TABLE cp_data (device STRING TAG, measurement INT64 FIELD)",
+        // d1: values 10,10,20,20,20,30 -> change points at rows with values 
10,20,30 (last of
+        // each run)
+        "INSERT INTO cp_data VALUES (1, 'd1', 10)",
+        "INSERT INTO cp_data VALUES (2, 'd1', 10)",
+        "INSERT INTO cp_data VALUES (3, 'd1', 20)",
+        "INSERT INTO cp_data VALUES (4, 'd1', 20)",
+        "INSERT INTO cp_data VALUES (5, 'd1', 20)",
+        "INSERT INTO cp_data VALUES (6, 'd1', 30)",
+        // d2: values 100,200,200,300 -> change points at rows with values 
100,200,300
+        "INSERT INTO cp_data VALUES (1, 'd2', 100)",
+        "INSERT INTO cp_data VALUES (2, 'd2', 200)",
+        "INSERT INTO cp_data VALUES (3, 'd2', 200)",
+        "INSERT INTO cp_data VALUES (4, 'd2', 300)",
+        // Table for all-same-values test
+        "CREATE TABLE cp_same (device STRING TAG, measurement INT64 FIELD)",
+        "INSERT INTO cp_same VALUES (1, 'd1', 42)",
+        "INSERT INTO cp_same VALUES (2, 'd1', 42)",
+        "INSERT INTO cp_same VALUES (3, 'd1', 42)",
+        // Table for all-different-values test
+        "CREATE TABLE cp_diff (device STRING TAG, measurement INT64 FIELD)",
+        "INSERT INTO cp_diff VALUES (1, 'd1', 1)",
+        "INSERT INTO cp_diff VALUES (2, 'd1', 2)",
+        "INSERT INTO cp_diff VALUES (3, 'd1', 3)",
+        // Table for single row test
+        "CREATE TABLE cp_single (device STRING TAG, measurement INT64 FIELD)",
+        "INSERT INTO cp_single VALUES (1, 'd1', 99)",
+        "FLUSH",
+        "CLEAR ATTRIBUTE CACHE",
+      };
+
+  @BeforeClass
+  public static void setUp() {
+    EnvFactory.getEnv().getConfig().getCommonConfig().setSortBufferSize(1024 * 
1024);
+    EnvFactory.getEnv().initClusterEnvironment();
+    try (Connection connection = EnvFactory.getEnv().getTableConnection();
+        Statement statement = connection.createStatement()) {
+      for (String sql : setupSqls) {
+        statement.execute(sql);
+      }
+    } catch (Exception e) {
+      e.printStackTrace();
+      fail("setUp failed: " + e.getMessage());
+    }
+  }
+
+  @AfterClass
+  public static void tearDown() {
+    EnvFactory.getEnv().cleanClusterEnvironment();
+  }
+
+  @Test
+  public void testChangePointDetection() {
+    // The change-point SQL pattern:
+    // SELECT * FROM (SELECT *, LEAD(measurement) OVER (...) AS next FROM t) 
WHERE measurement <>
+    // next OR next IS NULL
+    // This detects the last row of each consecutive run of identical values.
+    //
+    // d1 values: 10,10,20,20,20,30
+    //   row 1 (10): next=10, 10!=10 false -> not emitted
+    //   row 2 (10): next=20, 10!=20 true  -> emitted (last 10 before 20)
+    //   row 3 (20): next=20, 20!=20 false -> not emitted
+    //   row 4 (20): next=20, 20!=20 false -> not emitted
+    //   row 5 (20): next=30, 20!=30 true  -> emitted (last 20 before 30)
+    //   row 6 (30): next=NULL             -> emitted (last row)
+    //
+    // d2 values: 100,200,200,300
+    //   row 1 (100): next=200, true  -> emitted
+    //   row 2 (200): next=200, false -> not emitted
+    //   row 3 (200): next=300, true  -> emitted
+    //   row 4 (300): next=NULL       -> emitted
+
+    String sql =
+        "SELECT time, device, measurement, next FROM "
+            + "(SELECT *, LEAD(measurement) OVER (PARTITION BY device ORDER BY 
time) AS next FROM cp_data) "
+            + "WHERE measurement <> next OR next IS NULL "
+            + "ORDER BY device, time";
+
+    String[] expectedHeader = new String[] {"time", "device", "measurement", 
"next"};
+    String[] retArray =
+        new String[] {
+          "1970-01-01T00:00:00.002Z,d1,10,20,",
+          "1970-01-01T00:00:00.005Z,d1,20,30,",
+          "1970-01-01T00:00:00.006Z,d1,30,null,",
+          "1970-01-01T00:00:00.001Z,d2,100,200,",
+          "1970-01-01T00:00:00.003Z,d2,200,300,",
+          "1970-01-01T00:00:00.004Z,d2,300,null,",
+        };
+
+    tableResultSetEqualTest(sql, expectedHeader, retArray, DATABASE_NAME);
+  }
+
+  @Test
+  public void testChangePointAllSameValues() {
+    // All values are the same -> only the last row should be emitted (next IS 
NULL)
+    String sql =
+        "SELECT time, device, measurement, next FROM "
+            + "(SELECT *, LEAD(measurement) OVER (PARTITION BY device ORDER BY 
time) AS next FROM cp_same) "
+            + "WHERE measurement <> next OR next IS NULL "
+            + "ORDER BY device, time";
+
+    String[] expectedHeader = new String[] {"time", "device", "measurement", 
"next"};
+    String[] retArray =
+        new String[] {
+          "1970-01-01T00:00:00.003Z,d1,42,null,",
+        };
+
+    tableResultSetEqualTest(sql, expectedHeader, retArray, DATABASE_NAME);
+  }
+
+  @Test
+  public void testChangePointAllDifferentValues() {
+    // All values are different -> every row should be emitted
+    String sql =
+        "SELECT time, device, measurement, next FROM "
+            + "(SELECT *, LEAD(measurement) OVER (PARTITION BY device ORDER BY 
time) AS next FROM cp_diff) "
+            + "WHERE measurement <> next OR next IS NULL "
+            + "ORDER BY device, time";
+
+    String[] expectedHeader = new String[] {"time", "device", "measurement", 
"next"};
+    String[] retArray =
+        new String[] {
+          "1970-01-01T00:00:00.001Z,d1,1,2,",
+          "1970-01-01T00:00:00.002Z,d1,2,3,",
+          "1970-01-01T00:00:00.003Z,d1,3,null,",
+        };
+
+    tableResultSetEqualTest(sql, expectedHeader, retArray, DATABASE_NAME);
+  }
+
+  @Test
+  public void testChangePointSingleRow() {
+    // Single row -> always emitted (next IS NULL)
+    String sql =
+        "SELECT time, device, measurement, next FROM "
+            + "(SELECT *, LEAD(measurement) OVER (PARTITION BY device ORDER BY 
time) AS next FROM cp_single) "
+            + "WHERE measurement <> next OR next IS NULL "
+            + "ORDER BY device, time";
+
+    String[] expectedHeader = new String[] {"time", "device", "measurement", 
"next"};
+    String[] retArray =
+        new String[] {
+          "1970-01-01T00:00:00.001Z,d1,99,null,",
+        };
+
+    tableResultSetEqualTest(sql, expectedHeader, retArray, DATABASE_NAME);
+  }
+}
diff --git 
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/execution/operator/process/TableChangePointOperator.java
 
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/execution/operator/process/TableChangePointOperator.java
new file mode 100644
index 00000000000..b108363a9df
--- /dev/null
+++ 
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/execution/operator/process/TableChangePointOperator.java
@@ -0,0 +1,286 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.iotdb.db.queryengine.execution.operator.process;
+
+import org.apache.iotdb.db.queryengine.execution.MemoryEstimationHelper;
+import org.apache.iotdb.db.queryengine.execution.operator.Operator;
+import org.apache.iotdb.db.queryengine.execution.operator.OperatorContext;
+
+import com.google.common.util.concurrent.ListenableFuture;
+import org.apache.tsfile.block.column.Column;
+import org.apache.tsfile.block.column.ColumnBuilder;
+import org.apache.tsfile.common.conf.TSFileDescriptor;
+import org.apache.tsfile.enums.TSDataType;
+import org.apache.tsfile.read.common.block.TsBlock;
+import org.apache.tsfile.read.common.block.TsBlockBuilder;
+import org.apache.tsfile.read.common.block.column.RunLengthEncodedColumn;
+import org.apache.tsfile.utils.Binary;
+import org.apache.tsfile.utils.RamUsageEstimator;
+
+import java.util.ArrayList;
+import java.util.List;
+
+import static 
org.apache.iotdb.db.queryengine.execution.operator.source.relational.AbstractTableScanOperator.TIME_COLUMN_TEMPLATE;
+
+/**
+ * TableChangePointOperator implements "last of run" change-point detection. 
It buffers the latest
+ * row seen and emits it when the monitored column changes value or input is 
exhausted. The output
+ * includes all child columns plus a "next" column containing the new value 
(or NULL at end).
+ *
+ * <p>This replaces a Filter(Window(LEAD(...))) pattern: SELECT * FROM (SELECT 
*, LEAD(col) OVER
+ * (PARTITION BY ... ORDER BY ...) AS next FROM t) WHERE col != next OR next 
IS NULL
+ */
+public class TableChangePointOperator implements ProcessOperator {
+
+  private static final long INSTANCE_SIZE =
+      RamUsageEstimator.shallowSizeOfInstance(TableChangePointOperator.class);
+
+  private final OperatorContext operatorContext;
+  private final Operator inputOperator;
+  private final int measurementColumnIndex;
+  private final TSDataType measurementDataType;
+  private final List<TSDataType> childOutputTypes;
+  private final TsBlockBuilder tsBlockBuilder;
+
+  private boolean hasBufferedRow = false;
+  private TsBlock bufferedBlock;
+  private int bufferedPosition;
+
+  private boolean childExhausted = false;
+  private boolean finished = false;
+
+  public TableChangePointOperator(
+      OperatorContext operatorContext,
+      Operator inputOperator,
+      int measurementColumnIndex,
+      List<TSDataType> childOutputTypes) {
+    this.operatorContext = operatorContext;
+    this.inputOperator = inputOperator;
+    this.measurementColumnIndex = measurementColumnIndex;
+    this.childOutputTypes = childOutputTypes;
+    this.measurementDataType = childOutputTypes.get(measurementColumnIndex);
+
+    List<TSDataType> outputDataTypes = new ArrayList<>(childOutputTypes);
+    outputDataTypes.add(measurementDataType);
+    this.tsBlockBuilder = new TsBlockBuilder(outputDataTypes);
+  }
+
+  @Override
+  public OperatorContext getOperatorContext() {
+    return operatorContext;
+  }
+
+  @Override
+  public TsBlock next() throws Exception {
+    if (finished) {
+      return null;
+    }
+
+    tsBlockBuilder.reset();
+
+    if (!childExhausted) {
+      TsBlock inputBlock = inputOperator.nextWithTimer();
+      if (inputBlock == null) {
+        return null;
+      }
+      processBlock(inputBlock);
+    }
+
+    if (childExhausted && hasBufferedRow) {
+      emitBufferedRow(true);
+      hasBufferedRow = false;
+    }
+
+    if (childExhausted && !hasBufferedRow) {
+      finished = true;
+    }
+
+    if (tsBlockBuilder.isEmpty()) {
+      return null;
+    }
+
+    return tsBlockBuilder.build(
+        new RunLengthEncodedColumn(TIME_COLUMN_TEMPLATE, 
tsBlockBuilder.getPositionCount()));
+  }
+
+  private void processBlock(TsBlock inputBlock) {
+    Column measurementColumn = inputBlock.getColumn(measurementColumnIndex);
+    int rowCount = inputBlock.getPositionCount();
+
+    for (int i = 0; i < rowCount; i++) {
+      if (measurementColumn.isNull(i)) {
+        continue;
+      }
+
+      if (!hasBufferedRow) {
+        bufferRow(inputBlock, i);
+        continue;
+      }
+
+      if (valueChanged(measurementColumn, i)) {
+        emitBufferedRowWithNext(inputBlock, i);
+        bufferRow(inputBlock, i);
+      } else {
+        bufferRow(inputBlock, i);
+      }
+    }
+  }
+
+  private boolean valueChanged(Column newMeasurementColumn, int newPosition) {
+    Column bufferedMeasurementColumn = 
bufferedBlock.getColumn(measurementColumnIndex);
+    switch (measurementDataType) {
+      case BOOLEAN:
+        return bufferedMeasurementColumn.getBoolean(bufferedPosition)
+            != newMeasurementColumn.getBoolean(newPosition);
+      case INT32:
+        return bufferedMeasurementColumn.getInt(bufferedPosition)
+            != newMeasurementColumn.getInt(newPosition);
+      case INT64:
+      case TIMESTAMP:
+        return bufferedMeasurementColumn.getLong(bufferedPosition)
+            != newMeasurementColumn.getLong(newPosition);
+      case FLOAT:
+        return Float.compare(
+                bufferedMeasurementColumn.getFloat(bufferedPosition),
+                newMeasurementColumn.getFloat(newPosition))
+            != 0;
+      case DOUBLE:
+        return Double.compare(
+                bufferedMeasurementColumn.getDouble(bufferedPosition),
+                newMeasurementColumn.getDouble(newPosition))
+            != 0;
+      case TEXT:
+      case STRING:
+      case BLOB:
+        Binary bufferedVal = 
bufferedMeasurementColumn.getBinary(bufferedPosition);
+        Binary newVal = newMeasurementColumn.getBinary(newPosition);
+        return !bufferedVal.equals(newVal);
+      default:
+        return false;
+    }
+  }
+
+  private void bufferRow(TsBlock block, int position) {
+    hasBufferedRow = true;
+    bufferedBlock = block;
+    bufferedPosition = position;
+  }
+
+  private void emitBufferedRowWithNext(TsBlock nextBlock, int nextPosition) {
+    for (int col = 0; col < childOutputTypes.size(); col++) {
+      Column column = bufferedBlock.getColumn(col);
+      ColumnBuilder builder = tsBlockBuilder.getColumnBuilder(col);
+      if (column.isNull(bufferedPosition)) {
+        builder.appendNull();
+      } else {
+        builder.write(column, bufferedPosition);
+      }
+    }
+
+    int nextCol = childOutputTypes.size();
+    ColumnBuilder nextBuilder = tsBlockBuilder.getColumnBuilder(nextCol);
+    Column nextMeasurementColumn = nextBlock.getColumn(measurementColumnIndex);
+    if (nextMeasurementColumn.isNull(nextPosition)) {
+      nextBuilder.appendNull();
+    } else {
+      nextBuilder.write(nextMeasurementColumn, nextPosition);
+    }
+
+    tsBlockBuilder.declarePosition();
+  }
+
+  private void emitBufferedRow(boolean nextIsNull) {
+    for (int col = 0; col < childOutputTypes.size(); col++) {
+      Column column = bufferedBlock.getColumn(col);
+      ColumnBuilder builder = tsBlockBuilder.getColumnBuilder(col);
+      if (column.isNull(bufferedPosition)) {
+        builder.appendNull();
+      } else {
+        builder.write(column, bufferedPosition);
+      }
+    }
+
+    int nextCol = childOutputTypes.size();
+    ColumnBuilder nextBuilder = tsBlockBuilder.getColumnBuilder(nextCol);
+    if (nextIsNull) {
+      nextBuilder.appendNull();
+    }
+
+    tsBlockBuilder.declarePosition();
+  }
+
+  @Override
+  public boolean hasNext() throws Exception {
+    if (finished) {
+      return false;
+    }
+    if (hasBufferedRow && childExhausted) {
+      return true;
+    }
+    if (inputOperator.hasNext()) {
+      return true;
+    }
+    childExhausted = true;
+    return hasBufferedRow;
+  }
+
+  @Override
+  public void close() throws Exception {
+    inputOperator.close();
+  }
+
+  @Override
+  public boolean isFinished() throws Exception {
+    return finished;
+  }
+
+  @Override
+  public long calculateMaxPeekMemory() {
+    long maxPeekMemoryFromInput = 
inputOperator.calculateMaxPeekMemoryWithCounter();
+    long maxPeekMemoryFromCurrent =
+        TSFileDescriptor.getInstance().getConfig().getMaxTsBlockSizeInBytes();
+    return Math.max(maxPeekMemoryFromInput, maxPeekMemoryFromCurrent)
+        + inputOperator.calculateRetainedSizeAfterCallingNext();
+  }
+
+  @Override
+  public long calculateMaxReturnSize() {
+    return 
TSFileDescriptor.getInstance().getConfig().getMaxTsBlockSizeInBytes();
+  }
+
+  @Override
+  public long calculateRetainedSizeAfterCallingNext() {
+    return inputOperator.calculateRetainedSizeAfterCallingNext()
+        + 
TSFileDescriptor.getInstance().getConfig().getMaxTsBlockSizeInBytes();
+  }
+
+  @Override
+  public long ramBytesUsed() {
+    return INSTANCE_SIZE
+        + 
MemoryEstimationHelper.getEstimatedSizeOfAccountableObject(inputOperator)
+        + 
MemoryEstimationHelper.getEstimatedSizeOfAccountableObject(operatorContext)
+        + tsBlockBuilder.getRetainedSizeInBytes();
+  }
+
+  @Override
+  public ListenableFuture<?> isBlocked() {
+    return inputOperator.isBlocked();
+  }
+}
diff --git 
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/planner/TableOperatorGenerator.java
 
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/planner/TableOperatorGenerator.java
index 1bc788251a7..cb125831cc4 100644
--- 
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/planner/TableOperatorGenerator.java
+++ 
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/planner/TableOperatorGenerator.java
@@ -101,6 +101,7 @@ import 
org.apache.iotdb.db.queryengine.execution.operator.process.rowpattern.exp
 import 
org.apache.iotdb.db.queryengine.execution.operator.process.rowpattern.matcher.IrRowPatternToProgramRewriter;
 import 
org.apache.iotdb.db.queryengine.execution.operator.process.rowpattern.matcher.Matcher;
 import 
org.apache.iotdb.db.queryengine.execution.operator.process.rowpattern.matcher.Program;
+import 
org.apache.iotdb.db.queryengine.execution.operator.process.TableChangePointOperator;
 import 
org.apache.iotdb.db.queryengine.execution.operator.process.window.RowNumberOperator;
 import 
org.apache.iotdb.db.queryengine.execution.operator.process.window.TableWindowOperator;
 import 
org.apache.iotdb.db.queryengine.execution.operator.process.window.TopKRankingOperator;
@@ -208,6 +209,7 @@ import 
org.apache.iotdb.db.queryengine.plan.relational.planner.node.OutputNode;
 import 
org.apache.iotdb.db.queryengine.plan.relational.planner.node.PatternRecognitionNode;
 import 
org.apache.iotdb.db.queryengine.plan.relational.planner.node.PreviousFillNode;
 import 
org.apache.iotdb.db.queryengine.plan.relational.planner.node.ProjectNode;
+import 
org.apache.iotdb.db.queryengine.plan.relational.planner.node.ChangePointNode;
 import 
org.apache.iotdb.db.queryengine.plan.relational.planner.node.RowNumberNode;
 import 
org.apache.iotdb.db.queryengine.plan.relational.planner.node.SemiJoinNode;
 import org.apache.iotdb.db.queryengine.plan.relational.planner.node.SortNode;
@@ -4306,6 +4308,32 @@ public class TableOperatorGenerator extends 
PlanVisitor<Operator, LocalExecution
         10_000);
   }
 
+  @Override
+  public Operator visitChangePoint(ChangePointNode node, 
LocalExecutionPlanContext context) {
+    Operator child = node.getChild().accept(this, context);
+    OperatorContext operatorContext =
+        context
+            .getDriverContext()
+            .addOperatorContext(
+                context.getNextOperatorId(),
+                node.getPlanNodeId(),
+                TableChangePointOperator.class.getSimpleName());
+
+    Map<Symbol, Integer> childLayout =
+        makeLayoutFromOutputSymbols(node.getChild().getOutputSymbols());
+    List<TSDataType> childOutputTypes =
+        getOutputColumnTypes(node.getChild(), context.getTypeProvider());
+
+    Integer measurementChannel = childLayout.get(node.getMeasurementSymbol());
+    if (measurementChannel == null) {
+      throw new IllegalStateException(
+          "Measurement symbol not found in child output: " + 
node.getMeasurementSymbol());
+    }
+
+    return new TableChangePointOperator(
+        operatorContext, child, measurementChannel, childOutputTypes);
+  }
+
   @Override
   public Operator visitTopKRanking(TopKRankingNode node, 
LocalExecutionPlanContext context) {
     Operator child = node.getChild().accept(this, context);
diff --git 
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/planner/plan/node/PlanNodeType.java
 
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/planner/plan/node/PlanNodeType.java
index 55aaefe8be6..d56ae531580 100644
--- 
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/planner/plan/node/PlanNodeType.java
+++ 
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/planner/plan/node/PlanNodeType.java
@@ -129,6 +129,7 @@ import 
org.apache.iotdb.db.queryengine.plan.relational.planner.node.LinearFillNo
 import 
org.apache.iotdb.db.queryengine.plan.relational.planner.node.MarkDistinctNode;
 import 
org.apache.iotdb.db.queryengine.plan.relational.planner.node.PatternRecognitionNode;
 import 
org.apache.iotdb.db.queryengine.plan.relational.planner.node.PreviousFillNode;
+import 
org.apache.iotdb.db.queryengine.plan.relational.planner.node.ChangePointNode;
 import 
org.apache.iotdb.db.queryengine.plan.relational.planner.node.RowNumberNode;
 import 
org.apache.iotdb.db.queryengine.plan.relational.planner.node.SemiJoinNode;
 import 
org.apache.iotdb.db.queryengine.plan.relational.planner.node.TableFunctionNode;
@@ -323,6 +324,7 @@ public enum PlanNodeType {
   TABLE_TOPK_RANKING_NODE((short) 1037),
   TABLE_ROW_NUMBER_NODE((short) 1038),
   TABLE_VALUES_NODE((short) 1039),
+  TABLE_CHANGE_POINT_NODE((short) 1040),
 
   RELATIONAL_INSERT_TABLET((short) 2000),
   RELATIONAL_INSERT_ROW((short) 2001),
@@ -727,6 +729,8 @@ public enum PlanNodeType {
         return RowNumberNode.deserialize(buffer);
       case 1039:
         return ValuesNode.deserialize(buffer);
+      case 1040:
+        return ChangePointNode.deserialize(buffer);
       case 2000:
         return RelationalInsertTabletNode.deserialize(buffer);
       case 2001:
diff --git 
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/planner/plan/node/PlanVisitor.java
 
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/planner/plan/node/PlanVisitor.java
index 44f1cd8bc1f..944824f7f91 100644
--- 
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/planner/plan/node/PlanVisitor.java
+++ 
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/planner/plan/node/PlanVisitor.java
@@ -134,6 +134,7 @@ import 
org.apache.iotdb.db.queryengine.plan.relational.planner.node.LinearFillNo
 import 
org.apache.iotdb.db.queryengine.plan.relational.planner.node.MarkDistinctNode;
 import 
org.apache.iotdb.db.queryengine.plan.relational.planner.node.PatternRecognitionNode;
 import 
org.apache.iotdb.db.queryengine.plan.relational.planner.node.PreviousFillNode;
+import 
org.apache.iotdb.db.queryengine.plan.relational.planner.node.ChangePointNode;
 import 
org.apache.iotdb.db.queryengine.plan.relational.planner.node.RowNumberNode;
 import 
org.apache.iotdb.db.queryengine.plan.relational.planner.node.SemiJoinNode;
 import 
org.apache.iotdb.db.queryengine.plan.relational.planner.node.TableFunctionNode;
@@ -799,6 +800,10 @@ public abstract class PlanVisitor<R, C> {
     return visitSingleChildProcess(node, context);
   }
 
+  public R visitChangePoint(ChangePointNode node, C context) {
+    return visitSingleChildProcess(node, context);
+  }
+
   public R visitValuesNode(ValuesNode node, C context) {
     return visitPlan(node, context);
   }
diff --git 
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/relational/planner/distribute/TableDistributedPlanGenerator.java
 
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/relational/planner/distribute/TableDistributedPlanGenerator.java
index 7072b5f519f..bff078a811f 100644
--- 
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/relational/planner/distribute/TableDistributedPlanGenerator.java
+++ 
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/relational/planner/distribute/TableDistributedPlanGenerator.java
@@ -76,6 +76,7 @@ import 
org.apache.iotdb.db.queryengine.plan.relational.planner.node.OffsetNode;
 import org.apache.iotdb.db.queryengine.plan.relational.planner.node.OutputNode;
 import 
org.apache.iotdb.db.queryengine.plan.relational.planner.node.PatternRecognitionNode;
 import 
org.apache.iotdb.db.queryengine.plan.relational.planner.node.ProjectNode;
+import 
org.apache.iotdb.db.queryengine.plan.relational.planner.node.ChangePointNode;
 import 
org.apache.iotdb.db.queryengine.plan.relational.planner.node.RowNumberNode;
 import 
org.apache.iotdb.db.queryengine.plan.relational.planner.node.SemiJoinNode;
 import org.apache.iotdb.db.queryengine.plan.relational.planner.node.SortNode;
@@ -1877,6 +1878,21 @@ public class TableDistributedPlanGenerator
     }
   }
 
+  @Override
+  public List<PlanNode> visitChangePoint(ChangePointNode node, PlanContext 
context) {
+    if (node.getChildren().isEmpty()) {
+      return Collections.singletonList(node);
+    }
+
+    List<PlanNode> childrenNodes = node.getChild().accept(this, context);
+    if (childrenNodes.size() == 1) {
+      node.setChild(childrenNodes.get(0));
+      return Collections.singletonList(node);
+    } else {
+      return splitForEachChild(node, childrenNodes);
+    }
+  }
+
   @Override
   public List<PlanNode> visitTopKRanking(TopKRankingNode node, PlanContext 
context) {
     Optional<OrderingScheme> orderingScheme = 
node.getSpecification().getOrderingScheme();
diff --git 
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/relational/planner/iterative/rule/ReplaceFilterWindowLeadWithChangePoint.java
 
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/relational/planner/iterative/rule/ReplaceFilterWindowLeadWithChangePoint.java
new file mode 100644
index 00000000000..abe49a626e0
--- /dev/null
+++ 
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/relational/planner/iterative/rule/ReplaceFilterWindowLeadWithChangePoint.java
@@ -0,0 +1,190 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.iotdb.db.queryengine.plan.relational.planner.iterative.rule;
+
+import org.apache.iotdb.db.queryengine.plan.relational.planner.Symbol;
+import org.apache.iotdb.db.queryengine.plan.relational.planner.iterative.Rule;
+import 
org.apache.iotdb.db.queryengine.plan.relational.planner.node.ChangePointNode;
+import org.apache.iotdb.db.queryengine.plan.relational.planner.node.FilterNode;
+import org.apache.iotdb.db.queryengine.plan.relational.planner.node.WindowNode;
+import 
org.apache.iotdb.db.queryengine.plan.relational.sql.ast.ComparisonExpression;
+import org.apache.iotdb.db.queryengine.plan.relational.sql.ast.Expression;
+import org.apache.iotdb.db.queryengine.plan.relational.sql.ast.IsNullPredicate;
+import org.apache.iotdb.db.queryengine.plan.relational.sql.ast.Literal;
+import 
org.apache.iotdb.db.queryengine.plan.relational.sql.ast.LogicalExpression;
+import org.apache.iotdb.db.queryengine.plan.relational.sql.ast.SymbolReference;
+import org.apache.iotdb.db.queryengine.plan.relational.utils.matching.Capture;
+import org.apache.iotdb.db.queryengine.plan.relational.utils.matching.Captures;
+import org.apache.iotdb.db.queryengine.plan.relational.utils.matching.Pattern;
+
+import java.util.List;
+import java.util.Map;
+
+import static com.google.common.collect.Iterables.getOnlyElement;
+import static 
org.apache.iotdb.db.queryengine.plan.relational.planner.node.Patterns.filter;
+import static 
org.apache.iotdb.db.queryengine.plan.relational.planner.node.Patterns.source;
+import static 
org.apache.iotdb.db.queryengine.plan.relational.planner.node.Patterns.window;
+import static 
org.apache.iotdb.db.queryengine.plan.relational.utils.matching.Capture.newCapture;
+
+/**
+ * Replaces a Filter(Window(LEAD)) change-point detection pattern with a 
single ChangePointNode.
+ *
+ * <p>Matches the SQL pattern: SELECT * FROM (SELECT *, LEAD(col) OVER 
(PARTITION BY ... ORDER BY
+ * ...) AS next FROM t) WHERE col != next OR next IS NULL
+ */
+public class ReplaceFilterWindowLeadWithChangePoint implements 
Rule<FilterNode> {
+
+  private static final Capture<WindowNode> WINDOW_CAPTURE = newCapture();
+
+  private final Pattern<FilterNode> pattern;
+
+  public ReplaceFilterWindowLeadWithChangePoint() {
+    this.pattern =
+        filter()
+            .with(
+                source()
+                    .matching(
+                        window()
+                            
.matching(ReplaceFilterWindowLeadWithChangePoint::isLeadWindow)
+                            .capturedAs(WINDOW_CAPTURE)));
+  }
+
+  @Override
+  public Pattern<FilterNode> getPattern() {
+    return pattern;
+  }
+
+  @Override
+  public Result apply(FilterNode filterNode, Captures captures, Context 
context) {
+    WindowNode windowNode = captures.get(WINDOW_CAPTURE);
+
+    Map.Entry<Symbol, WindowNode.Function> entry =
+        getOnlyElement(windowNode.getWindowFunctions().entrySet());
+    Symbol nextSymbol = entry.getKey();
+    WindowNode.Function function = entry.getValue();
+
+    List<Expression> arguments = function.getArguments();
+    if (arguments.isEmpty() || !(arguments.get(0) instanceof SymbolReference)) 
{
+      return Result.empty();
+    }
+
+    String measurementName = ((SymbolReference) arguments.get(0)).getName();
+    Symbol measurementSymbol = new Symbol(measurementName);
+
+    if (!isChangePointPredicate(filterNode.getPredicate(), measurementName, 
nextSymbol.getName())) {
+      return Result.empty();
+    }
+
+    return Result.ofPlanNode(
+        new ChangePointNode(
+            filterNode.getPlanNodeId(), windowNode.getChild(), 
measurementSymbol, nextSymbol));
+  }
+
+  private static boolean isLeadWindow(WindowNode window) {
+    if (window.getWindowFunctions().size() != 1) {
+      return false;
+    }
+
+    WindowNode.Function function = 
getOnlyElement(window.getWindowFunctions().values());
+    String functionName = 
function.getResolvedFunction().getSignature().getName();
+    if (!"lead".equals(functionName)) {
+      return false;
+    }
+
+    List<Expression> arguments = function.getArguments();
+    if (arguments.isEmpty()) {
+      return false;
+    }
+
+    // LEAD with default offset (1 argument) or explicit offset=1 (2 arguments)
+    if (arguments.size() == 1) {
+      return arguments.get(0) instanceof SymbolReference;
+    }
+    if (arguments.size() == 2) {
+      if (!(arguments.get(0) instanceof SymbolReference)) {
+        return false;
+      }
+      Expression offsetExpr = arguments.get(1);
+      if (offsetExpr instanceof Literal) {
+        Object val =
+            ((Literal) offsetExpr)
+                .getTsValue();
+        return val instanceof Number && ((Number) val).longValue() == 1;
+      }
+      return false;
+    }
+    return false;
+  }
+
+  /**
+   * Checks if the predicate matches: col != next OR next IS NULL, in either 
order of the OR terms.
+   */
+  private static boolean isChangePointPredicate(
+      Expression predicate, String measurementName, String nextName) {
+    if (!(predicate instanceof LogicalExpression)) {
+      return false;
+    }
+
+    LogicalExpression logical = (LogicalExpression) predicate;
+    if (logical.getOperator() != LogicalExpression.Operator.OR) {
+      return false;
+    }
+
+    List<Expression> terms = logical.getTerms();
+    if (terms.size() != 2) {
+      return false;
+    }
+
+    Expression first = terms.get(0);
+    Expression second = terms.get(1);
+
+    return (isNotEqualComparison(first, measurementName, nextName)
+            && isNullCheck(second, nextName))
+        || (isNullCheck(first, nextName)
+            && isNotEqualComparison(second, measurementName, nextName));
+  }
+
+  private static boolean isNotEqualComparison(
+      Expression expr, String measurementName, String nextName) {
+    if (!(expr instanceof ComparisonExpression)) {
+      return false;
+    }
+    ComparisonExpression comparison = (ComparisonExpression) expr;
+    if (comparison.getOperator() != ComparisonExpression.Operator.NOT_EQUAL) {
+      return false;
+    }
+
+    Expression left = comparison.getLeft();
+    Expression right = comparison.getRight();
+    return (isSymbolRef(left, measurementName) && isSymbolRef(right, nextName))
+        || (isSymbolRef(left, nextName) && isSymbolRef(right, 
measurementName));
+  }
+
+  private static boolean isNullCheck(Expression expr, String symbolName) {
+    if (!(expr instanceof IsNullPredicate)) {
+      return false;
+    }
+    return isSymbolRef(((IsNullPredicate) expr).getValue(), symbolName);
+  }
+
+  private static boolean isSymbolRef(Expression expr, String name) {
+    return expr instanceof SymbolReference && ((SymbolReference) 
expr).getName().equals(name);
+  }
+}
diff --git 
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/relational/planner/node/ChangePointNode.java
 
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/relational/planner/node/ChangePointNode.java
new file mode 100644
index 00000000000..02b567c8434
--- /dev/null
+++ 
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/relational/planner/node/ChangePointNode.java
@@ -0,0 +1,140 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.iotdb.db.queryengine.plan.relational.planner.node;
+
+import org.apache.iotdb.db.queryengine.plan.planner.plan.node.PlanNode;
+import org.apache.iotdb.db.queryengine.plan.planner.plan.node.PlanNodeId;
+import org.apache.iotdb.db.queryengine.plan.planner.plan.node.PlanNodeType;
+import org.apache.iotdb.db.queryengine.plan.planner.plan.node.PlanVisitor;
+import 
org.apache.iotdb.db.queryengine.plan.planner.plan.node.process.SingleChildProcessNode;
+import org.apache.iotdb.db.queryengine.plan.relational.planner.Symbol;
+
+import com.google.common.collect.ImmutableList;
+import org.apache.tsfile.utils.ReadWriteIOUtils;
+
+import java.io.DataOutputStream;
+import java.io.IOException;
+import java.nio.ByteBuffer;
+import java.util.List;
+import java.util.Objects;
+
+import static com.google.common.base.Preconditions.checkArgument;
+
+/**
+ * ChangePointNode detects value changes in a monitored column within sorted, 
partitioned data. It
+ * emits the last row of each consecutive run of identical values (plus the 
final row), replacing a
+ * Filter(Window(LEAD(...))) pattern.
+ */
+public class ChangePointNode extends SingleChildProcessNode {
+
+  private final Symbol measurementSymbol;
+  private final Symbol nextSymbol;
+
+  public ChangePointNode(
+      PlanNodeId id, PlanNode child, Symbol measurementSymbol, Symbol 
nextSymbol) {
+    super(id, child);
+    this.measurementSymbol = measurementSymbol;
+    this.nextSymbol = nextSymbol;
+  }
+
+  public ChangePointNode(PlanNodeId id, Symbol measurementSymbol, Symbol 
nextSymbol) {
+    super(id);
+    this.measurementSymbol = measurementSymbol;
+    this.nextSymbol = nextSymbol;
+  }
+
+  public Symbol getMeasurementSymbol() {
+    return measurementSymbol;
+  }
+
+  public Symbol getNextSymbol() {
+    return nextSymbol;
+  }
+
+  @Override
+  public PlanNode clone() {
+    return new ChangePointNode(getPlanNodeId(), measurementSymbol, nextSymbol);
+  }
+
+  @Override
+  public <R, C> R accept(PlanVisitor<R, C> visitor, C context) {
+    return visitor.visitChangePoint(this, context);
+  }
+
+  @Override
+  public List<String> getOutputColumnNames() {
+    throw new UnsupportedOperationException();
+  }
+
+  @Override
+  public List<Symbol> getOutputSymbols() {
+    return ImmutableList.<Symbol>builder()
+        .addAll(getChild().getOutputSymbols())
+        .add(nextSymbol)
+        .build();
+  }
+
+  @Override
+  public PlanNode replaceChildren(List<PlanNode> newChildren) {
+    checkArgument(newChildren.size() == 1, "wrong number of new children");
+    return new ChangePointNode(id, newChildren.get(0), measurementSymbol, 
nextSymbol);
+  }
+
+  @Override
+  protected void serializeAttributes(ByteBuffer byteBuffer) {
+    PlanNodeType.TABLE_CHANGE_POINT_NODE.serialize(byteBuffer);
+    Symbol.serialize(measurementSymbol, byteBuffer);
+    Symbol.serialize(nextSymbol, byteBuffer);
+  }
+
+  @Override
+  protected void serializeAttributes(DataOutputStream stream) throws 
IOException {
+    PlanNodeType.TABLE_CHANGE_POINT_NODE.serialize(stream);
+    Symbol.serialize(measurementSymbol, stream);
+    Symbol.serialize(nextSymbol, stream);
+  }
+
+  public static ChangePointNode deserialize(ByteBuffer buffer) {
+    Symbol measurementSymbol = Symbol.deserialize(buffer);
+    Symbol nextSymbol = Symbol.deserialize(buffer);
+    PlanNodeId planNodeId = PlanNodeId.deserialize(buffer);
+    return new ChangePointNode(planNodeId, measurementSymbol, nextSymbol);
+  }
+
+  @Override
+  public boolean equals(Object o) {
+    if (this == o) return true;
+    if (o == null || getClass() != o.getClass()) return false;
+    if (!super.equals(o)) return false;
+    ChangePointNode that = (ChangePointNode) o;
+    return Objects.equals(measurementSymbol, that.measurementSymbol)
+        && Objects.equals(nextSymbol, that.nextSymbol);
+  }
+
+  @Override
+  public int hashCode() {
+    return Objects.hash(super.hashCode(), measurementSymbol, nextSymbol);
+  }
+
+  @Override
+  public String toString() {
+    return "ChangePoint-" + this.getPlanNodeId();
+  }
+}
diff --git 
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/relational/planner/node/Patterns.java
 
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/relational/planner/node/Patterns.java
index c9bf429b183..5e632b2020a 100644
--- 
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/relational/planner/node/Patterns.java
+++ 
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/relational/planner/node/Patterns.java
@@ -235,6 +235,10 @@ public final class Patterns {
     return typeOf(WindowNode.class);
   }
 
+  public static Pattern<ChangePointNode> changePoint() {
+    return typeOf(ChangePointNode.class);
+  }
+
   public static Pattern<GroupNode> groupNode() {
     return typeOf(GroupNode.class);
   }
diff --git 
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/relational/planner/optimizations/DistributedOptimizeFactory.java
 
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/relational/planner/optimizations/DistributedOptimizeFactory.java
index 3d688610f01..ead8e9e43f4 100644
--- 
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/relational/planner/optimizations/DistributedOptimizeFactory.java
+++ 
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/relational/planner/optimizations/DistributedOptimizeFactory.java
@@ -27,6 +27,7 @@ import 
org.apache.iotdb.db.queryengine.plan.relational.planner.iterative.rule.El
 import 
org.apache.iotdb.db.queryengine.plan.relational.planner.iterative.rule.MergeLimitOverProjectWithMergeSort;
 import 
org.apache.iotdb.db.queryengine.plan.relational.planner.iterative.rule.MergeLimitWithMergeSort;
 import 
org.apache.iotdb.db.queryengine.plan.relational.planner.iterative.rule.PushDownOffsetIntoTableScan;
+import 
org.apache.iotdb.db.queryengine.plan.relational.planner.iterative.rule.ReplaceFilterWindowLeadWithChangePoint;
 
 import com.google.common.collect.ImmutableList;
 import com.google.common.collect.ImmutableSet;
@@ -41,6 +42,11 @@ public class DistributedOptimizeFactory {
 
     this.planOptimizers =
         ImmutableList.of(
+            // replace Filter(Window(LEAD)) change-point pattern with 
ChangePointNode
+            new IterativeOptimizer(
+                plannerContext,
+                ruleStats,
+                ImmutableSet.of(new ReplaceFilterWindowLeadWithChangePoint())),
             // transfer Limit+Sort to TopK
             new IterativeOptimizer(
                 plannerContext,
diff --git 
a/iotdb-core/datanode/src/test/java/org/apache/iotdb/db/queryengine/plan/relational/planner/ChangePointOptimizationTest.java
 
b/iotdb-core/datanode/src/test/java/org/apache/iotdb/db/queryengine/plan/relational/planner/ChangePointOptimizationTest.java
new file mode 100644
index 00000000000..66312bb6a19
--- /dev/null
+++ 
b/iotdb-core/datanode/src/test/java/org/apache/iotdb/db/queryengine/plan/relational/planner/ChangePointOptimizationTest.java
@@ -0,0 +1,111 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.iotdb.db.queryengine.plan.relational.planner;
+
+import org.apache.iotdb.db.queryengine.plan.planner.plan.LogicalQueryPlan;
+import 
org.apache.iotdb.db.queryengine.plan.relational.planner.assertions.PlanMatchPattern;
+
+import org.junit.Test;
+
+import static 
org.apache.iotdb.db.queryengine.plan.relational.planner.assertions.PlanAssert.assertPlan;
+import static 
org.apache.iotdb.db.queryengine.plan.relational.planner.assertions.PlanMatchPattern.changePoint;
+import static 
org.apache.iotdb.db.queryengine.plan.relational.planner.assertions.PlanMatchPattern.collect;
+import static 
org.apache.iotdb.db.queryengine.plan.relational.planner.assertions.PlanMatchPattern.exchange;
+import static 
org.apache.iotdb.db.queryengine.plan.relational.planner.assertions.PlanMatchPattern.filter;
+import static 
org.apache.iotdb.db.queryengine.plan.relational.planner.assertions.PlanMatchPattern.group;
+import static 
org.apache.iotdb.db.queryengine.plan.relational.planner.assertions.PlanMatchPattern.output;
+import static 
org.apache.iotdb.db.queryengine.plan.relational.planner.assertions.PlanMatchPattern.project;
+import static 
org.apache.iotdb.db.queryengine.plan.relational.planner.assertions.PlanMatchPattern.sort;
+import static 
org.apache.iotdb.db.queryengine.plan.relational.planner.assertions.PlanMatchPattern.tableScan;
+import static 
org.apache.iotdb.db.queryengine.plan.relational.planner.assertions.PlanMatchPattern.window;
+
+public class ChangePointOptimizationTest {
+
+  @Test
+  public void testChangePointOptimization() {
+    PlanTester planTester = new PlanTester();
+
+    String sql =
+        "SELECT * FROM (SELECT *, LEAD(s1) OVER (PARTITION BY tag1, tag2, tag3 
ORDER BY time) AS next FROM table1) WHERE s1 <> next OR next IS NULL";
+
+    LogicalQueryPlan logicalQueryPlan = planTester.createPlan(sql);
+    PlanMatchPattern tableScan = tableScan("testdb.table1");
+
+    // Logical plan still has Filter -> Window (optimization happens at 
distributed level)
+    assertPlan(logicalQueryPlan, output((filter(window(group(tableScan))))));
+
+    // Distributed plan: ChangePointNode replaces Filter -> Window per 
partition
+    // Fragment 0: Output -> Collect -> Exchange*
+    assertPlan(planTester.getFragmentPlan(0), output(collect(exchange(), 
exchange(), exchange())));
+    // Fragment 1+: ChangePointNode -> TableScan
+    assertPlan(planTester.getFragmentPlan(1), changePoint(tableScan));
+    assertPlan(planTester.getFragmentPlan(2), changePoint(tableScan));
+  }
+
+  @Test
+  public void testChangePointNotMatchedWithLag() {
+    PlanTester planTester = new PlanTester();
+
+    // LAG instead of LEAD should NOT be optimized
+    String sql =
+        "SELECT * FROM (SELECT *, LAG(s1) OVER (PARTITION BY tag1, tag2, tag3 
ORDER BY time) AS prev FROM table1) WHERE s1 <> prev OR prev IS NULL";
+
+    LogicalQueryPlan logicalQueryPlan = planTester.createPlan(sql);
+    PlanMatchPattern tableScan = tableScan("testdb.table1");
+
+    // Should remain Filter -> Window in the logical plan
+    assertPlan(logicalQueryPlan, output((filter(window(group(tableScan))))));
+
+    // Distributed plan: should still have Filter -> Window per partition (no 
ChangePoint)
+    assertPlan(planTester.getFragmentPlan(1), filter(window(tableScan)));
+  }
+
+  @Test
+  public void testChangePointNotMatchedWithDifferentPredicate() {
+    PlanTester planTester = new PlanTester();
+
+    // Different predicate (s1 = next instead of s1 != next) should NOT be 
optimized
+    String sql =
+        "SELECT * FROM (SELECT *, LEAD(s1) OVER (PARTITION BY tag1, tag2, tag3 
ORDER BY time) AS next FROM table1) WHERE s1 = next";
+
+    LogicalQueryPlan logicalQueryPlan = planTester.createPlan(sql);
+    PlanMatchPattern tableScan = tableScan("testdb.table1");
+
+    assertPlan(logicalQueryPlan, output(filter(window(group(tableScan)))));
+
+    // Distributed plan: should still have Filter -> Window (no ChangePoint)
+    assertPlan(planTester.getFragmentPlan(1), filter(window(tableScan)));
+  }
+
+  @Test
+  public void testChangePointNotMatchedWithMultipleWindowFunctions() {
+    PlanTester planTester = new PlanTester();
+
+    // Multiple window functions should NOT be optimized
+    String sql =
+        "SELECT * FROM (SELECT *, LEAD(s1) OVER (PARTITION BY tag1, tag2, tag3 
ORDER BY time) AS next, row_number() OVER (PARTITION BY tag1, tag2, tag3 ORDER 
BY time) AS rn FROM table1) WHERE s1 <> next OR next IS NULL";
+
+    LogicalQueryPlan logicalQueryPlan = planTester.createPlan(sql);
+    PlanMatchPattern tableScan = tableScan("testdb.table1");
+
+    // Should not be transformed to ChangePoint because there are multiple 
window functions
+    assertPlan(logicalQueryPlan, output(filter(window(group(tableScan)))));
+  }
+}
diff --git 
a/iotdb-core/datanode/src/test/java/org/apache/iotdb/db/queryengine/plan/relational/planner/assertions/PlanMatchPattern.java
 
b/iotdb-core/datanode/src/test/java/org/apache/iotdb/db/queryengine/plan/relational/planner/assertions/PlanMatchPattern.java
index 03f79fd2ec2..e30f1967cf9 100644
--- 
a/iotdb-core/datanode/src/test/java/org/apache/iotdb/db/queryengine/plan/relational/planner/assertions/PlanMatchPattern.java
+++ 
b/iotdb-core/datanode/src/test/java/org/apache/iotdb/db/queryengine/plan/relational/planner/assertions/PlanMatchPattern.java
@@ -46,6 +46,7 @@ import 
org.apache.iotdb.db.queryengine.plan.relational.planner.node.MergeSortNod
 import org.apache.iotdb.db.queryengine.plan.relational.planner.node.OffsetNode;
 import org.apache.iotdb.db.queryengine.plan.relational.planner.node.OutputNode;
 import 
org.apache.iotdb.db.queryengine.plan.relational.planner.node.ProjectNode;
+import 
org.apache.iotdb.db.queryengine.plan.relational.planner.node.ChangePointNode;
 import 
org.apache.iotdb.db.queryengine.plan.relational.planner.node.RowNumberNode;
 import 
org.apache.iotdb.db.queryengine.plan.relational.planner.node.SemiJoinNode;
 import org.apache.iotdb.db.queryengine.plan.relational.planner.node.SortNode;
@@ -481,6 +482,10 @@ public final class PlanMatchPattern {
     return node(RowNumberNode.class, source);
   }
 
+  public static PlanMatchPattern changePoint(PlanMatchPattern source) {
+    return node(ChangePointNode.class, source);
+  }
+
   public static PlanMatchPattern markDistinct(
       String markerSymbol, List<String> distinctSymbols, PlanMatchPattern 
source) {
     return node(MarkDistinctNode.class, source)

Reply via email to