This is an automated email from the ASF dual-hosted git repository.
jackietien pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/iotdb.git
The following commit(s) were added to refs/heads/master by this push:
new 83ebf5b1a9c Support order by time for Agg align by device query with
cross region data
83ebf5b1a9c is described below
commit 83ebf5b1a9cfa525ef0b30b2a97f6b22ea566564
Author: Weihao Li <[email protected]>
AuthorDate: Mon Jun 16 21:57:04 2025 +0800
Support order by time for Agg align by device query with cross region data
---
.../plan/planner/OperatorTreeGenerator.java | 4 ++--
.../plan/planner/distribution/SourceRewriter.java | 14 +++++++++++++-
.../distribution/AggregationAlignByDeviceTest.java | 19 +++++++++++++++++++
3 files changed, 34 insertions(+), 3 deletions(-)
diff --git
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/planner/OperatorTreeGenerator.java
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/planner/OperatorTreeGenerator.java
index 59f717e2860..bdc171a98a9 100644
---
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/planner/OperatorTreeGenerator.java
+++
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/planner/OperatorTreeGenerator.java
@@ -1189,8 +1189,8 @@ public class OperatorTreeGenerator extends
PlanVisitor<Operator, LocalExecutionP
List<SortItem> sortItemList =
node.getMergeOrderParameter().getSortItemList();
if (!sortItemList.get(0).getSortKey().equalsIgnoreCase("Device")) {
- throw new IllegalArgumentException(
- "Only order by device align by device support
AggregationMergeSortNode.");
+ throw new IllegalStateException(
+ "AggregationMergeSortNode without order by device should not appear
here");
}
boolean timeAscending = true;
diff --git
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/planner/distribution/SourceRewriter.java
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/planner/distribution/SourceRewriter.java
index b2a43684a4b..34c0ff27b10 100644
---
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/planner/distribution/SourceRewriter.java
+++
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/planner/distribution/SourceRewriter.java
@@ -82,6 +82,7 @@ import
org.apache.iotdb.db.queryengine.plan.statement.component.Ordering;
import org.apache.iotdb.db.queryengine.plan.statement.component.SortItem;
import org.apache.iotdb.db.utils.constant.SqlConstant;
+import com.google.common.collect.ImmutableList;
import org.apache.tsfile.enums.TSDataType;
import org.apache.tsfile.file.metadata.IDeviceID;
@@ -331,12 +332,23 @@ public class SourceRewriter extends
BaseSourceRewriter<DistributionPlanContext>
}
}
+ OrderByParameter orderByParameter;
+ List<SortItem> sortItemList =
node.getMergeOrderParameter().getSortItemList();
+ if (!sortItemList.get(0).getSortKey().equalsIgnoreCase("Device")) {
+ // When reach here, it means DeviceView is order by time with only one
device, it is no
+ // problem to transform order by time to order by device.
+ // SortItems here will only be Time and Device, see planDeviceView().
+ orderByParameter =
+ new OrderByParameter(ImmutableList.of(sortItemList.get(1),
sortItemList.get(0)));
+ } else {
+ orderByParameter = node.getMergeOrderParameter();
+ }
boolean hasGroupBy =
analysis.getGroupByTimeParameter() != null ||
analysis.hasGroupByParameter();
AggregationMergeSortNode mergeSortNode =
new AggregationMergeSortNode(
context.queryContext.getQueryId().genPlanNodeId(),
- node.getMergeOrderParameter(),
+ orderByParameter,
node.getOutputColumnNames(),
deviceViewOutputExpressions,
hasGroupBy);
diff --git
a/iotdb-core/datanode/src/test/java/org/apache/iotdb/db/queryengine/plan/planner/distribution/AggregationAlignByDeviceTest.java
b/iotdb-core/datanode/src/test/java/org/apache/iotdb/db/queryengine/plan/planner/distribution/AggregationAlignByDeviceTest.java
index 185db3a9cf6..ef96700c22b 100644
---
a/iotdb-core/datanode/src/test/java/org/apache/iotdb/db/queryengine/plan/planner/distribution/AggregationAlignByDeviceTest.java
+++
b/iotdb-core/datanode/src/test/java/org/apache/iotdb/db/queryengine/plan/planner/distribution/AggregationAlignByDeviceTest.java
@@ -39,10 +39,13 @@ import
org.apache.iotdb.db.queryengine.plan.planner.plan.node.process.join.LeftO
import
org.apache.iotdb.db.queryengine.plan.planner.plan.node.sink.IdentitySinkNode;
import
org.apache.iotdb.db.queryengine.plan.planner.plan.node.sink.ShuffleSinkNode;
import
org.apache.iotdb.db.queryengine.plan.planner.plan.node.source.SeriesAggregationScanNode;
+import org.apache.iotdb.db.queryengine.plan.statement.component.SortItem;
import com.google.common.collect.ImmutableList;
import org.junit.Test;
+import java.util.List;
+
import static org.junit.Assert.assertEquals;
import static org.junit.Assert.assertTrue;
@@ -465,5 +468,21 @@ public class AggregationAlignByDeviceTest {
"last_value(root.sg.d1.s2)",
"max_time(root.sg.d1.s2)"));
}
+
+ // order by time
+ sql = "select last_value(s1),last_value(s2)from root.sg.d1 order by time
align by device";
+ analysis = Util.analyze(sql, context);
+ logicalPlanNode = Util.genLogicalPlan(analysis, context);
+ planner = new DistributionPlanner(analysis, new LogicalQueryPlan(context,
logicalPlanNode));
+ plan = planner.planFragments();
+ assertEquals(2, plan.getInstances().size());
+
+ firstFiRoot =
plan.getInstances().get(0).getFragment().getPlanNodeTree().getChildren().get(0);
+ assertTrue(firstFiRoot instanceof AggregationMergeSortNode);
+ assertTrue(firstFiRoot.getChildren().get(0) instanceof DeviceViewNode);
+ List<SortItem> sortItemList =
+ ((AggregationMergeSortNode)
firstFiRoot).getMergeOrderParameter().getSortItemList();
+ assertEquals(sortItemList.get(0).getSortKey().toLowerCase(), "device");
+ assertEquals(sortItemList.get(1).getSortKey().toLowerCase(), "time");
}
}