This is an automated email from the ASF dual-hosted git repository. hui pushed a commit to branch lmh/intoOperator in repository https://gitbox.apache.org/repos/asf/iotdb.git
commit 9ce4eacfcbf806a44f93830df73e3f377043efea Author: Minghui Liu <[email protected]> AuthorDate: Mon Oct 17 22:20:17 2022 +0800 add IntoNode & DeviceViewNode --- .../planner/distribution/ExchangeNodeAdder.java | 6 +- .../plan/planner/distribution/SourceRewriter.java | 11 +-- .../mpp/plan/planner/plan/node/PlanNodeType.java | 4 +- .../db/mpp/plan/planner/plan/node/PlanVisitor.java | 10 +++ .../metedata/read/SchemaQueryOrderByHeatNode.java | 4 +- .../planner/plan/node/process/AggregationNode.java | 23 ++--- .../planner/plan/node/process/DeviceMergeNode.java | 32 +------ .../{LimitNode.java => DeviceViewIntoNode.java} | 97 +++++++++------------- .../planner/plan/node/process/DeviceViewNode.java | 19 +---- .../planner/plan/node/process/ExchangeNode.java | 42 +--------- .../plan/planner/plan/node/process/FillNode.java | 46 ++-------- .../plan/node/process/GroupByLevelNode.java | 29 +++---- .../planner/plan/node/process/GroupByTagNode.java | 17 +--- .../process/{OffsetNode.java => IntoNode.java} | 91 +++++++++----------- .../plan/planner/plan/node/process/LimitNode.java | 37 ++------- ...tiChildNode.java => MultiChildProcessNode.java} | 23 ++++- .../plan/planner/plan/node/process/OffsetNode.java | 36 ++------ .../planner/plan/node/process/ProjectNode.java | 40 +++------ ...iChildNode.java => SingleChildProcessNode.java} | 50 ++++++++--- .../node/process/SlidingWindowAggregationNode.java | 36 ++------ .../plan/planner/plan/node/process/SortNode.java | 29 ++----- .../planner/plan/node/process/TimeJoinNode.java | 17 +--- .../planner/plan/node/process/TransformNode.java | 29 +------ .../node/process/last/LastQueryCollectNode.java | 4 +- .../plan/node/process/last/LastQueryMergeNode.java | 4 +- .../plan/node/process/last/LastQueryNode.java | 4 +- 26 files changed, 237 insertions(+), 503 deletions(-) diff --git a/server/src/main/java/org/apache/iotdb/db/mpp/plan/planner/distribution/ExchangeNodeAdder.java b/server/src/main/java/org/apache/iotdb/db/mpp/plan/planner/distribution/ExchangeNodeAdder.java index 37c487b4ab..60c0a6a9b6 100644 --- a/server/src/main/java/org/apache/iotdb/db/mpp/plan/planner/distribution/ExchangeNodeAdder.java +++ b/server/src/main/java/org/apache/iotdb/db/mpp/plan/planner/distribution/ExchangeNodeAdder.java @@ -38,7 +38,7 @@ import org.apache.iotdb.db.mpp.plan.planner.plan.node.process.ExchangeNode; import org.apache.iotdb.db.mpp.plan.planner.plan.node.process.FilterNode; import org.apache.iotdb.db.mpp.plan.planner.plan.node.process.GroupByLevelNode; import org.apache.iotdb.db.mpp.plan.planner.plan.node.process.GroupByTagNode; -import org.apache.iotdb.db.mpp.plan.planner.plan.node.process.MultiChildNode; +import org.apache.iotdb.db.mpp.plan.planner.plan.node.process.MultiChildProcessNode; import org.apache.iotdb.db.mpp.plan.planner.plan.node.process.SlidingWindowAggregationNode; import org.apache.iotdb.db.mpp.plan.planner.plan.node.process.TimeJoinNode; import org.apache.iotdb.db.mpp.plan.planner.plan.node.process.TransformNode; @@ -241,8 +241,8 @@ public class ExchangeNodeAdder extends PlanVisitor<PlanNode, NodeGroupContext> { return processMultiChildNode(node, context); } - private PlanNode processMultiChildNode(MultiChildNode node, NodeGroupContext context) { - MultiChildNode newNode = (MultiChildNode) node.clone(); + private PlanNode processMultiChildNode(MultiChildProcessNode node, NodeGroupContext context) { + MultiChildProcessNode newNode = (MultiChildProcessNode) node.clone(); List<PlanNode> visitedChildren = new ArrayList<>(); node.getChildren() .forEach( diff --git a/server/src/main/java/org/apache/iotdb/db/mpp/plan/planner/distribution/SourceRewriter.java b/server/src/main/java/org/apache/iotdb/db/mpp/plan/planner/distribution/SourceRewriter.java index 8df3a16804..e6bc4891f5 100644 --- a/server/src/main/java/org/apache/iotdb/db/mpp/plan/planner/distribution/SourceRewriter.java +++ b/server/src/main/java/org/apache/iotdb/db/mpp/plan/planner/distribution/SourceRewriter.java @@ -38,7 +38,7 @@ import org.apache.iotdb.db.mpp.plan.planner.plan.node.process.DeviceMergeNode; import org.apache.iotdb.db.mpp.plan.planner.plan.node.process.DeviceViewNode; import org.apache.iotdb.db.mpp.plan.planner.plan.node.process.GroupByLevelNode; import org.apache.iotdb.db.mpp.plan.planner.plan.node.process.GroupByTagNode; -import org.apache.iotdb.db.mpp.plan.planner.plan.node.process.MultiChildNode; +import org.apache.iotdb.db.mpp.plan.planner.plan.node.process.MultiChildProcessNode; import org.apache.iotdb.db.mpp.plan.planner.plan.node.process.SlidingWindowAggregationNode; import org.apache.iotdb.db.mpp.plan.planner.plan.node.process.TimeJoinNode; import org.apache.iotdb.db.mpp.plan.planner.plan.node.process.last.LastQueryCollectNode; @@ -254,7 +254,7 @@ public class SourceRewriter extends SimplePlanNodeRewriter<DistributionPlanConte } private PlanNode processRawSeriesScan( - SeriesSourceNode node, DistributionPlanContext context, MultiChildNode parent) { + SeriesSourceNode node, DistributionPlanContext context, MultiChildProcessNode parent) { List<SeriesSourceNode> sourceNodes = splitSeriesSourceNodeByPartition(node, context); if (sourceNodes.size() == 1) { return sourceNodes.get(0); @@ -407,8 +407,9 @@ public class SourceRewriter extends SimplePlanNodeRewriter<DistributionPlanConte return processRawMultiChildNode(node, context); } - private PlanNode processRawMultiChildNode(MultiChildNode node, DistributionPlanContext context) { - MultiChildNode root = (MultiChildNode) node.clone(); + private PlanNode processRawMultiChildNode( + MultiChildProcessNode node, DistributionPlanContext context) { + MultiChildProcessNode root = (MultiChildProcessNode) node.clone(); // Step 1: Get all source nodes. For the node which is not source, add it as the child of // current TimeJoinNode List<SourceNode> sources = new ArrayList<>(); @@ -468,7 +469,7 @@ public class SourceRewriter extends SimplePlanNodeRewriter<DistributionPlanConte } else { // We clone a TimeJoinNode from root to make the params to be consistent. // But we need to assign a new ID to it - MultiChildNode parentOfGroup = (MultiChildNode) root.clone(); + MultiChildProcessNode parentOfGroup = (MultiChildProcessNode) root.clone(); parentOfGroup.setPlanNodeId(context.queryContext.getQueryId().genPlanNodeId()); seriesScanNodes.forEach(parentOfGroup::addChild); root.addChild(parentOfGroup); diff --git a/server/src/main/java/org/apache/iotdb/db/mpp/plan/planner/plan/node/PlanNodeType.java b/server/src/main/java/org/apache/iotdb/db/mpp/plan/planner/plan/node/PlanNodeType.java index dd102898d5..8b7804bfd6 100644 --- a/server/src/main/java/org/apache/iotdb/db/mpp/plan/planner/plan/node/PlanNodeType.java +++ b/server/src/main/java/org/apache/iotdb/db/mpp/plan/planner/plan/node/PlanNodeType.java @@ -140,7 +140,9 @@ public enum PlanNodeType { LOAD_TSFILE((short) 55), CONSTRUCT_SCHEMA_BLACK_LIST_NODE((short) 56), ROLLBACK_SCHEMA_BLACK_LIST_NODE((short) 57), - GROUP_BY_TAG((short) 58); + GROUP_BY_TAG((short) 58), + INTO((short) 59), + DEVICE_VIEW_INTO((short) 60); public static final int BYTES = Short.BYTES; diff --git a/server/src/main/java/org/apache/iotdb/db/mpp/plan/planner/plan/node/PlanVisitor.java b/server/src/main/java/org/apache/iotdb/db/mpp/plan/planner/plan/node/PlanVisitor.java index 8b4155cb53..cad20d65de 100644 --- a/server/src/main/java/org/apache/iotdb/db/mpp/plan/planner/plan/node/PlanVisitor.java +++ b/server/src/main/java/org/apache/iotdb/db/mpp/plan/planner/plan/node/PlanVisitor.java @@ -44,12 +44,14 @@ import org.apache.iotdb.db.mpp.plan.planner.plan.node.metedata.write.InternalCre import org.apache.iotdb.db.mpp.plan.planner.plan.node.metedata.write.RollbackSchemaBlackListNode; import org.apache.iotdb.db.mpp.plan.planner.plan.node.process.AggregationNode; import org.apache.iotdb.db.mpp.plan.planner.plan.node.process.DeviceMergeNode; +import org.apache.iotdb.db.mpp.plan.planner.plan.node.process.DeviceViewIntoNode; import org.apache.iotdb.db.mpp.plan.planner.plan.node.process.DeviceViewNode; import org.apache.iotdb.db.mpp.plan.planner.plan.node.process.ExchangeNode; import org.apache.iotdb.db.mpp.plan.planner.plan.node.process.FillNode; import org.apache.iotdb.db.mpp.plan.planner.plan.node.process.FilterNode; import org.apache.iotdb.db.mpp.plan.planner.plan.node.process.GroupByLevelNode; import org.apache.iotdb.db.mpp.plan.planner.plan.node.process.GroupByTagNode; +import org.apache.iotdb.db.mpp.plan.planner.plan.node.process.IntoNode; import org.apache.iotdb.db.mpp.plan.planner.plan.node.process.LimitNode; import org.apache.iotdb.db.mpp.plan.planner.plan.node.process.OffsetNode; import org.apache.iotdb.db.mpp.plan.planner.plan.node.process.ProjectNode; @@ -301,4 +303,12 @@ public abstract class PlanVisitor<R, C> { public R visitActivateTemplate(ActivateTemplateNode node, C context) { return visitPlan(node, context); } + + public R visitInto(IntoNode node, C context) { + return visitPlan(node, context); + } + + public R visitDeviceViewInto(DeviceViewIntoNode node, C context) { + return visitPlan(node, context); + } } diff --git a/server/src/main/java/org/apache/iotdb/db/mpp/plan/planner/plan/node/metedata/read/SchemaQueryOrderByHeatNode.java b/server/src/main/java/org/apache/iotdb/db/mpp/plan/planner/plan/node/metedata/read/SchemaQueryOrderByHeatNode.java index 68cdf909cd..0c3d78ce6f 100644 --- a/server/src/main/java/org/apache/iotdb/db/mpp/plan/planner/plan/node/metedata/read/SchemaQueryOrderByHeatNode.java +++ b/server/src/main/java/org/apache/iotdb/db/mpp/plan/planner/plan/node/metedata/read/SchemaQueryOrderByHeatNode.java @@ -23,14 +23,14 @@ import org.apache.iotdb.db.mpp.plan.planner.plan.node.PlanNode; import org.apache.iotdb.db.mpp.plan.planner.plan.node.PlanNodeId; import org.apache.iotdb.db.mpp.plan.planner.plan.node.PlanNodeType; import org.apache.iotdb.db.mpp.plan.planner.plan.node.PlanVisitor; -import org.apache.iotdb.db.mpp.plan.planner.plan.node.process.MultiChildNode; +import org.apache.iotdb.db.mpp.plan.planner.plan.node.process.MultiChildProcessNode; import java.io.DataOutputStream; import java.io.IOException; import java.nio.ByteBuffer; import java.util.List; -public class SchemaQueryOrderByHeatNode extends MultiChildNode { +public class SchemaQueryOrderByHeatNode extends MultiChildProcessNode { public SchemaQueryOrderByHeatNode(PlanNodeId id) { super(id); diff --git a/server/src/main/java/org/apache/iotdb/db/mpp/plan/planner/plan/node/process/AggregationNode.java b/server/src/main/java/org/apache/iotdb/db/mpp/plan/planner/plan/node/process/AggregationNode.java index 837be95747..4448ca0c5c 100644 --- a/server/src/main/java/org/apache/iotdb/db/mpp/plan/planner/plan/node/process/AggregationNode.java +++ b/server/src/main/java/org/apache/iotdb/db/mpp/plan/planner/plan/node/process/AggregationNode.java @@ -44,7 +44,7 @@ import java.util.stream.Collectors; * input as a TsBlock, it may be raw data or partial aggregation result. This node will output the * final series aggregated result represented by TsBlock. */ -public class AggregationNode extends MultiChildNode { +public class AggregationNode extends MultiChildProcessNode { // The list of aggregate functions, each AggregateDescriptor will be output as one or two column // of @@ -74,8 +74,10 @@ public class AggregationNode extends MultiChildNode { List<AggregationDescriptor> aggregationDescriptorList, @Nullable GroupByTimeParameter groupByTimeParameter, Ordering scanOrder) { - this(id, aggregationDescriptorList, groupByTimeParameter, scanOrder); - this.children = children; + super(id, children); + this.aggregationDescriptorList = getDeduplicatedDescriptors(aggregationDescriptorList); + this.groupByTimeParameter = groupByTimeParameter; + this.scanOrder = scanOrder; } public List<AggregationDescriptor> getAggregationDescriptorList() { @@ -91,21 +93,6 @@ public class AggregationNode extends MultiChildNode { return scanOrder; } - @Override - public List<PlanNode> getChildren() { - return children; - } - - @Override - public void addChild(PlanNode child) { - this.children.add(child); - } - - @Override - public int allowedChildCount() { - return CHILD_COUNT_NO_LIMIT; - } - @Override public PlanNode clone() { return new AggregationNode( diff --git a/server/src/main/java/org/apache/iotdb/db/mpp/plan/planner/plan/node/process/DeviceMergeNode.java b/server/src/main/java/org/apache/iotdb/db/mpp/plan/planner/plan/node/process/DeviceMergeNode.java index 141530d3f4..d3ec660e7f 100644 --- a/server/src/main/java/org/apache/iotdb/db/mpp/plan/planner/plan/node/process/DeviceMergeNode.java +++ b/server/src/main/java/org/apache/iotdb/db/mpp/plan/planner/plan/node/process/DeviceMergeNode.java @@ -34,7 +34,7 @@ import java.util.List; import java.util.Objects; import java.util.stream.Collectors; -public class DeviceMergeNode extends MultiChildNode { +public class DeviceMergeNode extends MultiChildProcessNode { // The result output order, which could sort by device and time. // The size of this list is 2 and the first SortItem in this list has higher priority. @@ -43,16 +43,6 @@ public class DeviceMergeNode extends MultiChildNode { // the list of selected devices private final List<String> devices; - public DeviceMergeNode( - PlanNodeId id, - List<PlanNode> children, - OrderByParameter mergeOrderParameter, - List<String> devices) { - super(id, children); - this.mergeOrderParameter = mergeOrderParameter; - this.devices = devices; - } - public DeviceMergeNode( PlanNodeId id, OrderByParameter mergeOrderParameter, List<String> devices) { super(id); @@ -68,21 +58,6 @@ public class DeviceMergeNode extends MultiChildNode { return devices; } - @Override - public List<PlanNode> getChildren() { - return children; - } - - @Override - public void addChild(PlanNode child) { - this.children.add(child); - } - - @Override - public int allowedChildCount() { - return CHILD_COUNT_NO_LIMIT; - } - @Override public PlanNode clone() { return new DeviceMergeNode(getPlanNodeId(), getMergeOrderParameter(), getDevices()); @@ -147,13 +122,12 @@ public class DeviceMergeNode extends MultiChildNode { } DeviceMergeNode that = (DeviceMergeNode) o; return Objects.equals(mergeOrderParameter, that.mergeOrderParameter) - && Objects.equals(devices, that.devices) - && Objects.equals(children, that.children); + && Objects.equals(devices, that.devices); } @Override public int hashCode() { - return Objects.hash(super.hashCode(), mergeOrderParameter, devices, children); + return Objects.hash(super.hashCode(), mergeOrderParameter, devices); } @Override diff --git a/server/src/main/java/org/apache/iotdb/db/mpp/plan/planner/plan/node/process/LimitNode.java b/server/src/main/java/org/apache/iotdb/db/mpp/plan/planner/plan/node/process/DeviceViewIntoNode.java similarity index 51% copy from server/src/main/java/org/apache/iotdb/db/mpp/plan/planner/plan/node/process/LimitNode.java copy to server/src/main/java/org/apache/iotdb/db/mpp/plan/planner/plan/node/process/DeviceViewIntoNode.java index 9d3badac0e..987cbc18d2 100644 --- a/server/src/main/java/org/apache/iotdb/db/mpp/plan/planner/plan/node/process/LimitNode.java +++ b/server/src/main/java/org/apache/iotdb/db/mpp/plan/planner/plan/node/process/DeviceViewIntoNode.java @@ -7,7 +7,7 @@ * "License"); you may not use this file except in compliance * with the License. You may obtain a copy of the License at * - * http://www.apache.org/licenses/LICENSE-2.0 + * http://www.apache.org/licenses/LICENSE-2.0 * * Unless required by applicable law or agreed to in writing, * software distributed under the License is distributed on an @@ -16,102 +16,74 @@ * specific language governing permissions and limitations * under the License. */ + package org.apache.iotdb.db.mpp.plan.planner.plan.node.process; +import org.apache.iotdb.db.mpp.common.header.ColumnHeader; +import org.apache.iotdb.db.mpp.common.header.ColumnHeaderConstant; import org.apache.iotdb.db.mpp.plan.planner.plan.node.PlanNode; import org.apache.iotdb.db.mpp.plan.planner.plan.node.PlanNodeId; import org.apache.iotdb.db.mpp.plan.planner.plan.node.PlanNodeType; import org.apache.iotdb.db.mpp.plan.planner.plan.node.PlanVisitor; -import org.apache.iotdb.tsfile.utils.ReadWriteIOUtils; - -import com.google.common.collect.ImmutableList; +import org.apache.iotdb.db.mpp.plan.planner.plan.parameter.DeviceViewIntoPathDescriptor; import java.io.DataOutputStream; import java.io.IOException; import java.nio.ByteBuffer; import java.util.List; import java.util.Objects; +import java.util.stream.Collectors; -/** LimitNode is used to select top n result. It uses the default order of upstream nodes */ -public class LimitNode extends ProcessNode { +public class DeviceViewIntoNode extends SingleChildProcessNode { - private final int limit; + private final DeviceViewIntoPathDescriptor deviceViewIntoPathDescriptor; - private PlanNode child; - - public LimitNode(PlanNodeId id, int limit) { + public DeviceViewIntoNode( + PlanNodeId id, DeviceViewIntoPathDescriptor deviceViewIntoPathDescriptor) { super(id); - this.limit = limit; - } - - public LimitNode(PlanNodeId id, PlanNode child, int limit) { - this(id, limit); - this.child = child; - } - - public int getLimit() { - return limit; - } - - public PlanNode getChild() { - return child; - } - - public void setChild(PlanNode child) { - this.child = child; - } - - @Override - public List<PlanNode> getChildren() { - return ImmutableList.of(child); + this.deviceViewIntoPathDescriptor = deviceViewIntoPathDescriptor; } - @Override - public void addChild(PlanNode child) { - this.child = child; + public DeviceViewIntoNode( + PlanNodeId id, PlanNode child, DeviceViewIntoPathDescriptor deviceViewIntoPathDescriptor) { + super(id, child); + this.deviceViewIntoPathDescriptor = deviceViewIntoPathDescriptor; } @Override public PlanNode clone() { - return new LimitNode(getPlanNodeId(), this.limit); - } - - @Override - public int allowedChildCount() { - return ONE_CHILD; + return new DeviceViewIntoNode(getPlanNodeId(), this.deviceViewIntoPathDescriptor); } @Override public List<String> getOutputColumnNames() { - return child.getOutputColumnNames(); - } - - @Override - public <R, C> R accept(PlanVisitor<R, C> visitor, C context) { - return visitor.visitLimit(this, context); + return ColumnHeaderConstant.selectIntoAlignByDeviceColumnHeaders.stream() + .map(ColumnHeader::getColumnName) + .collect(Collectors.toList()); } @Override protected void serializeAttributes(ByteBuffer byteBuffer) { - PlanNodeType.LIMIT.serialize(byteBuffer); - ReadWriteIOUtils.write(limit, byteBuffer); + PlanNodeType.INTO.serialize(byteBuffer); + this.deviceViewIntoPathDescriptor.serialize(byteBuffer); } @Override protected void serializeAttributes(DataOutputStream stream) throws IOException { - PlanNodeType.LIMIT.serialize(stream); - ReadWriteIOUtils.write(limit, stream); + PlanNodeType.INTO.serialize(stream); + this.deviceViewIntoPathDescriptor.serialize(stream); } - public static LimitNode deserialize(ByteBuffer byteBuffer) { - int limit = ReadWriteIOUtils.readInt(byteBuffer); + public static DeviceViewIntoNode deserialize(ByteBuffer byteBuffer) { + DeviceViewIntoPathDescriptor deviceViewIntoPathDescriptor = + DeviceViewIntoPathDescriptor.deserialize(byteBuffer); PlanNodeId planNodeId = PlanNodeId.deserialize(byteBuffer); - return new LimitNode(planNodeId, limit); + return new DeviceViewIntoNode(planNodeId, deviceViewIntoPathDescriptor); } @Override - public String toString() { - return "LimitNode-" + this.getPlanNodeId(); + public <R, C> R accept(PlanVisitor<R, C> visitor, C context) { + return visitor.visitDeviceViewInto(this, context); } @Override @@ -125,12 +97,17 @@ public class LimitNode extends ProcessNode { if (!super.equals(o)) { return false; } - LimitNode that = (LimitNode) o; - return limit == that.limit && child.equals(that.child); + DeviceViewIntoNode intoNode = (DeviceViewIntoNode) o; + return deviceViewIntoPathDescriptor.equals(intoNode.deviceViewIntoPathDescriptor); } @Override public int hashCode() { - return Objects.hash(super.hashCode(), limit, child); + return Objects.hash(super.hashCode(), deviceViewIntoPathDescriptor); + } + + @Override + public String toString() { + return "DeviceViewIntoNode-" + getPlanNodeId(); } } diff --git a/server/src/main/java/org/apache/iotdb/db/mpp/plan/planner/plan/node/process/DeviceViewNode.java b/server/src/main/java/org/apache/iotdb/db/mpp/plan/planner/plan/node/process/DeviceViewNode.java index 034db109de..8517904d21 100644 --- a/server/src/main/java/org/apache/iotdb/db/mpp/plan/planner/plan/node/process/DeviceViewNode.java +++ b/server/src/main/java/org/apache/iotdb/db/mpp/plan/planner/plan/node/process/DeviceViewNode.java @@ -42,7 +42,7 @@ import java.util.Objects; * same between these TsBlocks. If the input TsBlock contains n columns, the device-based view will * contain n+1 columns where the new column is Device column. */ -public class DeviceViewNode extends MultiChildNode { +public class DeviceViewNode extends MultiChildProcessNode { // The result output order, which could sort by device and time. // The size of this list is 2 and the first SortItem in this list has higher priority. @@ -95,21 +95,6 @@ public class DeviceViewNode extends MultiChildNode { return deviceToMeasurementIndexesMap; } - @Override - public List<PlanNode> getChildren() { - return children; - } - - @Override - public void addChild(PlanNode child) { - this.children.add(child); - } - - @Override - public int allowedChildCount() { - return CHILD_COUNT_NO_LIMIT; - } - @Override public PlanNode clone() { return new DeviceViewNode( @@ -224,7 +209,6 @@ public class DeviceViewNode extends MultiChildNode { DeviceViewNode that = (DeviceViewNode) o; return mergeOrderParameter.equals(that.mergeOrderParameter) && devices.equals(that.devices) - && children.equals(that.children) && outputColumnNames.equals(that.outputColumnNames) && deviceToMeasurementIndexesMap.equals(that.deviceToMeasurementIndexesMap); } @@ -235,7 +219,6 @@ public class DeviceViewNode extends MultiChildNode { super.hashCode(), mergeOrderParameter, devices, - children, outputColumnNames, deviceToMeasurementIndexesMap); } diff --git a/server/src/main/java/org/apache/iotdb/db/mpp/plan/planner/plan/node/process/ExchangeNode.java b/server/src/main/java/org/apache/iotdb/db/mpp/plan/planner/plan/node/process/ExchangeNode.java index 780a0e629f..0a4b64f3de 100644 --- a/server/src/main/java/org/apache/iotdb/db/mpp/plan/planner/plan/node/process/ExchangeNode.java +++ b/server/src/main/java/org/apache/iotdb/db/mpp/plan/planner/plan/node/process/ExchangeNode.java @@ -28,8 +28,6 @@ import org.apache.iotdb.db.mpp.plan.planner.plan.node.PlanVisitor; import org.apache.iotdb.db.mpp.plan.planner.plan.node.sink.FragmentSinkNode; import org.apache.iotdb.tsfile.utils.ReadWriteIOUtils; -import com.google.common.collect.ImmutableList; - import java.io.DataOutputStream; import java.io.IOException; import java.nio.ByteBuffer; @@ -37,8 +35,8 @@ import java.util.ArrayList; import java.util.List; import java.util.Objects; -public class ExchangeNode extends PlanNode { - private PlanNode child; +public class ExchangeNode extends SingleChildProcessNode { + // The remoteSourceNode is used to record the remote source info for current ExchangeNode // It is not the child of current ExchangeNode private FragmentSinkNode remoteSourceNode; @@ -56,24 +54,11 @@ public class ExchangeNode extends PlanNode { super(id); } - @Override - public List<PlanNode> getChildren() { - if (this.child == null) { - return ImmutableList.of(); - } - return ImmutableList.of(child); - } - @Override public <R, C> R accept(PlanVisitor<R, C> visitor, C context) { return visitor.visitExchange(this, context); } - @Override - public void addChild(PlanNode child) { - this.child = child; - } - @Override public PlanNode clone() { ExchangeNode node = new ExchangeNode(getPlanNodeId()); @@ -85,11 +70,6 @@ public class ExchangeNode extends PlanNode { return node; } - @Override - public int allowedChildCount() { - return CHILD_COUNT_NO_LIMIT; - } - @Override public List<String> getOutputColumnNames() { return outputColumnNames; @@ -150,14 +130,6 @@ public class ExchangeNode extends PlanNode { } } - public PlanNode getChild() { - return child; - } - - public void setChild(PlanNode child) { - this.child = child; - } - @Override public String toString() { return String.format( @@ -182,10 +154,6 @@ public class ExchangeNode extends PlanNode { this.setOutputColumnNames(remoteSourceNode.getOutputColumnNames()); } - public void cleanChildren() { - this.child = null; - } - public TEndPoint getUpstreamEndpoint() { return upstreamEndpoint; } @@ -210,15 +178,13 @@ public class ExchangeNode extends PlanNode { return false; } ExchangeNode that = (ExchangeNode) o; - return Objects.equals(child, that.child) - && Objects.equals(upstreamEndpoint, that.upstreamEndpoint) + return Objects.equals(upstreamEndpoint, that.upstreamEndpoint) && Objects.equals(upstreamInstanceId, that.upstreamInstanceId) && Objects.equals(upstreamPlanNodeId, that.upstreamPlanNodeId); } @Override public int hashCode() { - return Objects.hash( - super.hashCode(), child, upstreamEndpoint, upstreamInstanceId, upstreamPlanNodeId); + return Objects.hash(super.hashCode(), upstreamEndpoint, upstreamInstanceId, upstreamPlanNodeId); } } diff --git a/server/src/main/java/org/apache/iotdb/db/mpp/plan/planner/plan/node/process/FillNode.java b/server/src/main/java/org/apache/iotdb/db/mpp/plan/planner/plan/node/process/FillNode.java index 5acb18a9de..d177d98d9c 100644 --- a/server/src/main/java/org/apache/iotdb/db/mpp/plan/planner/plan/node/process/FillNode.java +++ b/server/src/main/java/org/apache/iotdb/db/mpp/plan/planner/plan/node/process/FillNode.java @@ -26,8 +26,6 @@ import org.apache.iotdb.db.mpp.plan.planner.plan.parameter.FillDescriptor; import org.apache.iotdb.db.mpp.plan.statement.component.Ordering; import org.apache.iotdb.tsfile.utils.ReadWriteIOUtils; -import com.google.common.collect.ImmutableList; - import java.io.DataOutputStream; import java.io.IOException; import java.nio.ByteBuffer; @@ -35,48 +33,24 @@ import java.util.List; import java.util.Objects; /** FillNode is used to fill the empty field in one row. */ -public class FillNode extends ProcessNode { +public class FillNode extends SingleChildProcessNode { // descriptions of how null values are filled - private FillDescriptor fillDescriptor; - - private Ordering scanOrder; - - private PlanNode child; + private final FillDescriptor fillDescriptor; - public FillNode(PlanNodeId id) { - super(id); - } + private final Ordering scanOrder; public FillNode(PlanNodeId id, FillDescriptor fillDescriptor, Ordering scanOrder) { - this(id); + super(id); this.fillDescriptor = fillDescriptor; this.scanOrder = scanOrder; } public FillNode( PlanNodeId id, PlanNode child, FillDescriptor fillDescriptor, Ordering scanOrder) { - this(id, fillDescriptor, scanOrder); - this.child = child; - } - - @Override - public List<PlanNode> getChildren() { - return ImmutableList.of(child); - } - - public PlanNode getChild() { - return child; - } - - @Override - public void addChild(PlanNode child) { - this.child = child; - } - - @Override - public int allowedChildCount() { - return ONE_CHILD; + super(id, child); + this.fillDescriptor = fillDescriptor; + this.scanOrder = scanOrder; } @Override @@ -127,14 +101,12 @@ public class FillNode extends ProcessNode { return false; } FillNode that = (FillNode) o; - return Objects.equals(fillDescriptor, that.fillDescriptor) - && Objects.equals(child, that.child) - && scanOrder == that.scanOrder; + return Objects.equals(fillDescriptor, that.fillDescriptor) && scanOrder == that.scanOrder; } @Override public int hashCode() { - return Objects.hash(super.hashCode(), fillDescriptor, child, scanOrder); + return Objects.hash(super.hashCode(), fillDescriptor, scanOrder); } public FillDescriptor getFillDescriptor() { diff --git a/server/src/main/java/org/apache/iotdb/db/mpp/plan/planner/plan/node/process/GroupByLevelNode.java b/server/src/main/java/org/apache/iotdb/db/mpp/plan/planner/plan/node/process/GroupByLevelNode.java index 0fa39ff184..da7b8dc9c0 100644 --- a/server/src/main/java/org/apache/iotdb/db/mpp/plan/planner/plan/node/process/GroupByLevelNode.java +++ b/server/src/main/java/org/apache/iotdb/db/mpp/plan/planner/plan/node/process/GroupByLevelNode.java @@ -52,7 +52,7 @@ import java.util.stream.Collectors; * <p>If the group by level parameter is [0, 2], then these two columns will not belong to one * bucket. And the total buckets are `root.*.d1.s1` and `root.*.d2.s1` */ -public class GroupByLevelNode extends MultiChildNode { +public class GroupByLevelNode extends MultiChildProcessNode { // The list of aggregate descriptors // each GroupByLevelDescriptor will be output as one or two column of result TsBlock @@ -87,21 +87,6 @@ public class GroupByLevelNode extends MultiChildNode { this.scanOrder = scanOrder; } - @Override - public List<PlanNode> getChildren() { - return children; - } - - @Override - public void addChild(PlanNode child) { - this.children.add(child); - } - - @Override - public int allowedChildCount() { - return CHILD_COUNT_NO_LIMIT; - } - @Override public PlanNode clone() { return new GroupByLevelNode( @@ -191,9 +176,15 @@ public class GroupByLevelNode extends MultiChildNode { @Override public boolean equals(Object o) { - if (this == o) return true; - if (o == null || getClass() != o.getClass()) return false; - if (!super.equals(o)) return false; + if (this == o) { + return true; + } + if (o == null || getClass() != o.getClass()) { + return false; + } + if (!super.equals(o)) { + return false; + } GroupByLevelNode that = (GroupByLevelNode) o; return Objects.equals(groupByLevelDescriptors, that.groupByLevelDescriptors) && Objects.equals(groupByTimeParameter, that.groupByTimeParameter) diff --git a/server/src/main/java/org/apache/iotdb/db/mpp/plan/planner/plan/node/process/GroupByTagNode.java b/server/src/main/java/org/apache/iotdb/db/mpp/plan/planner/plan/node/process/GroupByTagNode.java index e0ef548508..c2e558bdc7 100644 --- a/server/src/main/java/org/apache/iotdb/db/mpp/plan/planner/plan/node/process/GroupByTagNode.java +++ b/server/src/main/java/org/apache/iotdb/db/mpp/plan/planner/plan/node/process/GroupByTagNode.java @@ -43,7 +43,7 @@ import java.util.Map.Entry; import java.util.Objects; import java.util.stream.Collectors; -public class GroupByTagNode extends MultiChildNode { +public class GroupByTagNode extends MultiChildProcessNode { private final List<String> tagKeys; private final Map<List<String>, List<CrossSeriesAggregationDescriptor>> @@ -87,16 +87,6 @@ public class GroupByTagNode extends MultiChildNode { this.outputColumnNames = Validate.notNull(outputColumnNames); } - @Override - public List<PlanNode> getChildren() { - return children; - } - - @Override - public void addChild(PlanNode child) { - this.children.add(child); - } - @Override public PlanNode clone() { // TODO: better do deep copy @@ -109,11 +99,6 @@ public class GroupByTagNode extends MultiChildNode { this.outputColumnNames); } - @Override - public int allowedChildCount() { - return CHILD_COUNT_NO_LIMIT; - } - @Override public List<String> getOutputColumnNames() { List<String> ret = new ArrayList<>(tagKeys); diff --git a/server/src/main/java/org/apache/iotdb/db/mpp/plan/planner/plan/node/process/OffsetNode.java b/server/src/main/java/org/apache/iotdb/db/mpp/plan/planner/plan/node/process/IntoNode.java similarity index 55% copy from server/src/main/java/org/apache/iotdb/db/mpp/plan/planner/plan/node/process/OffsetNode.java copy to server/src/main/java/org/apache/iotdb/db/mpp/plan/planner/plan/node/process/IntoNode.java index 0407f6d946..b68e862a7a 100644 --- a/server/src/main/java/org/apache/iotdb/db/mpp/plan/planner/plan/node/process/OffsetNode.java +++ b/server/src/main/java/org/apache/iotdb/db/mpp/plan/planner/plan/node/process/IntoNode.java @@ -16,96 +16,71 @@ * specific language governing permissions and limitations * under the License. */ + package org.apache.iotdb.db.mpp.plan.planner.plan.node.process; +import org.apache.iotdb.db.mpp.common.header.ColumnHeader; +import org.apache.iotdb.db.mpp.common.header.ColumnHeaderConstant; import org.apache.iotdb.db.mpp.plan.planner.plan.node.PlanNode; import org.apache.iotdb.db.mpp.plan.planner.plan.node.PlanNodeId; import org.apache.iotdb.db.mpp.plan.planner.plan.node.PlanNodeType; import org.apache.iotdb.db.mpp.plan.planner.plan.node.PlanVisitor; -import org.apache.iotdb.tsfile.utils.ReadWriteIOUtils; - -import com.google.common.collect.ImmutableList; +import org.apache.iotdb.db.mpp.plan.planner.plan.parameter.IntoPathDescriptor; import java.io.DataOutputStream; import java.io.IOException; import java.nio.ByteBuffer; import java.util.List; import java.util.Objects; +import java.util.stream.Collectors; -/** - * OffsetNode is used to skip top n result from upstream nodes. It uses the default order of - * upstream nodes - */ -public class OffsetNode extends ProcessNode { - - private final int offset; +public class IntoNode extends SingleChildProcessNode { - private PlanNode child; + private final IntoPathDescriptor intoPathDescriptor; - public OffsetNode(PlanNodeId id, int offset) { + public IntoNode(PlanNodeId id, IntoPathDescriptor intoPathDescriptor) { super(id); - this.offset = offset; + this.intoPathDescriptor = intoPathDescriptor; } - public OffsetNode(PlanNodeId id, PlanNode child, int offset) { - this(id, offset); - this.child = child; - } - - @Override - public List<PlanNode> getChildren() { - return ImmutableList.of(child); - } - - @Override - public void addChild(PlanNode child) { - this.child = child; - } - - @Override - public int allowedChildCount() { - return ONE_CHILD; + public IntoNode(PlanNodeId id, PlanNode child, IntoPathDescriptor intoPathDescriptor) { + super(id, child); + this.intoPathDescriptor = intoPathDescriptor; } @Override public PlanNode clone() { - return new OffsetNode(getPlanNodeId(), offset); + return new IntoNode(getPlanNodeId(), this.intoPathDescriptor); } @Override public List<String> getOutputColumnNames() { - return child.getOutputColumnNames(); - } - - @Override - public <R, C> R accept(PlanVisitor<R, C> visitor, C context) { - return visitor.visitOffset(this, context); + return ColumnHeaderConstant.selectIntoColumnHeaders.stream() + .map(ColumnHeader::getColumnName) + .collect(Collectors.toList()); } @Override protected void serializeAttributes(ByteBuffer byteBuffer) { - PlanNodeType.OFFSET.serialize(byteBuffer); - ReadWriteIOUtils.write(offset, byteBuffer); + PlanNodeType.INTO.serialize(byteBuffer); + this.intoPathDescriptor.serialize(byteBuffer); } @Override protected void serializeAttributes(DataOutputStream stream) throws IOException { - PlanNodeType.OFFSET.serialize(stream); - ReadWriteIOUtils.write(offset, stream); + PlanNodeType.INTO.serialize(stream); + this.intoPathDescriptor.serialize(stream); } - public static OffsetNode deserialize(ByteBuffer byteBuffer) { - int offset = ReadWriteIOUtils.readInt(byteBuffer); + public static IntoNode deserialize(ByteBuffer byteBuffer) { + IntoPathDescriptor intoPathDescriptor = IntoPathDescriptor.deserialize(byteBuffer); PlanNodeId planNodeId = PlanNodeId.deserialize(byteBuffer); - return new OffsetNode(planNodeId, offset); + return new IntoNode(planNodeId, intoPathDescriptor); } - public PlanNode getChild() { - return child; - } - - public int getOffset() { - return offset; + @Override + public <R, C> R accept(PlanVisitor<R, C> visitor, C context) { + return visitor.visitInto(this, context); } @Override @@ -116,12 +91,20 @@ public class OffsetNode extends ProcessNode { if (o == null || getClass() != o.getClass()) { return false; } - OffsetNode that = (OffsetNode) o; - return offset == that.offset && child.equals(that.child); + if (!super.equals(o)) { + return false; + } + IntoNode intoNode = (IntoNode) o; + return intoPathDescriptor.equals(intoNode.intoPathDescriptor); } @Override public int hashCode() { - return Objects.hash(child, offset); + return Objects.hash(super.hashCode(), intoPathDescriptor); + } + + @Override + public String toString() { + return "IntoNode-" + getPlanNodeId(); } } diff --git a/server/src/main/java/org/apache/iotdb/db/mpp/plan/planner/plan/node/process/LimitNode.java b/server/src/main/java/org/apache/iotdb/db/mpp/plan/planner/plan/node/process/LimitNode.java index 9d3badac0e..ae03e13f70 100644 --- a/server/src/main/java/org/apache/iotdb/db/mpp/plan/planner/plan/node/process/LimitNode.java +++ b/server/src/main/java/org/apache/iotdb/db/mpp/plan/planner/plan/node/process/LimitNode.java @@ -24,8 +24,6 @@ import org.apache.iotdb.db.mpp.plan.planner.plan.node.PlanNodeType; import org.apache.iotdb.db.mpp.plan.planner.plan.node.PlanVisitor; import org.apache.iotdb.tsfile.utils.ReadWriteIOUtils; -import com.google.common.collect.ImmutableList; - import java.io.DataOutputStream; import java.io.IOException; import java.nio.ByteBuffer; @@ -33,54 +31,29 @@ import java.util.List; import java.util.Objects; /** LimitNode is used to select top n result. It uses the default order of upstream nodes */ -public class LimitNode extends ProcessNode { +public class LimitNode extends SingleChildProcessNode { private final int limit; - private PlanNode child; - public LimitNode(PlanNodeId id, int limit) { super(id); this.limit = limit; } public LimitNode(PlanNodeId id, PlanNode child, int limit) { - this(id, limit); - this.child = child; + super(id, child); + this.limit = limit; } public int getLimit() { return limit; } - public PlanNode getChild() { - return child; - } - - public void setChild(PlanNode child) { - this.child = child; - } - - @Override - public List<PlanNode> getChildren() { - return ImmutableList.of(child); - } - - @Override - public void addChild(PlanNode child) { - this.child = child; - } - @Override public PlanNode clone() { return new LimitNode(getPlanNodeId(), this.limit); } - @Override - public int allowedChildCount() { - return ONE_CHILD; - } - @Override public List<String> getOutputColumnNames() { return child.getOutputColumnNames(); @@ -126,11 +99,11 @@ public class LimitNode extends ProcessNode { return false; } LimitNode that = (LimitNode) o; - return limit == that.limit && child.equals(that.child); + return limit == that.limit; } @Override public int hashCode() { - return Objects.hash(super.hashCode(), limit, child); + return Objects.hash(super.hashCode(), limit); } } diff --git a/server/src/main/java/org/apache/iotdb/db/mpp/plan/planner/plan/node/process/MultiChildNode.java b/server/src/main/java/org/apache/iotdb/db/mpp/plan/planner/plan/node/process/MultiChildProcessNode.java similarity index 77% copy from server/src/main/java/org/apache/iotdb/db/mpp/plan/planner/plan/node/process/MultiChildNode.java copy to server/src/main/java/org/apache/iotdb/db/mpp/plan/planner/plan/node/process/MultiChildProcessNode.java index 9e699f370c..e23a86d0c1 100644 --- a/server/src/main/java/org/apache/iotdb/db/mpp/plan/planner/plan/node/process/MultiChildNode.java +++ b/server/src/main/java/org/apache/iotdb/db/mpp/plan/planner/plan/node/process/MultiChildProcessNode.java @@ -26,16 +26,16 @@ import java.util.ArrayList; import java.util.List; import java.util.Objects; -public abstract class MultiChildNode extends ProcessNode { +public abstract class MultiChildProcessNode extends ProcessNode { protected List<PlanNode> children; - public MultiChildNode(PlanNodeId id, List<PlanNode> children) { + public MultiChildProcessNode(PlanNodeId id, List<PlanNode> children) { super(id); this.children = children; } - public MultiChildNode(PlanNodeId id) { + public MultiChildProcessNode(PlanNodeId id) { super(id); this.children = new ArrayList<>(); } @@ -44,6 +44,21 @@ public abstract class MultiChildNode extends ProcessNode { this.children = children; } + @Override + public List<PlanNode> getChildren() { + return children; + } + + @Override + public void addChild(PlanNode child) { + this.children.add(child); + } + + @Override + public int allowedChildCount() { + return CHILD_COUNT_NO_LIMIT; + } + @Override public boolean equals(Object o) { if (this == o) { @@ -55,7 +70,7 @@ public abstract class MultiChildNode extends ProcessNode { if (!super.equals(o)) { return false; } - MultiChildNode that = (MultiChildNode) o; + MultiChildProcessNode that = (MultiChildProcessNode) o; return children.equals(that.children); } diff --git a/server/src/main/java/org/apache/iotdb/db/mpp/plan/planner/plan/node/process/OffsetNode.java b/server/src/main/java/org/apache/iotdb/db/mpp/plan/planner/plan/node/process/OffsetNode.java index 0407f6d946..64b912303b 100644 --- a/server/src/main/java/org/apache/iotdb/db/mpp/plan/planner/plan/node/process/OffsetNode.java +++ b/server/src/main/java/org/apache/iotdb/db/mpp/plan/planner/plan/node/process/OffsetNode.java @@ -24,8 +24,6 @@ import org.apache.iotdb.db.mpp.plan.planner.plan.node.PlanNodeType; import org.apache.iotdb.db.mpp.plan.planner.plan.node.PlanVisitor; import org.apache.iotdb.tsfile.utils.ReadWriteIOUtils; -import com.google.common.collect.ImmutableList; - import java.io.DataOutputStream; import java.io.IOException; import java.nio.ByteBuffer; @@ -36,35 +34,18 @@ import java.util.Objects; * OffsetNode is used to skip top n result from upstream nodes. It uses the default order of * upstream nodes */ -public class OffsetNode extends ProcessNode { +public class OffsetNode extends SingleChildProcessNode { private final int offset; - private PlanNode child; - public OffsetNode(PlanNodeId id, int offset) { super(id); this.offset = offset; } public OffsetNode(PlanNodeId id, PlanNode child, int offset) { - this(id, offset); - this.child = child; - } - - @Override - public List<PlanNode> getChildren() { - return ImmutableList.of(child); - } - - @Override - public void addChild(PlanNode child) { - this.child = child; - } - - @Override - public int allowedChildCount() { - return ONE_CHILD; + super(id, child); + this.offset = offset; } @Override @@ -100,10 +81,6 @@ public class OffsetNode extends ProcessNode { return new OffsetNode(planNodeId, offset); } - public PlanNode getChild() { - return child; - } - public int getOffset() { return offset; } @@ -116,12 +93,15 @@ public class OffsetNode extends ProcessNode { if (o == null || getClass() != o.getClass()) { return false; } + if (!super.equals(o)) { + return false; + } OffsetNode that = (OffsetNode) o; - return offset == that.offset && child.equals(that.child); + return offset == that.offset; } @Override public int hashCode() { - return Objects.hash(child, offset); + return Objects.hash(super.hashCode(), offset); } } diff --git a/server/src/main/java/org/apache/iotdb/db/mpp/plan/planner/plan/node/process/ProjectNode.java b/server/src/main/java/org/apache/iotdb/db/mpp/plan/planner/plan/node/process/ProjectNode.java index 3a9b5a7268..d5e71b6257 100644 --- a/server/src/main/java/org/apache/iotdb/db/mpp/plan/planner/plan/node/process/ProjectNode.java +++ b/server/src/main/java/org/apache/iotdb/db/mpp/plan/planner/plan/node/process/ProjectNode.java @@ -25,8 +25,6 @@ import org.apache.iotdb.db.mpp.plan.planner.plan.node.PlanNodeType; import org.apache.iotdb.db.mpp.plan.planner.plan.node.PlanVisitor; import org.apache.iotdb.tsfile.utils.ReadWriteIOUtils; -import com.google.common.collect.ImmutableList; - import java.io.DataOutputStream; import java.io.IOException; import java.nio.ByteBuffer; @@ -34,38 +32,20 @@ import java.util.ArrayList; import java.util.List; import java.util.Objects; -public class ProjectNode extends ProcessNode { +public class ProjectNode extends SingleChildProcessNode { private final List<String> outputColumnNames; - private PlanNode child; - public ProjectNode(PlanNodeId id, List<String> outputColumnNames) { super(id); this.outputColumnNames = outputColumnNames; } public ProjectNode(PlanNodeId id, PlanNode child, List<String> outputColumnNames) { - super(id); - this.child = child; + super(id, child); this.outputColumnNames = outputColumnNames; } - @Override - public List<PlanNode> getChildren() { - return ImmutableList.of(child); - } - - @Override - public void addChild(PlanNode child) { - this.child = child; - } - - @Override - public int allowedChildCount() { - return ONE_CHILD; - } - @Override public PlanNode clone() { return new ProjectNode(getPlanNodeId(), getOutputColumnNames()); @@ -112,15 +92,21 @@ public class ProjectNode extends ProcessNode { @Override public boolean equals(Object o) { - if (this == o) return true; - if (o == null || getClass() != o.getClass()) return false; - if (!super.equals(o)) return false; + if (this == o) { + return true; + } + if (o == null || getClass() != o.getClass()) { + return false; + } + if (!super.equals(o)) { + return false; + } ProjectNode that = (ProjectNode) o; - return outputColumnNames.equals(that.outputColumnNames) && child.equals(that.child); + return outputColumnNames.equals(that.outputColumnNames); } @Override public int hashCode() { - return Objects.hash(super.hashCode(), outputColumnNames, child); + return Objects.hash(super.hashCode(), outputColumnNames); } } diff --git a/server/src/main/java/org/apache/iotdb/db/mpp/plan/planner/plan/node/process/MultiChildNode.java b/server/src/main/java/org/apache/iotdb/db/mpp/plan/planner/plan/node/process/SingleChildProcessNode.java similarity index 59% rename from server/src/main/java/org/apache/iotdb/db/mpp/plan/planner/plan/node/process/MultiChildNode.java rename to server/src/main/java/org/apache/iotdb/db/mpp/plan/planner/plan/node/process/SingleChildProcessNode.java index 9e699f370c..bd183cf3ee 100644 --- a/server/src/main/java/org/apache/iotdb/db/mpp/plan/planner/plan/node/process/MultiChildNode.java +++ b/server/src/main/java/org/apache/iotdb/db/mpp/plan/planner/plan/node/process/SingleChildProcessNode.java @@ -22,26 +22,52 @@ package org.apache.iotdb.db.mpp.plan.planner.plan.node.process; import org.apache.iotdb.db.mpp.plan.planner.plan.node.PlanNode; import org.apache.iotdb.db.mpp.plan.planner.plan.node.PlanNodeId; -import java.util.ArrayList; +import com.google.common.collect.ImmutableList; + import java.util.List; import java.util.Objects; -public abstract class MultiChildNode extends ProcessNode { +public abstract class SingleChildProcessNode extends ProcessNode { - protected List<PlanNode> children; + protected PlanNode child; - public MultiChildNode(PlanNodeId id, List<PlanNode> children) { + public SingleChildProcessNode(PlanNodeId id) { super(id); - this.children = children; } - public MultiChildNode(PlanNodeId id) { + public SingleChildProcessNode(PlanNodeId id, PlanNode child) { super(id); - this.children = new ArrayList<>(); + this.child = child; + } + + public PlanNode getChild() { + return child; + } + + public void setChild(PlanNode child) { + this.child = child; + } + + public void cleanChildren() { + this.child = null; } - public void setChildren(List<PlanNode> children) { - this.children = children; + @Override + public List<PlanNode> getChildren() { + if (this.child == null) { + return ImmutableList.of(); + } + return ImmutableList.of(child); + } + + @Override + public void addChild(PlanNode child) { + this.child = child; + } + + @Override + public int allowedChildCount() { + return ONE_CHILD; } @Override @@ -55,12 +81,12 @@ public abstract class MultiChildNode extends ProcessNode { if (!super.equals(o)) { return false; } - MultiChildNode that = (MultiChildNode) o; - return children.equals(that.children); + SingleChildProcessNode that = (SingleChildProcessNode) o; + return Objects.equals(child, that.child); } @Override public int hashCode() { - return Objects.hash(super.hashCode(), children); + return Objects.hash(super.hashCode(), child); } } diff --git a/server/src/main/java/org/apache/iotdb/db/mpp/plan/planner/plan/node/process/SlidingWindowAggregationNode.java b/server/src/main/java/org/apache/iotdb/db/mpp/plan/planner/plan/node/process/SlidingWindowAggregationNode.java index a2f4935791..49d470a780 100644 --- a/server/src/main/java/org/apache/iotdb/db/mpp/plan/planner/plan/node/process/SlidingWindowAggregationNode.java +++ b/server/src/main/java/org/apache/iotdb/db/mpp/plan/planner/plan/node/process/SlidingWindowAggregationNode.java @@ -28,8 +28,6 @@ import org.apache.iotdb.db.mpp.plan.planner.plan.parameter.GroupByTimeParameter; import org.apache.iotdb.db.mpp.plan.statement.component.Ordering; import org.apache.iotdb.tsfile.utils.ReadWriteIOUtils; -import com.google.common.collect.ImmutableList; - import java.io.DataOutputStream; import java.io.IOException; import java.nio.ByteBuffer; @@ -38,7 +36,7 @@ import java.util.List; import java.util.Objects; import java.util.stream.Collectors; -public class SlidingWindowAggregationNode extends ProcessNode { +public class SlidingWindowAggregationNode extends SingleChildProcessNode { // The list of aggregate functions, each AggregateDescriptor will be output as one column of // result TsBlock @@ -49,8 +47,6 @@ public class SlidingWindowAggregationNode extends ProcessNode { protected Ordering scanOrder; - private PlanNode child; - public SlidingWindowAggregationNode( PlanNodeId id, List<AggregationDescriptor> aggregationDescriptorList, @@ -68,8 +64,10 @@ public class SlidingWindowAggregationNode extends ProcessNode { List<AggregationDescriptor> aggregationDescriptorList, GroupByTimeParameter groupByTimeParameter, Ordering scanOrder) { - this(id, aggregationDescriptorList, groupByTimeParameter, scanOrder); - this.child = child; + super(id, child); + this.aggregationDescriptorList = aggregationDescriptorList; + this.groupByTimeParameter = groupByTimeParameter; + this.scanOrder = scanOrder; } public List<AggregationDescriptor> getAggregationDescriptorList() { @@ -88,25 +86,6 @@ public class SlidingWindowAggregationNode extends ProcessNode { return scanOrder; } - public PlanNode getChild() { - return child; - } - - @Override - public List<PlanNode> getChildren() { - return ImmutableList.of(child); - } - - @Override - public void addChild(PlanNode child) { - this.child = child; - } - - @Override - public int allowedChildCount() { - return ONE_CHILD; - } - @Override public PlanNode clone() { return new SlidingWindowAggregationNode( @@ -189,13 +168,12 @@ public class SlidingWindowAggregationNode extends ProcessNode { } SlidingWindowAggregationNode that = (SlidingWindowAggregationNode) o; return Objects.equals(aggregationDescriptorList, that.aggregationDescriptorList) - && Objects.equals(groupByTimeParameter, that.groupByTimeParameter) - && Objects.equals(child, that.child); + && Objects.equals(groupByTimeParameter, that.groupByTimeParameter); } @Override public int hashCode() { - return Objects.hash(super.hashCode(), aggregationDescriptorList, groupByTimeParameter, child); + return Objects.hash(super.hashCode(), aggregationDescriptorList, groupByTimeParameter); } public String toString() { diff --git a/server/src/main/java/org/apache/iotdb/db/mpp/plan/planner/plan/node/process/SortNode.java b/server/src/main/java/org/apache/iotdb/db/mpp/plan/planner/plan/node/process/SortNode.java index 283987eb97..e033fd163d 100644 --- a/server/src/main/java/org/apache/iotdb/db/mpp/plan/planner/plan/node/process/SortNode.java +++ b/server/src/main/java/org/apache/iotdb/db/mpp/plan/planner/plan/node/process/SortNode.java @@ -25,8 +25,6 @@ import org.apache.iotdb.db.mpp.plan.planner.plan.node.PlanVisitor; import org.apache.iotdb.db.mpp.plan.statement.component.Ordering; import org.apache.iotdb.tsfile.utils.ReadWriteIOUtils; -import com.google.common.collect.ImmutableList; - import java.io.DataOutputStream; import java.io.IOException; import java.nio.ByteBuffer; @@ -37,9 +35,7 @@ import java.util.Objects; * In general, the parameter in sortNode should be pushed down to the upstream operators. In our * optimized logical query plan, the sortNode should not appear. */ -public class SortNode extends ProcessNode { - - private PlanNode child; +public class SortNode extends SingleChildProcessNode { private final Ordering sortOrder; @@ -49,29 +45,14 @@ public class SortNode extends ProcessNode { } public SortNode(PlanNodeId id, PlanNode child, Ordering sortOrder) { - this(id, sortOrder); - this.child = child; + super(id, child); + this.sortOrder = sortOrder; } public Ordering getSortOrder() { return sortOrder; } - @Override - public List<PlanNode> getChildren() { - return ImmutableList.of(child); - } - - @Override - public void addChild(PlanNode child) { - this.child = child; - } - - @Override - public int allowedChildCount() { - return ONE_CHILD; - } - @Override public PlanNode clone() { return new SortNode(getPlanNodeId(), sortOrder); @@ -117,11 +98,11 @@ public class SortNode extends ProcessNode { return false; } SortNode sortNode = (SortNode) o; - return child.equals(sortNode.child) && sortOrder == sortNode.sortOrder; + return sortOrder == sortNode.sortOrder; } @Override public int hashCode() { - return Objects.hash(super.hashCode(), child, sortOrder); + return Objects.hash(super.hashCode(), sortOrder); } } diff --git a/server/src/main/java/org/apache/iotdb/db/mpp/plan/planner/plan/node/process/TimeJoinNode.java b/server/src/main/java/org/apache/iotdb/db/mpp/plan/planner/plan/node/process/TimeJoinNode.java index 5685e6d95b..2598cd4e28 100644 --- a/server/src/main/java/org/apache/iotdb/db/mpp/plan/planner/plan/node/process/TimeJoinNode.java +++ b/server/src/main/java/org/apache/iotdb/db/mpp/plan/planner/plan/node/process/TimeJoinNode.java @@ -38,7 +38,7 @@ import java.util.stream.Collectors; * timestamp column. It will join two or more TsBlock by Timestamp column. The output result of * TimeJoinOperator is sorted by timestamp */ -public class TimeJoinNode extends MultiChildNode { +public class TimeJoinNode extends MultiChildProcessNode { // This parameter indicates the order when executing multiway merge sort. private final Ordering mergeOrder; @@ -57,21 +57,6 @@ public class TimeJoinNode extends MultiChildNode { return mergeOrder; } - @Override - public List<PlanNode> getChildren() { - return children; - } - - @Override - public void addChild(PlanNode child) { - this.children.add(child); - } - - @Override - public int allowedChildCount() { - return CHILD_COUNT_NO_LIMIT; - } - @Override public PlanNode clone() { return new TimeJoinNode(getPlanNodeId(), getMergeOrder()); diff --git a/server/src/main/java/org/apache/iotdb/db/mpp/plan/planner/plan/node/process/TransformNode.java b/server/src/main/java/org/apache/iotdb/db/mpp/plan/planner/plan/node/process/TransformNode.java index f6233e2e9d..112e53a01b 100644 --- a/server/src/main/java/org/apache/iotdb/db/mpp/plan/planner/plan/node/process/TransformNode.java +++ b/server/src/main/java/org/apache/iotdb/db/mpp/plan/planner/plan/node/process/TransformNode.java @@ -27,8 +27,6 @@ import org.apache.iotdb.db.mpp.plan.planner.plan.node.PlanVisitor; import org.apache.iotdb.db.mpp.plan.statement.component.Ordering; import org.apache.iotdb.tsfile.utils.ReadWriteIOUtils; -import com.google.common.collect.ImmutableList; - import java.io.DataOutputStream; import java.io.IOException; import java.nio.ByteBuffer; @@ -38,9 +36,7 @@ import java.util.Arrays; import java.util.List; import java.util.Objects; -public class TransformNode extends ProcessNode { - - protected PlanNode childPlanNode; +public class TransformNode extends SingleChildProcessNode { protected final Expression[] outputExpressions; protected final boolean keepNull; @@ -52,13 +48,12 @@ public class TransformNode extends ProcessNode { public TransformNode( PlanNodeId id, - PlanNode childPlanNode, + PlanNode child, Expression[] outputExpressions, boolean keepNull, ZoneId zoneId, Ordering scanOrder) { - super(id); - this.childPlanNode = childPlanNode; + super(id, child); this.outputExpressions = outputExpressions; this.keepNull = keepNull; this.zoneId = zoneId; @@ -78,21 +73,6 @@ public class TransformNode extends ProcessNode { this.scanOrder = scanOrder; } - @Override - public final List<PlanNode> getChildren() { - return ImmutableList.of(childPlanNode); - } - - @Override - public final void addChild(PlanNode childPlanNode) { - this.childPlanNode = childPlanNode; - } - - @Override - public final int allowedChildCount() { - return ONE_CHILD; - } - @Override public final List<String> getOutputColumnNames() { if (outputColumnNames == null) { @@ -185,7 +165,6 @@ public class TransformNode extends ProcessNode { } TransformNode that = (TransformNode) o; return keepNull == that.keepNull - && childPlanNode.equals(that.childPlanNode) && Arrays.equals(outputExpressions, that.outputExpressions) && zoneId.equals(that.zoneId) && scanOrder == that.scanOrder; @@ -193,7 +172,7 @@ public class TransformNode extends ProcessNode { @Override public int hashCode() { - int result = Objects.hash(super.hashCode(), childPlanNode, keepNull, zoneId, scanOrder); + int result = Objects.hash(super.hashCode(), keepNull, zoneId, scanOrder); result = 31 * result + Arrays.hashCode(outputExpressions); return result; } diff --git a/server/src/main/java/org/apache/iotdb/db/mpp/plan/planner/plan/node/process/last/LastQueryCollectNode.java b/server/src/main/java/org/apache/iotdb/db/mpp/plan/planner/plan/node/process/last/LastQueryCollectNode.java index a81a5fb455..5f38f64a0a 100644 --- a/server/src/main/java/org/apache/iotdb/db/mpp/plan/planner/plan/node/process/last/LastQueryCollectNode.java +++ b/server/src/main/java/org/apache/iotdb/db/mpp/plan/planner/plan/node/process/last/LastQueryCollectNode.java @@ -22,7 +22,7 @@ import org.apache.iotdb.db.mpp.plan.planner.plan.node.PlanNode; import org.apache.iotdb.db.mpp.plan.planner.plan.node.PlanNodeId; import org.apache.iotdb.db.mpp.plan.planner.plan.node.PlanNodeType; import org.apache.iotdb.db.mpp.plan.planner.plan.node.PlanVisitor; -import org.apache.iotdb.db.mpp.plan.planner.plan.node.process.MultiChildNode; +import org.apache.iotdb.db.mpp.plan.planner.plan.node.process.MultiChildProcessNode; import java.io.DataOutputStream; import java.io.IOException; @@ -32,7 +32,7 @@ import java.util.Objects; import static org.apache.iotdb.db.mpp.plan.planner.plan.node.source.LastQueryScanNode.LAST_QUERY_HEADER_COLUMNS; -public class LastQueryCollectNode extends MultiChildNode { +public class LastQueryCollectNode extends MultiChildProcessNode { public LastQueryCollectNode(PlanNodeId id) { super(id); diff --git a/server/src/main/java/org/apache/iotdb/db/mpp/plan/planner/plan/node/process/last/LastQueryMergeNode.java b/server/src/main/java/org/apache/iotdb/db/mpp/plan/planner/plan/node/process/last/LastQueryMergeNode.java index 482fc0110d..accb4dcbd7 100644 --- a/server/src/main/java/org/apache/iotdb/db/mpp/plan/planner/plan/node/process/last/LastQueryMergeNode.java +++ b/server/src/main/java/org/apache/iotdb/db/mpp/plan/planner/plan/node/process/last/LastQueryMergeNode.java @@ -22,7 +22,7 @@ import org.apache.iotdb.db.mpp.plan.planner.plan.node.PlanNode; import org.apache.iotdb.db.mpp.plan.planner.plan.node.PlanNodeId; import org.apache.iotdb.db.mpp.plan.planner.plan.node.PlanNodeType; import org.apache.iotdb.db.mpp.plan.planner.plan.node.PlanVisitor; -import org.apache.iotdb.db.mpp.plan.planner.plan.node.process.MultiChildNode; +import org.apache.iotdb.db.mpp.plan.planner.plan.node.process.MultiChildProcessNode; import org.apache.iotdb.db.mpp.plan.planner.plan.parameter.OrderByParameter; import java.io.DataOutputStream; @@ -33,7 +33,7 @@ import java.util.Objects; import static org.apache.iotdb.db.mpp.plan.planner.plan.node.source.LastQueryScanNode.LAST_QUERY_HEADER_COLUMNS; -public class LastQueryMergeNode extends MultiChildNode { +public class LastQueryMergeNode extends MultiChildProcessNode { // The result output order, which could sort by sensor and time. // The size of this list is 2 and the first SortItem in this list has higher priority. diff --git a/server/src/main/java/org/apache/iotdb/db/mpp/plan/planner/plan/node/process/last/LastQueryNode.java b/server/src/main/java/org/apache/iotdb/db/mpp/plan/planner/plan/node/process/last/LastQueryNode.java index 23b40ca829..b8e1cf422f 100644 --- a/server/src/main/java/org/apache/iotdb/db/mpp/plan/planner/plan/node/process/last/LastQueryNode.java +++ b/server/src/main/java/org/apache/iotdb/db/mpp/plan/planner/plan/node/process/last/LastQueryNode.java @@ -22,7 +22,7 @@ import org.apache.iotdb.db.mpp.plan.planner.plan.node.PlanNode; import org.apache.iotdb.db.mpp.plan.planner.plan.node.PlanNodeId; import org.apache.iotdb.db.mpp.plan.planner.plan.node.PlanNodeType; import org.apache.iotdb.db.mpp.plan.planner.plan.node.PlanVisitor; -import org.apache.iotdb.db.mpp.plan.planner.plan.node.process.MultiChildNode; +import org.apache.iotdb.db.mpp.plan.planner.plan.node.process.MultiChildProcessNode; import org.apache.iotdb.db.mpp.plan.planner.plan.parameter.OrderByParameter; import org.apache.iotdb.tsfile.read.filter.basic.Filter; import org.apache.iotdb.tsfile.read.filter.factory.FilterFactory; @@ -38,7 +38,7 @@ import java.util.Objects; import static org.apache.iotdb.db.mpp.plan.planner.plan.node.source.LastQueryScanNode.LAST_QUERY_HEADER_COLUMNS; -public class LastQueryNode extends MultiChildNode { +public class LastQueryNode extends MultiChildProcessNode { private final Filter timeFilter;
