This is an automated email from the ASF dual-hosted git repository.
jackietien pushed a commit to branch dev/1.3
in repository https://gitbox.apache.org/repos/asf/iotdb.git
The following commit(s) were added to refs/heads/dev/1.3 by this push:
new f829251657c Fix arrary index out-of-bounds in agg query with align by
device when cross region
f829251657c is described below
commit f829251657c0f6b4c181d3b2445f330505301649
Author: Weihao Li <[email protected]>
AuthorDate: Mon Jun 16 15:56:34 2025 +0800
Fix arrary index out-of-bounds in agg query with align by device when cross
region
---
.../plan/planner/distribution/SourceRewriter.java | 55 ++++++++++++++++++++++
.../planner/plan/node/process/AggregationNode.java | 3 +-
.../plan/node/process/RawDataAggregationNode.java | 3 +-
.../source/AlignedSeriesAggregationScanNode.java | 15 +++---
.../node/source/SeriesAggregationScanNode.java | 15 +++---
.../distribution/AggregationAlignByDeviceTest.java | 38 +++++++++++++++
.../org/apache/iotdb/commons/path/PartialPath.java | 12 +++++
7 files changed, 123 insertions(+), 18 deletions(-)
diff --git
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/planner/distribution/SourceRewriter.java
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/planner/distribution/SourceRewriter.java
index 63d59ccaec7..d24d3bb2429 100644
---
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/planner/distribution/SourceRewriter.java
+++
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/planner/distribution/SourceRewriter.java
@@ -30,6 +30,7 @@ import org.apache.iotdb.db.queryengine.common.MPPQueryContext;
import org.apache.iotdb.db.queryengine.execution.MemoryEstimationHelper;
import org.apache.iotdb.db.queryengine.plan.analyze.Analysis;
import org.apache.iotdb.db.queryengine.plan.expression.Expression;
+import org.apache.iotdb.db.queryengine.plan.expression.leaf.TimeSeriesOperand;
import
org.apache.iotdb.db.queryengine.plan.expression.multi.FunctionExpression;
import
org.apache.iotdb.db.queryengine.plan.planner.plan.node.BaseSourceRewriter;
import org.apache.iotdb.db.queryengine.plan.planner.plan.node.PlanNode;
@@ -49,6 +50,7 @@ import
org.apache.iotdb.db.queryengine.plan.planner.plan.node.process.Horizontal
import
org.apache.iotdb.db.queryengine.plan.planner.plan.node.process.LimitNode;
import
org.apache.iotdb.db.queryengine.plan.planner.plan.node.process.MergeSortNode;
import
org.apache.iotdb.db.queryengine.plan.planner.plan.node.process.MultiChildProcessNode;
+import
org.apache.iotdb.db.queryengine.plan.planner.plan.node.process.ProjectNode;
import
org.apache.iotdb.db.queryengine.plan.planner.plan.node.process.RawDataAggregationNode;
import
org.apache.iotdb.db.queryengine.plan.planner.plan.node.process.SingleDeviceViewNode;
import
org.apache.iotdb.db.queryengine.plan.planner.plan.node.process.SlidingWindowAggregationNode;
@@ -94,6 +96,7 @@ import java.util.Set;
import java.util.TreeSet;
import java.util.stream.Collectors;
+import static com.google.common.base.Preconditions.checkArgument;
import static org.apache.iotdb.commons.conf.IoTDBConstant.LAST_VALUE;
import static
org.apache.iotdb.commons.conf.IoTDBConstant.MULTI_LEVEL_PATH_WILDCARD;
import static org.apache.iotdb.commons.partition.DataPartition.NOT_ASSIGNED;
@@ -241,6 +244,8 @@ public class SourceRewriter extends
BaseSourceRewriter<DistributionPlanContext>
Map<Integer, List<Integer>> newMeasurementIdxMap = new HashMap<>();
List<String> newPartialOutputColumns = new ArrayList<>();
Set<Expression> deviceViewOutputExpressions =
analysis.getDeviceViewOutputExpressions();
+ // Used to rewrite child ProjectNode if it exists
+ List<FunctionExpression> actualPartialAggregations = new ArrayList<>();
int i = 0, newIdxSum = 0;
for (Expression expression : deviceViewOutputExpressions) {
@@ -248,6 +253,8 @@ public class SourceRewriter extends
BaseSourceRewriter<DistributionPlanContext>
newPartialOutputColumns.add(expression.getOutputSymbol());
i++;
newIdxSum++;
+ // just a placeholder, convenient for after process
+ actualPartialAggregations.add(null);
continue;
}
FunctionExpression aggExpression = (FunctionExpression) expression;
@@ -268,6 +275,7 @@ public class SourceRewriter extends
BaseSourceRewriter<DistributionPlanContext>
.setType(partialFunctionExpression.getOutputSymbol(),
dataType);
}
newPartialOutputColumns.add(partialFunctionExpression.getOutputSymbol());
+ actualPartialAggregations.add(partialFunctionExpression);
}
newMeasurementIdxMap.put(
i++,
@@ -288,6 +296,38 @@ public class SourceRewriter extends
BaseSourceRewriter<DistributionPlanContext>
DeviceViewNode deviceViewNode = (DeviceViewNode) planNode;
deviceViewNode.setOutputColumnNames(newPartialOutputColumns);
transferAggregatorsRecursively(planNode, context);
+
+ List<String> devices = deviceViewNode.getDevices();
+ for (int j = 0; j < devices.size(); j++) {
+ if (deviceViewNode.getChildren().get(j) instanceof ProjectNode) {
+ String device = devices.get(j);
+
+ // construct output column names for each child ProjectNode
+ List<Integer> newMeasurementIdxList =
+ deviceViewNode.getDeviceToMeasurementIndexesMap().get(device);
+ List<String> newProjectOutputs =
+ newMeasurementIdxList.stream()
+ .map(
+ // process each measurement
+ measurementIdx -> {
+ FunctionExpression aggExpression =
+ actualPartialAggregations.get(measurementIdx);
+
+ // construct new FunctionExpression with device for
ProjectNode
+ List<Expression> withDeviceExpressions =
+ getWithDeviceExpressions(aggExpression, device);
+ aggExpression =
+ new FunctionExpression(
+ aggExpression.getFunctionName(),
+ aggExpression.getFunctionAttributes(),
+ withDeviceExpressions);
+ return aggExpression.getExpressionString();
+ })
+ .collect(Collectors.toList());
+ ((ProjectNode) deviceViewNode.getChildren().get(j))
+ .setOutputColumnNames(newProjectOutputs);
+ }
+ }
}
boolean hasGroupBy =
@@ -312,6 +352,21 @@ public class SourceRewriter extends
BaseSourceRewriter<DistributionPlanContext>
}
}
+ private static List<Expression> getWithDeviceExpressions(
+ FunctionExpression aggExpression, String device) {
+ return aggExpression.getExpressions().stream()
+ .map(
+ // process each argument of FunctionExpression
+ argument -> {
+ checkArgument(
+ argument instanceof TimeSeriesOperand,
+ "Argument of AggregationFunction should be TimeSeriesOperand
here");
+ return new TimeSeriesOperand(
+ new PartialPath(device, argument.getExpressionString(),
false));
+ })
+ .collect(Collectors.toList());
+ }
+
/**
* aggregation align by device, and aggregation is `count_if` or `diff`, or
aggregation used with
* group by parameter (session, variation, count), use the old aggregation
logic
diff --git
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/planner/plan/node/process/AggregationNode.java
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/planner/plan/node/process/AggregationNode.java
index 0f81707bbea..b7e1aea74f1 100644
---
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/planner/plan/node/process/AggregationNode.java
+++
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/planner/plan/node/process/AggregationNode.java
@@ -96,6 +96,7 @@ public class AggregationNode extends MultiChildProcessNode {
this.scanOrder = scanOrder;
}
+ // used by clone & deserialize
public AggregationNode(
PlanNodeId id,
List<AggregationDescriptor> aggregationDescriptorList,
@@ -105,7 +106,7 @@ public class AggregationNode extends MultiChildProcessNode {
boolean outputEndTime,
Ordering scanOrder) {
super(id, new ArrayList<>());
- this.aggregationDescriptorList =
getDeduplicatedDescriptors(aggregationDescriptorList);
+ this.aggregationDescriptorList = aggregationDescriptorList;
this.groupByTimeParameter = groupByTimeParameter;
this.scanOrder = scanOrder;
this.groupByParameter = groupByParameter;
diff --git
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/planner/plan/node/process/RawDataAggregationNode.java
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/planner/plan/node/process/RawDataAggregationNode.java
index dafcbdb9409..24a207184e0 100644
---
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/planner/plan/node/process/RawDataAggregationNode.java
+++
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/planner/plan/node/process/RawDataAggregationNode.java
@@ -89,6 +89,7 @@ public class RawDataAggregationNode extends
SingleChildProcessNode {
this.scanOrder = scanOrder;
}
+ // used by clone & deserialize
public RawDataAggregationNode(
PlanNodeId id,
List<AggregationDescriptor> aggregationDescriptorList,
@@ -98,7 +99,7 @@ public class RawDataAggregationNode extends
SingleChildProcessNode {
boolean outputEndTime,
Ordering scanOrder) {
super(id);
- this.aggregationDescriptorList =
getDeduplicatedDescriptors(aggregationDescriptorList);
+ this.aggregationDescriptorList = aggregationDescriptorList;
this.groupByTimeParameter = groupByTimeParameter;
this.scanOrder = scanOrder;
this.groupByParameter = groupByParameter;
diff --git
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/planner/plan/node/source/AlignedSeriesAggregationScanNode.java
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/planner/plan/node/source/AlignedSeriesAggregationScanNode.java
index 6e53f5d95d7..b16b723c44b 100644
---
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/planner/plan/node/source/AlignedSeriesAggregationScanNode.java
+++
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/planner/plan/node/source/AlignedSeriesAggregationScanNode.java
@@ -99,6 +99,7 @@ public class AlignedSeriesAggregationScanNode extends
SeriesAggregationSourceNod
this.regionReplicaSet = dataRegionReplicaSet;
}
+ // used by clone & deserialize
public AlignedSeriesAggregationScanNode(
PlanNodeId id,
AlignedPath alignedPath,
@@ -109,14 +110,12 @@ public class AlignedSeriesAggregationScanNode extends
SeriesAggregationSourceNod
@Nullable GroupByTimeParameter groupByTimeParameter,
TRegionReplicaSet dataRegionReplicaSet,
byte descriptorType) {
- this(
- id,
- alignedPath,
- aggregationDescriptorList,
- scanOrder,
- pushDownPredicate,
- groupByTimeParameter,
- dataRegionReplicaSet);
+ super(id, aggregationDescriptorList);
+ this.alignedPath = alignedPath;
+ this.scanOrder = scanOrder;
+ this.groupByTimeParameter = groupByTimeParameter;
+ this.pushDownPredicate = pushDownPredicate;
+ this.regionReplicaSet = dataRegionReplicaSet;
setOutputEndTime(outputEndTime);
setDescriptorType(descriptorType);
}
diff --git
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/planner/plan/node/source/SeriesAggregationScanNode.java
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/planner/plan/node/source/SeriesAggregationScanNode.java
index 01e801cad22..54a186b98ce 100644
---
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/planner/plan/node/source/SeriesAggregationScanNode.java
+++
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/planner/plan/node/source/SeriesAggregationScanNode.java
@@ -105,6 +105,7 @@ public class SeriesAggregationScanNode extends
SeriesAggregationSourceNode {
this.regionReplicaSet = dataRegionReplicaSet;
}
+ // used by clone & deserialize
public SeriesAggregationScanNode(
PlanNodeId id,
MeasurementPath seriesPath,
@@ -114,14 +115,12 @@ public class SeriesAggregationScanNode extends
SeriesAggregationSourceNode {
@Nullable Expression pushDownPredicate,
@Nullable GroupByTimeParameter groupByTimeParameter,
TRegionReplicaSet dataRegionReplicaSet) {
- this(
- id,
- seriesPath,
- aggregationDescriptorList,
- scanOrder,
- pushDownPredicate,
- groupByTimeParameter,
- dataRegionReplicaSet);
+ super(id, aggregationDescriptorList);
+ this.seriesPath = seriesPath;
+ this.scanOrder = scanOrder;
+ this.groupByTimeParameter = groupByTimeParameter;
+ this.pushDownPredicate = pushDownPredicate;
+ this.regionReplicaSet = dataRegionReplicaSet;
setOutputEndTime(outputEndTime);
}
diff --git
a/iotdb-core/datanode/src/test/java/org/apache/iotdb/db/queryengine/plan/planner/distribution/AggregationAlignByDeviceTest.java
b/iotdb-core/datanode/src/test/java/org/apache/iotdb/db/queryengine/plan/planner/distribution/AggregationAlignByDeviceTest.java
index f6bb0deea1c..185db3a9cf6 100644
---
a/iotdb-core/datanode/src/test/java/org/apache/iotdb/db/queryengine/plan/planner/distribution/AggregationAlignByDeviceTest.java
+++
b/iotdb-core/datanode/src/test/java/org/apache/iotdb/db/queryengine/plan/planner/distribution/AggregationAlignByDeviceTest.java
@@ -26,6 +26,7 @@ import org.apache.iotdb.db.queryengine.plan.analyze.Analysis;
import org.apache.iotdb.db.queryengine.plan.planner.plan.DistributedQueryPlan;
import org.apache.iotdb.db.queryengine.plan.planner.plan.LogicalQueryPlan;
import org.apache.iotdb.db.queryengine.plan.planner.plan.node.PlanNode;
+import
org.apache.iotdb.db.queryengine.plan.planner.plan.node.process.AggregationMergeSortNode;
import
org.apache.iotdb.db.queryengine.plan.planner.plan.node.process.DeviceViewNode;
import
org.apache.iotdb.db.queryengine.plan.planner.plan.node.process.HorizontallyConcatNode;
import
org.apache.iotdb.db.queryengine.plan.planner.plan.node.process.MergeSortNode;
@@ -39,6 +40,7 @@ import
org.apache.iotdb.db.queryengine.plan.planner.plan.node.sink.IdentitySinkN
import
org.apache.iotdb.db.queryengine.plan.planner.plan.node.sink.ShuffleSinkNode;
import
org.apache.iotdb.db.queryengine.plan.planner.plan.node.source.SeriesAggregationScanNode;
+import com.google.common.collect.ImmutableList;
import org.junit.Test;
import static org.junit.Assert.assertEquals;
@@ -428,4 +430,40 @@ public class AggregationAlignByDeviceTest {
assertTrue(
firstFiTopNode.getChildren().get(0).getChildren().get(0) instanceof
HorizontallyConcatNode);
}
+
+ @Test
+ public void crossRegionTest() {
+ // one aggregation measurement, two devices
+ sql = "select last_value(s1),last_value(s2)from root.sg.d1 align by
device";
+ analysis = Util.analyze(sql, context);
+ logicalPlanNode = Util.genLogicalPlan(analysis, context);
+ planner = new DistributionPlanner(analysis, new LogicalQueryPlan(context,
logicalPlanNode));
+ plan = planner.planFragments();
+ assertEquals(2, plan.getInstances().size());
+
+ firstFiRoot =
plan.getInstances().get(0).getFragment().getPlanNodeTree().getChildren().get(0);
+ assertTrue(firstFiRoot instanceof AggregationMergeSortNode);
+ assertTrue(firstFiRoot.getChildren().get(0) instanceof DeviceViewNode);
+ if (firstFiRoot.getChildren().get(0).getChildren().get(0) instanceof
ProjectNode) {
+ assertEquals(
+
firstFiRoot.getChildren().get(0).getChildren().get(0).getOutputColumnNames(),
+ ImmutableList.of(
+ "last_value(root.sg.d1.s1)",
+ "max_time(root.sg.d1.s1)",
+ "last_value(root.sg.d1.s2)",
+ "max_time(root.sg.d1.s2)"));
+ }
+
+ secondFiRoot =
plan.getInstances().get(1).getFragment().getPlanNodeTree().getChildren().get(0);
+ assertTrue(secondFiRoot instanceof DeviceViewNode);
+ if (secondFiRoot.getChildren().get(0) instanceof ProjectNode) {
+ assertEquals(
+
firstFiRoot.getChildren().get(0).getChildren().get(0).getOutputColumnNames(),
+ ImmutableList.of(
+ "last_value(root.sg.d1.s1)",
+ "max_time(root.sg.d1.s1)",
+ "last_value(root.sg.d1.s2)",
+ "max_time(root.sg.d1.s2)"));
+ }
+ }
}
diff --git
a/iotdb-core/node-commons/src/main/java/org/apache/iotdb/commons/path/PartialPath.java
b/iotdb-core/node-commons/src/main/java/org/apache/iotdb/commons/path/PartialPath.java
index 59a64352af1..ceec1f17c4e 100644
---
a/iotdb-core/node-commons/src/main/java/org/apache/iotdb/commons/path/PartialPath.java
+++
b/iotdb-core/node-commons/src/main/java/org/apache/iotdb/commons/path/PartialPath.java
@@ -118,6 +118,18 @@ public class PartialPath extends Path implements
Comparable<Path>, Cloneable {
}
}
+ /**
+ * only use this method in following situations: 1. you are sure you do not
want to split the
+ * path. 2. you are sure path is correct.
+ *
+ * @param needSplit whether to split path to nodes, needSplit can only be
false.
+ */
+ public PartialPath(String device, String measurement, boolean needSplit) {
+ Validate.isTrue(!needSplit);
+ String path = device + TsFileConstant.PATH_SEPARATOR + measurement;
+ this.nodes = new String[] {path};
+ }
+
public boolean hasWildcard() {
for (String node : nodes) {
// *, ** , d*, *d*