This is an automated email from the ASF dual-hosted git repository. hui pushed a commit to branch ml/windowSet in repository https://gitbox.apache.org/repos/asf/iotdb.git
commit dc53d22b5c07a13eb7c32952fe77b8855c1dd573 Author: Minghui Liu <[email protected]> AuthorDate: Tue Nov 1 10:53:50 2022 +0800 implement planner --- .../iotdb/db/mpp/plan/analyze/AnalyzeVisitor.java | 15 +++ .../db/mpp/plan/planner/LogicalPlanBuilder.java | 16 +++ .../db/mpp/plan/planner/LogicalPlanVisitor.java | 18 +++ .../mpp/plan/planner/plan/node/PlanNodeType.java | 3 +- .../planner/plan/node/process/WindowSplitNode.java | 134 +++++++++++++++++++++ 5 files changed, 185 insertions(+), 1 deletion(-) diff --git a/server/src/main/java/org/apache/iotdb/db/mpp/plan/analyze/AnalyzeVisitor.java b/server/src/main/java/org/apache/iotdb/db/mpp/plan/analyze/AnalyzeVisitor.java index 1f29d600c7..6b5fc36659 100644 --- a/server/src/main/java/org/apache/iotdb/db/mpp/plan/analyze/AnalyzeVisitor.java +++ b/server/src/main/java/org/apache/iotdb/db/mpp/plan/analyze/AnalyzeVisitor.java @@ -1233,6 +1233,21 @@ public class AnalyzeVisitor extends StatementVisitor<Analysis, MPPQueryContext> measurementPaths.stream().map(TimeSeriesOperand::new).collect(Collectors.toSet()); analysis.setSourceExpressions(sourceExpressions); + // set transform + if (fetchWindowSetStatement.getFunctionName() != null) { + String functionName = fetchWindowSetStatement.getFunctionName(); + Set<Expression> sourceTransformExpressions = + sourceExpressions.stream() + .map( + expression -> + new FunctionExpression( + functionName, + new LinkedHashMap<>(), + Collections.singletonList(expression))) + .collect(Collectors.toSet()); + analysis.setSourceTransformExpressions(sourceTransformExpressions); + } + // set output List<ColumnHeader> columnHeaders = measurementPaths.stream() diff --git a/server/src/main/java/org/apache/iotdb/db/mpp/plan/planner/LogicalPlanBuilder.java b/server/src/main/java/org/apache/iotdb/db/mpp/plan/planner/LogicalPlanBuilder.java index 9b0c800c27..a212cb6326 100644 --- a/server/src/main/java/org/apache/iotdb/db/mpp/plan/planner/LogicalPlanBuilder.java +++ b/server/src/main/java/org/apache/iotdb/db/mpp/plan/planner/LogicalPlanBuilder.java @@ -64,6 +64,7 @@ import org.apache.iotdb.db.mpp.plan.planner.plan.node.process.OffsetNode; import org.apache.iotdb.db.mpp.plan.planner.plan.node.process.SlidingWindowAggregationNode; import org.apache.iotdb.db.mpp.plan.planner.plan.node.process.TimeJoinNode; import org.apache.iotdb.db.mpp.plan.planner.plan.node.process.TransformNode; +import org.apache.iotdb.db.mpp.plan.planner.plan.node.process.WindowSplitNode; import org.apache.iotdb.db.mpp.plan.planner.plan.node.process.last.LastQueryNode; import org.apache.iotdb.db.mpp.plan.planner.plan.node.source.AlignedLastQueryScanNode; import org.apache.iotdb.db.mpp.plan.planner.plan.node.source.AlignedSeriesAggregationScanNode; @@ -772,6 +773,10 @@ public class LogicalPlanBuilder { public LogicalPlanBuilder planTransform( Set<Expression> selectExpressions, boolean isGroupByTime, ZoneId zoneId, Ordering scanOrder) { + if (selectExpressions == null) { + return this; + } + boolean needTransform = false; for (Expression expression : selectExpressions) { if (ExpressionAnalyzer.checkIsNeedTransform(expression)) { @@ -884,6 +889,17 @@ public class LogicalPlanBuilder { return this; } + public LogicalPlanBuilder planWindowSplit( + GroupByTimeParameter groupByTimeParameter, List<Integer> samplingIndexes) { + this.root = + new WindowSplitNode( + context.getQueryId().genPlanNodeId(), + this.getRoot(), + groupByTimeParameter, + samplingIndexes); + return this; + } + /** Meta Query* */ public LogicalPlanBuilder planTimeSeriesSchemaSource( PartialPath pathPattern, diff --git a/server/src/main/java/org/apache/iotdb/db/mpp/plan/planner/LogicalPlanVisitor.java b/server/src/main/java/org/apache/iotdb/db/mpp/plan/planner/LogicalPlanVisitor.java index cbbc9d4131..ef1d9b8502 100644 --- a/server/src/main/java/org/apache/iotdb/db/mpp/plan/planner/LogicalPlanVisitor.java +++ b/server/src/main/java/org/apache/iotdb/db/mpp/plan/planner/LogicalPlanVisitor.java @@ -41,7 +41,9 @@ import org.apache.iotdb.db.mpp.plan.planner.plan.parameter.AggregationStep; import org.apache.iotdb.db.mpp.plan.planner.plan.parameter.OrderByParameter; import org.apache.iotdb.db.mpp.plan.statement.StatementNode; import org.apache.iotdb.db.mpp.plan.statement.StatementVisitor; +import org.apache.iotdb.db.mpp.plan.statement.component.Ordering; import org.apache.iotdb.db.mpp.plan.statement.crud.DeleteDataStatement; +import org.apache.iotdb.db.mpp.plan.statement.crud.FetchWindowSetStatement; import org.apache.iotdb.db.mpp.plan.statement.crud.InsertMultiTabletsStatement; import org.apache.iotdb.db.mpp.plan.statement.crud.InsertRowStatement; import org.apache.iotdb.db.mpp.plan.statement.crud.InsertRowsOfOneDeviceStatement; @@ -66,6 +68,7 @@ import org.apache.iotdb.db.mpp.plan.statement.metadata.ShowTimeSeriesStatement; import org.apache.iotdb.db.mpp.plan.statement.metadata.template.ActivateTemplateStatement; import org.apache.iotdb.db.mpp.plan.statement.metadata.template.ShowPathsUsingTemplateStatement; +import java.time.ZoneId; import java.util.ArrayList; import java.util.List; import java.util.Map; @@ -287,6 +290,21 @@ public class LogicalPlanVisitor extends StatementVisitor<PlanNode, MPPQueryConte return false; } + @Override + public PlanNode visitFetchWindowSet( + FetchWindowSetStatement fetchWindowSetStatement, MPPQueryContext context) { + LogicalPlanBuilder planBuilder = new LogicalPlanBuilder(analysis, context); + planBuilder + .planRawDataSource(analysis.getSourceExpressions(), Ordering.ASC, null) + .planTransform( + analysis.getSourceTransformExpressions(), true, ZoneId.systemDefault(), Ordering.ASC) + .planWindowSplit( + fetchWindowSetStatement.getGroupByTimeParameter(), + fetchWindowSetStatement.getSamplingIndexes()); + + return planBuilder.getRoot(); + } + @Override public PlanNode visitCreateTimeseries( CreateTimeSeriesStatement createTimeSeriesStatement, MPPQueryContext context) { diff --git a/server/src/main/java/org/apache/iotdb/db/mpp/plan/planner/plan/node/PlanNodeType.java b/server/src/main/java/org/apache/iotdb/db/mpp/plan/planner/plan/node/PlanNodeType.java index bad8725978..800459849b 100644 --- a/server/src/main/java/org/apache/iotdb/db/mpp/plan/planner/plan/node/PlanNodeType.java +++ b/server/src/main/java/org/apache/iotdb/db/mpp/plan/planner/plan/node/PlanNodeType.java @@ -150,7 +150,8 @@ public enum PlanNodeType { ROLLBACK_PRE_DEACTIVATE_TEMPLATE_NODE((short) 60), DEACTIVATE_TEMPLATE_NODE((short) 61), INTO((short) 62), - DEVICE_VIEW_INTO((short) 63); + DEVICE_VIEW_INTO((short) 63), + WINDOW_SPLIT((short) 64); public static final int BYTES = Short.BYTES; diff --git a/server/src/main/java/org/apache/iotdb/db/mpp/plan/planner/plan/node/process/WindowSplitNode.java b/server/src/main/java/org/apache/iotdb/db/mpp/plan/planner/plan/node/process/WindowSplitNode.java new file mode 100644 index 0000000000..8ed9d404ef --- /dev/null +++ b/server/src/main/java/org/apache/iotdb/db/mpp/plan/planner/plan/node/process/WindowSplitNode.java @@ -0,0 +1,134 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ + +package org.apache.iotdb.db.mpp.plan.planner.plan.node.process; + +import org.apache.iotdb.db.mpp.plan.planner.plan.node.PlanNode; +import org.apache.iotdb.db.mpp.plan.planner.plan.node.PlanNodeId; +import org.apache.iotdb.db.mpp.plan.planner.plan.node.PlanNodeType; +import org.apache.iotdb.db.mpp.plan.planner.plan.parameter.GroupByTimeParameter; +import org.apache.iotdb.tsfile.utils.ReadWriteIOUtils; + +import java.io.DataOutputStream; +import java.io.IOException; +import java.nio.ByteBuffer; +import java.util.ArrayList; +import java.util.List; +import java.util.Objects; + +public class WindowSplitNode extends SingleChildProcessNode { + + private final GroupByTimeParameter groupByTimeParameter; + private final List<Integer> samplingIndexes; + + public WindowSplitNode( + PlanNodeId id, + PlanNode child, + GroupByTimeParameter groupByTimeParameter, + List<Integer> samplingIndexes) { + super(id, child); + this.groupByTimeParameter = groupByTimeParameter; + this.samplingIndexes = samplingIndexes; + } + + public WindowSplitNode( + PlanNodeId id, GroupByTimeParameter groupByTimeParameter, List<Integer> samplingIndexes) { + super(id); + this.groupByTimeParameter = groupByTimeParameter; + this.samplingIndexes = samplingIndexes; + } + + public GroupByTimeParameter getGroupByTimeParameter() { + return groupByTimeParameter; + } + + public List<Integer> getSamplingIndexes() { + return samplingIndexes; + } + + @Override + public PlanNode clone() { + return new WindowSplitNode(getPlanNodeId(), groupByTimeParameter, samplingIndexes); + } + + @Override + public List<String> getOutputColumnNames() { + return child.getOutputColumnNames(); + } + + @Override + protected void serializeAttributes(ByteBuffer byteBuffer) { + PlanNodeType.WINDOW_SPLIT.serialize(byteBuffer); + groupByTimeParameter.serialize(byteBuffer); + ReadWriteIOUtils.write(samplingIndexes.size(), byteBuffer); + for (Integer index : samplingIndexes) { + ReadWriteIOUtils.write(index, byteBuffer); + } + } + + @Override + protected void serializeAttributes(DataOutputStream stream) throws IOException { + PlanNodeType.WINDOW_SPLIT.serialize(stream); + groupByTimeParameter.serialize(stream); + ReadWriteIOUtils.write(samplingIndexes.size(), stream); + for (Integer index : samplingIndexes) { + ReadWriteIOUtils.write(index, stream); + } + } + + public static WindowSplitNode deserialize(ByteBuffer byteBuffer) { + GroupByTimeParameter groupByTimeParameter = GroupByTimeParameter.deserialize(byteBuffer); + + int listSize = ReadWriteIOUtils.readInt(byteBuffer); + List<Integer> samplingIndexes = new ArrayList<>(listSize); + while (listSize > 0) { + samplingIndexes.add(ReadWriteIOUtils.readInt(byteBuffer)); + listSize--; + } + + PlanNodeId planNodeId = PlanNodeId.deserialize(byteBuffer); + return new WindowSplitNode(planNodeId, groupByTimeParameter, samplingIndexes); + } + + @Override + public boolean equals(Object o) { + if (this == o) { + return true; + } + if (o == null || getClass() != o.getClass()) { + return false; + } + if (!super.equals(o)) { + return false; + } + WindowSplitNode that = (WindowSplitNode) o; + return groupByTimeParameter.equals(that.groupByTimeParameter) + && samplingIndexes.equals(that.samplingIndexes); + } + + @Override + public int hashCode() { + return Objects.hash(super.hashCode(), groupByTimeParameter, samplingIndexes); + } + + @Override + public String toString() { + return String.format("WindowSplitNode-%s", getPlanNodeId()); + } +}
