This is an automated email from the ASF dual-hosted git repository. zhihao pushed a commit to branch perf/szh/change_point_in_window in repository https://gitbox.apache.org/repos/asf/iotdb.git
commit 5a616c51710ba16f371f0738d4b55b91894b06b0 Author: Sh-Zh-7 <[email protected]> AuthorDate: Tue Mar 3 15:39:47 2026 +0800 Change Point optimization prototype. --- .../relational/it/db/it/IoTDBChangePointIT.java | 196 ++++++++++++++ .../operator/process/TableChangePointOperator.java | 286 +++++++++++++++++++++ .../plan/planner/TableOperatorGenerator.java | 28 ++ .../plan/planner/plan/node/PlanNodeType.java | 4 + .../plan/planner/plan/node/PlanVisitor.java | 5 + .../distribute/TableDistributedPlanGenerator.java | 16 ++ .../ReplaceFilterWindowLeadWithChangePoint.java | 190 ++++++++++++++ .../relational/planner/node/ChangePointNode.java | 140 ++++++++++ .../plan/relational/planner/node/Patterns.java | 4 + .../optimizations/DistributedOptimizeFactory.java | 6 + .../planner/ChangePointOptimizationTest.java | 111 ++++++++ .../planner/assertions/PlanMatchPattern.java | 5 + 12 files changed, 991 insertions(+) diff --git a/integration-test/src/test/java/org/apache/iotdb/relational/it/db/it/IoTDBChangePointIT.java b/integration-test/src/test/java/org/apache/iotdb/relational/it/db/it/IoTDBChangePointIT.java new file mode 100644 index 00000000000..edfaa57a9f9 --- /dev/null +++ b/integration-test/src/test/java/org/apache/iotdb/relational/it/db/it/IoTDBChangePointIT.java @@ -0,0 +1,196 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ + +package org.apache.iotdb.relational.it.db.it; + +import org.apache.iotdb.it.env.EnvFactory; +import org.apache.iotdb.it.framework.IoTDBTestRunner; +import org.apache.iotdb.itbase.category.TableClusterIT; +import org.apache.iotdb.itbase.category.TableLocalStandaloneIT; + +import org.junit.AfterClass; +import org.junit.BeforeClass; +import org.junit.Test; +import org.junit.experimental.categories.Category; +import org.junit.runner.RunWith; + +import java.sql.Connection; +import java.sql.Statement; + +import static org.apache.iotdb.db.it.utils.TestUtils.tableResultSetEqualTest; +import static org.junit.Assert.fail; + +@RunWith(IoTDBTestRunner.class) +@Category({TableLocalStandaloneIT.class, TableClusterIT.class}) +public class IoTDBChangePointIT { + private static final String DATABASE_NAME = "test"; + + private static final String[] setupSqls = + new String[] { + "CREATE DATABASE " + DATABASE_NAME, + "USE " + DATABASE_NAME, + // Table with consecutive duplicate values for change-point detection + "CREATE TABLE cp_data (device STRING TAG, measurement INT64 FIELD)", + // d1: values 10,10,20,20,20,30 -> change points at rows with values 10,20,30 (last of + // each run) + "INSERT INTO cp_data VALUES (1, 'd1', 10)", + "INSERT INTO cp_data VALUES (2, 'd1', 10)", + "INSERT INTO cp_data VALUES (3, 'd1', 20)", + "INSERT INTO cp_data VALUES (4, 'd1', 20)", + "INSERT INTO cp_data VALUES (5, 'd1', 20)", + "INSERT INTO cp_data VALUES (6, 'd1', 30)", + // d2: values 100,200,200,300 -> change points at rows with values 100,200,300 + "INSERT INTO cp_data VALUES (1, 'd2', 100)", + "INSERT INTO cp_data VALUES (2, 'd2', 200)", + "INSERT INTO cp_data VALUES (3, 'd2', 200)", + "INSERT INTO cp_data VALUES (4, 'd2', 300)", + // Table for all-same-values test + "CREATE TABLE cp_same (device STRING TAG, measurement INT64 FIELD)", + "INSERT INTO cp_same VALUES (1, 'd1', 42)", + "INSERT INTO cp_same VALUES (2, 'd1', 42)", + "INSERT INTO cp_same VALUES (3, 'd1', 42)", + // Table for all-different-values test + "CREATE TABLE cp_diff (device STRING TAG, measurement INT64 FIELD)", + "INSERT INTO cp_diff VALUES (1, 'd1', 1)", + "INSERT INTO cp_diff VALUES (2, 'd1', 2)", + "INSERT INTO cp_diff VALUES (3, 'd1', 3)", + // Table for single row test + "CREATE TABLE cp_single (device STRING TAG, measurement INT64 FIELD)", + "INSERT INTO cp_single VALUES (1, 'd1', 99)", + "FLUSH", + "CLEAR ATTRIBUTE CACHE", + }; + + @BeforeClass + public static void setUp() { + EnvFactory.getEnv().getConfig().getCommonConfig().setSortBufferSize(1024 * 1024); + EnvFactory.getEnv().initClusterEnvironment(); + try (Connection connection = EnvFactory.getEnv().getTableConnection(); + Statement statement = connection.createStatement()) { + for (String sql : setupSqls) { + statement.execute(sql); + } + } catch (Exception e) { + e.printStackTrace(); + fail("setUp failed: " + e.getMessage()); + } + } + + @AfterClass + public static void tearDown() { + EnvFactory.getEnv().cleanClusterEnvironment(); + } + + @Test + public void testChangePointDetection() { + // The change-point SQL pattern: + // SELECT * FROM (SELECT *, LEAD(measurement) OVER (...) AS next FROM t) WHERE measurement <> + // next OR next IS NULL + // This detects the last row of each consecutive run of identical values. + // + // d1 values: 10,10,20,20,20,30 + // row 1 (10): next=10, 10!=10 false -> not emitted + // row 2 (10): next=20, 10!=20 true -> emitted (last 10 before 20) + // row 3 (20): next=20, 20!=20 false -> not emitted + // row 4 (20): next=20, 20!=20 false -> not emitted + // row 5 (20): next=30, 20!=30 true -> emitted (last 20 before 30) + // row 6 (30): next=NULL -> emitted (last row) + // + // d2 values: 100,200,200,300 + // row 1 (100): next=200, true -> emitted + // row 2 (200): next=200, false -> not emitted + // row 3 (200): next=300, true -> emitted + // row 4 (300): next=NULL -> emitted + + String sql = + "SELECT time, device, measurement, next FROM " + + "(SELECT *, LEAD(measurement) OVER (PARTITION BY device ORDER BY time) AS next FROM cp_data) " + + "WHERE measurement <> next OR next IS NULL " + + "ORDER BY device, time"; + + String[] expectedHeader = new String[] {"time", "device", "measurement", "next"}; + String[] retArray = + new String[] { + "1970-01-01T00:00:00.002Z,d1,10,20,", + "1970-01-01T00:00:00.005Z,d1,20,30,", + "1970-01-01T00:00:00.006Z,d1,30,null,", + "1970-01-01T00:00:00.001Z,d2,100,200,", + "1970-01-01T00:00:00.003Z,d2,200,300,", + "1970-01-01T00:00:00.004Z,d2,300,null,", + }; + + tableResultSetEqualTest(sql, expectedHeader, retArray, DATABASE_NAME); + } + + @Test + public void testChangePointAllSameValues() { + // All values are the same -> only the last row should be emitted (next IS NULL) + String sql = + "SELECT time, device, measurement, next FROM " + + "(SELECT *, LEAD(measurement) OVER (PARTITION BY device ORDER BY time) AS next FROM cp_same) " + + "WHERE measurement <> next OR next IS NULL " + + "ORDER BY device, time"; + + String[] expectedHeader = new String[] {"time", "device", "measurement", "next"}; + String[] retArray = + new String[] { + "1970-01-01T00:00:00.003Z,d1,42,null,", + }; + + tableResultSetEqualTest(sql, expectedHeader, retArray, DATABASE_NAME); + } + + @Test + public void testChangePointAllDifferentValues() { + // All values are different -> every row should be emitted + String sql = + "SELECT time, device, measurement, next FROM " + + "(SELECT *, LEAD(measurement) OVER (PARTITION BY device ORDER BY time) AS next FROM cp_diff) " + + "WHERE measurement <> next OR next IS NULL " + + "ORDER BY device, time"; + + String[] expectedHeader = new String[] {"time", "device", "measurement", "next"}; + String[] retArray = + new String[] { + "1970-01-01T00:00:00.001Z,d1,1,2,", + "1970-01-01T00:00:00.002Z,d1,2,3,", + "1970-01-01T00:00:00.003Z,d1,3,null,", + }; + + tableResultSetEqualTest(sql, expectedHeader, retArray, DATABASE_NAME); + } + + @Test + public void testChangePointSingleRow() { + // Single row -> always emitted (next IS NULL) + String sql = + "SELECT time, device, measurement, next FROM " + + "(SELECT *, LEAD(measurement) OVER (PARTITION BY device ORDER BY time) AS next FROM cp_single) " + + "WHERE measurement <> next OR next IS NULL " + + "ORDER BY device, time"; + + String[] expectedHeader = new String[] {"time", "device", "measurement", "next"}; + String[] retArray = + new String[] { + "1970-01-01T00:00:00.001Z,d1,99,null,", + }; + + tableResultSetEqualTest(sql, expectedHeader, retArray, DATABASE_NAME); + } +} diff --git a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/execution/operator/process/TableChangePointOperator.java b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/execution/operator/process/TableChangePointOperator.java new file mode 100644 index 00000000000..b108363a9df --- /dev/null +++ b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/execution/operator/process/TableChangePointOperator.java @@ -0,0 +1,286 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ + +package org.apache.iotdb.db.queryengine.execution.operator.process; + +import org.apache.iotdb.db.queryengine.execution.MemoryEstimationHelper; +import org.apache.iotdb.db.queryengine.execution.operator.Operator; +import org.apache.iotdb.db.queryengine.execution.operator.OperatorContext; + +import com.google.common.util.concurrent.ListenableFuture; +import org.apache.tsfile.block.column.Column; +import org.apache.tsfile.block.column.ColumnBuilder; +import org.apache.tsfile.common.conf.TSFileDescriptor; +import org.apache.tsfile.enums.TSDataType; +import org.apache.tsfile.read.common.block.TsBlock; +import org.apache.tsfile.read.common.block.TsBlockBuilder; +import org.apache.tsfile.read.common.block.column.RunLengthEncodedColumn; +import org.apache.tsfile.utils.Binary; +import org.apache.tsfile.utils.RamUsageEstimator; + +import java.util.ArrayList; +import java.util.List; + +import static org.apache.iotdb.db.queryengine.execution.operator.source.relational.AbstractTableScanOperator.TIME_COLUMN_TEMPLATE; + +/** + * TableChangePointOperator implements "last of run" change-point detection. It buffers the latest + * row seen and emits it when the monitored column changes value or input is exhausted. The output + * includes all child columns plus a "next" column containing the new value (or NULL at end). + * + * <p>This replaces a Filter(Window(LEAD(...))) pattern: SELECT * FROM (SELECT *, LEAD(col) OVER + * (PARTITION BY ... ORDER BY ...) AS next FROM t) WHERE col != next OR next IS NULL + */ +public class TableChangePointOperator implements ProcessOperator { + + private static final long INSTANCE_SIZE = + RamUsageEstimator.shallowSizeOfInstance(TableChangePointOperator.class); + + private final OperatorContext operatorContext; + private final Operator inputOperator; + private final int measurementColumnIndex; + private final TSDataType measurementDataType; + private final List<TSDataType> childOutputTypes; + private final TsBlockBuilder tsBlockBuilder; + + private boolean hasBufferedRow = false; + private TsBlock bufferedBlock; + private int bufferedPosition; + + private boolean childExhausted = false; + private boolean finished = false; + + public TableChangePointOperator( + OperatorContext operatorContext, + Operator inputOperator, + int measurementColumnIndex, + List<TSDataType> childOutputTypes) { + this.operatorContext = operatorContext; + this.inputOperator = inputOperator; + this.measurementColumnIndex = measurementColumnIndex; + this.childOutputTypes = childOutputTypes; + this.measurementDataType = childOutputTypes.get(measurementColumnIndex); + + List<TSDataType> outputDataTypes = new ArrayList<>(childOutputTypes); + outputDataTypes.add(measurementDataType); + this.tsBlockBuilder = new TsBlockBuilder(outputDataTypes); + } + + @Override + public OperatorContext getOperatorContext() { + return operatorContext; + } + + @Override + public TsBlock next() throws Exception { + if (finished) { + return null; + } + + tsBlockBuilder.reset(); + + if (!childExhausted) { + TsBlock inputBlock = inputOperator.nextWithTimer(); + if (inputBlock == null) { + return null; + } + processBlock(inputBlock); + } + + if (childExhausted && hasBufferedRow) { + emitBufferedRow(true); + hasBufferedRow = false; + } + + if (childExhausted && !hasBufferedRow) { + finished = true; + } + + if (tsBlockBuilder.isEmpty()) { + return null; + } + + return tsBlockBuilder.build( + new RunLengthEncodedColumn(TIME_COLUMN_TEMPLATE, tsBlockBuilder.getPositionCount())); + } + + private void processBlock(TsBlock inputBlock) { + Column measurementColumn = inputBlock.getColumn(measurementColumnIndex); + int rowCount = inputBlock.getPositionCount(); + + for (int i = 0; i < rowCount; i++) { + if (measurementColumn.isNull(i)) { + continue; + } + + if (!hasBufferedRow) { + bufferRow(inputBlock, i); + continue; + } + + if (valueChanged(measurementColumn, i)) { + emitBufferedRowWithNext(inputBlock, i); + bufferRow(inputBlock, i); + } else { + bufferRow(inputBlock, i); + } + } + } + + private boolean valueChanged(Column newMeasurementColumn, int newPosition) { + Column bufferedMeasurementColumn = bufferedBlock.getColumn(measurementColumnIndex); + switch (measurementDataType) { + case BOOLEAN: + return bufferedMeasurementColumn.getBoolean(bufferedPosition) + != newMeasurementColumn.getBoolean(newPosition); + case INT32: + return bufferedMeasurementColumn.getInt(bufferedPosition) + != newMeasurementColumn.getInt(newPosition); + case INT64: + case TIMESTAMP: + return bufferedMeasurementColumn.getLong(bufferedPosition) + != newMeasurementColumn.getLong(newPosition); + case FLOAT: + return Float.compare( + bufferedMeasurementColumn.getFloat(bufferedPosition), + newMeasurementColumn.getFloat(newPosition)) + != 0; + case DOUBLE: + return Double.compare( + bufferedMeasurementColumn.getDouble(bufferedPosition), + newMeasurementColumn.getDouble(newPosition)) + != 0; + case TEXT: + case STRING: + case BLOB: + Binary bufferedVal = bufferedMeasurementColumn.getBinary(bufferedPosition); + Binary newVal = newMeasurementColumn.getBinary(newPosition); + return !bufferedVal.equals(newVal); + default: + return false; + } + } + + private void bufferRow(TsBlock block, int position) { + hasBufferedRow = true; + bufferedBlock = block; + bufferedPosition = position; + } + + private void emitBufferedRowWithNext(TsBlock nextBlock, int nextPosition) { + for (int col = 0; col < childOutputTypes.size(); col++) { + Column column = bufferedBlock.getColumn(col); + ColumnBuilder builder = tsBlockBuilder.getColumnBuilder(col); + if (column.isNull(bufferedPosition)) { + builder.appendNull(); + } else { + builder.write(column, bufferedPosition); + } + } + + int nextCol = childOutputTypes.size(); + ColumnBuilder nextBuilder = tsBlockBuilder.getColumnBuilder(nextCol); + Column nextMeasurementColumn = nextBlock.getColumn(measurementColumnIndex); + if (nextMeasurementColumn.isNull(nextPosition)) { + nextBuilder.appendNull(); + } else { + nextBuilder.write(nextMeasurementColumn, nextPosition); + } + + tsBlockBuilder.declarePosition(); + } + + private void emitBufferedRow(boolean nextIsNull) { + for (int col = 0; col < childOutputTypes.size(); col++) { + Column column = bufferedBlock.getColumn(col); + ColumnBuilder builder = tsBlockBuilder.getColumnBuilder(col); + if (column.isNull(bufferedPosition)) { + builder.appendNull(); + } else { + builder.write(column, bufferedPosition); + } + } + + int nextCol = childOutputTypes.size(); + ColumnBuilder nextBuilder = tsBlockBuilder.getColumnBuilder(nextCol); + if (nextIsNull) { + nextBuilder.appendNull(); + } + + tsBlockBuilder.declarePosition(); + } + + @Override + public boolean hasNext() throws Exception { + if (finished) { + return false; + } + if (hasBufferedRow && childExhausted) { + return true; + } + if (inputOperator.hasNext()) { + return true; + } + childExhausted = true; + return hasBufferedRow; + } + + @Override + public void close() throws Exception { + inputOperator.close(); + } + + @Override + public boolean isFinished() throws Exception { + return finished; + } + + @Override + public long calculateMaxPeekMemory() { + long maxPeekMemoryFromInput = inputOperator.calculateMaxPeekMemoryWithCounter(); + long maxPeekMemoryFromCurrent = + TSFileDescriptor.getInstance().getConfig().getMaxTsBlockSizeInBytes(); + return Math.max(maxPeekMemoryFromInput, maxPeekMemoryFromCurrent) + + inputOperator.calculateRetainedSizeAfterCallingNext(); + } + + @Override + public long calculateMaxReturnSize() { + return TSFileDescriptor.getInstance().getConfig().getMaxTsBlockSizeInBytes(); + } + + @Override + public long calculateRetainedSizeAfterCallingNext() { + return inputOperator.calculateRetainedSizeAfterCallingNext() + + TSFileDescriptor.getInstance().getConfig().getMaxTsBlockSizeInBytes(); + } + + @Override + public long ramBytesUsed() { + return INSTANCE_SIZE + + MemoryEstimationHelper.getEstimatedSizeOfAccountableObject(inputOperator) + + MemoryEstimationHelper.getEstimatedSizeOfAccountableObject(operatorContext) + + tsBlockBuilder.getRetainedSizeInBytes(); + } + + @Override + public ListenableFuture<?> isBlocked() { + return inputOperator.isBlocked(); + } +} diff --git a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/planner/TableOperatorGenerator.java b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/planner/TableOperatorGenerator.java index 1bc788251a7..cb125831cc4 100644 --- a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/planner/TableOperatorGenerator.java +++ b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/planner/TableOperatorGenerator.java @@ -101,6 +101,7 @@ import org.apache.iotdb.db.queryengine.execution.operator.process.rowpattern.exp import org.apache.iotdb.db.queryengine.execution.operator.process.rowpattern.matcher.IrRowPatternToProgramRewriter; import org.apache.iotdb.db.queryengine.execution.operator.process.rowpattern.matcher.Matcher; import org.apache.iotdb.db.queryengine.execution.operator.process.rowpattern.matcher.Program; +import org.apache.iotdb.db.queryengine.execution.operator.process.TableChangePointOperator; import org.apache.iotdb.db.queryengine.execution.operator.process.window.RowNumberOperator; import org.apache.iotdb.db.queryengine.execution.operator.process.window.TableWindowOperator; import org.apache.iotdb.db.queryengine.execution.operator.process.window.TopKRankingOperator; @@ -208,6 +209,7 @@ import org.apache.iotdb.db.queryengine.plan.relational.planner.node.OutputNode; import org.apache.iotdb.db.queryengine.plan.relational.planner.node.PatternRecognitionNode; import org.apache.iotdb.db.queryengine.plan.relational.planner.node.PreviousFillNode; import org.apache.iotdb.db.queryengine.plan.relational.planner.node.ProjectNode; +import org.apache.iotdb.db.queryengine.plan.relational.planner.node.ChangePointNode; import org.apache.iotdb.db.queryengine.plan.relational.planner.node.RowNumberNode; import org.apache.iotdb.db.queryengine.plan.relational.planner.node.SemiJoinNode; import org.apache.iotdb.db.queryengine.plan.relational.planner.node.SortNode; @@ -4306,6 +4308,32 @@ public class TableOperatorGenerator extends PlanVisitor<Operator, LocalExecution 10_000); } + @Override + public Operator visitChangePoint(ChangePointNode node, LocalExecutionPlanContext context) { + Operator child = node.getChild().accept(this, context); + OperatorContext operatorContext = + context + .getDriverContext() + .addOperatorContext( + context.getNextOperatorId(), + node.getPlanNodeId(), + TableChangePointOperator.class.getSimpleName()); + + Map<Symbol, Integer> childLayout = + makeLayoutFromOutputSymbols(node.getChild().getOutputSymbols()); + List<TSDataType> childOutputTypes = + getOutputColumnTypes(node.getChild(), context.getTypeProvider()); + + Integer measurementChannel = childLayout.get(node.getMeasurementSymbol()); + if (measurementChannel == null) { + throw new IllegalStateException( + "Measurement symbol not found in child output: " + node.getMeasurementSymbol()); + } + + return new TableChangePointOperator( + operatorContext, child, measurementChannel, childOutputTypes); + } + @Override public Operator visitTopKRanking(TopKRankingNode node, LocalExecutionPlanContext context) { Operator child = node.getChild().accept(this, context); diff --git a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/planner/plan/node/PlanNodeType.java b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/planner/plan/node/PlanNodeType.java index 55aaefe8be6..d56ae531580 100644 --- a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/planner/plan/node/PlanNodeType.java +++ b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/planner/plan/node/PlanNodeType.java @@ -129,6 +129,7 @@ import org.apache.iotdb.db.queryengine.plan.relational.planner.node.LinearFillNo import org.apache.iotdb.db.queryengine.plan.relational.planner.node.MarkDistinctNode; import org.apache.iotdb.db.queryengine.plan.relational.planner.node.PatternRecognitionNode; import org.apache.iotdb.db.queryengine.plan.relational.planner.node.PreviousFillNode; +import org.apache.iotdb.db.queryengine.plan.relational.planner.node.ChangePointNode; import org.apache.iotdb.db.queryengine.plan.relational.planner.node.RowNumberNode; import org.apache.iotdb.db.queryengine.plan.relational.planner.node.SemiJoinNode; import org.apache.iotdb.db.queryengine.plan.relational.planner.node.TableFunctionNode; @@ -323,6 +324,7 @@ public enum PlanNodeType { TABLE_TOPK_RANKING_NODE((short) 1037), TABLE_ROW_NUMBER_NODE((short) 1038), TABLE_VALUES_NODE((short) 1039), + TABLE_CHANGE_POINT_NODE((short) 1040), RELATIONAL_INSERT_TABLET((short) 2000), RELATIONAL_INSERT_ROW((short) 2001), @@ -727,6 +729,8 @@ public enum PlanNodeType { return RowNumberNode.deserialize(buffer); case 1039: return ValuesNode.deserialize(buffer); + case 1040: + return ChangePointNode.deserialize(buffer); case 2000: return RelationalInsertTabletNode.deserialize(buffer); case 2001: diff --git a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/planner/plan/node/PlanVisitor.java b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/planner/plan/node/PlanVisitor.java index 44f1cd8bc1f..944824f7f91 100644 --- a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/planner/plan/node/PlanVisitor.java +++ b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/planner/plan/node/PlanVisitor.java @@ -134,6 +134,7 @@ import org.apache.iotdb.db.queryengine.plan.relational.planner.node.LinearFillNo import org.apache.iotdb.db.queryengine.plan.relational.planner.node.MarkDistinctNode; import org.apache.iotdb.db.queryengine.plan.relational.planner.node.PatternRecognitionNode; import org.apache.iotdb.db.queryengine.plan.relational.planner.node.PreviousFillNode; +import org.apache.iotdb.db.queryengine.plan.relational.planner.node.ChangePointNode; import org.apache.iotdb.db.queryengine.plan.relational.planner.node.RowNumberNode; import org.apache.iotdb.db.queryengine.plan.relational.planner.node.SemiJoinNode; import org.apache.iotdb.db.queryengine.plan.relational.planner.node.TableFunctionNode; @@ -799,6 +800,10 @@ public abstract class PlanVisitor<R, C> { return visitSingleChildProcess(node, context); } + public R visitChangePoint(ChangePointNode node, C context) { + return visitSingleChildProcess(node, context); + } + public R visitValuesNode(ValuesNode node, C context) { return visitPlan(node, context); } diff --git a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/relational/planner/distribute/TableDistributedPlanGenerator.java b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/relational/planner/distribute/TableDistributedPlanGenerator.java index 7072b5f519f..bff078a811f 100644 --- a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/relational/planner/distribute/TableDistributedPlanGenerator.java +++ b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/relational/planner/distribute/TableDistributedPlanGenerator.java @@ -76,6 +76,7 @@ import org.apache.iotdb.db.queryengine.plan.relational.planner.node.OffsetNode; import org.apache.iotdb.db.queryengine.plan.relational.planner.node.OutputNode; import org.apache.iotdb.db.queryengine.plan.relational.planner.node.PatternRecognitionNode; import org.apache.iotdb.db.queryengine.plan.relational.planner.node.ProjectNode; +import org.apache.iotdb.db.queryengine.plan.relational.planner.node.ChangePointNode; import org.apache.iotdb.db.queryengine.plan.relational.planner.node.RowNumberNode; import org.apache.iotdb.db.queryengine.plan.relational.planner.node.SemiJoinNode; import org.apache.iotdb.db.queryengine.plan.relational.planner.node.SortNode; @@ -1877,6 +1878,21 @@ public class TableDistributedPlanGenerator } } + @Override + public List<PlanNode> visitChangePoint(ChangePointNode node, PlanContext context) { + if (node.getChildren().isEmpty()) { + return Collections.singletonList(node); + } + + List<PlanNode> childrenNodes = node.getChild().accept(this, context); + if (childrenNodes.size() == 1) { + node.setChild(childrenNodes.get(0)); + return Collections.singletonList(node); + } else { + return splitForEachChild(node, childrenNodes); + } + } + @Override public List<PlanNode> visitTopKRanking(TopKRankingNode node, PlanContext context) { Optional<OrderingScheme> orderingScheme = node.getSpecification().getOrderingScheme(); diff --git a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/relational/planner/iterative/rule/ReplaceFilterWindowLeadWithChangePoint.java b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/relational/planner/iterative/rule/ReplaceFilterWindowLeadWithChangePoint.java new file mode 100644 index 00000000000..abe49a626e0 --- /dev/null +++ b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/relational/planner/iterative/rule/ReplaceFilterWindowLeadWithChangePoint.java @@ -0,0 +1,190 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ + +package org.apache.iotdb.db.queryengine.plan.relational.planner.iterative.rule; + +import org.apache.iotdb.db.queryengine.plan.relational.planner.Symbol; +import org.apache.iotdb.db.queryengine.plan.relational.planner.iterative.Rule; +import org.apache.iotdb.db.queryengine.plan.relational.planner.node.ChangePointNode; +import org.apache.iotdb.db.queryengine.plan.relational.planner.node.FilterNode; +import org.apache.iotdb.db.queryengine.plan.relational.planner.node.WindowNode; +import org.apache.iotdb.db.queryengine.plan.relational.sql.ast.ComparisonExpression; +import org.apache.iotdb.db.queryengine.plan.relational.sql.ast.Expression; +import org.apache.iotdb.db.queryengine.plan.relational.sql.ast.IsNullPredicate; +import org.apache.iotdb.db.queryengine.plan.relational.sql.ast.Literal; +import org.apache.iotdb.db.queryengine.plan.relational.sql.ast.LogicalExpression; +import org.apache.iotdb.db.queryengine.plan.relational.sql.ast.SymbolReference; +import org.apache.iotdb.db.queryengine.plan.relational.utils.matching.Capture; +import org.apache.iotdb.db.queryengine.plan.relational.utils.matching.Captures; +import org.apache.iotdb.db.queryengine.plan.relational.utils.matching.Pattern; + +import java.util.List; +import java.util.Map; + +import static com.google.common.collect.Iterables.getOnlyElement; +import static org.apache.iotdb.db.queryengine.plan.relational.planner.node.Patterns.filter; +import static org.apache.iotdb.db.queryengine.plan.relational.planner.node.Patterns.source; +import static org.apache.iotdb.db.queryengine.plan.relational.planner.node.Patterns.window; +import static org.apache.iotdb.db.queryengine.plan.relational.utils.matching.Capture.newCapture; + +/** + * Replaces a Filter(Window(LEAD)) change-point detection pattern with a single ChangePointNode. + * + * <p>Matches the SQL pattern: SELECT * FROM (SELECT *, LEAD(col) OVER (PARTITION BY ... ORDER BY + * ...) AS next FROM t) WHERE col != next OR next IS NULL + */ +public class ReplaceFilterWindowLeadWithChangePoint implements Rule<FilterNode> { + + private static final Capture<WindowNode> WINDOW_CAPTURE = newCapture(); + + private final Pattern<FilterNode> pattern; + + public ReplaceFilterWindowLeadWithChangePoint() { + this.pattern = + filter() + .with( + source() + .matching( + window() + .matching(ReplaceFilterWindowLeadWithChangePoint::isLeadWindow) + .capturedAs(WINDOW_CAPTURE))); + } + + @Override + public Pattern<FilterNode> getPattern() { + return pattern; + } + + @Override + public Result apply(FilterNode filterNode, Captures captures, Context context) { + WindowNode windowNode = captures.get(WINDOW_CAPTURE); + + Map.Entry<Symbol, WindowNode.Function> entry = + getOnlyElement(windowNode.getWindowFunctions().entrySet()); + Symbol nextSymbol = entry.getKey(); + WindowNode.Function function = entry.getValue(); + + List<Expression> arguments = function.getArguments(); + if (arguments.isEmpty() || !(arguments.get(0) instanceof SymbolReference)) { + return Result.empty(); + } + + String measurementName = ((SymbolReference) arguments.get(0)).getName(); + Symbol measurementSymbol = new Symbol(measurementName); + + if (!isChangePointPredicate(filterNode.getPredicate(), measurementName, nextSymbol.getName())) { + return Result.empty(); + } + + return Result.ofPlanNode( + new ChangePointNode( + filterNode.getPlanNodeId(), windowNode.getChild(), measurementSymbol, nextSymbol)); + } + + private static boolean isLeadWindow(WindowNode window) { + if (window.getWindowFunctions().size() != 1) { + return false; + } + + WindowNode.Function function = getOnlyElement(window.getWindowFunctions().values()); + String functionName = function.getResolvedFunction().getSignature().getName(); + if (!"lead".equals(functionName)) { + return false; + } + + List<Expression> arguments = function.getArguments(); + if (arguments.isEmpty()) { + return false; + } + + // LEAD with default offset (1 argument) or explicit offset=1 (2 arguments) + if (arguments.size() == 1) { + return arguments.get(0) instanceof SymbolReference; + } + if (arguments.size() == 2) { + if (!(arguments.get(0) instanceof SymbolReference)) { + return false; + } + Expression offsetExpr = arguments.get(1); + if (offsetExpr instanceof Literal) { + Object val = + ((Literal) offsetExpr) + .getTsValue(); + return val instanceof Number && ((Number) val).longValue() == 1; + } + return false; + } + return false; + } + + /** + * Checks if the predicate matches: col != next OR next IS NULL, in either order of the OR terms. + */ + private static boolean isChangePointPredicate( + Expression predicate, String measurementName, String nextName) { + if (!(predicate instanceof LogicalExpression)) { + return false; + } + + LogicalExpression logical = (LogicalExpression) predicate; + if (logical.getOperator() != LogicalExpression.Operator.OR) { + return false; + } + + List<Expression> terms = logical.getTerms(); + if (terms.size() != 2) { + return false; + } + + Expression first = terms.get(0); + Expression second = terms.get(1); + + return (isNotEqualComparison(first, measurementName, nextName) + && isNullCheck(second, nextName)) + || (isNullCheck(first, nextName) + && isNotEqualComparison(second, measurementName, nextName)); + } + + private static boolean isNotEqualComparison( + Expression expr, String measurementName, String nextName) { + if (!(expr instanceof ComparisonExpression)) { + return false; + } + ComparisonExpression comparison = (ComparisonExpression) expr; + if (comparison.getOperator() != ComparisonExpression.Operator.NOT_EQUAL) { + return false; + } + + Expression left = comparison.getLeft(); + Expression right = comparison.getRight(); + return (isSymbolRef(left, measurementName) && isSymbolRef(right, nextName)) + || (isSymbolRef(left, nextName) && isSymbolRef(right, measurementName)); + } + + private static boolean isNullCheck(Expression expr, String symbolName) { + if (!(expr instanceof IsNullPredicate)) { + return false; + } + return isSymbolRef(((IsNullPredicate) expr).getValue(), symbolName); + } + + private static boolean isSymbolRef(Expression expr, String name) { + return expr instanceof SymbolReference && ((SymbolReference) expr).getName().equals(name); + } +} diff --git a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/relational/planner/node/ChangePointNode.java b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/relational/planner/node/ChangePointNode.java new file mode 100644 index 00000000000..02b567c8434 --- /dev/null +++ b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/relational/planner/node/ChangePointNode.java @@ -0,0 +1,140 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ + +package org.apache.iotdb.db.queryengine.plan.relational.planner.node; + +import org.apache.iotdb.db.queryengine.plan.planner.plan.node.PlanNode; +import org.apache.iotdb.db.queryengine.plan.planner.plan.node.PlanNodeId; +import org.apache.iotdb.db.queryengine.plan.planner.plan.node.PlanNodeType; +import org.apache.iotdb.db.queryengine.plan.planner.plan.node.PlanVisitor; +import org.apache.iotdb.db.queryengine.plan.planner.plan.node.process.SingleChildProcessNode; +import org.apache.iotdb.db.queryengine.plan.relational.planner.Symbol; + +import com.google.common.collect.ImmutableList; +import org.apache.tsfile.utils.ReadWriteIOUtils; + +import java.io.DataOutputStream; +import java.io.IOException; +import java.nio.ByteBuffer; +import java.util.List; +import java.util.Objects; + +import static com.google.common.base.Preconditions.checkArgument; + +/** + * ChangePointNode detects value changes in a monitored column within sorted, partitioned data. It + * emits the last row of each consecutive run of identical values (plus the final row), replacing a + * Filter(Window(LEAD(...))) pattern. + */ +public class ChangePointNode extends SingleChildProcessNode { + + private final Symbol measurementSymbol; + private final Symbol nextSymbol; + + public ChangePointNode( + PlanNodeId id, PlanNode child, Symbol measurementSymbol, Symbol nextSymbol) { + super(id, child); + this.measurementSymbol = measurementSymbol; + this.nextSymbol = nextSymbol; + } + + public ChangePointNode(PlanNodeId id, Symbol measurementSymbol, Symbol nextSymbol) { + super(id); + this.measurementSymbol = measurementSymbol; + this.nextSymbol = nextSymbol; + } + + public Symbol getMeasurementSymbol() { + return measurementSymbol; + } + + public Symbol getNextSymbol() { + return nextSymbol; + } + + @Override + public PlanNode clone() { + return new ChangePointNode(getPlanNodeId(), measurementSymbol, nextSymbol); + } + + @Override + public <R, C> R accept(PlanVisitor<R, C> visitor, C context) { + return visitor.visitChangePoint(this, context); + } + + @Override + public List<String> getOutputColumnNames() { + throw new UnsupportedOperationException(); + } + + @Override + public List<Symbol> getOutputSymbols() { + return ImmutableList.<Symbol>builder() + .addAll(getChild().getOutputSymbols()) + .add(nextSymbol) + .build(); + } + + @Override + public PlanNode replaceChildren(List<PlanNode> newChildren) { + checkArgument(newChildren.size() == 1, "wrong number of new children"); + return new ChangePointNode(id, newChildren.get(0), measurementSymbol, nextSymbol); + } + + @Override + protected void serializeAttributes(ByteBuffer byteBuffer) { + PlanNodeType.TABLE_CHANGE_POINT_NODE.serialize(byteBuffer); + Symbol.serialize(measurementSymbol, byteBuffer); + Symbol.serialize(nextSymbol, byteBuffer); + } + + @Override + protected void serializeAttributes(DataOutputStream stream) throws IOException { + PlanNodeType.TABLE_CHANGE_POINT_NODE.serialize(stream); + Symbol.serialize(measurementSymbol, stream); + Symbol.serialize(nextSymbol, stream); + } + + public static ChangePointNode deserialize(ByteBuffer buffer) { + Symbol measurementSymbol = Symbol.deserialize(buffer); + Symbol nextSymbol = Symbol.deserialize(buffer); + PlanNodeId planNodeId = PlanNodeId.deserialize(buffer); + return new ChangePointNode(planNodeId, measurementSymbol, nextSymbol); + } + + @Override + public boolean equals(Object o) { + if (this == o) return true; + if (o == null || getClass() != o.getClass()) return false; + if (!super.equals(o)) return false; + ChangePointNode that = (ChangePointNode) o; + return Objects.equals(measurementSymbol, that.measurementSymbol) + && Objects.equals(nextSymbol, that.nextSymbol); + } + + @Override + public int hashCode() { + return Objects.hash(super.hashCode(), measurementSymbol, nextSymbol); + } + + @Override + public String toString() { + return "ChangePoint-" + this.getPlanNodeId(); + } +} diff --git a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/relational/planner/node/Patterns.java b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/relational/planner/node/Patterns.java index c9bf429b183..5e632b2020a 100644 --- a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/relational/planner/node/Patterns.java +++ b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/relational/planner/node/Patterns.java @@ -235,6 +235,10 @@ public final class Patterns { return typeOf(WindowNode.class); } + public static Pattern<ChangePointNode> changePoint() { + return typeOf(ChangePointNode.class); + } + public static Pattern<GroupNode> groupNode() { return typeOf(GroupNode.class); } diff --git a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/relational/planner/optimizations/DistributedOptimizeFactory.java b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/relational/planner/optimizations/DistributedOptimizeFactory.java index 3d688610f01..ead8e9e43f4 100644 --- a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/relational/planner/optimizations/DistributedOptimizeFactory.java +++ b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/relational/planner/optimizations/DistributedOptimizeFactory.java @@ -27,6 +27,7 @@ import org.apache.iotdb.db.queryengine.plan.relational.planner.iterative.rule.El import org.apache.iotdb.db.queryengine.plan.relational.planner.iterative.rule.MergeLimitOverProjectWithMergeSort; import org.apache.iotdb.db.queryengine.plan.relational.planner.iterative.rule.MergeLimitWithMergeSort; import org.apache.iotdb.db.queryengine.plan.relational.planner.iterative.rule.PushDownOffsetIntoTableScan; +import org.apache.iotdb.db.queryengine.plan.relational.planner.iterative.rule.ReplaceFilterWindowLeadWithChangePoint; import com.google.common.collect.ImmutableList; import com.google.common.collect.ImmutableSet; @@ -41,6 +42,11 @@ public class DistributedOptimizeFactory { this.planOptimizers = ImmutableList.of( + // replace Filter(Window(LEAD)) change-point pattern with ChangePointNode + new IterativeOptimizer( + plannerContext, + ruleStats, + ImmutableSet.of(new ReplaceFilterWindowLeadWithChangePoint())), // transfer Limit+Sort to TopK new IterativeOptimizer( plannerContext, diff --git a/iotdb-core/datanode/src/test/java/org/apache/iotdb/db/queryengine/plan/relational/planner/ChangePointOptimizationTest.java b/iotdb-core/datanode/src/test/java/org/apache/iotdb/db/queryengine/plan/relational/planner/ChangePointOptimizationTest.java new file mode 100644 index 00000000000..66312bb6a19 --- /dev/null +++ b/iotdb-core/datanode/src/test/java/org/apache/iotdb/db/queryengine/plan/relational/planner/ChangePointOptimizationTest.java @@ -0,0 +1,111 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ + +package org.apache.iotdb.db.queryengine.plan.relational.planner; + +import org.apache.iotdb.db.queryengine.plan.planner.plan.LogicalQueryPlan; +import org.apache.iotdb.db.queryengine.plan.relational.planner.assertions.PlanMatchPattern; + +import org.junit.Test; + +import static org.apache.iotdb.db.queryengine.plan.relational.planner.assertions.PlanAssert.assertPlan; +import static org.apache.iotdb.db.queryengine.plan.relational.planner.assertions.PlanMatchPattern.changePoint; +import static org.apache.iotdb.db.queryengine.plan.relational.planner.assertions.PlanMatchPattern.collect; +import static org.apache.iotdb.db.queryengine.plan.relational.planner.assertions.PlanMatchPattern.exchange; +import static org.apache.iotdb.db.queryengine.plan.relational.planner.assertions.PlanMatchPattern.filter; +import static org.apache.iotdb.db.queryengine.plan.relational.planner.assertions.PlanMatchPattern.group; +import static org.apache.iotdb.db.queryengine.plan.relational.planner.assertions.PlanMatchPattern.output; +import static org.apache.iotdb.db.queryengine.plan.relational.planner.assertions.PlanMatchPattern.project; +import static org.apache.iotdb.db.queryengine.plan.relational.planner.assertions.PlanMatchPattern.sort; +import static org.apache.iotdb.db.queryengine.plan.relational.planner.assertions.PlanMatchPattern.tableScan; +import static org.apache.iotdb.db.queryengine.plan.relational.planner.assertions.PlanMatchPattern.window; + +public class ChangePointOptimizationTest { + + @Test + public void testChangePointOptimization() { + PlanTester planTester = new PlanTester(); + + String sql = + "SELECT * FROM (SELECT *, LEAD(s1) OVER (PARTITION BY tag1, tag2, tag3 ORDER BY time) AS next FROM table1) WHERE s1 <> next OR next IS NULL"; + + LogicalQueryPlan logicalQueryPlan = planTester.createPlan(sql); + PlanMatchPattern tableScan = tableScan("testdb.table1"); + + // Logical plan still has Filter -> Window (optimization happens at distributed level) + assertPlan(logicalQueryPlan, output((filter(window(group(tableScan)))))); + + // Distributed plan: ChangePointNode replaces Filter -> Window per partition + // Fragment 0: Output -> Collect -> Exchange* + assertPlan(planTester.getFragmentPlan(0), output(collect(exchange(), exchange(), exchange()))); + // Fragment 1+: ChangePointNode -> TableScan + assertPlan(planTester.getFragmentPlan(1), changePoint(tableScan)); + assertPlan(planTester.getFragmentPlan(2), changePoint(tableScan)); + } + + @Test + public void testChangePointNotMatchedWithLag() { + PlanTester planTester = new PlanTester(); + + // LAG instead of LEAD should NOT be optimized + String sql = + "SELECT * FROM (SELECT *, LAG(s1) OVER (PARTITION BY tag1, tag2, tag3 ORDER BY time) AS prev FROM table1) WHERE s1 <> prev OR prev IS NULL"; + + LogicalQueryPlan logicalQueryPlan = planTester.createPlan(sql); + PlanMatchPattern tableScan = tableScan("testdb.table1"); + + // Should remain Filter -> Window in the logical plan + assertPlan(logicalQueryPlan, output((filter(window(group(tableScan)))))); + + // Distributed plan: should still have Filter -> Window per partition (no ChangePoint) + assertPlan(planTester.getFragmentPlan(1), filter(window(tableScan))); + } + + @Test + public void testChangePointNotMatchedWithDifferentPredicate() { + PlanTester planTester = new PlanTester(); + + // Different predicate (s1 = next instead of s1 != next) should NOT be optimized + String sql = + "SELECT * FROM (SELECT *, LEAD(s1) OVER (PARTITION BY tag1, tag2, tag3 ORDER BY time) AS next FROM table1) WHERE s1 = next"; + + LogicalQueryPlan logicalQueryPlan = planTester.createPlan(sql); + PlanMatchPattern tableScan = tableScan("testdb.table1"); + + assertPlan(logicalQueryPlan, output(filter(window(group(tableScan))))); + + // Distributed plan: should still have Filter -> Window (no ChangePoint) + assertPlan(planTester.getFragmentPlan(1), filter(window(tableScan))); + } + + @Test + public void testChangePointNotMatchedWithMultipleWindowFunctions() { + PlanTester planTester = new PlanTester(); + + // Multiple window functions should NOT be optimized + String sql = + "SELECT * FROM (SELECT *, LEAD(s1) OVER (PARTITION BY tag1, tag2, tag3 ORDER BY time) AS next, row_number() OVER (PARTITION BY tag1, tag2, tag3 ORDER BY time) AS rn FROM table1) WHERE s1 <> next OR next IS NULL"; + + LogicalQueryPlan logicalQueryPlan = planTester.createPlan(sql); + PlanMatchPattern tableScan = tableScan("testdb.table1"); + + // Should not be transformed to ChangePoint because there are multiple window functions + assertPlan(logicalQueryPlan, output(filter(window(group(tableScan))))); + } +} diff --git a/iotdb-core/datanode/src/test/java/org/apache/iotdb/db/queryengine/plan/relational/planner/assertions/PlanMatchPattern.java b/iotdb-core/datanode/src/test/java/org/apache/iotdb/db/queryengine/plan/relational/planner/assertions/PlanMatchPattern.java index 03f79fd2ec2..e30f1967cf9 100644 --- a/iotdb-core/datanode/src/test/java/org/apache/iotdb/db/queryengine/plan/relational/planner/assertions/PlanMatchPattern.java +++ b/iotdb-core/datanode/src/test/java/org/apache/iotdb/db/queryengine/plan/relational/planner/assertions/PlanMatchPattern.java @@ -46,6 +46,7 @@ import org.apache.iotdb.db.queryengine.plan.relational.planner.node.MergeSortNod import org.apache.iotdb.db.queryengine.plan.relational.planner.node.OffsetNode; import org.apache.iotdb.db.queryengine.plan.relational.planner.node.OutputNode; import org.apache.iotdb.db.queryengine.plan.relational.planner.node.ProjectNode; +import org.apache.iotdb.db.queryengine.plan.relational.planner.node.ChangePointNode; import org.apache.iotdb.db.queryengine.plan.relational.planner.node.RowNumberNode; import org.apache.iotdb.db.queryengine.plan.relational.planner.node.SemiJoinNode; import org.apache.iotdb.db.queryengine.plan.relational.planner.node.SortNode; @@ -481,6 +482,10 @@ public final class PlanMatchPattern { return node(RowNumberNode.class, source); } + public static PlanMatchPattern changePoint(PlanMatchPattern source) { + return node(ChangePointNode.class, source); + } + public static PlanMatchPattern markDistinct( String markerSymbol, List<String> distinctSymbols, PlanMatchPattern source) { return node(MarkDistinctNode.class, source)
