This is an automated email from the ASF dual-hosted git repository. hui pushed a commit to branch lmh/addJoinNode in repository https://gitbox.apache.org/repos/asf/iotdb.git
commit 190c40d411ebb6154b79b16989cc22a30ef3dda8 Author: Minghui Liu <[email protected]> AuthorDate: Wed Dec 20 11:05:34 2023 +0800 add LeftOuterTimeJoinNode --- .../plan/planner/plan/node/PlanNode.java | 1 + .../plan/planner/plan/node/PlanVisitor.java | 12 +++ .../plan/node/process/TwoChildProcessNode.java | 85 ++++++++++++++++++++++ .../node/process/join/FullOuterTimeJoinNode.java | 4 +- .../plan/node/process/join/InnerTimeJoinNode.java | 4 +- ...imeJoinNode.java => LeftOuterTimeJoinNode.java} | 51 +++++-------- 6 files changed, 122 insertions(+), 35 deletions(-) diff --git a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/planner/plan/node/PlanNode.java b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/planner/plan/node/PlanNode.java index 18f678974b7..6d13dde35ff 100644 --- a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/planner/plan/node/PlanNode.java +++ b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/planner/plan/node/PlanNode.java @@ -42,6 +42,7 @@ public abstract class PlanNode implements IConsensusRequest { protected static final int NO_CHILD_ALLOWED = 0; protected static final int ONE_CHILD = 1; + protected static final int TWO_CHILDREN = 2; protected static final int CHILD_COUNT_NO_LIMIT = -1; protected PlanNodeId id; diff --git a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/planner/plan/node/PlanVisitor.java b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/planner/plan/node/PlanVisitor.java index 013dd801871..ed7072a76eb 100644 --- a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/planner/plan/node/PlanVisitor.java +++ b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/planner/plan/node/PlanVisitor.java @@ -80,8 +80,10 @@ import org.apache.iotdb.db.queryengine.plan.planner.plan.node.process.SlidingWin import org.apache.iotdb.db.queryengine.plan.planner.plan.node.process.SortNode; import org.apache.iotdb.db.queryengine.plan.planner.plan.node.process.TopKNode; import org.apache.iotdb.db.queryengine.plan.planner.plan.node.process.TransformNode; +import org.apache.iotdb.db.queryengine.plan.planner.plan.node.process.TwoChildProcessNode; import org.apache.iotdb.db.queryengine.plan.planner.plan.node.process.join.FullOuterTimeJoinNode; import org.apache.iotdb.db.queryengine.plan.planner.plan.node.process.join.InnerTimeJoinNode; +import org.apache.iotdb.db.queryengine.plan.planner.plan.node.process.join.LeftOuterTimeJoinNode; import org.apache.iotdb.db.queryengine.plan.planner.plan.node.process.last.LastQueryCollectNode; import org.apache.iotdb.db.queryengine.plan.planner.plan.node.process.last.LastQueryMergeNode; import org.apache.iotdb.db.queryengine.plan.planner.plan.node.process.last.LastQueryNode; @@ -209,6 +211,16 @@ public abstract class PlanVisitor<R, C> { return visitSingleChildProcess(node, context); } + // two child ----------------------------------------------------------------------------------- + + public R visitTwoChildProcess(TwoChildProcessNode node, C context) { + return visitPlan(node, context); + } + + public R visitLeftOuterTimeJoin(LeftOuterTimeJoinNode node, C context) { + return visitTwoChildProcess(node, context); + } + // multi child -------------------------------------------------------------------------------- public R visitMultiChildProcess(MultiChildProcessNode node, C context) { diff --git a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/planner/plan/node/process/TwoChildProcessNode.java b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/planner/plan/node/process/TwoChildProcessNode.java new file mode 100644 index 00000000000..290725534f4 --- /dev/null +++ b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/planner/plan/node/process/TwoChildProcessNode.java @@ -0,0 +1,85 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ + +package org.apache.iotdb.db.queryengine.plan.planner.plan.node.process; + +import org.apache.iotdb.db.queryengine.plan.planner.plan.node.PlanNode; +import org.apache.iotdb.db.queryengine.plan.planner.plan.node.PlanNodeId; + +import com.google.common.collect.ImmutableList; + +import java.util.List; +import java.util.Objects; + +public abstract class TwoChildProcessNode extends ProcessNode { + + protected PlanNode leftChild; + protected PlanNode rightChild; + + protected TwoChildProcessNode(PlanNodeId id) { + super(id); + } + + protected TwoChildProcessNode(PlanNodeId id, PlanNode leftChild, PlanNode rightChild) { + super(id); + this.leftChild = leftChild; + this.rightChild = rightChild; + } + + @Override + public List<PlanNode> getChildren() { + return ImmutableList.of(leftChild, rightChild); + } + + @Override + public void addChild(PlanNode child) { + if (leftChild == null) { + leftChild = child; + } else if (rightChild == null) { + rightChild = child; + } else { + throw new UnsupportedOperationException("This node doesn't support more than two children"); + } + } + + @Override + public int allowedChildCount() { + return TWO_CHILDREN; + } + + @Override + public boolean equals(Object o) { + if (this == o) { + return true; + } + if (o == null || getClass() != o.getClass()) { + return false; + } + if (!super.equals(o)) { + return false; + } + TwoChildProcessNode that = (TwoChildProcessNode) o; + return Objects.equals(leftChild, that.leftChild) && Objects.equals(rightChild, that.rightChild); + } + + @Override + public int hashCode() { + return Objects.hash(super.hashCode(), leftChild, rightChild); + } +} diff --git a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/planner/plan/node/process/join/FullOuterTimeJoinNode.java b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/planner/plan/node/process/join/FullOuterTimeJoinNode.java index 8a6173a780e..5b21ccaabe6 100644 --- a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/planner/plan/node/process/join/FullOuterTimeJoinNode.java +++ b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/planner/plan/node/process/join/FullOuterTimeJoinNode.java @@ -121,11 +121,11 @@ public class FullOuterTimeJoinNode extends MultiChildProcessNode { return false; } FullOuterTimeJoinNode that = (FullOuterTimeJoinNode) o; - return mergeOrder == that.mergeOrder && children.equals(that.children); + return mergeOrder == that.mergeOrder; } @Override public int hashCode() { - return Objects.hash(super.hashCode(), mergeOrder, children); + return Objects.hash(super.hashCode(), mergeOrder); } } diff --git a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/planner/plan/node/process/join/InnerTimeJoinNode.java b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/planner/plan/node/process/join/InnerTimeJoinNode.java index 4d559843b6e..42fda408817 100644 --- a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/planner/plan/node/process/join/InnerTimeJoinNode.java +++ b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/planner/plan/node/process/join/InnerTimeJoinNode.java @@ -116,11 +116,11 @@ public class InnerTimeJoinNode extends MultiChildProcessNode { return false; } InnerTimeJoinNode that = (InnerTimeJoinNode) o; - return mergeOrder == that.mergeOrder && children.equals(that.children); + return mergeOrder == that.mergeOrder; } @Override public int hashCode() { - return Objects.hash(super.hashCode(), mergeOrder, children); + return Objects.hash(super.hashCode(), mergeOrder); } } diff --git a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/planner/plan/node/process/join/InnerTimeJoinNode.java b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/planner/plan/node/process/join/LeftOuterTimeJoinNode.java similarity index 65% copy from iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/planner/plan/node/process/join/InnerTimeJoinNode.java copy to iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/planner/plan/node/process/join/LeftOuterTimeJoinNode.java index 4d559843b6e..0fd83f4c04e 100644 --- a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/planner/plan/node/process/join/InnerTimeJoinNode.java +++ b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/planner/plan/node/process/join/LeftOuterTimeJoinNode.java @@ -23,30 +23,29 @@ import org.apache.iotdb.db.queryengine.plan.planner.plan.node.PlanNode; import org.apache.iotdb.db.queryengine.plan.planner.plan.node.PlanNodeId; import org.apache.iotdb.db.queryengine.plan.planner.plan.node.PlanNodeType; import org.apache.iotdb.db.queryengine.plan.planner.plan.node.PlanVisitor; -import org.apache.iotdb.db.queryengine.plan.planner.plan.node.process.MultiChildProcessNode; +import org.apache.iotdb.db.queryengine.plan.planner.plan.node.process.TwoChildProcessNode; import org.apache.iotdb.db.queryengine.plan.statement.component.Ordering; import org.apache.iotdb.tsfile.utils.ReadWriteIOUtils; import java.io.DataOutputStream; import java.io.IOException; import java.nio.ByteBuffer; -import java.util.ArrayList; import java.util.List; import java.util.Objects; -import java.util.stream.Collectors; -public class InnerTimeJoinNode extends MultiChildProcessNode { +public class LeftOuterTimeJoinNode extends TwoChildProcessNode { // This parameter indicates the order when executing multiway merge sort. private final Ordering mergeOrder; - public InnerTimeJoinNode(PlanNodeId id, Ordering mergeOrder) { - super(id, new ArrayList<>()); + public LeftOuterTimeJoinNode(PlanNodeId id, Ordering mergeOrder) { + super(id); this.mergeOrder = mergeOrder; } - public InnerTimeJoinNode(PlanNodeId id, Ordering mergeOrder, List<PlanNode> children) { - super(id, children); + public LeftOuterTimeJoinNode( + PlanNodeId id, Ordering mergeOrder, PlanNode leftChild, PlanNode rightChild) { + super(id, leftChild, rightChild); this.mergeOrder = mergeOrder; } @@ -56,52 +55,42 @@ public class InnerTimeJoinNode extends MultiChildProcessNode { @Override public PlanNode clone() { - return new InnerTimeJoinNode(getPlanNodeId(), getMergeOrder()); - } - - @Override - public PlanNode createSubNode(int subNodeId, int startIndex, int endIndex) { - return new InnerTimeJoinNode( - new PlanNodeId(String.format("%s-%s", getPlanNodeId(), subNodeId)), - getMergeOrder(), - new ArrayList<>(children.subList(startIndex, endIndex))); + return new LeftOuterTimeJoinNode(getPlanNodeId(), getMergeOrder()); } @Override public List<String> getOutputColumnNames() { - return children.stream() - .map(PlanNode::getOutputColumnNames) - .flatMap(List::stream) - .distinct() - .collect(Collectors.toList()); + List<String> outputColumnNames = leftChild.getOutputColumnNames(); + outputColumnNames.addAll(rightChild.getOutputColumnNames()); + return outputColumnNames; } @Override public <R, C> R accept(PlanVisitor<R, C> visitor, C context) { - return visitor.visitInnerTimeJoin(this, context); + return visitor.visitLeftOuterTimeJoin(this, context); } @Override protected void serializeAttributes(ByteBuffer byteBuffer) { - PlanNodeType.INNER_TIME_JOIN.serialize(byteBuffer); + PlanNodeType.LEFT_OUTER_TIME_JOIN.serialize(byteBuffer); ReadWriteIOUtils.write(mergeOrder.ordinal(), byteBuffer); } @Override protected void serializeAttributes(DataOutputStream stream) throws IOException { - PlanNodeType.INNER_TIME_JOIN.serialize(stream); + PlanNodeType.LEFT_OUTER_TIME_JOIN.serialize(stream); ReadWriteIOUtils.write(mergeOrder.ordinal(), stream); } - public static InnerTimeJoinNode deserialize(ByteBuffer byteBuffer) { + public static LeftOuterTimeJoinNode deserialize(ByteBuffer byteBuffer) { Ordering mergeOrder = Ordering.values()[ReadWriteIOUtils.readInt(byteBuffer)]; PlanNodeId planNodeId = PlanNodeId.deserialize(byteBuffer); - return new InnerTimeJoinNode(planNodeId, mergeOrder); + return new LeftOuterTimeJoinNode(planNodeId, mergeOrder); } @Override public String toString() { - return "InnerTimeJoinNode-" + this.getPlanNodeId(); + return "LeftOuterTimeJoinNode-" + this.getPlanNodeId(); } @Override @@ -115,12 +104,12 @@ public class InnerTimeJoinNode extends MultiChildProcessNode { if (!super.equals(o)) { return false; } - InnerTimeJoinNode that = (InnerTimeJoinNode) o; - return mergeOrder == that.mergeOrder && children.equals(that.children); + LeftOuterTimeJoinNode that = (LeftOuterTimeJoinNode) o; + return mergeOrder == that.mergeOrder; } @Override public int hashCode() { - return Objects.hash(super.hashCode(), mergeOrder, children); + return Objects.hash(super.hashCode(), mergeOrder); } }
