This is an automated email from the ASF dual-hosted git repository. JackieTien97 pushed a commit to branch rc/2.0.10 in repository https://gitbox.apache.org/repos/asf/iotdb.git
commit d28ae2679bdf10056365e8c56d2f4e72f36a7a39 Author: Caideyipi <[email protected]> AuthorDate: Fri Jun 5 13:52:20 2026 +0800 Fix window function state reset across batches (#17813) --- .../db/it/IoTDBWindowFunctionBatchedResultIT.java | 94 ++++++++++++++++++++++ .../process/window/TableWindowOperator.java | 2 +- .../window/partition/PartitionExecutor.java | 11 ++- 3 files changed, 105 insertions(+), 2 deletions(-) diff --git a/integration-test/src/test/java/org/apache/iotdb/relational/it/db/it/IoTDBWindowFunctionBatchedResultIT.java b/integration-test/src/test/java/org/apache/iotdb/relational/it/db/it/IoTDBWindowFunctionBatchedResultIT.java new file mode 100644 index 00000000000..ad7bc73fb8f --- /dev/null +++ b/integration-test/src/test/java/org/apache/iotdb/relational/it/db/it/IoTDBWindowFunctionBatchedResultIT.java @@ -0,0 +1,94 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ + +package org.apache.iotdb.relational.it.db.it; + +import org.apache.iotdb.it.env.EnvFactory; +import org.apache.iotdb.it.framework.IoTDBTestRunner; +import org.apache.iotdb.itbase.category.TableClusterIT; +import org.apache.iotdb.itbase.category.TableLocalStandaloneIT; + +import org.junit.AfterClass; +import org.junit.BeforeClass; +import org.junit.Test; +import org.junit.experimental.categories.Category; +import org.junit.runner.RunWith; + +import java.sql.Connection; +import java.sql.Statement; + +import static org.apache.iotdb.db.it.utils.TestUtils.tableResultSetEqualTest; +import static org.junit.Assert.fail; + +@RunWith(IoTDBTestRunner.class) +@Category({TableLocalStandaloneIT.class, TableClusterIT.class}) +public class IoTDBWindowFunctionBatchedResultIT { + private static final String DATABASE_NAME = "test"; + + private static final String[] SQLS = + new String[] { + "CREATE DATABASE " + DATABASE_NAME, + "USE " + DATABASE_NAME, + "CREATE TABLE batched_rank (device STRING TAG, value INT32 FIELD)", + "INSERT INTO batched_rank VALUES (2021-01-01T00:00:01, 'd1', 1)", + "INSERT INTO batched_rank VALUES (2021-01-01T00:00:02, 'd1', 2)", + "INSERT INTO batched_rank VALUES (2021-01-01T00:00:03, 'd1', 3)", + "INSERT INTO batched_rank VALUES (2021-01-01T00:00:04, 'd1', 4)", + "INSERT INTO batched_rank VALUES (2021-01-01T00:00:05, 'd1', 5)", + "FLUSH", + "CLEAR ATTRIBUTE CACHE", + }; + + @BeforeClass + public static void setUp() { + EnvFactory.getEnv().getConfig().getCommonConfig().setMaxTsBlockLineNumber(2); + EnvFactory.getEnv().initClusterEnvironment(); + insertData(); + } + + @AfterClass + public static void tearDown() { + EnvFactory.getEnv().cleanClusterEnvironment(); + } + + @Test + public void testRankStateAcrossOutputBatches() { + String[] expectedHeader = new String[] {"value", "rk"}; + String[] retArray = + new String[] { + "1,1,", "2,2,", "3,3,", "4,4,", "5,5,", + }; + tableResultSetEqualTest( + "SELECT value, rank() OVER (PARTITION BY device ORDER BY value) AS rk FROM batched_rank ORDER BY value", + expectedHeader, + retArray, + DATABASE_NAME); + } + + private static void insertData() { + try (Connection connection = EnvFactory.getEnv().getTableConnection(); + Statement statement = connection.createStatement()) { + for (String sql : SQLS) { + statement.execute(sql); + } + } catch (Exception e) { + fail("insertData failed."); + } + } +} diff --git a/iotdb-core/calc-commons/src/main/java/org/apache/iotdb/calc/execution/operator/process/window/TableWindowOperator.java b/iotdb-core/calc-commons/src/main/java/org/apache/iotdb/calc/execution/operator/process/window/TableWindowOperator.java index 8dde1145696..bdbabcefa3d 100644 --- a/iotdb-core/calc-commons/src/main/java/org/apache/iotdb/calc/execution/operator/process/window/TableWindowOperator.java +++ b/iotdb-core/calc-commons/src/main/java/org/apache/iotdb/calc/execution/operator/process/window/TableWindowOperator.java @@ -218,7 +218,7 @@ public class TableWindowOperator implements ProcessOperator { private TsBlock transform(long startTime) { while (!cachedPartitionExecutors.isEmpty()) { PartitionExecutor partitionExecutor = cachedPartitionExecutors.getFirst(); - partitionExecutor.resetWindowFunctions(); + partitionExecutor.initializeWindowFunctions(); while (System.nanoTime() - startTime < maxRuntime && !tsBlockBuilder.isFull() diff --git a/iotdb-core/calc-commons/src/main/java/org/apache/iotdb/calc/execution/operator/process/window/partition/PartitionExecutor.java b/iotdb-core/calc-commons/src/main/java/org/apache/iotdb/calc/execution/operator/process/window/partition/PartitionExecutor.java index 673e971a671..5cfcf3ac173 100644 --- a/iotdb-core/calc-commons/src/main/java/org/apache/iotdb/calc/execution/operator/process/window/partition/PartitionExecutor.java +++ b/iotdb-core/calc-commons/src/main/java/org/apache/iotdb/calc/execution/operator/process/window/partition/PartitionExecutor.java @@ -59,6 +59,7 @@ public final class PartitionExecutor { private final List<Frame> frames; private final boolean needPeerGroup; + private boolean windowFunctionsInitialized; public PartitionExecutor( List<TsBlock> tsBlocks, @@ -114,6 +115,7 @@ public final class PartitionExecutor { sortedColumns = partition.getSortedColumnList(sortChannels); currentPosition = partitionStart; + windowFunctionsInitialized = false; needPeerGroup = windowFunctions.stream().anyMatch(WindowFunction::needPeerGroup) || frameInfoList.stream() @@ -212,9 +214,16 @@ public final class PartitionExecutor { } } - public void resetWindowFunctions() { + private void resetWindowFunctions() { for (WindowFunction windowFunction : windowFunctions) { windowFunction.reset(); } } + + public void initializeWindowFunctions() { + if (!windowFunctionsInitialized) { + resetWindowFunctions(); + windowFunctionsInitialized = true; + } + } }
