This is an automated email from the ASF dual-hosted git repository. hui pushed a commit to branch lmh/addJoinNode in repository https://gitbox.apache.org/repos/asf/iotdb.git
commit 4240a1450df593e5c852b0db780b65d26b959114 Author: Minghui Liu <[email protected]> AuthorDate: Wed Dec 20 10:28:13 2023 +0800 add InnerTimeJoinNode --- .../plan/planner/plan/node/PlanNodeType.java | 12 +++++--- .../plan/planner/plan/node/PlanVisitor.java | 7 ++++- .../node/process/join/FullOuterTimeJoinNode.java | 6 ++-- ...terTimeJoinNode.java => InnerTimeJoinNode.java} | 33 +++++++++------------- 4 files changed, 31 insertions(+), 27 deletions(-) diff --git a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/planner/plan/node/PlanNodeType.java b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/planner/plan/node/PlanNodeType.java index 4c59619a248..17e9af3c9a4 100644 --- a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/planner/plan/node/PlanNodeType.java +++ b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/planner/plan/node/PlanNodeType.java @@ -68,7 +68,6 @@ import org.apache.iotdb.db.queryengine.plan.planner.plan.node.process.DeviceView import org.apache.iotdb.db.queryengine.plan.planner.plan.node.process.ExchangeNode; import org.apache.iotdb.db.queryengine.plan.planner.plan.node.process.FillNode; import org.apache.iotdb.db.queryengine.plan.planner.plan.node.process.FilterNode; -import org.apache.iotdb.db.queryengine.plan.planner.plan.node.process.join.FullOuterTimeJoinNode; import org.apache.iotdb.db.queryengine.plan.planner.plan.node.process.GroupByLevelNode; import org.apache.iotdb.db.queryengine.plan.planner.plan.node.process.GroupByTagNode; import org.apache.iotdb.db.queryengine.plan.planner.plan.node.process.HorizontallyConcatNode; @@ -82,6 +81,8 @@ import org.apache.iotdb.db.queryengine.plan.planner.plan.node.process.SlidingWin import org.apache.iotdb.db.queryengine.plan.planner.plan.node.process.SortNode; import org.apache.iotdb.db.queryengine.plan.planner.plan.node.process.TopKNode; import org.apache.iotdb.db.queryengine.plan.planner.plan.node.process.TransformNode; +import org.apache.iotdb.db.queryengine.plan.planner.plan.node.process.join.FullOuterTimeJoinNode; +import org.apache.iotdb.db.queryengine.plan.planner.plan.node.process.join.InnerTimeJoinNode; import org.apache.iotdb.db.queryengine.plan.planner.plan.node.process.last.LastQueryCollectNode; import org.apache.iotdb.db.queryengine.plan.planner.plan.node.process.last.LastQueryMergeNode; import org.apache.iotdb.db.queryengine.plan.planner.plan.node.process.last.LastQueryNode; @@ -118,7 +119,7 @@ public enum PlanNodeType { LIMIT((short) 6), OFFSET((short) 7), SORT((short) 8), - TIME_JOIN((short) 9), + FULL_OUTER_TIME_JOIN((short) 9), FRAGMENT_SINK((short) 10), SERIES_SCAN((short) 11), SERIES_AGGREGATE_SCAN((short) 12), @@ -196,7 +197,9 @@ public enum PlanNodeType { PIPE_ENRICHED_DELETE_DATA((short) 84), PIPE_ENRICHED_WRITE_SCHEMA((short) 85), PIPE_ENRICHED_DELETE_SCHEMA((short) 86), - ; + + INNER_TIME_JOIN((short) 87), + LEFT_OUTER_TIME_JOIN((short) 88); public static final int BYTES = Short.BYTES; @@ -417,7 +420,8 @@ public enum PlanNodeType { return PipeEnrichedWriteSchemaNode.deserialize(buffer); case 86: return PipeEnrichedConfigSchemaNode.deserialize(buffer); - + case 87: + return InnerTimeJoinNode.deserialize(buffer); default: throw new IllegalArgumentException("Invalid node type: " + nodeType); } diff --git a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/planner/plan/node/PlanVisitor.java b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/planner/plan/node/PlanVisitor.java index 484afe601fe..013dd801871 100644 --- a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/planner/plan/node/PlanVisitor.java +++ b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/planner/plan/node/PlanVisitor.java @@ -65,7 +65,6 @@ import org.apache.iotdb.db.queryengine.plan.planner.plan.node.process.DeviceView import org.apache.iotdb.db.queryengine.plan.planner.plan.node.process.ExchangeNode; import org.apache.iotdb.db.queryengine.plan.planner.plan.node.process.FillNode; import org.apache.iotdb.db.queryengine.plan.planner.plan.node.process.FilterNode; -import org.apache.iotdb.db.queryengine.plan.planner.plan.node.process.join.FullOuterTimeJoinNode; import org.apache.iotdb.db.queryengine.plan.planner.plan.node.process.GroupByLevelNode; import org.apache.iotdb.db.queryengine.plan.planner.plan.node.process.GroupByTagNode; import org.apache.iotdb.db.queryengine.plan.planner.plan.node.process.HorizontallyConcatNode; @@ -81,6 +80,8 @@ import org.apache.iotdb.db.queryengine.plan.planner.plan.node.process.SlidingWin import org.apache.iotdb.db.queryengine.plan.planner.plan.node.process.SortNode; import org.apache.iotdb.db.queryengine.plan.planner.plan.node.process.TopKNode; import org.apache.iotdb.db.queryengine.plan.planner.plan.node.process.TransformNode; +import org.apache.iotdb.db.queryengine.plan.planner.plan.node.process.join.FullOuterTimeJoinNode; +import org.apache.iotdb.db.queryengine.plan.planner.plan.node.process.join.InnerTimeJoinNode; import org.apache.iotdb.db.queryengine.plan.planner.plan.node.process.last.LastQueryCollectNode; import org.apache.iotdb.db.queryengine.plan.planner.plan.node.process.last.LastQueryMergeNode; import org.apache.iotdb.db.queryengine.plan.planner.plan.node.process.last.LastQueryNode; @@ -238,6 +239,10 @@ public abstract class PlanVisitor<R, C> { return visitMultiChildProcess(node, context); } + public R visitInnerTimeJoin(InnerTimeJoinNode node, C context) { + return visitMultiChildProcess(node, context); + } + public R visitLastQuery(LastQueryNode node, C context) { return visitMultiChildProcess(node, context); } diff --git a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/planner/plan/node/process/join/FullOuterTimeJoinNode.java b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/planner/plan/node/process/join/FullOuterTimeJoinNode.java index 57134fbb2a3..8a6173a780e 100644 --- a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/planner/plan/node/process/join/FullOuterTimeJoinNode.java +++ b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/planner/plan/node/process/join/FullOuterTimeJoinNode.java @@ -88,13 +88,13 @@ public class FullOuterTimeJoinNode extends MultiChildProcessNode { @Override protected void serializeAttributes(ByteBuffer byteBuffer) { - PlanNodeType.TIME_JOIN.serialize(byteBuffer); + PlanNodeType.FULL_OUTER_TIME_JOIN.serialize(byteBuffer); ReadWriteIOUtils.write(mergeOrder.ordinal(), byteBuffer); } @Override protected void serializeAttributes(DataOutputStream stream) throws IOException { - PlanNodeType.TIME_JOIN.serialize(stream); + PlanNodeType.FULL_OUTER_TIME_JOIN.serialize(stream); ReadWriteIOUtils.write(mergeOrder.ordinal(), stream); } @@ -106,7 +106,7 @@ public class FullOuterTimeJoinNode extends MultiChildProcessNode { @Override public String toString() { - return "TimeJoinNode-" + this.getPlanNodeId(); + return "FullOuterTimeJoinNode-" + this.getPlanNodeId(); } @Override diff --git a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/planner/plan/node/process/join/FullOuterTimeJoinNode.java b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/planner/plan/node/process/join/InnerTimeJoinNode.java similarity index 76% copy from iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/planner/plan/node/process/join/FullOuterTimeJoinNode.java copy to iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/planner/plan/node/process/join/InnerTimeJoinNode.java index 57134fbb2a3..4d559843b6e 100644 --- a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/planner/plan/node/process/join/FullOuterTimeJoinNode.java +++ b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/planner/plan/node/process/join/InnerTimeJoinNode.java @@ -7,7 +7,7 @@ * "License"); you may not use this file except in compliance * with the License. You may obtain a copy of the License at * - * http://www.apache.org/licenses/LICENSE-2.0 + * http://www.apache.org/licenses/LICENSE-2.0 * * Unless required by applicable law or agreed to in writing, * software distributed under the License is distributed on an @@ -16,6 +16,7 @@ * specific language governing permissions and limitations * under the License. */ + package org.apache.iotdb.db.queryengine.plan.planner.plan.node.process.join; import org.apache.iotdb.db.queryengine.plan.planner.plan.node.PlanNode; @@ -34,23 +35,17 @@ import java.util.List; import java.util.Objects; import java.util.stream.Collectors; -/** - * This node is responsible for join two or more TsBlock. - * - * <p>The join algorithm is like <b>full outer join</b> by timestamp column. It will join two or - * more TsBlock by Timestamp column. The output result of TimeJoinOperator is sorted by timestamp. - */ -public class FullOuterTimeJoinNode extends MultiChildProcessNode { +public class InnerTimeJoinNode extends MultiChildProcessNode { // This parameter indicates the order when executing multiway merge sort. private final Ordering mergeOrder; - public FullOuterTimeJoinNode(PlanNodeId id, Ordering mergeOrder) { + public InnerTimeJoinNode(PlanNodeId id, Ordering mergeOrder) { super(id, new ArrayList<>()); this.mergeOrder = mergeOrder; } - public FullOuterTimeJoinNode(PlanNodeId id, Ordering mergeOrder, List<PlanNode> children) { + public InnerTimeJoinNode(PlanNodeId id, Ordering mergeOrder, List<PlanNode> children) { super(id, children); this.mergeOrder = mergeOrder; } @@ -61,12 +56,12 @@ public class FullOuterTimeJoinNode extends MultiChildProcessNode { @Override public PlanNode clone() { - return new FullOuterTimeJoinNode(getPlanNodeId(), getMergeOrder()); + return new InnerTimeJoinNode(getPlanNodeId(), getMergeOrder()); } @Override public PlanNode createSubNode(int subNodeId, int startIndex, int endIndex) { - return new FullOuterTimeJoinNode( + return new InnerTimeJoinNode( new PlanNodeId(String.format("%s-%s", getPlanNodeId(), subNodeId)), getMergeOrder(), new ArrayList<>(children.subList(startIndex, endIndex))); @@ -83,30 +78,30 @@ public class FullOuterTimeJoinNode extends MultiChildProcessNode { @Override public <R, C> R accept(PlanVisitor<R, C> visitor, C context) { - return visitor.visitFullOuterTimeJoin(this, context); + return visitor.visitInnerTimeJoin(this, context); } @Override protected void serializeAttributes(ByteBuffer byteBuffer) { - PlanNodeType.TIME_JOIN.serialize(byteBuffer); + PlanNodeType.INNER_TIME_JOIN.serialize(byteBuffer); ReadWriteIOUtils.write(mergeOrder.ordinal(), byteBuffer); } @Override protected void serializeAttributes(DataOutputStream stream) throws IOException { - PlanNodeType.TIME_JOIN.serialize(stream); + PlanNodeType.INNER_TIME_JOIN.serialize(stream); ReadWriteIOUtils.write(mergeOrder.ordinal(), stream); } - public static FullOuterTimeJoinNode deserialize(ByteBuffer byteBuffer) { + public static InnerTimeJoinNode deserialize(ByteBuffer byteBuffer) { Ordering mergeOrder = Ordering.values()[ReadWriteIOUtils.readInt(byteBuffer)]; PlanNodeId planNodeId = PlanNodeId.deserialize(byteBuffer); - return new FullOuterTimeJoinNode(planNodeId, mergeOrder); + return new InnerTimeJoinNode(planNodeId, mergeOrder); } @Override public String toString() { - return "TimeJoinNode-" + this.getPlanNodeId(); + return "InnerTimeJoinNode-" + this.getPlanNodeId(); } @Override @@ -120,7 +115,7 @@ public class FullOuterTimeJoinNode extends MultiChildProcessNode { if (!super.equals(o)) { return false; } - FullOuterTimeJoinNode that = (FullOuterTimeJoinNode) o; + InnerTimeJoinNode that = (InnerTimeJoinNode) o; return mergeOrder == that.mergeOrder && children.equals(that.children); }
