This is an automated email from the ASF dual-hosted git repository.
jackietien pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/iotdb.git
The following commit(s) were added to refs/heads/master by this push:
new 16ad6c1a686 Resolve IndexOutOfPartitionBound and symbol not found in
window functions.
16ad6c1a686 is described below
commit 16ad6c1a686b7e565b5c64a672da9749228fc29e
Author: Zhihao Shen <[email protected]>
AuthorDate: Mon Jun 30 09:42:40 2025 +0800
Resolve IndexOutOfPartitionBound and symbol not found in window functions.
---
.../relational/it/db/it/IoTDBWindowFunctionIT.java | 63 ++++++++++++++++++++++
.../process/window/TableWindowOperator.java | 6 ++-
.../window/partition/PartitionExecutor.java | 11 ++--
.../planner/optimizations/SymbolMapper.java | 10 +++-
4 files changed, 82 insertions(+), 8 deletions(-)
diff --git
a/integration-test/src/test/java/org/apache/iotdb/relational/it/db/it/IoTDBWindowFunctionIT.java
b/integration-test/src/test/java/org/apache/iotdb/relational/it/db/it/IoTDBWindowFunctionIT.java
index 1be98fa7bf4..d99de124f24 100644
---
a/integration-test/src/test/java/org/apache/iotdb/relational/it/db/it/IoTDBWindowFunctionIT.java
+++
b/integration-test/src/test/java/org/apache/iotdb/relational/it/db/it/IoTDBWindowFunctionIT.java
@@ -72,6 +72,21 @@ public class IoTDBWindowFunctionIT {
"FLUSH",
"CLEAR ATTRIBUTE CACHE",
};
+ private static final String[] normalSqls =
+ new String[] {
+ "create table demo3 (Device string tag, t2 string tag, t3 string tag,
a1 string attribute, Flow int64 field, f2 int32 field, f3 double field,f4 float
field, date date field, timestamp timestamp field, string string field)",
+ "insert into demo3 values (1970-01-01T08:00:00.000+08:00, 'd0', 'd0',
'd0', 'd0', 3, 3, 3.0, 3.0, '1970-01-01', 3, '3')",
+ "insert into demo3 values (1970-01-01T08:00:00.001+08:00, 'd0', 'd0',
'd0', 'd0', 5, 5, 5.0, 5.0, '1970-01-01', 5, '5')",
+ "insert into demo3 values (1970-01-01T08:00:00.002+08:00, 'd0', 'd0',
'd0', 'd0', 3, 3, 3.0, 3.0, '1970-01-01', 3, '3')",
+ "insert into demo3 values (1970-01-01T08:00:00.003+08:00, 'd0', 'd0',
'd0', 'd0', 1, 1, 1.0, 1.0, '1970-01-01', 1, '1')",
+ "insert into demo3 values (1970-01-01T08:00:00.004+08:00, 'd0', 'd0',
'd0', 'd0', null, null, null, null, null, null, null)",
+ "FLUSH",
+ "insert into demo3 values (1970-01-01T08:00:00.005+08:00, 'd1', 'd1',
'd1', 'd1', 2, 2, 2.0, 2.0, '1970-01-01', 2, '2')",
+ "insert into demo3 values (1970-01-01T08:00:00.006+08:00, 'd1', 'd1',
'd1', 'd1', null, null, null, null, null, null, null)",
+ "insert into demo3 values (1970-01-01T08:00:00.007+08:00, 'd1', 'd1',
'd1', 'd1', 4, 4, 4.0, 4.0, '1970-01-01', 4, '4')",
+ "insert into demo3 values (1970-01-01T08:00:00.008+08:00, null, null,
null, null, null, null, null, null, null, null, null)",
+ "CLEAR ATTRIBUTE CACHE",
+ };
private static void insertData() {
try (Connection connection = EnvFactory.getEnv().getTableConnection();
@@ -82,6 +97,9 @@ public class IoTDBWindowFunctionIT {
for (String sql : sqlsWithNulls) {
statement.execute(sql);
}
+ for (String sql : normalSqls) {
+ statement.execute(sql);
+ }
} catch (Exception e) {
fail("insertData failed.");
}
@@ -576,4 +594,49 @@ public class IoTDBWindowFunctionIT {
"Window frame offset value must not be negative or null",
DATABASE_NAME);
}
+
+ @Test
+ public void testMultiPartitions() {
+ String[] expectedHeader = new String[] {"time", "device", "flow", "cnt"};
+ String[] retArray =
+ new String[] {
+ "1970-01-01T00:00:00.000Z,d0,3,4,",
+ "1970-01-01T00:00:00.001Z,d0,5,4,",
+ "1970-01-01T00:00:00.002Z,d0,3,4,",
+ "1970-01-01T00:00:00.003Z,d0,1,4,",
+ "1970-01-01T00:00:00.004Z,d0,null,4,",
+ "1970-01-01T00:00:00.005Z,d1,2,2,",
+ "1970-01-01T00:00:00.006Z,d1,null,2,",
+ "1970-01-01T00:00:00.007Z,d1,4,2,",
+ "1970-01-01T00:00:00.008Z,null,null,0,",
+ };
+ tableResultSetEqualTest(
+ "SELECT time, device, flow, count(flow) OVER(PARTITION BY device ORDER
BY flow ROWS BETWEEN UNBOUNDED PRECEDING AND UNBOUNDED FOLLOWING) AS cnt FROM
demo3 ORDER BY time",
+ expectedHeader,
+ retArray,
+ DATABASE_NAME);
+ }
+
+ @Test
+ public void testComplexQuery() {
+ String[] expectedHeader = new String[] {"flow", "_col1"};
+ String[] retArray =
+ new String[] {
+ "1,null,",
+ "3,1970-01-01T00:00:00.003Z,",
+ "5,1970-01-01T00:00:00.002Z,",
+ "null,1970-01-01T00:00:00.001Z,",
+ };
+ tableResultSetEqualTest(
+ " SELECT flow, \n"
+ + " lag(last(time)) OVER (order by flow)\n"
+ + " FROM VARIATION(\n"
+ + " DATA => (SELECT time, flow FROM demo3 WHERE device =
'd0'),\n"
+ + " COL => 'flow',\n"
+ + " DELTA => 0.0)\n"
+ + " GROUP BY flow",
+ expectedHeader,
+ retArray,
+ DATABASE_NAME);
+ }
}
diff --git
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/execution/operator/process/window/TableWindowOperator.java
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/execution/operator/process/window/TableWindowOperator.java
index e1737ffdbcd..af46988c55e 100644
---
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/execution/operator/process/window/TableWindowOperator.java
+++
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/execution/operator/process/window/TableWindowOperator.java
@@ -99,8 +99,8 @@ public class TableWindowOperator implements ProcessOperator {
this.tsBlockBuilder = new TsBlockBuilder(outputDataTypes);
// Basic information part
- this.windowFunctions = windowFunctions;
- this.frameInfoList = frameInfoList;
+ this.windowFunctions = ImmutableList.copyOf(windowFunctions);
+ this.frameInfoList = ImmutableList.copyOf(frameInfoList);
// Partition Part
this.partitionChannels = ImmutableList.copyOf(partitionChannels);
@@ -313,6 +313,8 @@ public class TableWindowOperator implements ProcessOperator
{
private TsBlock transform(long startTime) {
while (!cachedPartitionExecutors.isEmpty()) {
PartitionExecutor partitionExecutor =
cachedPartitionExecutors.getFirst();
+ // Reset window functions for new partition
+ partitionExecutor.resetWindowFunctions();
while (System.nanoTime() - startTime < maxRuntime
&& !tsBlockBuilder.isFull()
diff --git
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/execution/operator/process/window/partition/PartitionExecutor.java
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/execution/operator/process/window/partition/PartitionExecutor.java
index 9075592a7e1..20167955d79 100644
---
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/execution/operator/process/window/partition/PartitionExecutor.java
+++
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/execution/operator/process/window/partition/PartitionExecutor.java
@@ -88,11 +88,6 @@ public final class PartitionExecutor {
peerGroupComparator = new RowComparator(sortDataTypes);
sortedColumns = partition.getSortedColumnList(sortChannels);
- // Reset functions for new partition
- for (WindowFunction windowFunction : windowFunctions) {
- windowFunction.reset();
- }
-
currentPosition = partitionStart;
needPeerGroup =
windowFunctions.stream().anyMatch(WindowFunction::needPeerGroup)
@@ -198,4 +193,10 @@ public final class PartitionExecutor {
peerGroupEnd++;
}
}
+
+ public void resetWindowFunctions() {
+ for (WindowFunction windowFunction : windowFunctions) {
+ windowFunction.reset();
+ }
+ }
}
diff --git
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/relational/planner/optimizations/SymbolMapper.java
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/relational/planner/optimizations/SymbolMapper.java
index 7065b899e14..db198c8ce70 100644
---
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/relational/planner/optimizations/SymbolMapper.java
+++
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/relational/planner/optimizations/SymbolMapper.java
@@ -51,6 +51,7 @@ import java.util.HashSet;
import java.util.List;
import java.util.Map;
import java.util.Map.Entry;
+import java.util.Optional;
import java.util.Set;
import java.util.function.Function;
import java.util.stream.Collectors;
@@ -263,10 +264,17 @@ public class SymbolMapper {
function.isIgnoreNulls()));
});
+ ImmutableList<Symbol> newPartitionBy =
+
node.getSpecification().getPartitionBy().stream().map(this::map).collect(toImmutableList());
+ Optional<OrderingScheme> newOrderingScheme =
+ node.getSpecification().getOrderingScheme().map(this::map);
+ DataOrganizationSpecification newSpecification =
+ new DataOrganizationSpecification(newPartitionBy, newOrderingScheme);
+
return new WindowNode(
node.getPlanNodeId(),
source,
- node.getSpecification(),
+ newSpecification,
newFunctions.buildOrThrow(),
node.getHashSymbol().map(this::map),
node.getPrePartitionedInputs().stream().map(this::map).collect(toImmutableSet()),