This is an automated email from the ASF dual-hosted git repository. hui pushed a commit to branch lmh/orderBySensor in repository https://gitbox.apache.org/repos/asf/iotdb.git
commit 574153d8187b68fc30e82e46ddaf0aadd3fb5a8d Author: Minghui Liu <[email protected]> AuthorDate: Wed Jul 20 11:18:06 2022 +0800 add mergeOrders in LastQueryMergeNode --- .../db/mpp/plan/planner/LogicalPlanBuilder.java | 5 +- .../db/mpp/plan/planner/LogicalPlanVisitor.java | 15 ++++-- .../plan/planner/distribution/SourceRewriter.java | 8 +++- .../planner/plan/node/process/DeviceMergeNode.java | 6 +-- .../planner/plan/node/process/DeviceViewNode.java | 2 +- .../plan/node/process/LastQueryMergeNode.java | 53 ++++++++++++++++------ .../db/mpp/plan/statement/crud/QueryStatement.java | 9 ++++ .../db/mpp/plan/plan/QueryLogicalPlanUtil.java | 8 +++- .../mpp/plan/plan/distribution/LastQueryTest.java | 2 +- 9 files changed, 78 insertions(+), 30 deletions(-) diff --git a/server/src/main/java/org/apache/iotdb/db/mpp/plan/planner/LogicalPlanBuilder.java b/server/src/main/java/org/apache/iotdb/db/mpp/plan/planner/LogicalPlanBuilder.java index cb86295df9..5557e06bb7 100644 --- a/server/src/main/java/org/apache/iotdb/db/mpp/plan/planner/LogicalPlanBuilder.java +++ b/server/src/main/java/org/apache/iotdb/db/mpp/plan/planner/LogicalPlanBuilder.java @@ -141,7 +141,8 @@ public class LogicalPlanBuilder { return this; } - public LogicalPlanBuilder planLast(Set<Expression> sourceExpressions, Filter globalTimeFilter) { + public LogicalPlanBuilder planLast( + Set<Expression> sourceExpressions, Filter globalTimeFilter, List<SortItem> mergeOrders) { List<PlanNode> sourceNodeList = new ArrayList<>(); for (Expression sourceExpression : sourceExpressions) { MeasurementPath selectPath = @@ -157,7 +158,7 @@ public class LogicalPlanBuilder { this.root = new LastQueryMergeNode( - context.getQueryId().genPlanNodeId(), sourceNodeList, globalTimeFilter); + context.getQueryId().genPlanNodeId(), sourceNodeList, globalTimeFilter, mergeOrders); return this; } diff --git a/server/src/main/java/org/apache/iotdb/db/mpp/plan/planner/LogicalPlanVisitor.java b/server/src/main/java/org/apache/iotdb/db/mpp/plan/planner/LogicalPlanVisitor.java index f35f51bcb4..b49801d547 100644 --- a/server/src/main/java/org/apache/iotdb/db/mpp/plan/planner/LogicalPlanVisitor.java +++ b/server/src/main/java/org/apache/iotdb/db/mpp/plan/planner/LogicalPlanVisitor.java @@ -62,6 +62,7 @@ import org.apache.iotdb.db.mpp.plan.statement.metadata.ShowDevicesStatement; import org.apache.iotdb.db.mpp.plan.statement.metadata.ShowTimeSeriesStatement; import java.util.ArrayList; +import java.util.Collections; import java.util.List; import java.util.Map; import java.util.Set; @@ -92,7 +93,10 @@ public class LogicalPlanVisitor extends StatementVisitor<PlanNode, MPPQueryConte if (queryStatement.isLastQuery()) { return planBuilder - .planLast(analysis.getSourceExpressions(), analysis.getGlobalTimeFilter()) + .planLast( + analysis.getSourceExpressions(), + analysis.getGlobalTimeFilter(), + queryStatement.getSortItemList()) .getRoot(); } @@ -304,7 +308,9 @@ public class LogicalPlanVisitor extends StatementVisitor<PlanNode, MPPQueryConte public PlanNode visitLastPointFetch( LastPointFetchStatement lastPointFetchStatement, MPPQueryContext context) { LogicalPlanBuilder planBuilder = new LogicalPlanBuilder(context); - return planBuilder.planLast(analysis.getSourceExpressions(), null).getRoot(); + return planBuilder + .planLast(analysis.getSourceExpressions(), null, Collections.emptyList()) + .getRoot(); } @Override @@ -456,7 +462,10 @@ public class LogicalPlanVisitor extends StatementVisitor<PlanNode, MPPQueryConte && 0 != analysis.getDataPartitionInfo().getDataPartitionMap().size()) { PlanNode lastPlanNode = new LogicalPlanBuilder(context) - .planLast(analysis.getSourceExpressions(), analysis.getGlobalTimeFilter()) + .planLast( + analysis.getSourceExpressions(), + analysis.getGlobalTimeFilter(), + Collections.emptyList()) .getRoot(); planBuilder = planBuilder.planSchemaQueryOrderByHeat(lastPlanNode); } diff --git a/server/src/main/java/org/apache/iotdb/db/mpp/plan/planner/distribution/SourceRewriter.java b/server/src/main/java/org/apache/iotdb/db/mpp/plan/planner/distribution/SourceRewriter.java index 747b9933d1..3934659bb4 100644 --- a/server/src/main/java/org/apache/iotdb/db/mpp/plan/planner/distribution/SourceRewriter.java +++ b/server/src/main/java/org/apache/iotdb/db/mpp/plan/planner/distribution/SourceRewriter.java @@ -231,7 +231,9 @@ public class SourceRewriter extends SimplePlanNodeRewriter<DistributionPlanConte public PlanNode visitLastQueryScan(LastQueryScanNode node, DistributionPlanContext context) { LastQueryMergeNode mergeNode = new LastQueryMergeNode( - context.queryContext.getQueryId().genPlanNodeId(), node.getPartitionTimeFilter()); + context.queryContext.getQueryId().genPlanNodeId(), + node.getPartitionTimeFilter(), + Collections.emptyList()); return processRawSeriesScan(node, context, mergeNode); } @@ -240,7 +242,9 @@ public class SourceRewriter extends SimplePlanNodeRewriter<DistributionPlanConte AlignedLastQueryScanNode node, DistributionPlanContext context) { LastQueryMergeNode mergeNode = new LastQueryMergeNode( - context.queryContext.getQueryId().genPlanNodeId(), node.getPartitionTimeFilter()); + context.queryContext.getQueryId().genPlanNodeId(), + node.getPartitionTimeFilter(), + Collections.emptyList()); return processRawSeriesScan(node, context, mergeNode); } diff --git a/server/src/main/java/org/apache/iotdb/db/mpp/plan/planner/plan/node/process/DeviceMergeNode.java b/server/src/main/java/org/apache/iotdb/db/mpp/plan/planner/plan/node/process/DeviceMergeNode.java index 37a71fa58c..53318dfe21 100644 --- a/server/src/main/java/org/apache/iotdb/db/mpp/plan/planner/plan/node/process/DeviceMergeNode.java +++ b/server/src/main/java/org/apache/iotdb/db/mpp/plan/planner/plan/node/process/DeviceMergeNode.java @@ -37,7 +37,7 @@ import java.util.stream.Collectors; public class DeviceMergeNode extends MultiChildNode { // The result output order, which could sort by device and time. - // The size of this list is 2 and the first OrderBy in this list has higher priority. + // The size of this list is 2 and the first SortItem in this list has higher priority. private final List<SortItem> mergeOrders; // the list of selected devices @@ -45,15 +45,13 @@ public class DeviceMergeNode extends MultiChildNode { public DeviceMergeNode( PlanNodeId id, List<PlanNode> children, List<SortItem> mergeOrders, List<String> devices) { - super(id); - this.children = children; + super(id, children); this.mergeOrders = mergeOrders; this.devices = devices; } public DeviceMergeNode(PlanNodeId id, List<SortItem> mergeOrders, List<String> devices) { super(id); - this.children = new ArrayList<>(); this.mergeOrders = mergeOrders; this.devices = devices; } diff --git a/server/src/main/java/org/apache/iotdb/db/mpp/plan/planner/plan/node/process/DeviceViewNode.java b/server/src/main/java/org/apache/iotdb/db/mpp/plan/planner/plan/node/process/DeviceViewNode.java index fb074122ba..fe7716cfa8 100644 --- a/server/src/main/java/org/apache/iotdb/db/mpp/plan/planner/plan/node/process/DeviceViewNode.java +++ b/server/src/main/java/org/apache/iotdb/db/mpp/plan/planner/plan/node/process/DeviceViewNode.java @@ -45,7 +45,7 @@ import java.util.Objects; public class DeviceViewNode extends MultiChildNode { // The result output order, which could sort by device and time. - // The size of this list is 2 and the first OrderBy in this list has higher priority. + // The size of this list is 2 and the first SortItem in this list has higher priority. private final List<SortItem> mergeOrders; // The size devices and children should be the same. diff --git a/server/src/main/java/org/apache/iotdb/db/mpp/plan/planner/plan/node/process/LastQueryMergeNode.java b/server/src/main/java/org/apache/iotdb/db/mpp/plan/planner/plan/node/process/LastQueryMergeNode.java index 51a20ea238..6445fe88c4 100644 --- a/server/src/main/java/org/apache/iotdb/db/mpp/plan/planner/plan/node/process/LastQueryMergeNode.java +++ b/server/src/main/java/org/apache/iotdb/db/mpp/plan/planner/plan/node/process/LastQueryMergeNode.java @@ -22,6 +22,7 @@ import org.apache.iotdb.db.mpp.plan.planner.plan.node.PlanNode; import org.apache.iotdb.db.mpp.plan.planner.plan.node.PlanNodeId; import org.apache.iotdb.db.mpp.plan.planner.plan.node.PlanNodeType; import org.apache.iotdb.db.mpp.plan.planner.plan.node.PlanVisitor; +import org.apache.iotdb.db.mpp.plan.statement.component.SortItem; import org.apache.iotdb.tsfile.read.filter.basic.Filter; import org.apache.iotdb.tsfile.read.filter.factory.FilterFactory; import org.apache.iotdb.tsfile.utils.ReadWriteIOUtils; @@ -39,21 +40,23 @@ import static org.apache.iotdb.db.mpp.plan.planner.plan.node.source.LastQuerySca public class LastQueryMergeNode extends MultiChildNode { - // make sure child in list has been ordered by their sensor name - private List<PlanNode> children; - private final Filter timeFilter; - public LastQueryMergeNode(PlanNodeId id, Filter timeFilter) { + // The result output order, which could sort by sensor and time. + // The size of this list is 2 and the first SortItem in this list has higher priority. + private final List<SortItem> mergeOrders; + + public LastQueryMergeNode(PlanNodeId id, Filter timeFilter, List<SortItem> mergeOrders) { super(id); - this.children = new ArrayList<>(); this.timeFilter = timeFilter; + this.mergeOrders = mergeOrders; } - public LastQueryMergeNode(PlanNodeId id, List<PlanNode> children, Filter timeFilter) { - super(id); - this.children = children; + public LastQueryMergeNode( + PlanNodeId id, List<PlanNode> children, Filter timeFilter, List<SortItem> mergeOrders) { + super(id, children); this.timeFilter = timeFilter; + this.mergeOrders = mergeOrders; } @Override @@ -68,7 +71,7 @@ public class LastQueryMergeNode extends MultiChildNode { @Override public PlanNode clone() { - return new LastQueryMergeNode(getPlanNodeId(), timeFilter); + return new LastQueryMergeNode(getPlanNodeId(), timeFilter, mergeOrders); } @Override @@ -89,16 +92,22 @@ public class LastQueryMergeNode extends MultiChildNode { @Override public boolean equals(Object o) { - if (this == o) return true; - if (o == null || getClass() != o.getClass()) return false; - if (!super.equals(o)) return false; + if (this == o) { + return true; + } + if (o == null || getClass() != o.getClass()) { + return false; + } + if (!super.equals(o)) { + return false; + } LastQueryMergeNode that = (LastQueryMergeNode) o; - return Objects.equals(children, that.children); + return Objects.equals(timeFilter, that.timeFilter) && mergeOrders.equals(that.mergeOrders); } @Override public int hashCode() { - return Objects.hash(super.hashCode(), children); + return Objects.hash(super.hashCode(), timeFilter, mergeOrders); } @Override @@ -115,6 +124,10 @@ public class LastQueryMergeNode extends MultiChildNode { ReadWriteIOUtils.write((byte) 1, byteBuffer); timeFilter.serialize(byteBuffer); } + ReadWriteIOUtils.write(mergeOrders.size(), byteBuffer); + for (SortItem mergeOrder : mergeOrders) { + mergeOrder.serialize(byteBuffer); + } } @Override @@ -126,6 +139,10 @@ public class LastQueryMergeNode extends MultiChildNode { ReadWriteIOUtils.write((byte) 1, stream); timeFilter.serialize(stream); } + ReadWriteIOUtils.write(mergeOrders.size(), stream); + for (SortItem mergeOrder : mergeOrders) { + mergeOrder.serialize(stream); + } } public static LastQueryMergeNode deserialize(ByteBuffer byteBuffer) { @@ -133,8 +150,14 @@ public class LastQueryMergeNode extends MultiChildNode { if (ReadWriteIOUtils.readByte(byteBuffer) == 1) { timeFilter = FilterFactory.deserialize(byteBuffer); } + int mergeOrdersSize = ReadWriteIOUtils.readInt(byteBuffer); + List<SortItem> mergeOrders = new ArrayList<>(mergeOrdersSize); + while (mergeOrdersSize > 0) { + mergeOrders.add(SortItem.deserialize(byteBuffer)); + mergeOrdersSize--; + } PlanNodeId planNodeId = PlanNodeId.deserialize(byteBuffer); - return new LastQueryMergeNode(planNodeId, timeFilter); + return new LastQueryMergeNode(planNodeId, timeFilter, mergeOrders); } public void setChildren(List<PlanNode> children) { diff --git a/server/src/main/java/org/apache/iotdb/db/mpp/plan/statement/crud/QueryStatement.java b/server/src/main/java/org/apache/iotdb/db/mpp/plan/statement/crud/QueryStatement.java index aa4f01cd68..9067f7d55e 100644 --- a/server/src/main/java/org/apache/iotdb/db/mpp/plan/statement/crud/QueryStatement.java +++ b/server/src/main/java/org/apache/iotdb/db/mpp/plan/statement/crud/QueryStatement.java @@ -36,8 +36,10 @@ import org.apache.iotdb.db.mpp.plan.statement.component.Ordering; import org.apache.iotdb.db.mpp.plan.statement.component.ResultColumn; import org.apache.iotdb.db.mpp.plan.statement.component.ResultSetFormat; import org.apache.iotdb.db.mpp.plan.statement.component.SelectComponent; +import org.apache.iotdb.db.mpp.plan.statement.component.SortItem; import org.apache.iotdb.db.mpp.plan.statement.component.WhereCondition; +import java.util.Collections; import java.util.List; /** @@ -235,6 +237,13 @@ public class QueryStatement extends Statement { return orderByComponent.getTimeOrder(); } + public List<SortItem> getSortItemList() { + if (orderByComponent == null) { + return Collections.emptyList(); + } + return orderByComponent.getSortItemList(); + } + public void semanticCheck() { if (isAggregationQuery()) { if (disableAlign()) { diff --git a/server/src/test/java/org/apache/iotdb/db/mpp/plan/plan/QueryLogicalPlanUtil.java b/server/src/test/java/org/apache/iotdb/db/mpp/plan/plan/QueryLogicalPlanUtil.java index e098262c81..7f8ade3f5a 100644 --- a/server/src/test/java/org/apache/iotdb/db/mpp/plan/plan/QueryLogicalPlanUtil.java +++ b/server/src/test/java/org/apache/iotdb/db/mpp/plan/plan/QueryLogicalPlanUtil.java @@ -108,7 +108,7 @@ public class QueryLogicalPlanUtil { /* Last Query */ static { - String sql = "SELECT last * FROM root.sg.** WHERE time > 100"; + String sql = "SELECT last * FROM root.sg.** WHERE time > 100 ORDER BY timeseries ASC"; QueryId queryId = new QueryId("test"); List<PlanNode> sourceNodeList = new ArrayList<>(); @@ -140,7 +140,11 @@ public class QueryLogicalPlanUtil { queryId.genPlanNodeId(), (MeasurementPath) schemaMap.get("root.sg.d2.s2"))); LastQueryMergeNode lastQueryMergeNode = - new LastQueryMergeNode(queryId.genPlanNodeId(), sourceNodeList, TimeFilter.gt(100)); + new LastQueryMergeNode( + queryId.genPlanNodeId(), + sourceNodeList, + TimeFilter.gt(100), + Collections.singletonList(new SortItem(SortKey.TIMESERIES, Ordering.ASC))); querySQLs.add(sql); sqlToPlanMap.put(sql, lastQueryMergeNode); diff --git a/server/src/test/java/org/apache/iotdb/db/mpp/plan/plan/distribution/LastQueryTest.java b/server/src/test/java/org/apache/iotdb/db/mpp/plan/plan/distribution/LastQueryTest.java index 2a3765dded..73079d21f9 100644 --- a/server/src/test/java/org/apache/iotdb/db/mpp/plan/plan/distribution/LastQueryTest.java +++ b/server/src/test/java/org/apache/iotdb/db/mpp/plan/plan/distribution/LastQueryTest.java @@ -112,7 +112,7 @@ public class LastQueryTest { for (String path : paths) { expressions.add(new TimeSeriesOperand(new MeasurementPath(path))); } - PlanNode root = builder.planLast(expressions, null).getRoot(); + PlanNode root = builder.planLast(expressions, null, Collections.emptyList()).getRoot(); return new LogicalQueryPlan(context, root); } }
