This is an automated email from the ASF dual-hosted git repository.

jackietien pushed a commit to branch dev/1.3
in repository https://gitbox.apache.org/repos/asf/iotdb.git


The following commit(s) were added to refs/heads/dev/1.3 by this push:
     new f829251657c Fix arrary index out-of-bounds in agg query with align by 
device when cross region
f829251657c is described below

commit f829251657c0f6b4c181d3b2445f330505301649
Author: Weihao Li <[email protected]>
AuthorDate: Mon Jun 16 15:56:34 2025 +0800

    Fix arrary index out-of-bounds in agg query with align by device when cross 
region
---
 .../plan/planner/distribution/SourceRewriter.java  | 55 ++++++++++++++++++++++
 .../planner/plan/node/process/AggregationNode.java |  3 +-
 .../plan/node/process/RawDataAggregationNode.java  |  3 +-
 .../source/AlignedSeriesAggregationScanNode.java   | 15 +++---
 .../node/source/SeriesAggregationScanNode.java     | 15 +++---
 .../distribution/AggregationAlignByDeviceTest.java | 38 +++++++++++++++
 .../org/apache/iotdb/commons/path/PartialPath.java | 12 +++++
 7 files changed, 123 insertions(+), 18 deletions(-)

diff --git 
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/planner/distribution/SourceRewriter.java
 
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/planner/distribution/SourceRewriter.java
index 63d59ccaec7..d24d3bb2429 100644
--- 
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/planner/distribution/SourceRewriter.java
+++ 
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/planner/distribution/SourceRewriter.java
@@ -30,6 +30,7 @@ import org.apache.iotdb.db.queryengine.common.MPPQueryContext;
 import org.apache.iotdb.db.queryengine.execution.MemoryEstimationHelper;
 import org.apache.iotdb.db.queryengine.plan.analyze.Analysis;
 import org.apache.iotdb.db.queryengine.plan.expression.Expression;
+import org.apache.iotdb.db.queryengine.plan.expression.leaf.TimeSeriesOperand;
 import 
org.apache.iotdb.db.queryengine.plan.expression.multi.FunctionExpression;
 import 
org.apache.iotdb.db.queryengine.plan.planner.plan.node.BaseSourceRewriter;
 import org.apache.iotdb.db.queryengine.plan.planner.plan.node.PlanNode;
@@ -49,6 +50,7 @@ import 
org.apache.iotdb.db.queryengine.plan.planner.plan.node.process.Horizontal
 import 
org.apache.iotdb.db.queryengine.plan.planner.plan.node.process.LimitNode;
 import 
org.apache.iotdb.db.queryengine.plan.planner.plan.node.process.MergeSortNode;
 import 
org.apache.iotdb.db.queryengine.plan.planner.plan.node.process.MultiChildProcessNode;
+import 
org.apache.iotdb.db.queryengine.plan.planner.plan.node.process.ProjectNode;
 import 
org.apache.iotdb.db.queryengine.plan.planner.plan.node.process.RawDataAggregationNode;
 import 
org.apache.iotdb.db.queryengine.plan.planner.plan.node.process.SingleDeviceViewNode;
 import 
org.apache.iotdb.db.queryengine.plan.planner.plan.node.process.SlidingWindowAggregationNode;
@@ -94,6 +96,7 @@ import java.util.Set;
 import java.util.TreeSet;
 import java.util.stream.Collectors;
 
+import static com.google.common.base.Preconditions.checkArgument;
 import static org.apache.iotdb.commons.conf.IoTDBConstant.LAST_VALUE;
 import static 
org.apache.iotdb.commons.conf.IoTDBConstant.MULTI_LEVEL_PATH_WILDCARD;
 import static org.apache.iotdb.commons.partition.DataPartition.NOT_ASSIGNED;
@@ -241,6 +244,8 @@ public class SourceRewriter extends 
BaseSourceRewriter<DistributionPlanContext>
       Map<Integer, List<Integer>> newMeasurementIdxMap = new HashMap<>();
       List<String> newPartialOutputColumns = new ArrayList<>();
       Set<Expression> deviceViewOutputExpressions = 
analysis.getDeviceViewOutputExpressions();
+      // Used to rewrite child ProjectNode if it exists
+      List<FunctionExpression> actualPartialAggregations = new ArrayList<>();
 
       int i = 0, newIdxSum = 0;
       for (Expression expression : deviceViewOutputExpressions) {
@@ -248,6 +253,8 @@ public class SourceRewriter extends 
BaseSourceRewriter<DistributionPlanContext>
           newPartialOutputColumns.add(expression.getOutputSymbol());
           i++;
           newIdxSum++;
+          // just a placeholder, convenient for after process
+          actualPartialAggregations.add(null);
           continue;
         }
         FunctionExpression aggExpression = (FunctionExpression) expression;
@@ -268,6 +275,7 @@ public class SourceRewriter extends 
BaseSourceRewriter<DistributionPlanContext>
                 .setType(partialFunctionExpression.getOutputSymbol(), 
dataType);
           }
           
newPartialOutputColumns.add(partialFunctionExpression.getOutputSymbol());
+          actualPartialAggregations.add(partialFunctionExpression);
         }
         newMeasurementIdxMap.put(
             i++,
@@ -288,6 +296,38 @@ public class SourceRewriter extends 
BaseSourceRewriter<DistributionPlanContext>
         DeviceViewNode deviceViewNode = (DeviceViewNode) planNode;
         deviceViewNode.setOutputColumnNames(newPartialOutputColumns);
         transferAggregatorsRecursively(planNode, context);
+
+        List<String> devices = deviceViewNode.getDevices();
+        for (int j = 0; j < devices.size(); j++) {
+          if (deviceViewNode.getChildren().get(j) instanceof ProjectNode) {
+            String device = devices.get(j);
+
+            // construct output column names for each child ProjectNode
+            List<Integer> newMeasurementIdxList =
+                deviceViewNode.getDeviceToMeasurementIndexesMap().get(device);
+            List<String> newProjectOutputs =
+                newMeasurementIdxList.stream()
+                    .map(
+                        // process each measurement
+                        measurementIdx -> {
+                          FunctionExpression aggExpression =
+                              actualPartialAggregations.get(measurementIdx);
+
+                          // construct new FunctionExpression with device for 
ProjectNode
+                          List<Expression> withDeviceExpressions =
+                              getWithDeviceExpressions(aggExpression, device);
+                          aggExpression =
+                              new FunctionExpression(
+                                  aggExpression.getFunctionName(),
+                                  aggExpression.getFunctionAttributes(),
+                                  withDeviceExpressions);
+                          return aggExpression.getExpressionString();
+                        })
+                    .collect(Collectors.toList());
+            ((ProjectNode) deviceViewNode.getChildren().get(j))
+                .setOutputColumnNames(newProjectOutputs);
+          }
+        }
       }
 
       boolean hasGroupBy =
@@ -312,6 +352,21 @@ public class SourceRewriter extends 
BaseSourceRewriter<DistributionPlanContext>
     }
   }
 
+  private static List<Expression> getWithDeviceExpressions(
+      FunctionExpression aggExpression, String device) {
+    return aggExpression.getExpressions().stream()
+        .map(
+            // process each argument of FunctionExpression
+            argument -> {
+              checkArgument(
+                  argument instanceof TimeSeriesOperand,
+                  "Argument of AggregationFunction should be TimeSeriesOperand 
here");
+              return new TimeSeriesOperand(
+                  new PartialPath(device, argument.getExpressionString(), 
false));
+            })
+        .collect(Collectors.toList());
+  }
+
   /**
    * aggregation align by device, and aggregation is `count_if` or `diff`, or 
aggregation used with
    * group by parameter (session, variation, count), use the old aggregation 
logic
diff --git 
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/planner/plan/node/process/AggregationNode.java
 
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/planner/plan/node/process/AggregationNode.java
index 0f81707bbea..b7e1aea74f1 100644
--- 
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/planner/plan/node/process/AggregationNode.java
+++ 
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/planner/plan/node/process/AggregationNode.java
@@ -96,6 +96,7 @@ public class AggregationNode extends MultiChildProcessNode {
     this.scanOrder = scanOrder;
   }
 
+  // used by clone & deserialize
   public AggregationNode(
       PlanNodeId id,
       List<AggregationDescriptor> aggregationDescriptorList,
@@ -105,7 +106,7 @@ public class AggregationNode extends MultiChildProcessNode {
       boolean outputEndTime,
       Ordering scanOrder) {
     super(id, new ArrayList<>());
-    this.aggregationDescriptorList = 
getDeduplicatedDescriptors(aggregationDescriptorList);
+    this.aggregationDescriptorList = aggregationDescriptorList;
     this.groupByTimeParameter = groupByTimeParameter;
     this.scanOrder = scanOrder;
     this.groupByParameter = groupByParameter;
diff --git 
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/planner/plan/node/process/RawDataAggregationNode.java
 
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/planner/plan/node/process/RawDataAggregationNode.java
index dafcbdb9409..24a207184e0 100644
--- 
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/planner/plan/node/process/RawDataAggregationNode.java
+++ 
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/planner/plan/node/process/RawDataAggregationNode.java
@@ -89,6 +89,7 @@ public class RawDataAggregationNode extends 
SingleChildProcessNode {
     this.scanOrder = scanOrder;
   }
 
+  // used by clone & deserialize
   public RawDataAggregationNode(
       PlanNodeId id,
       List<AggregationDescriptor> aggregationDescriptorList,
@@ -98,7 +99,7 @@ public class RawDataAggregationNode extends 
SingleChildProcessNode {
       boolean outputEndTime,
       Ordering scanOrder) {
     super(id);
-    this.aggregationDescriptorList = 
getDeduplicatedDescriptors(aggregationDescriptorList);
+    this.aggregationDescriptorList = aggregationDescriptorList;
     this.groupByTimeParameter = groupByTimeParameter;
     this.scanOrder = scanOrder;
     this.groupByParameter = groupByParameter;
diff --git 
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/planner/plan/node/source/AlignedSeriesAggregationScanNode.java
 
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/planner/plan/node/source/AlignedSeriesAggregationScanNode.java
index 6e53f5d95d7..b16b723c44b 100644
--- 
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/planner/plan/node/source/AlignedSeriesAggregationScanNode.java
+++ 
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/planner/plan/node/source/AlignedSeriesAggregationScanNode.java
@@ -99,6 +99,7 @@ public class AlignedSeriesAggregationScanNode extends 
SeriesAggregationSourceNod
     this.regionReplicaSet = dataRegionReplicaSet;
   }
 
+  // used by clone & deserialize
   public AlignedSeriesAggregationScanNode(
       PlanNodeId id,
       AlignedPath alignedPath,
@@ -109,14 +110,12 @@ public class AlignedSeriesAggregationScanNode extends 
SeriesAggregationSourceNod
       @Nullable GroupByTimeParameter groupByTimeParameter,
       TRegionReplicaSet dataRegionReplicaSet,
       byte descriptorType) {
-    this(
-        id,
-        alignedPath,
-        aggregationDescriptorList,
-        scanOrder,
-        pushDownPredicate,
-        groupByTimeParameter,
-        dataRegionReplicaSet);
+    super(id, aggregationDescriptorList);
+    this.alignedPath = alignedPath;
+    this.scanOrder = scanOrder;
+    this.groupByTimeParameter = groupByTimeParameter;
+    this.pushDownPredicate = pushDownPredicate;
+    this.regionReplicaSet = dataRegionReplicaSet;
     setOutputEndTime(outputEndTime);
     setDescriptorType(descriptorType);
   }
diff --git 
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/planner/plan/node/source/SeriesAggregationScanNode.java
 
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/planner/plan/node/source/SeriesAggregationScanNode.java
index 01e801cad22..54a186b98ce 100644
--- 
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/planner/plan/node/source/SeriesAggregationScanNode.java
+++ 
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/planner/plan/node/source/SeriesAggregationScanNode.java
@@ -105,6 +105,7 @@ public class SeriesAggregationScanNode extends 
SeriesAggregationSourceNode {
     this.regionReplicaSet = dataRegionReplicaSet;
   }
 
+  // used by clone & deserialize
   public SeriesAggregationScanNode(
       PlanNodeId id,
       MeasurementPath seriesPath,
@@ -114,14 +115,12 @@ public class SeriesAggregationScanNode extends 
SeriesAggregationSourceNode {
       @Nullable Expression pushDownPredicate,
       @Nullable GroupByTimeParameter groupByTimeParameter,
       TRegionReplicaSet dataRegionReplicaSet) {
-    this(
-        id,
-        seriesPath,
-        aggregationDescriptorList,
-        scanOrder,
-        pushDownPredicate,
-        groupByTimeParameter,
-        dataRegionReplicaSet);
+    super(id, aggregationDescriptorList);
+    this.seriesPath = seriesPath;
+    this.scanOrder = scanOrder;
+    this.groupByTimeParameter = groupByTimeParameter;
+    this.pushDownPredicate = pushDownPredicate;
+    this.regionReplicaSet = dataRegionReplicaSet;
     setOutputEndTime(outputEndTime);
   }
 
diff --git 
a/iotdb-core/datanode/src/test/java/org/apache/iotdb/db/queryengine/plan/planner/distribution/AggregationAlignByDeviceTest.java
 
b/iotdb-core/datanode/src/test/java/org/apache/iotdb/db/queryengine/plan/planner/distribution/AggregationAlignByDeviceTest.java
index f6bb0deea1c..185db3a9cf6 100644
--- 
a/iotdb-core/datanode/src/test/java/org/apache/iotdb/db/queryengine/plan/planner/distribution/AggregationAlignByDeviceTest.java
+++ 
b/iotdb-core/datanode/src/test/java/org/apache/iotdb/db/queryengine/plan/planner/distribution/AggregationAlignByDeviceTest.java
@@ -26,6 +26,7 @@ import org.apache.iotdb.db.queryengine.plan.analyze.Analysis;
 import org.apache.iotdb.db.queryengine.plan.planner.plan.DistributedQueryPlan;
 import org.apache.iotdb.db.queryengine.plan.planner.plan.LogicalQueryPlan;
 import org.apache.iotdb.db.queryengine.plan.planner.plan.node.PlanNode;
+import 
org.apache.iotdb.db.queryengine.plan.planner.plan.node.process.AggregationMergeSortNode;
 import 
org.apache.iotdb.db.queryengine.plan.planner.plan.node.process.DeviceViewNode;
 import 
org.apache.iotdb.db.queryengine.plan.planner.plan.node.process.HorizontallyConcatNode;
 import 
org.apache.iotdb.db.queryengine.plan.planner.plan.node.process.MergeSortNode;
@@ -39,6 +40,7 @@ import 
org.apache.iotdb.db.queryengine.plan.planner.plan.node.sink.IdentitySinkN
 import 
org.apache.iotdb.db.queryengine.plan.planner.plan.node.sink.ShuffleSinkNode;
 import 
org.apache.iotdb.db.queryengine.plan.planner.plan.node.source.SeriesAggregationScanNode;
 
+import com.google.common.collect.ImmutableList;
 import org.junit.Test;
 
 import static org.junit.Assert.assertEquals;
@@ -428,4 +430,40 @@ public class AggregationAlignByDeviceTest {
     assertTrue(
         firstFiTopNode.getChildren().get(0).getChildren().get(0) instanceof 
HorizontallyConcatNode);
   }
+
+  @Test
+  public void crossRegionTest() {
+    // one aggregation measurement, two devices
+    sql = "select last_value(s1),last_value(s2)from root.sg.d1 align by 
device";
+    analysis = Util.analyze(sql, context);
+    logicalPlanNode = Util.genLogicalPlan(analysis, context);
+    planner = new DistributionPlanner(analysis, new LogicalQueryPlan(context, 
logicalPlanNode));
+    plan = planner.planFragments();
+    assertEquals(2, plan.getInstances().size());
+
+    firstFiRoot = 
plan.getInstances().get(0).getFragment().getPlanNodeTree().getChildren().get(0);
+    assertTrue(firstFiRoot instanceof AggregationMergeSortNode);
+    assertTrue(firstFiRoot.getChildren().get(0) instanceof DeviceViewNode);
+    if (firstFiRoot.getChildren().get(0).getChildren().get(0) instanceof 
ProjectNode) {
+      assertEquals(
+          
firstFiRoot.getChildren().get(0).getChildren().get(0).getOutputColumnNames(),
+          ImmutableList.of(
+              "last_value(root.sg.d1.s1)",
+              "max_time(root.sg.d1.s1)",
+              "last_value(root.sg.d1.s2)",
+              "max_time(root.sg.d1.s2)"));
+    }
+
+    secondFiRoot = 
plan.getInstances().get(1).getFragment().getPlanNodeTree().getChildren().get(0);
+    assertTrue(secondFiRoot instanceof DeviceViewNode);
+    if (secondFiRoot.getChildren().get(0) instanceof ProjectNode) {
+      assertEquals(
+          
firstFiRoot.getChildren().get(0).getChildren().get(0).getOutputColumnNames(),
+          ImmutableList.of(
+              "last_value(root.sg.d1.s1)",
+              "max_time(root.sg.d1.s1)",
+              "last_value(root.sg.d1.s2)",
+              "max_time(root.sg.d1.s2)"));
+    }
+  }
 }
diff --git 
a/iotdb-core/node-commons/src/main/java/org/apache/iotdb/commons/path/PartialPath.java
 
b/iotdb-core/node-commons/src/main/java/org/apache/iotdb/commons/path/PartialPath.java
index 59a64352af1..ceec1f17c4e 100644
--- 
a/iotdb-core/node-commons/src/main/java/org/apache/iotdb/commons/path/PartialPath.java
+++ 
b/iotdb-core/node-commons/src/main/java/org/apache/iotdb/commons/path/PartialPath.java
@@ -118,6 +118,18 @@ public class PartialPath extends Path implements 
Comparable<Path>, Cloneable {
     }
   }
 
+  /**
+   * only use this method in following situations: 1. you are sure you do not 
want to split the
+   * path. 2. you are sure path is correct.
+   *
+   * @param needSplit whether to split path to nodes, needSplit can only be 
false.
+   */
+  public PartialPath(String device, String measurement, boolean needSplit) {
+    Validate.isTrue(!needSplit);
+    String path = device + TsFileConstant.PATH_SEPARATOR + measurement;
+    this.nodes = new String[] {path};
+  }
+
   public boolean hasWildcard() {
     for (String node : nodes) {
       // *, ** , d*, *d*

Reply via email to