This is an automated email from the ASF dual-hosted git repository. hui pushed a commit to branch lmh/addJoinNode in repository https://gitbox.apache.org/repos/asf/iotdb.git
commit 8799dbca962ac4c517210246371e02ac9e053a67 Author: Minghui Liu <[email protected]> AuthorDate: Wed Dec 20 10:16:11 2023 +0800 rename TimeJoinNode to FullOuterTimeJoinNode --- .../plan/planner/LogicalPlanBuilder.java | 5 +- .../plan/planner/OperatorTreeGenerator.java | 7 +- .../planner/distribution/ExchangeNodeAdder.java | 4 +- .../plan/planner/distribution/SourceRewriter.java | 25 +++-- .../plan/planner/plan/node/PlanGraphPrinter.java | 4 +- .../plan/planner/plan/node/PlanNodeType.java | 4 +- .../plan/planner/plan/node/PlanVisitor.java | 4 +- ...imeJoinNode.java => FullOuterTimeJoinNode.java} | 25 ++--- .../plan/optimization/TestPlanBuilder.java | 8 +- .../plan/plan/FragmentInstanceSerdeTest.java | 13 +-- .../queryengine/plan/plan/PipelineBuilderTest.java | 113 ++++++++++++--------- .../plan/plan/QueryLogicalPlanUtil.java | 62 +++++------ .../distribution/AggregationDistributionTest.java | 27 +++-- .../AlignByDeviceOrderByLimitOffsetTest.java | 4 +- .../plan/distribution/AlignedByDeviceTest.java | 62 +++++------ .../plan/node/process/DeviceViewNodeSerdeTest.java | 12 ++- .../plan/node/process/ExchangeNodeSerdeTest.java | 7 +- .../plan/plan/node/process/FillNodeSerdeTest.java | 7 +- .../plan/node/process/FilterNodeSerdeTest.java | 7 +- ...st.java => FullOuterTimeJoinNodeSerdeTest.java} | 15 +-- 20 files changed, 228 insertions(+), 187 deletions(-) diff --git a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/planner/LogicalPlanBuilder.java b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/planner/LogicalPlanBuilder.java index 047fab98504..f660307fa1b 100644 --- a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/planner/LogicalPlanBuilder.java +++ b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/planner/LogicalPlanBuilder.java @@ -60,6 +60,7 @@ import org.apache.iotdb.db.queryengine.plan.planner.plan.node.process.DeviceView import org.apache.iotdb.db.queryengine.plan.planner.plan.node.process.DeviceViewNode; import org.apache.iotdb.db.queryengine.plan.planner.plan.node.process.FillNode; import org.apache.iotdb.db.queryengine.plan.planner.plan.node.process.FilterNode; +import org.apache.iotdb.db.queryengine.plan.planner.plan.node.process.FullOuterTimeJoinNode; import org.apache.iotdb.db.queryengine.plan.planner.plan.node.process.GroupByLevelNode; import org.apache.iotdb.db.queryengine.plan.planner.plan.node.process.GroupByTagNode; import org.apache.iotdb.db.queryengine.plan.planner.plan.node.process.IntoNode; @@ -70,7 +71,6 @@ import org.apache.iotdb.db.queryengine.plan.planner.plan.node.process.OffsetNode import org.apache.iotdb.db.queryengine.plan.planner.plan.node.process.SingleDeviceViewNode; import org.apache.iotdb.db.queryengine.plan.planner.plan.node.process.SlidingWindowAggregationNode; import org.apache.iotdb.db.queryengine.plan.planner.plan.node.process.SortNode; -import org.apache.iotdb.db.queryengine.plan.planner.plan.node.process.TimeJoinNode; import org.apache.iotdb.db.queryengine.plan.planner.plan.node.process.TopKNode; import org.apache.iotdb.db.queryengine.plan.planner.plan.node.process.TransformNode; import org.apache.iotdb.db.queryengine.plan.planner.plan.node.process.last.LastQueryNode; @@ -728,7 +728,8 @@ public class LogicalPlanBuilder { if (sourceNodes.size() == 1) { tmpNode = sourceNodes.get(0); } else { - tmpNode = new TimeJoinNode(context.getQueryId().genPlanNodeId(), mergeOrder, sourceNodes); + tmpNode = + new FullOuterTimeJoinNode(context.getQueryId().genPlanNodeId(), mergeOrder, sourceNodes); } return tmpNode; } diff --git a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/planner/OperatorTreeGenerator.java b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/planner/OperatorTreeGenerator.java index e184bf8ff18..c78ddf287aa 100644 --- a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/planner/OperatorTreeGenerator.java +++ b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/planner/OperatorTreeGenerator.java @@ -171,6 +171,7 @@ import org.apache.iotdb.db.queryengine.plan.planner.plan.node.process.DeviceView import org.apache.iotdb.db.queryengine.plan.planner.plan.node.process.ExchangeNode; import org.apache.iotdb.db.queryengine.plan.planner.plan.node.process.FillNode; import org.apache.iotdb.db.queryengine.plan.planner.plan.node.process.FilterNode; +import org.apache.iotdb.db.queryengine.plan.planner.plan.node.process.FullOuterTimeJoinNode; import org.apache.iotdb.db.queryengine.plan.planner.plan.node.process.GroupByLevelNode; import org.apache.iotdb.db.queryengine.plan.planner.plan.node.process.GroupByTagNode; import org.apache.iotdb.db.queryengine.plan.planner.plan.node.process.HorizontallyConcatNode; @@ -182,7 +183,6 @@ import org.apache.iotdb.db.queryengine.plan.planner.plan.node.process.OffsetNode import org.apache.iotdb.db.queryengine.plan.planner.plan.node.process.SingleDeviceViewNode; import org.apache.iotdb.db.queryengine.plan.planner.plan.node.process.SlidingWindowAggregationNode; import org.apache.iotdb.db.queryengine.plan.planner.plan.node.process.SortNode; -import org.apache.iotdb.db.queryengine.plan.planner.plan.node.process.TimeJoinNode; import org.apache.iotdb.db.queryengine.plan.planner.plan.node.process.TopKNode; import org.apache.iotdb.db.queryengine.plan.planner.plan.node.process.TransformNode; import org.apache.iotdb.db.queryengine.plan.planner.plan.node.process.last.LastQueryCollectNode; @@ -1871,7 +1871,8 @@ public class OperatorTreeGenerator extends PlanVisitor<Operator, LocalExecutionP @Deprecated @Override - public Operator visitTimeJoin(TimeJoinNode node, LocalExecutionPlanContext context) { + public Operator visitFullOuterTimeJoin( + FullOuterTimeJoinNode node, LocalExecutionPlanContext context) { List<Operator> children = dealWithConsumeAllChildrenPipelineBreaker(node, context); OperatorContext operatorContext = context @@ -2535,7 +2536,7 @@ public class OperatorTreeGenerator extends PlanVisitor<Operator, LocalExecutionP dataTypes.add(((SeriesScanNode) child).getSeriesPath().getSeriesType()); } else if (child instanceof AlignedSeriesScanNode) { dataTypes.add(((AlignedSeriesScanNode) child).getAlignedPath().getSeriesType()); - } else if (child instanceof TimeJoinNode) { + } else if (child instanceof FullOuterTimeJoinNode) { dataTypes.addAll(getOutputColumnTypesOfTimeJoinNode(child)); } else { LOGGER.error( diff --git a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/planner/distribution/ExchangeNodeAdder.java b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/planner/distribution/ExchangeNodeAdder.java index e95aa68720c..c159ae65fdf 100644 --- a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/planner/distribution/ExchangeNodeAdder.java +++ b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/planner/distribution/ExchangeNodeAdder.java @@ -37,6 +37,7 @@ import org.apache.iotdb.db.queryengine.plan.planner.plan.node.process.DeviceMerg import org.apache.iotdb.db.queryengine.plan.planner.plan.node.process.DeviceViewNode; import org.apache.iotdb.db.queryengine.plan.planner.plan.node.process.ExchangeNode; import org.apache.iotdb.db.queryengine.plan.planner.plan.node.process.FilterNode; +import org.apache.iotdb.db.queryengine.plan.planner.plan.node.process.FullOuterTimeJoinNode; import org.apache.iotdb.db.queryengine.plan.planner.plan.node.process.GroupByLevelNode; import org.apache.iotdb.db.queryengine.plan.planner.plan.node.process.GroupByTagNode; import org.apache.iotdb.db.queryengine.plan.planner.plan.node.process.HorizontallyConcatNode; @@ -46,7 +47,6 @@ import org.apache.iotdb.db.queryengine.plan.planner.plan.node.process.MultiChild import org.apache.iotdb.db.queryengine.plan.planner.plan.node.process.SingleDeviceViewNode; import org.apache.iotdb.db.queryengine.plan.planner.plan.node.process.SlidingWindowAggregationNode; import org.apache.iotdb.db.queryengine.plan.planner.plan.node.process.SortNode; -import org.apache.iotdb.db.queryengine.plan.planner.plan.node.process.TimeJoinNode; import org.apache.iotdb.db.queryengine.plan.planner.plan.node.process.TopKNode; import org.apache.iotdb.db.queryengine.plan.planner.plan.node.process.TransformNode; import org.apache.iotdb.db.queryengine.plan.planner.plan.node.process.last.LastQueryCollectNode; @@ -238,7 +238,7 @@ public class ExchangeNodeAdder extends PlanVisitor<PlanNode, NodeGroupContext> { } @Override - public PlanNode visitTimeJoin(TimeJoinNode node, NodeGroupContext context) { + public PlanNode visitFullOuterTimeJoin(FullOuterTimeJoinNode node, NodeGroupContext context) { return processMultiChildNode(node, context); } diff --git a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/planner/distribution/SourceRewriter.java b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/planner/distribution/SourceRewriter.java index df0dc913d04..f6cca28c036 100644 --- a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/planner/distribution/SourceRewriter.java +++ b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/planner/distribution/SourceRewriter.java @@ -38,6 +38,7 @@ import org.apache.iotdb.db.queryengine.plan.planner.plan.node.metedata.read.Sche import org.apache.iotdb.db.queryengine.plan.planner.plan.node.metedata.read.SchemaQueryScanNode; import org.apache.iotdb.db.queryengine.plan.planner.plan.node.process.AggregationNode; import org.apache.iotdb.db.queryengine.plan.planner.plan.node.process.DeviceViewNode; +import org.apache.iotdb.db.queryengine.plan.planner.plan.node.process.FullOuterTimeJoinNode; import org.apache.iotdb.db.queryengine.plan.planner.plan.node.process.GroupByLevelNode; import org.apache.iotdb.db.queryengine.plan.planner.plan.node.process.GroupByTagNode; import org.apache.iotdb.db.queryengine.plan.planner.plan.node.process.HorizontallyConcatNode; @@ -47,7 +48,6 @@ import org.apache.iotdb.db.queryengine.plan.planner.plan.node.process.MultiChild import org.apache.iotdb.db.queryengine.plan.planner.plan.node.process.SingleDeviceViewNode; import org.apache.iotdb.db.queryengine.plan.planner.plan.node.process.SlidingWindowAggregationNode; import org.apache.iotdb.db.queryengine.plan.planner.plan.node.process.SortNode; -import org.apache.iotdb.db.queryengine.plan.planner.plan.node.process.TimeJoinNode; import org.apache.iotdb.db.queryengine.plan.planner.plan.node.process.last.LastQueryCollectNode; import org.apache.iotdb.db.queryengine.plan.planner.plan.node.process.last.LastQueryMergeNode; import org.apache.iotdb.db.queryengine.plan.planner.plan.node.process.last.LastQueryNode; @@ -462,17 +462,19 @@ public class SourceRewriter extends SimplePlanNodeRewriter<DistributionPlanConte // TODO: (xingtanzjr) a temporary way to resolve the distribution of single SeriesScanNode issue @Override public List<PlanNode> visitSeriesScan(SeriesScanNode node, DistributionPlanContext context) { - TimeJoinNode timeJoinNode = - new TimeJoinNode(context.queryContext.getQueryId().genPlanNodeId(), node.getScanOrder()); - return processRawSeriesScan(node, context, timeJoinNode); + FullOuterTimeJoinNode fullOuterTimeJoinNode = + new FullOuterTimeJoinNode( + context.queryContext.getQueryId().genPlanNodeId(), node.getScanOrder()); + return processRawSeriesScan(node, context, fullOuterTimeJoinNode); } @Override public List<PlanNode> visitAlignedSeriesScan( AlignedSeriesScanNode node, DistributionPlanContext context) { - TimeJoinNode timeJoinNode = - new TimeJoinNode(context.queryContext.getQueryId().genPlanNodeId(), node.getScanOrder()); - return processRawSeriesScan(node, context, timeJoinNode); + FullOuterTimeJoinNode fullOuterTimeJoinNode = + new FullOuterTimeJoinNode( + context.queryContext.getQueryId().genPlanNodeId(), node.getScanOrder()); + return processRawSeriesScan(node, context, fullOuterTimeJoinNode); } @Override @@ -683,7 +685,8 @@ public class SourceRewriter extends SimplePlanNodeRewriter<DistributionPlanConte } @Override - public List<PlanNode> visitTimeJoin(TimeJoinNode node, DistributionPlanContext context) { + public List<PlanNode> visitFullOuterTimeJoin( + FullOuterTimeJoinNode node, DistributionPlanContext context) { // Although some logic is similar between Aggregation and RawDataQuery, // we still use separate method to process the distribution planning now // to make the planning procedure more clear @@ -693,7 +696,7 @@ public class SourceRewriter extends SimplePlanNodeRewriter<DistributionPlanConte return Collections.singletonList(processRawMultiChildNode(node, context, true)); } - // Only `visitTimeJoin` and `visitLastQuery` invoke this method + // Only `visitFullOuterTimeJoin` and `visitLastQuery` invoke this method private PlanNode processRawMultiChildNode( MultiChildProcessNode node, DistributionPlanContext context, boolean isTimeJoin) { MultiChildProcessNode root = (MultiChildProcessNode) node.clone(); @@ -785,7 +788,7 @@ public class SourceRewriter extends SimplePlanNodeRewriter<DistributionPlanConte return sourceGroup; } - private boolean containsAggregationSource(TimeJoinNode node) { + private boolean containsAggregationSource(FullOuterTimeJoinNode node) { for (PlanNode child : node.getChildren()) { if (child instanceof SeriesAggregationScanNode || child instanceof AlignedSeriesAggregationScanNode) { @@ -807,7 +810,7 @@ public class SourceRewriter extends SimplePlanNodeRewriter<DistributionPlanConte } private List<PlanNode> planAggregationWithTimeJoin( - TimeJoinNode root, DistributionPlanContext context) { + FullOuterTimeJoinNode root, DistributionPlanContext context) { Map<TRegionReplicaSet, List<SeriesAggregationSourceNode>> sourceGroup; // construct newRoot diff --git a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/planner/plan/node/PlanGraphPrinter.java b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/planner/plan/node/PlanGraphPrinter.java index ef103fdb730..b2f0fe9d23d 100644 --- a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/planner/plan/node/PlanGraphPrinter.java +++ b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/planner/plan/node/PlanGraphPrinter.java @@ -31,6 +31,7 @@ import org.apache.iotdb.db.queryengine.plan.planner.plan.node.process.DeviceView import org.apache.iotdb.db.queryengine.plan.planner.plan.node.process.ExchangeNode; import org.apache.iotdb.db.queryengine.plan.planner.plan.node.process.FillNode; import org.apache.iotdb.db.queryengine.plan.planner.plan.node.process.FilterNode; +import org.apache.iotdb.db.queryengine.plan.planner.plan.node.process.FullOuterTimeJoinNode; import org.apache.iotdb.db.queryengine.plan.planner.plan.node.process.GroupByLevelNode; import org.apache.iotdb.db.queryengine.plan.planner.plan.node.process.GroupByTagNode; import org.apache.iotdb.db.queryengine.plan.planner.plan.node.process.HorizontallyConcatNode; @@ -41,7 +42,6 @@ import org.apache.iotdb.db.queryengine.plan.planner.plan.node.process.OffsetNode import org.apache.iotdb.db.queryengine.plan.planner.plan.node.process.SingleDeviceViewNode; import org.apache.iotdb.db.queryengine.plan.planner.plan.node.process.SlidingWindowAggregationNode; import org.apache.iotdb.db.queryengine.plan.planner.plan.node.process.SortNode; -import org.apache.iotdb.db.queryengine.plan.planner.plan.node.process.TimeJoinNode; import org.apache.iotdb.db.queryengine.plan.planner.plan.node.process.TopKNode; import org.apache.iotdb.db.queryengine.plan.planner.plan.node.process.TransformNode; import org.apache.iotdb.db.queryengine.plan.planner.plan.node.process.last.LastQueryCollectNode; @@ -326,7 +326,7 @@ public class PlanGraphPrinter extends PlanVisitor<List<String>, PlanGraphPrinter } @Override - public List<String> visitTimeJoin(TimeJoinNode node, GraphContext context) { + public List<String> visitFullOuterTimeJoin(FullOuterTimeJoinNode node, GraphContext context) { List<String> boxValue = new ArrayList<>(); boxValue.add(String.format("TimeJoin-%s", node.getPlanNodeId().getId())); boxValue.add(String.format("Order: %s", node.getMergeOrder())); diff --git a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/planner/plan/node/PlanNodeType.java b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/planner/plan/node/PlanNodeType.java index a17ea3493fd..56621c04ac2 100644 --- a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/planner/plan/node/PlanNodeType.java +++ b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/planner/plan/node/PlanNodeType.java @@ -68,6 +68,7 @@ import org.apache.iotdb.db.queryengine.plan.planner.plan.node.process.DeviceView import org.apache.iotdb.db.queryengine.plan.planner.plan.node.process.ExchangeNode; import org.apache.iotdb.db.queryengine.plan.planner.plan.node.process.FillNode; import org.apache.iotdb.db.queryengine.plan.planner.plan.node.process.FilterNode; +import org.apache.iotdb.db.queryengine.plan.planner.plan.node.process.FullOuterTimeJoinNode; import org.apache.iotdb.db.queryengine.plan.planner.plan.node.process.GroupByLevelNode; import org.apache.iotdb.db.queryengine.plan.planner.plan.node.process.GroupByTagNode; import org.apache.iotdb.db.queryengine.plan.planner.plan.node.process.HorizontallyConcatNode; @@ -79,7 +80,6 @@ import org.apache.iotdb.db.queryengine.plan.planner.plan.node.process.ProjectNod import org.apache.iotdb.db.queryengine.plan.planner.plan.node.process.SingleDeviceViewNode; import org.apache.iotdb.db.queryengine.plan.planner.plan.node.process.SlidingWindowAggregationNode; import org.apache.iotdb.db.queryengine.plan.planner.plan.node.process.SortNode; -import org.apache.iotdb.db.queryengine.plan.planner.plan.node.process.TimeJoinNode; import org.apache.iotdb.db.queryengine.plan.planner.plan.node.process.TopKNode; import org.apache.iotdb.db.queryengine.plan.planner.plan.node.process.TransformNode; import org.apache.iotdb.db.queryengine.plan.planner.plan.node.process.last.LastQueryCollectNode; @@ -270,7 +270,7 @@ public enum PlanNodeType { case 8: return SortNode.deserialize(buffer); case 9: - return TimeJoinNode.deserialize(buffer); + return FullOuterTimeJoinNode.deserialize(buffer); case 11: return SeriesScanNode.deserialize(buffer); case 12: diff --git a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/planner/plan/node/PlanVisitor.java b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/planner/plan/node/PlanVisitor.java index e8f6b149f0b..ff8cc616e8e 100644 --- a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/planner/plan/node/PlanVisitor.java +++ b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/planner/plan/node/PlanVisitor.java @@ -65,6 +65,7 @@ import org.apache.iotdb.db.queryengine.plan.planner.plan.node.process.DeviceView import org.apache.iotdb.db.queryengine.plan.planner.plan.node.process.ExchangeNode; import org.apache.iotdb.db.queryengine.plan.planner.plan.node.process.FillNode; import org.apache.iotdb.db.queryengine.plan.planner.plan.node.process.FilterNode; +import org.apache.iotdb.db.queryengine.plan.planner.plan.node.process.FullOuterTimeJoinNode; import org.apache.iotdb.db.queryengine.plan.planner.plan.node.process.GroupByLevelNode; import org.apache.iotdb.db.queryengine.plan.planner.plan.node.process.GroupByTagNode; import org.apache.iotdb.db.queryengine.plan.planner.plan.node.process.HorizontallyConcatNode; @@ -78,7 +79,6 @@ import org.apache.iotdb.db.queryengine.plan.planner.plan.node.process.SingleChil import org.apache.iotdb.db.queryengine.plan.planner.plan.node.process.SingleDeviceViewNode; import org.apache.iotdb.db.queryengine.plan.planner.plan.node.process.SlidingWindowAggregationNode; import org.apache.iotdb.db.queryengine.plan.planner.plan.node.process.SortNode; -import org.apache.iotdb.db.queryengine.plan.planner.plan.node.process.TimeJoinNode; import org.apache.iotdb.db.queryengine.plan.planner.plan.node.process.TopKNode; import org.apache.iotdb.db.queryengine.plan.planner.plan.node.process.TransformNode; import org.apache.iotdb.db.queryengine.plan.planner.plan.node.process.last.LastQueryCollectNode; @@ -234,7 +234,7 @@ public abstract class PlanVisitor<R, C> { return visitMultiChildProcess(node, context); } - public R visitTimeJoin(TimeJoinNode node, C context) { + public R visitFullOuterTimeJoin(FullOuterTimeJoinNode node, C context) { return visitMultiChildProcess(node, context); } diff --git a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/planner/plan/node/process/TimeJoinNode.java b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/planner/plan/node/process/FullOuterTimeJoinNode.java similarity index 80% rename from iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/planner/plan/node/process/TimeJoinNode.java rename to iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/planner/plan/node/process/FullOuterTimeJoinNode.java index b6375d6b0f9..313fccc845e 100644 --- a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/planner/plan/node/process/TimeJoinNode.java +++ b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/planner/plan/node/process/FullOuterTimeJoinNode.java @@ -34,21 +34,22 @@ import java.util.Objects; import java.util.stream.Collectors; /** - * This node is responsible for join two or more TsBlock. The join algorithm is like outer join by - * timestamp column. It will join two or more TsBlock by Timestamp column. The output result of - * TimeJoinOperator is sorted by timestamp + * This node is responsible for join two or more TsBlock. + * + * <p>The join algorithm is like <b>full outer join</b> by timestamp column. It will join two or + * more TsBlock by Timestamp column. The output result of TimeJoinOperator is sorted by timestamp. */ -public class TimeJoinNode extends MultiChildProcessNode { +public class FullOuterTimeJoinNode extends MultiChildProcessNode { // This parameter indicates the order when executing multiway merge sort. private final Ordering mergeOrder; - public TimeJoinNode(PlanNodeId id, Ordering mergeOrder) { + public FullOuterTimeJoinNode(PlanNodeId id, Ordering mergeOrder) { super(id, new ArrayList<>()); this.mergeOrder = mergeOrder; } - public TimeJoinNode(PlanNodeId id, Ordering mergeOrder, List<PlanNode> children) { + public FullOuterTimeJoinNode(PlanNodeId id, Ordering mergeOrder, List<PlanNode> children) { super(id, children); this.mergeOrder = mergeOrder; } @@ -59,12 +60,12 @@ public class TimeJoinNode extends MultiChildProcessNode { @Override public PlanNode clone() { - return new TimeJoinNode(getPlanNodeId(), getMergeOrder()); + return new FullOuterTimeJoinNode(getPlanNodeId(), getMergeOrder()); } @Override public PlanNode createSubNode(int subNodeId, int startIndex, int endIndex) { - return new TimeJoinNode( + return new FullOuterTimeJoinNode( new PlanNodeId(String.format("%s-%s", getPlanNodeId(), subNodeId)), getMergeOrder(), new ArrayList<>(children.subList(startIndex, endIndex))); @@ -81,7 +82,7 @@ public class TimeJoinNode extends MultiChildProcessNode { @Override public <R, C> R accept(PlanVisitor<R, C> visitor, C context) { - return visitor.visitTimeJoin(this, context); + return visitor.visitFullOuterTimeJoin(this, context); } @Override @@ -96,10 +97,10 @@ public class TimeJoinNode extends MultiChildProcessNode { ReadWriteIOUtils.write(mergeOrder.ordinal(), stream); } - public static TimeJoinNode deserialize(ByteBuffer byteBuffer) { + public static FullOuterTimeJoinNode deserialize(ByteBuffer byteBuffer) { Ordering mergeOrder = Ordering.values()[ReadWriteIOUtils.readInt(byteBuffer)]; PlanNodeId planNodeId = PlanNodeId.deserialize(byteBuffer); - return new TimeJoinNode(planNodeId, mergeOrder); + return new FullOuterTimeJoinNode(planNodeId, mergeOrder); } @Override @@ -118,7 +119,7 @@ public class TimeJoinNode extends MultiChildProcessNode { if (!super.equals(o)) { return false; } - TimeJoinNode that = (TimeJoinNode) o; + FullOuterTimeJoinNode that = (FullOuterTimeJoinNode) o; return mergeOrder == that.mergeOrder && children.equals(that.children); } diff --git a/iotdb-core/datanode/src/test/java/org/apache/iotdb/db/queryengine/plan/optimization/TestPlanBuilder.java b/iotdb-core/datanode/src/test/java/org/apache/iotdb/db/queryengine/plan/optimization/TestPlanBuilder.java index c3684bc9140..5c92904f6f5 100644 --- a/iotdb-core/datanode/src/test/java/org/apache/iotdb/db/queryengine/plan/optimization/TestPlanBuilder.java +++ b/iotdb-core/datanode/src/test/java/org/apache/iotdb/db/queryengine/plan/optimization/TestPlanBuilder.java @@ -30,11 +30,11 @@ import org.apache.iotdb.db.queryengine.plan.planner.plan.node.process.ColumnInje import org.apache.iotdb.db.queryengine.plan.planner.plan.node.process.DeviceViewNode; import org.apache.iotdb.db.queryengine.plan.planner.plan.node.process.FillNode; import org.apache.iotdb.db.queryengine.plan.planner.plan.node.process.FilterNode; +import org.apache.iotdb.db.queryengine.plan.planner.plan.node.process.FullOuterTimeJoinNode; import org.apache.iotdb.db.queryengine.plan.planner.plan.node.process.IntoNode; import org.apache.iotdb.db.queryengine.plan.planner.plan.node.process.LimitNode; import org.apache.iotdb.db.queryengine.plan.planner.plan.node.process.OffsetNode; import org.apache.iotdb.db.queryengine.plan.planner.plan.node.process.SlidingWindowAggregationNode; -import org.apache.iotdb.db.queryengine.plan.planner.plan.node.process.TimeJoinNode; import org.apache.iotdb.db.queryengine.plan.planner.plan.node.process.TransformNode; import org.apache.iotdb.db.queryengine.plan.planner.plan.node.source.AlignedSeriesAggregationScanNode; import org.apache.iotdb.db.queryengine.plan.planner.plan.node.source.AlignedSeriesScanNode; @@ -199,7 +199,8 @@ public class TestPlanBuilder { } this.root = - new TimeJoinNode(new PlanNodeId(String.valueOf(planId)), Ordering.ASC, seriesSourceNodes); + new FullOuterTimeJoinNode( + new PlanNodeId(String.valueOf(planId)), Ordering.ASC, seriesSourceNodes); return this; } @@ -213,7 +214,8 @@ public class TestPlanBuilder { planId++; } this.root = - new TimeJoinNode(new PlanNodeId(String.valueOf(planId)), Ordering.ASC, seriesSourceNodes); + new FullOuterTimeJoinNode( + new PlanNodeId(String.valueOf(planId)), Ordering.ASC, seriesSourceNodes); return this; } diff --git a/iotdb-core/datanode/src/test/java/org/apache/iotdb/db/queryengine/plan/plan/FragmentInstanceSerdeTest.java b/iotdb-core/datanode/src/test/java/org/apache/iotdb/db/queryengine/plan/plan/FragmentInstanceSerdeTest.java index add289658ec..cde9c9fec06 100644 --- a/iotdb-core/datanode/src/test/java/org/apache/iotdb/db/queryengine/plan/plan/FragmentInstanceSerdeTest.java +++ b/iotdb-core/datanode/src/test/java/org/apache/iotdb/db/queryengine/plan/plan/FragmentInstanceSerdeTest.java @@ -37,9 +37,9 @@ import org.apache.iotdb.db.queryengine.plan.planner.plan.FragmentInstance; import org.apache.iotdb.db.queryengine.plan.planner.plan.PlanFragment; import org.apache.iotdb.db.queryengine.plan.planner.plan.node.PlanNode; import org.apache.iotdb.db.queryengine.plan.planner.plan.node.PlanNodeId; +import org.apache.iotdb.db.queryengine.plan.planner.plan.node.process.FullOuterTimeJoinNode; import org.apache.iotdb.db.queryengine.plan.planner.plan.node.process.LimitNode; import org.apache.iotdb.db.queryengine.plan.planner.plan.node.process.OffsetNode; -import org.apache.iotdb.db.queryengine.plan.planner.plan.node.process.TimeJoinNode; import org.apache.iotdb.db.queryengine.plan.planner.plan.node.source.SeriesScanNode; import org.apache.iotdb.db.queryengine.plan.statement.component.Ordering; @@ -136,7 +136,8 @@ public class FragmentInstanceSerdeTest { OffsetNode offsetNode = new OffsetNode(new PlanNodeId("OffsetNode"), 100); LimitNode limitNode = new LimitNode(new PlanNodeId("LimitNode"), 100); - TimeJoinNode timeJoinNode = new TimeJoinNode(new PlanNodeId("TimeJoinNode"), Ordering.DESC); + FullOuterTimeJoinNode fullOuterTimeJoinNode = + new FullOuterTimeJoinNode(new PlanNodeId("TimeJoinNode"), Ordering.DESC); SeriesScanNode seriesScanNode1 = new SeriesScanNode(new PlanNodeId("SeriesScanNode1"), new MeasurementPath("root.sg.d1.s2")); seriesScanNode1.setScanOrder(Ordering.DESC); @@ -148,10 +149,10 @@ public class FragmentInstanceSerdeTest { seriesScanNode3.setScanOrder(Ordering.DESC); // build tree - timeJoinNode.addChild(seriesScanNode1); - timeJoinNode.addChild(seriesScanNode2); - timeJoinNode.addChild(seriesScanNode3); - limitNode.addChild(timeJoinNode); + fullOuterTimeJoinNode.addChild(seriesScanNode1); + fullOuterTimeJoinNode.addChild(seriesScanNode2); + fullOuterTimeJoinNode.addChild(seriesScanNode3); + limitNode.addChild(fullOuterTimeJoinNode); offsetNode.addChild(limitNode); return offsetNode; diff --git a/iotdb-core/datanode/src/test/java/org/apache/iotdb/db/queryengine/plan/plan/PipelineBuilderTest.java b/iotdb-core/datanode/src/test/java/org/apache/iotdb/db/queryengine/plan/plan/PipelineBuilderTest.java index d07ea63d43c..8fb2e204a48 100644 --- a/iotdb-core/datanode/src/test/java/org/apache/iotdb/db/queryengine/plan/plan/PipelineBuilderTest.java +++ b/iotdb-core/datanode/src/test/java/org/apache/iotdb/db/queryengine/plan/plan/PipelineBuilderTest.java @@ -46,8 +46,8 @@ import org.apache.iotdb.db.queryengine.plan.planner.plan.node.PlanNodeId; import org.apache.iotdb.db.queryengine.plan.planner.plan.node.process.AggregationNode; import org.apache.iotdb.db.queryengine.plan.planner.plan.node.process.DeviceViewNode; import org.apache.iotdb.db.queryengine.plan.planner.plan.node.process.ExchangeNode; +import org.apache.iotdb.db.queryengine.plan.planner.plan.node.process.FullOuterTimeJoinNode; import org.apache.iotdb.db.queryengine.plan.planner.plan.node.process.SingleDeviceViewNode; -import org.apache.iotdb.db.queryengine.plan.planner.plan.node.process.TimeJoinNode; import org.apache.iotdb.db.queryengine.plan.planner.plan.node.process.TopKNode; import org.apache.iotdb.db.queryengine.plan.planner.plan.node.source.AlignedSeriesScanNode; import org.apache.iotdb.db.queryengine.plan.planner.plan.node.source.SeriesAggregationScanNode; @@ -89,21 +89,22 @@ public class PipelineBuilderTest { @Test public void testConsumeAllChildrenPipelineBuilder1() throws IllegalPathException { TypeProvider typeProvider = new TypeProvider(); - TimeJoinNode timeJoinNode = initTimeJoinNode(typeProvider, 4); + FullOuterTimeJoinNode fullOuterTimeJoinNode = initTimeJoinNode(typeProvider, 4); LocalExecutionPlanContext context = createLocalExecutionPlanContext(typeProvider); context.setDegreeOfParallelism(1); List<Operator> childrenOperator = - operatorTreeGenerator.dealWithConsumeAllChildrenPipelineBreaker(timeJoinNode, context); + operatorTreeGenerator.dealWithConsumeAllChildrenPipelineBreaker( + fullOuterTimeJoinNode, context); assertEquals(0, context.getPipelineNumber()); assertEquals(4, childrenOperator.size()); - assertEquals(4, timeJoinNode.getChildren().size()); + assertEquals(4, fullOuterTimeJoinNode.getChildren().size()); for (int i = 0; i < 4; i++) { assertEquals(SeriesScanOperator.class, childrenOperator.get(i).getClass()); - assertEquals(SeriesScanNode.class, timeJoinNode.getChildren().get(i).getClass()); + assertEquals(SeriesScanNode.class, fullOuterTimeJoinNode.getChildren().get(i).getClass()); assertEquals( String.format("root.sg.d%d.s1", i), - timeJoinNode.getChildren().get(i).getOutputColumnNames().get(0)); + fullOuterTimeJoinNode.getChildren().get(i).getOutputColumnNames().get(0)); } // Validate the number exchange operator @@ -120,36 +121,43 @@ public class PipelineBuilderTest { @Test public void testConsumeAllChildrenPipelineBuilder2() throws IllegalPathException { TypeProvider typeProvider = new TypeProvider(); - TimeJoinNode timeJoinNode = initTimeJoinNode(typeProvider, 4); + FullOuterTimeJoinNode fullOuterTimeJoinNode = initTimeJoinNode(typeProvider, 4); LocalExecutionPlanContext context = createLocalExecutionPlanContext(typeProvider); context.setDegreeOfParallelism(2); List<Operator> childrenOperator = - operatorTreeGenerator.dealWithConsumeAllChildrenPipelineBreaker(timeJoinNode, context); + operatorTreeGenerator.dealWithConsumeAllChildrenPipelineBreaker( + fullOuterTimeJoinNode, context); // The number of pipeline is 1, since parent pipeline hasn't joined assertEquals(1, context.getPipelineNumber()); // Validate the first pipeline assertEquals(3, childrenOperator.size()); - assertEquals(3, timeJoinNode.getChildren().size()); + assertEquals(3, fullOuterTimeJoinNode.getChildren().size()); for (int i = 0; i < 2; i++) { assertEquals(SeriesScanOperator.class, childrenOperator.get(i).getClass()); - assertEquals(SeriesScanNode.class, timeJoinNode.getChildren().get(i).getClass()); + assertEquals(SeriesScanNode.class, fullOuterTimeJoinNode.getChildren().get(i).getClass()); } assertEquals(ExchangeOperator.class, childrenOperator.get(2).getClass()); // Validate the changes of node structure - assertEquals("root.sg.d0.s1", timeJoinNode.getChildren().get(0).getOutputColumnNames().get(0)); - assertEquals("root.sg.d1.s1", timeJoinNode.getChildren().get(1).getOutputColumnNames().get(0)); - assertEquals(TimeJoinNode.class, timeJoinNode.getChildren().get(2).getClass()); + assertEquals( + "root.sg.d0.s1", fullOuterTimeJoinNode.getChildren().get(0).getOutputColumnNames().get(0)); + assertEquals( + "root.sg.d1.s1", fullOuterTimeJoinNode.getChildren().get(1).getOutputColumnNames().get(0)); + assertEquals( + FullOuterTimeJoinNode.class, fullOuterTimeJoinNode.getChildren().get(2).getClass()); // Validate the second pipeline - TimeJoinNode subTimeJoinNode = (TimeJoinNode) timeJoinNode.getChildren().get(2); - assertEquals(2, subTimeJoinNode.getChildren().size()); + FullOuterTimeJoinNode subFullOuterTimeJoinNode = + (FullOuterTimeJoinNode) fullOuterTimeJoinNode.getChildren().get(2); + assertEquals(2, subFullOuterTimeJoinNode.getChildren().size()); assertEquals( - "root.sg.d2.s1", subTimeJoinNode.getChildren().get(0).getOutputColumnNames().get(0)); + "root.sg.d2.s1", + subFullOuterTimeJoinNode.getChildren().get(0).getOutputColumnNames().get(0)); assertEquals( - "root.sg.d3.s1", subTimeJoinNode.getChildren().get(1).getOutputColumnNames().get(0)); + "root.sg.d3.s1", + subFullOuterTimeJoinNode.getChildren().get(1).getOutputColumnNames().get(0)); // Validate the number exchange operator assertEquals(1, context.getExchangeSumNum()); @@ -167,12 +175,13 @@ public class PipelineBuilderTest { @Test public void testConsumeAllChildrenPipelineBuilder3() throws IllegalPathException { TypeProvider typeProvider = new TypeProvider(); - TimeJoinNode timeJoinNode = initTimeJoinNode(typeProvider, 4); + FullOuterTimeJoinNode fullOuterTimeJoinNode = initTimeJoinNode(typeProvider, 4); LocalExecutionPlanContext context = createLocalExecutionPlanContext(typeProvider); context.setDegreeOfParallelism(3); List<Operator> childrenOperator = - operatorTreeGenerator.dealWithConsumeAllChildrenPipelineBreaker(timeJoinNode, context); + operatorTreeGenerator.dealWithConsumeAllChildrenPipelineBreaker( + fullOuterTimeJoinNode, context); // The number of pipeline is 2, since parent pipeline hasn't joined assertEquals(2, context.getPipelineNumber()); @@ -183,24 +192,30 @@ public class PipelineBuilderTest { assertEquals(ExchangeOperator.class, childrenOperator.get(2).getClass()); // Validate the changes of node structure - assertEquals(3, timeJoinNode.getChildren().size()); - assertEquals("root.sg.d0.s1", timeJoinNode.getChildren().get(0).getOutputColumnNames().get(0)); - assertEquals("root.sg.d1.s1", timeJoinNode.getChildren().get(1).getOutputColumnNames().get(0)); - assertEquals(TimeJoinNode.class, timeJoinNode.getChildren().get(2).getClass()); + assertEquals(3, fullOuterTimeJoinNode.getChildren().size()); + assertEquals( + "root.sg.d0.s1", fullOuterTimeJoinNode.getChildren().get(0).getOutputColumnNames().get(0)); + assertEquals( + "root.sg.d1.s1", fullOuterTimeJoinNode.getChildren().get(1).getOutputColumnNames().get(0)); + assertEquals( + FullOuterTimeJoinNode.class, fullOuterTimeJoinNode.getChildren().get(2).getClass()); // Validate the second pipeline ExchangeOperator exchangeOperator1 = (ExchangeOperator) childrenOperator.get(1); assertEquals("SeriesScanNode1", exchangeOperator1.getSourceId().getId()); // Validate the third pipeline - TimeJoinNode subTimeJoinNode = (TimeJoinNode) timeJoinNode.getChildren().get(2); - assertEquals(2, subTimeJoinNode.getChildren().size()); + FullOuterTimeJoinNode subFullOuterTimeJoinNode = + (FullOuterTimeJoinNode) fullOuterTimeJoinNode.getChildren().get(2); + assertEquals(2, subFullOuterTimeJoinNode.getChildren().size()); assertEquals( - "root.sg.d2.s1", subTimeJoinNode.getChildren().get(0).getOutputColumnNames().get(0)); + "root.sg.d2.s1", + subFullOuterTimeJoinNode.getChildren().get(0).getOutputColumnNames().get(0)); assertEquals( - "root.sg.d3.s1", subTimeJoinNode.getChildren().get(1).getOutputColumnNames().get(0)); + "root.sg.d3.s1", + subFullOuterTimeJoinNode.getChildren().get(1).getOutputColumnNames().get(0)); ExchangeOperator exchangeOperator2 = (ExchangeOperator) childrenOperator.get(2); - assertEquals(exchangeOperator2.getSourceId(), subTimeJoinNode.getPlanNodeId()); + assertEquals(exchangeOperator2.getSourceId(), subFullOuterTimeJoinNode.getPlanNodeId()); // Validate the number exchange operator assertEquals(2, context.getExchangeSumNum()); @@ -221,12 +236,13 @@ public class PipelineBuilderTest { @Test public void testConsumeAllChildrenPipelineBuilder4() throws IllegalPathException { TypeProvider typeProvider = new TypeProvider(); - TimeJoinNode timeJoinNode = initTimeJoinNode(typeProvider, 4); + FullOuterTimeJoinNode fullOuterTimeJoinNode = initTimeJoinNode(typeProvider, 4); LocalExecutionPlanContext context = createLocalExecutionPlanContext(typeProvider); context.setDegreeOfParallelism(4); List<Operator> childrenOperator = - operatorTreeGenerator.dealWithConsumeAllChildrenPipelineBreaker(timeJoinNode, context); + operatorTreeGenerator.dealWithConsumeAllChildrenPipelineBreaker( + fullOuterTimeJoinNode, context); // The number of pipeline is 3, since parent pipeline hasn't joined assertEquals(3, context.getPipelineNumber()); @@ -238,12 +254,12 @@ public class PipelineBuilderTest { assertEquals(ExchangeOperator.class, childrenOperator.get(3).getClass()); // Validate the changes of node structure - assertEquals(4, timeJoinNode.getChildren().size()); + assertEquals(4, fullOuterTimeJoinNode.getChildren().size()); for (int i = 0; i < 4; i++) { - assertEquals(SeriesScanNode.class, timeJoinNode.getChildren().get(i).getClass()); + assertEquals(SeriesScanNode.class, fullOuterTimeJoinNode.getChildren().get(i).getClass()); assertEquals( String.format("root.sg.d%d.s1", i), - timeJoinNode.getChildren().get(i).getOutputColumnNames().get(0)); + fullOuterTimeJoinNode.getChildren().get(i).getOutputColumnNames().get(0)); } // Validate the second pipeline @@ -279,12 +295,13 @@ public class PipelineBuilderTest { @Test public void testConsumeAllChildrenPipelineBuilder5() throws IllegalPathException { TypeProvider typeProvider = new TypeProvider(); - TimeJoinNode timeJoinNode = initTimeJoinNode(typeProvider, 4); + FullOuterTimeJoinNode fullOuterTimeJoinNode = initTimeJoinNode(typeProvider, 4); LocalExecutionPlanContext context = createLocalExecutionPlanContext(typeProvider); context.setDegreeOfParallelism(5); List<Operator> childrenOperator = - operatorTreeGenerator.dealWithConsumeAllChildrenPipelineBreaker(timeJoinNode, context); + operatorTreeGenerator.dealWithConsumeAllChildrenPipelineBreaker( + fullOuterTimeJoinNode, context); // The number of pipeline is 4, since parent pipeline hasn't joined assertEquals(4, context.getPipelineNumber()); @@ -295,12 +312,12 @@ public class PipelineBuilderTest { } // Validate the changes of node structure - assertEquals(4, timeJoinNode.getChildren().size()); + assertEquals(4, fullOuterTimeJoinNode.getChildren().size()); for (int i = 0; i < 4; i++) { - assertEquals(SeriesScanNode.class, timeJoinNode.getChildren().get(i).getClass()); + assertEquals(SeriesScanNode.class, fullOuterTimeJoinNode.getChildren().get(i).getClass()); assertEquals( String.format("root.sg.d%d.s1", i), - timeJoinNode.getChildren().get(i).getOutputColumnNames().get(0)); + fullOuterTimeJoinNode.getChildren().get(i).getOutputColumnNames().get(0)); } // Validate the second pipeline @@ -340,12 +357,13 @@ public class PipelineBuilderTest { @Test public void testConsumeAllChildrenPipelineBuilder6() throws IllegalPathException { TypeProvider typeProvider = new TypeProvider(); - TimeJoinNode timeJoinNode = initTimeJoinNode(typeProvider, 4); + FullOuterTimeJoinNode fullOuterTimeJoinNode = initTimeJoinNode(typeProvider, 4); LocalExecutionPlanContext context = createLocalExecutionPlanContext(typeProvider); context.setDegreeOfParallelism(6); List<Operator> childrenOperator = - operatorTreeGenerator.dealWithConsumeAllChildrenPipelineBreaker(timeJoinNode, context); + operatorTreeGenerator.dealWithConsumeAllChildrenPipelineBreaker( + fullOuterTimeJoinNode, context); // The number of pipeline is 4, since parent pipeline hasn't joined assertEquals(4, context.getPipelineNumber()); @@ -356,12 +374,12 @@ public class PipelineBuilderTest { } // Validate the changes of node structure - assertEquals(4, timeJoinNode.getChildren().size()); + assertEquals(4, fullOuterTimeJoinNode.getChildren().size()); for (int i = 0; i < 4; i++) { - assertEquals(SeriesScanNode.class, timeJoinNode.getChildren().get(i).getClass()); + assertEquals(SeriesScanNode.class, fullOuterTimeJoinNode.getChildren().get(i).getClass()); assertEquals( String.format("root.sg.d%d.s1", i), - timeJoinNode.getChildren().get(i).getOutputColumnNames().get(0)); + fullOuterTimeJoinNode.getChildren().get(i).getOutputColumnNames().get(0)); } // Validate the second pipeline @@ -880,18 +898,19 @@ public class PipelineBuilderTest { * @param childNum the number of children * @return a timeJoinNode with @childNum seriesScanNode as children */ - private TimeJoinNode initTimeJoinNode(TypeProvider typeProvider, int childNum) + private FullOuterTimeJoinNode initTimeJoinNode(TypeProvider typeProvider, int childNum) throws IllegalPathException { - TimeJoinNode timeJoinNode = new TimeJoinNode(new PlanNodeId("TimeJoinNode"), Ordering.ASC); + FullOuterTimeJoinNode fullOuterTimeJoinNode = + new FullOuterTimeJoinNode(new PlanNodeId("TimeJoinNode"), Ordering.ASC); for (int i = 0; i < childNum; i++) { SeriesScanNode seriesScanNode = new SeriesScanNode( new PlanNodeId(String.format("SeriesScanNode%d", i)), new MeasurementPath(String.format("root.sg.d%d.s1", i), TSDataType.INT32)); typeProvider.setType(seriesScanNode.getSeriesPath().toString(), TSDataType.INT32); - timeJoinNode.addChild(seriesScanNode); + fullOuterTimeJoinNode.addChild(seriesScanNode); } - return timeJoinNode; + return fullOuterTimeJoinNode; } /** diff --git a/iotdb-core/datanode/src/test/java/org/apache/iotdb/db/queryengine/plan/plan/QueryLogicalPlanUtil.java b/iotdb-core/datanode/src/test/java/org/apache/iotdb/db/queryengine/plan/plan/QueryLogicalPlanUtil.java index 9aaef681fe2..c434a3fd270 100644 --- a/iotdb-core/datanode/src/test/java/org/apache/iotdb/db/queryengine/plan/plan/QueryLogicalPlanUtil.java +++ b/iotdb-core/datanode/src/test/java/org/apache/iotdb/db/queryengine/plan/plan/QueryLogicalPlanUtil.java @@ -35,10 +35,10 @@ import org.apache.iotdb.db.queryengine.plan.planner.plan.node.PlanNode; import org.apache.iotdb.db.queryengine.plan.planner.plan.node.process.AggregationNode; import org.apache.iotdb.db.queryengine.plan.planner.plan.node.process.DeviceViewNode; import org.apache.iotdb.db.queryengine.plan.planner.plan.node.process.FilterNode; +import org.apache.iotdb.db.queryengine.plan.planner.plan.node.process.FullOuterTimeJoinNode; import org.apache.iotdb.db.queryengine.plan.planner.plan.node.process.GroupByLevelNode; import org.apache.iotdb.db.queryengine.plan.planner.plan.node.process.LimitNode; import org.apache.iotdb.db.queryengine.plan.planner.plan.node.process.OffsetNode; -import org.apache.iotdb.db.queryengine.plan.planner.plan.node.process.TimeJoinNode; import org.apache.iotdb.db.queryengine.plan.planner.plan.node.process.last.LastQueryNode; import org.apache.iotdb.db.queryengine.plan.planner.plan.node.source.AlignedLastQueryScanNode; import org.apache.iotdb.db.queryengine.plan.planner.plan.node.source.AlignedSeriesAggregationScanNode; @@ -176,9 +176,9 @@ public class QueryLogicalPlanUtil { Ordering.ASC, false)); - TimeJoinNode timeJoinNode = - new TimeJoinNode(queryId.genPlanNodeId(), Ordering.ASC, sourceNodeList); - OffsetNode offsetNode = new OffsetNode(queryId.genPlanNodeId(), timeJoinNode, 10); + FullOuterTimeJoinNode fullOuterTimeJoinNode = + new FullOuterTimeJoinNode(queryId.genPlanNodeId(), Ordering.ASC, sourceNodeList); + OffsetNode offsetNode = new OffsetNode(queryId.genPlanNodeId(), fullOuterTimeJoinNode, 10); LimitNode limitNode = new LimitNode(queryId.genPlanNodeId(), offsetNode, 10); querySQLs.add(sql); @@ -209,8 +209,8 @@ public class QueryLogicalPlanUtil { (MeasurementPath) schemaMap.get("root.sg.d1.s2"), Ordering.DESC)); - TimeJoinNode timeJoinNode = - new TimeJoinNode(queryId.genPlanNodeId(), Ordering.DESC, sourceNodeList); + FullOuterTimeJoinNode fullOuterTimeJoinNode = + new FullOuterTimeJoinNode(queryId.genPlanNodeId(), Ordering.DESC, sourceNodeList); GreaterThanExpression valueFilter1 = new GreaterThanExpression( @@ -227,7 +227,7 @@ public class QueryLogicalPlanUtil { FilterNode filterNode = new FilterNode( queryId.genPlanNodeId(), - timeJoinNode, + fullOuterTimeJoinNode, new Expression[] {new TimeSeriesOperand(schemaMap.get("root.sg.d2.s1"))}, predicate, false, @@ -265,8 +265,8 @@ public class QueryLogicalPlanUtil { (MeasurementPath) schemaMap.get("root.sg.d1.s2"), Ordering.DESC)); - TimeJoinNode timeJoinNode1 = - new TimeJoinNode(queryId.genPlanNodeId(), Ordering.DESC, sourceNodeList1); + FullOuterTimeJoinNode fullOuterTimeJoinNode1 = + new FullOuterTimeJoinNode(queryId.genPlanNodeId(), Ordering.DESC, sourceNodeList1); GreaterThanExpression predicate1 = new GreaterThanExpression( @@ -276,7 +276,7 @@ public class QueryLogicalPlanUtil { FilterNode filterNode1 = new FilterNode( queryId.genPlanNodeId(), - timeJoinNode1, + fullOuterTimeJoinNode1, new Expression[] { new TimeSeriesOperand(schemaMap.get("root.sg.d1.s3")), new TimeSeriesOperand(schemaMap.get("root.sg.d1.s1")), @@ -304,8 +304,8 @@ public class QueryLogicalPlanUtil { (MeasurementPath) schemaMap.get("root.sg.d2.s4"), Ordering.DESC)); - TimeJoinNode timeJoinNode2 = - new TimeJoinNode(queryId.genPlanNodeId(), Ordering.DESC, sourceNodeList2); + FullOuterTimeJoinNode fullOuterTimeJoinNode2 = + new FullOuterTimeJoinNode(queryId.genPlanNodeId(), Ordering.DESC, sourceNodeList2); GreaterThanExpression predicate2 = new GreaterThanExpression( @@ -315,7 +315,7 @@ public class QueryLogicalPlanUtil { FilterNode filterNode2 = new FilterNode( queryId.genPlanNodeId(), - timeJoinNode2, + fullOuterTimeJoinNode2, new Expression[] { new TimeSeriesOperand(schemaMap.get("root.sg.d2.s1")), new TimeSeriesOperand(schemaMap.get("root.sg.d2.s2")), @@ -465,9 +465,9 @@ public class QueryLogicalPlanUtil { Ordering.DESC, null)); - TimeJoinNode timeJoinNode = - new TimeJoinNode(queryId.genPlanNodeId(), Ordering.ASC, sourceNodeList); - OffsetNode offsetNode = new OffsetNode(queryId.genPlanNodeId(), timeJoinNode, 10); + FullOuterTimeJoinNode fullOuterTimeJoinNode = + new FullOuterTimeJoinNode(queryId.genPlanNodeId(), Ordering.ASC, sourceNodeList); + OffsetNode offsetNode = new OffsetNode(queryId.genPlanNodeId(), fullOuterTimeJoinNode, 10); LimitNode limitNode = new LimitNode(queryId.genPlanNodeId(), offsetNode, 10); querySQLs.add(sql); @@ -676,8 +676,8 @@ public class QueryLogicalPlanUtil { Ordering.DESC, null)); - TimeJoinNode timeJoinNode1 = - new TimeJoinNode(queryId.genPlanNodeId(), Ordering.DESC, sourceNodeList1); + FullOuterTimeJoinNode fullOuterTimeJoinNode1 = + new FullOuterTimeJoinNode(queryId.genPlanNodeId(), Ordering.DESC, sourceNodeList1); List<PlanNode> sourceNodeList2 = new ArrayList<>(); sourceNodeList2.add( @@ -710,8 +710,8 @@ public class QueryLogicalPlanUtil { Ordering.DESC, null)); - TimeJoinNode timeJoinNode2 = - new TimeJoinNode(queryId.genPlanNodeId(), Ordering.DESC, sourceNodeList2); + FullOuterTimeJoinNode fullOuterTimeJoinNode2 = + new FullOuterTimeJoinNode(queryId.genPlanNodeId(), Ordering.DESC, sourceNodeList2); Map<String, List<Integer>> deviceToMeasurementIndexesMap = new HashMap<>(); deviceToMeasurementIndexesMap.put("root.sg.d1", Arrays.asList(2, 1, 3)); @@ -726,8 +726,8 @@ public class QueryLogicalPlanUtil { Arrays.asList( ColumnHeaderConstant.DEVICE, "count(s1)", "max_value(s2)", "last_value(s1)"), deviceToMeasurementIndexesMap); - deviceViewNode.addChildDeviceNode("root.sg.d1", timeJoinNode1); - deviceViewNode.addChildDeviceNode("root.sg.d2", timeJoinNode2); + deviceViewNode.addChildDeviceNode("root.sg.d1", fullOuterTimeJoinNode1); + deviceViewNode.addChildDeviceNode("root.sg.d2", fullOuterTimeJoinNode2); OffsetNode offsetNode = new OffsetNode(queryId.genPlanNodeId(), deviceViewNode, 100); LimitNode limitNode = new LimitNode(queryId.genPlanNodeId(), offsetNode, 100); @@ -765,8 +765,8 @@ public class QueryLogicalPlanUtil { (MeasurementPath) schemaMap.get("root.sg.d1.s2"), Ordering.DESC)); - TimeJoinNode timeJoinNode = - new TimeJoinNode(queryId.genPlanNodeId(), Ordering.DESC, sourceNodeList); + FullOuterTimeJoinNode fullOuterTimeJoinNode = + new FullOuterTimeJoinNode(queryId.genPlanNodeId(), Ordering.DESC, sourceNodeList); GreaterThanExpression valueFilter1 = new GreaterThanExpression( @@ -780,7 +780,7 @@ public class QueryLogicalPlanUtil { FilterNode filterNode = new FilterNode( queryId.genPlanNodeId(), - timeJoinNode, + fullOuterTimeJoinNode, new Expression[] { new TimeSeriesOperand(schemaMap.get("root.sg.d2.s1")), new TimeSeriesOperand(schemaMap.get("root.sg.d1.s1")), @@ -891,8 +891,8 @@ public class QueryLogicalPlanUtil { (MeasurementPath) schemaMap.get("root.sg.d1.s2"), Ordering.DESC)); - TimeJoinNode timeJoinNode1 = - new TimeJoinNode(queryId.genPlanNodeId(), Ordering.DESC, sourceNodeList1); + FullOuterTimeJoinNode fullOuterTimeJoinNode1 = + new FullOuterTimeJoinNode(queryId.genPlanNodeId(), Ordering.DESC, sourceNodeList1); GreaterThanExpression predicate1 = new GreaterThanExpression( @@ -902,7 +902,7 @@ public class QueryLogicalPlanUtil { FilterNode filterNode1 = new FilterNode( queryId.genPlanNodeId(), - timeJoinNode1, + fullOuterTimeJoinNode1, new Expression[] { new TimeSeriesOperand(schemaMap.get("root.sg.d1.s1")), new TimeSeriesOperand(schemaMap.get("root.sg.d1.s2")), @@ -947,8 +947,8 @@ public class QueryLogicalPlanUtil { (MeasurementPath) schemaMap.get("root.sg.d2.s2"), Ordering.DESC)); - TimeJoinNode timeJoinNode2 = - new TimeJoinNode(queryId.genPlanNodeId(), Ordering.DESC, sourceNodeList2); + FullOuterTimeJoinNode fullOuterTimeJoinNode2 = + new FullOuterTimeJoinNode(queryId.genPlanNodeId(), Ordering.DESC, sourceNodeList2); GreaterThanExpression predicate2 = new GreaterThanExpression( @@ -958,7 +958,7 @@ public class QueryLogicalPlanUtil { FilterNode filterNode2 = new FilterNode( queryId.genPlanNodeId(), - timeJoinNode2, + fullOuterTimeJoinNode2, new Expression[] { new TimeSeriesOperand(schemaMap.get("root.sg.d2.s1")), new TimeSeriesOperand(schemaMap.get("root.sg.d2.s2")), diff --git a/iotdb-core/datanode/src/test/java/org/apache/iotdb/db/queryengine/plan/plan/distribution/AggregationDistributionTest.java b/iotdb-core/datanode/src/test/java/org/apache/iotdb/db/queryengine/plan/plan/distribution/AggregationDistributionTest.java index c1f2f7701a3..797cfd2b72a 100644 --- a/iotdb-core/datanode/src/test/java/org/apache/iotdb/db/queryengine/plan/plan/distribution/AggregationDistributionTest.java +++ b/iotdb-core/datanode/src/test/java/org/apache/iotdb/db/queryengine/plan/plan/distribution/AggregationDistributionTest.java @@ -38,10 +38,10 @@ import org.apache.iotdb.db.queryengine.plan.planner.plan.node.PlanNodeId; import org.apache.iotdb.db.queryengine.plan.planner.plan.node.process.AggregationNode; import org.apache.iotdb.db.queryengine.plan.planner.plan.node.process.DeviceViewNode; import org.apache.iotdb.db.queryengine.plan.planner.plan.node.process.ExchangeNode; +import org.apache.iotdb.db.queryengine.plan.planner.plan.node.process.FullOuterTimeJoinNode; import org.apache.iotdb.db.queryengine.plan.planner.plan.node.process.GroupByLevelNode; import org.apache.iotdb.db.queryengine.plan.planner.plan.node.process.HorizontallyConcatNode; import org.apache.iotdb.db.queryengine.plan.planner.plan.node.process.SlidingWindowAggregationNode; -import org.apache.iotdb.db.queryengine.plan.planner.plan.node.process.TimeJoinNode; import org.apache.iotdb.db.queryengine.plan.planner.plan.node.source.SeriesAggregationScanNode; import org.apache.iotdb.db.queryengine.plan.planner.plan.node.source.SeriesAggregationSourceNode; import org.apache.iotdb.db.queryengine.plan.planner.plan.node.source.SeriesSourceNode; @@ -391,10 +391,13 @@ public class AggregationDistributionTest { TAggregationType.COUNT, AggregationStep.PARTIAL, null); - TimeJoinNode timeJoinNode = new TimeJoinNode(queryId.genPlanNodeId(), Ordering.ASC); - timeJoinNode.addChild(genAggregationSourceNode(queryId, d3s1Path, TAggregationType.COUNT)); - timeJoinNode.addChild(genAggregationSourceNode(queryId, d4s1Path, TAggregationType.COUNT)); - slidingWindowAggregationNode.addChild(timeJoinNode); + FullOuterTimeJoinNode fullOuterTimeJoinNode = + new FullOuterTimeJoinNode(queryId.genPlanNodeId(), Ordering.ASC); + fullOuterTimeJoinNode.addChild( + genAggregationSourceNode(queryId, d3s1Path, TAggregationType.COUNT)); + fullOuterTimeJoinNode.addChild( + genAggregationSourceNode(queryId, d4s1Path, TAggregationType.COUNT)); + slidingWindowAggregationNode.addChild(fullOuterTimeJoinNode); GroupByLevelNode groupByLevelNode = new GroupByLevelNode( @@ -608,10 +611,14 @@ public class AggregationDistributionTest { String groupedPathS1 = "root.sg.*.s1"; String groupedPathS2 = "root.sg.*.s2"; - TimeJoinNode timeJoinNode = new TimeJoinNode(queryId.genPlanNodeId(), Ordering.ASC); - timeJoinNode.addChild(genAggregationSourceNode(queryId, d1s1Path, TAggregationType.COUNT)); - timeJoinNode.addChild(genAggregationSourceNode(queryId, d1s2Path, TAggregationType.COUNT)); - timeJoinNode.addChild(genAggregationSourceNode(queryId, d2s1Path, TAggregationType.COUNT)); + FullOuterTimeJoinNode fullOuterTimeJoinNode = + new FullOuterTimeJoinNode(queryId.genPlanNodeId(), Ordering.ASC); + fullOuterTimeJoinNode.addChild( + genAggregationSourceNode(queryId, d1s1Path, TAggregationType.COUNT)); + fullOuterTimeJoinNode.addChild( + genAggregationSourceNode(queryId, d1s2Path, TAggregationType.COUNT)); + fullOuterTimeJoinNode.addChild( + genAggregationSourceNode(queryId, d2s1Path, TAggregationType.COUNT)); SlidingWindowAggregationNode slidingWindowAggregationNode = genSlidingWindowAggregationNode( @@ -621,7 +628,7 @@ public class AggregationDistributionTest { TAggregationType.COUNT, AggregationStep.PARTIAL, null); - slidingWindowAggregationNode.addChild(timeJoinNode); + slidingWindowAggregationNode.addChild(fullOuterTimeJoinNode); GroupByLevelNode groupByLevelNode = new GroupByLevelNode( diff --git a/iotdb-core/datanode/src/test/java/org/apache/iotdb/db/queryengine/plan/plan/distribution/AlignByDeviceOrderByLimitOffsetTest.java b/iotdb-core/datanode/src/test/java/org/apache/iotdb/db/queryengine/plan/plan/distribution/AlignByDeviceOrderByLimitOffsetTest.java index 6da7ef8ae69..a73f2689a63 100644 --- a/iotdb-core/datanode/src/test/java/org/apache/iotdb/db/queryengine/plan/plan/distribution/AlignByDeviceOrderByLimitOffsetTest.java +++ b/iotdb-core/datanode/src/test/java/org/apache/iotdb/db/queryengine/plan/plan/distribution/AlignByDeviceOrderByLimitOffsetTest.java @@ -30,9 +30,9 @@ import org.apache.iotdb.db.queryengine.plan.planner.plan.node.PlanNode; import org.apache.iotdb.db.queryengine.plan.planner.plan.node.process.DeviceViewNode; import org.apache.iotdb.db.queryengine.plan.planner.plan.node.process.ExchangeNode; import org.apache.iotdb.db.queryengine.plan.planner.plan.node.process.FilterNode; +import org.apache.iotdb.db.queryengine.plan.planner.plan.node.process.FullOuterTimeJoinNode; import org.apache.iotdb.db.queryengine.plan.planner.plan.node.process.LimitNode; import org.apache.iotdb.db.queryengine.plan.planner.plan.node.process.SingleDeviceViewNode; -import org.apache.iotdb.db.queryengine.plan.planner.plan.node.process.TimeJoinNode; import org.apache.iotdb.db.queryengine.plan.planner.plan.node.process.TopKNode; import org.apache.iotdb.db.queryengine.plan.planner.plan.node.sink.IdentitySinkNode; import org.apache.iotdb.db.queryengine.plan.planner.plan.node.source.AlignedSeriesScanNode; @@ -215,7 +215,7 @@ public class AlignByDeviceOrderByLimitOffsetTest { assertTrue(firstFiTopNode instanceof TopKNode); for (PlanNode node : firstFiTopNode.getChildren().get(0).getChildren()) { assertTrue(node instanceof DeviceViewNode); - assertTrue(node.getChildren().get(0) instanceof TimeJoinNode); + assertTrue(node.getChildren().get(0) instanceof FullOuterTimeJoinNode); } assertTrue(firstFiTopNode.getChildren().get(1) instanceof ExchangeNode); assertTrue(firstFiTopNode.getChildren().get(2) instanceof ExchangeNode); diff --git a/iotdb-core/datanode/src/test/java/org/apache/iotdb/db/queryengine/plan/plan/distribution/AlignedByDeviceTest.java b/iotdb-core/datanode/src/test/java/org/apache/iotdb/db/queryengine/plan/plan/distribution/AlignedByDeviceTest.java index f300d0231e6..28937482614 100644 --- a/iotdb-core/datanode/src/test/java/org/apache/iotdb/db/queryengine/plan/plan/distribution/AlignedByDeviceTest.java +++ b/iotdb-core/datanode/src/test/java/org/apache/iotdb/db/queryengine/plan/plan/distribution/AlignedByDeviceTest.java @@ -31,10 +31,10 @@ import org.apache.iotdb.db.queryengine.plan.planner.plan.node.process.Aggregatio import org.apache.iotdb.db.queryengine.plan.planner.plan.node.process.DeviceViewNode; import org.apache.iotdb.db.queryengine.plan.planner.plan.node.process.ExchangeNode; import org.apache.iotdb.db.queryengine.plan.planner.plan.node.process.FilterNode; +import org.apache.iotdb.db.queryengine.plan.planner.plan.node.process.FullOuterTimeJoinNode; import org.apache.iotdb.db.queryengine.plan.planner.plan.node.process.HorizontallyConcatNode; import org.apache.iotdb.db.queryengine.plan.planner.plan.node.process.MergeSortNode; import org.apache.iotdb.db.queryengine.plan.planner.plan.node.process.SingleDeviceViewNode; -import org.apache.iotdb.db.queryengine.plan.planner.plan.node.process.TimeJoinNode; import org.apache.iotdb.db.queryengine.plan.planner.plan.node.process.TransformNode; import org.apache.iotdb.db.queryengine.plan.planner.plan.node.sink.IdentitySinkNode; import org.apache.iotdb.db.queryengine.plan.planner.plan.node.sink.ShuffleSinkNode; @@ -140,7 +140,7 @@ public class AlignedByDeviceTest { instanceof FilterNode); assertTrue( f1Root.getChildren().get(0).getChildren().get(0).getChildren().get(0).getChildren().get(0) - instanceof TimeJoinNode); + instanceof FullOuterTimeJoinNode); assertTrue( f1Root .getChildren() @@ -173,7 +173,7 @@ public class AlignedByDeviceTest { instanceof FilterNode); assertTrue( f1Root.getChildren().get(0).getChildren().get(1).getChildren().get(0).getChildren().get(0) - instanceof TimeJoinNode); + instanceof FullOuterTimeJoinNode); assertTrue( f1Root .getChildren() @@ -222,7 +222,7 @@ public class AlignedByDeviceTest { instanceof FilterNode); assertTrue( f1Root.getChildren().get(0).getChildren().get(0).getChildren().get(0).getChildren().get(0) - instanceof TimeJoinNode); + instanceof FullOuterTimeJoinNode); assertTrue( f1Root .getChildren() @@ -265,7 +265,7 @@ public class AlignedByDeviceTest { assertTrue(f1Root.getChildren().get(0).getChildren().get(1) instanceof AggregationNode); assertTrue( f1Root.getChildren().get(0).getChildren().get(1).getChildren().get(0).getChildren().get(0) - instanceof TimeJoinNode); + instanceof FullOuterTimeJoinNode); assertTrue( f1Root .getChildren() @@ -306,8 +306,8 @@ public class AlignedByDeviceTest { .get(2) instanceof ExchangeNode); assertTrue(f2Root instanceof IdentitySinkNode); - assertTrue(f2Root.getChildren().get(0) instanceof TimeJoinNode); - assertTrue(f2Root.getChildren().get(1) instanceof TimeJoinNode); + assertTrue(f2Root.getChildren().get(0) instanceof FullOuterTimeJoinNode); + assertTrue(f2Root.getChildren().get(1) instanceof FullOuterTimeJoinNode); } @Test @@ -431,7 +431,7 @@ public class AlignedByDeviceTest { .get(0) .getChildren() .get(0) - instanceof TimeJoinNode); + instanceof FullOuterTimeJoinNode); assertTrue( f1Root .getChildren() @@ -481,7 +481,7 @@ public class AlignedByDeviceTest { .get(0) .getChildren() .get(0) - instanceof TimeJoinNode); + instanceof FullOuterTimeJoinNode); assertTrue( f1Root .getChildren() @@ -547,7 +547,7 @@ public class AlignedByDeviceTest { .get(0) .getChildren() .get(0) - instanceof TimeJoinNode); + instanceof FullOuterTimeJoinNode); assertTrue( f1Root .getChildren() @@ -612,7 +612,7 @@ public class AlignedByDeviceTest { .get(0) .getChildren() .get(0) - instanceof TimeJoinNode); + instanceof FullOuterTimeJoinNode); assertTrue( f1Root .getChildren() @@ -659,8 +659,8 @@ public class AlignedByDeviceTest { .get(2) instanceof ExchangeNode); assertTrue(f2Root instanceof ShuffleSinkNode); - assertTrue(f2Root.getChildren().get(0) instanceof TimeJoinNode); - assertTrue(f2Root.getChildren().get(1) instanceof TimeJoinNode); + assertTrue(f2Root.getChildren().get(0) instanceof FullOuterTimeJoinNode); + assertTrue(f2Root.getChildren().get(1) instanceof FullOuterTimeJoinNode); } @Test @@ -762,7 +762,7 @@ public class AlignedByDeviceTest { instanceof FilterNode); assertTrue( f1Root.getChildren().get(0).getChildren().get(0).getChildren().get(0).getChildren().get(0) - instanceof TimeJoinNode); + instanceof FullOuterTimeJoinNode); assertTrue( f1Root .getChildren() @@ -795,7 +795,7 @@ public class AlignedByDeviceTest { instanceof FilterNode); assertTrue( f1Root.getChildren().get(0).getChildren().get(1).getChildren().get(0).getChildren().get(0) - instanceof TimeJoinNode); + instanceof FullOuterTimeJoinNode); assertTrue( f1Root .getChildren() @@ -845,7 +845,7 @@ public class AlignedByDeviceTest { instanceof FilterNode); assertTrue( f1Root.getChildren().get(0).getChildren().get(0).getChildren().get(0).getChildren().get(0) - instanceof TimeJoinNode); + instanceof FullOuterTimeJoinNode); assertTrue( f1Root .getChildren() @@ -888,7 +888,7 @@ public class AlignedByDeviceTest { assertTrue(f1Root.getChildren().get(0).getChildren().get(1) instanceof AggregationNode); assertTrue( f1Root.getChildren().get(0).getChildren().get(1).getChildren().get(0).getChildren().get(0) - instanceof TimeJoinNode); + instanceof FullOuterTimeJoinNode); assertTrue( f1Root .getChildren() @@ -929,9 +929,9 @@ public class AlignedByDeviceTest { .get(2) instanceof ExchangeNode); assertTrue(f2Root instanceof IdentitySinkNode); - assertTrue(f2Root.getChildren().get(0) instanceof TimeJoinNode); + assertTrue(f2Root.getChildren().get(0) instanceof FullOuterTimeJoinNode); assertTrue(f3Root instanceof IdentitySinkNode); - assertTrue(f3Root.getChildren().get(0) instanceof TimeJoinNode); + assertTrue(f3Root.getChildren().get(0) instanceof FullOuterTimeJoinNode); } @Test @@ -1059,7 +1059,7 @@ public class AlignedByDeviceTest { .get(0) .getChildren() .get(0) - instanceof TimeJoinNode); + instanceof FullOuterTimeJoinNode); assertTrue( f1Root .getChildren() @@ -1109,7 +1109,7 @@ public class AlignedByDeviceTest { .get(0) .getChildren() .get(0) - instanceof TimeJoinNode); + instanceof FullOuterTimeJoinNode); assertTrue( f1Root .getChildren() @@ -1177,7 +1177,7 @@ public class AlignedByDeviceTest { .get(0) .getChildren() .get(0) - instanceof TimeJoinNode); + instanceof FullOuterTimeJoinNode); assertTrue( f1Root .getChildren() @@ -1242,7 +1242,7 @@ public class AlignedByDeviceTest { .get(0) .getChildren() .get(0) - instanceof TimeJoinNode); + instanceof FullOuterTimeJoinNode); assertTrue( f1Root .getChildren() @@ -1289,9 +1289,9 @@ public class AlignedByDeviceTest { .get(2) instanceof ExchangeNode); assertTrue(f2Root instanceof ShuffleSinkNode); - assertTrue(f2Root.getChildren().get(0) instanceof TimeJoinNode); + assertTrue(f2Root.getChildren().get(0) instanceof FullOuterTimeJoinNode); assertTrue(f3Root instanceof ShuffleSinkNode); - assertTrue(f3Root.getChildren().get(0) instanceof TimeJoinNode); + assertTrue(f3Root.getChildren().get(0) instanceof FullOuterTimeJoinNode); } @Test @@ -1311,7 +1311,7 @@ public class AlignedByDeviceTest { assertTrue(f1Root instanceof IdentitySinkNode); assertTrue(f2Root instanceof IdentitySinkNode); assertTrue(f1Root.getChildren().get(0) instanceof DeviceViewNode); - assertTrue(f2Root.getChildren().get(0) instanceof TimeJoinNode); + assertTrue(f2Root.getChildren().get(0) instanceof FullOuterTimeJoinNode); assertTrue(f1Root.getChildren().get(0).getChildren().get(0) instanceof TransformNode); assertTrue( f1Root.getChildren().get(0).getChildren().get(0).getChildren().get(0).getChildren().get(2) @@ -1337,7 +1337,7 @@ public class AlignedByDeviceTest { assertTrue(f1Root instanceof IdentitySinkNode); assertTrue(f2Root instanceof ShuffleSinkNode); assertTrue(f1Root.getChildren().get(0) instanceof MergeSortNode); - assertTrue(f2Root.getChildren().get(0) instanceof TimeJoinNode); + assertTrue(f2Root.getChildren().get(0) instanceof FullOuterTimeJoinNode); assertTrue(f1Root.getChildren().get(0).getChildren().get(0) instanceof SingleDeviceViewNode); assertTrue( f1Root @@ -1393,13 +1393,13 @@ public class AlignedByDeviceTest { assertTrue(f2Root instanceof IdentitySinkNode); assertTrue(f3Root instanceof IdentitySinkNode); assertTrue(f1Root.getChildren().get(0) instanceof DeviceViewNode); - assertTrue(f2Root.getChildren().get(0) instanceof TimeJoinNode); + assertTrue(f2Root.getChildren().get(0) instanceof FullOuterTimeJoinNode); assertTrue(f3Root.getChildren().get(0) instanceof TransformNode); assertTrue(f1Root.getChildren().get(0).getChildren().get(0) instanceof TransformNode); assertTrue( f1Root.getChildren().get(0).getChildren().get(0).getChildren().get(0).getChildren().get(2) instanceof ExchangeNode); - assertTrue(f3Root.getChildren().get(0).getChildren().get(0) instanceof TimeJoinNode); + assertTrue(f3Root.getChildren().get(0).getChildren().get(0) instanceof FullOuterTimeJoinNode); } @Test @@ -1445,7 +1445,7 @@ public class AlignedByDeviceTest { assertTrue(f2Root instanceof ShuffleSinkNode); assertTrue(f3Root instanceof ShuffleSinkNode); assertTrue(f1Root.getChildren().get(0) instanceof MergeSortNode); - assertTrue(f2Root.getChildren().get(0) instanceof TimeJoinNode); + assertTrue(f2Root.getChildren().get(0) instanceof FullOuterTimeJoinNode); assertTrue(f3Root.getChildren().get(0) instanceof SingleDeviceViewNode); assertTrue(f1Root.getChildren().get(0).getChildren().get(0) instanceof SingleDeviceViewNode); assertTrue( @@ -1463,7 +1463,7 @@ public class AlignedByDeviceTest { instanceof ExchangeNode); assertTrue( f3Root.getChildren().get(0).getChildren().get(0).getChildren().get(0) - instanceof TimeJoinNode); + instanceof FullOuterTimeJoinNode); } @Test diff --git a/iotdb-core/datanode/src/test/java/org/apache/iotdb/db/queryengine/plan/plan/node/process/DeviceViewNodeSerdeTest.java b/iotdb-core/datanode/src/test/java/org/apache/iotdb/db/queryengine/plan/plan/node/process/DeviceViewNodeSerdeTest.java index 4384cdff5bf..e675bac7ac7 100644 --- a/iotdb-core/datanode/src/test/java/org/apache/iotdb/db/queryengine/plan/plan/node/process/DeviceViewNodeSerdeTest.java +++ b/iotdb-core/datanode/src/test/java/org/apache/iotdb/db/queryengine/plan/plan/node/process/DeviceViewNodeSerdeTest.java @@ -22,7 +22,7 @@ import org.apache.iotdb.commons.exception.IllegalPathException; import org.apache.iotdb.db.queryengine.plan.plan.node.PlanNodeDeserializeHelper; import org.apache.iotdb.db.queryengine.plan.planner.plan.node.PlanNodeId; import org.apache.iotdb.db.queryengine.plan.planner.plan.node.process.DeviceViewNode; -import org.apache.iotdb.db.queryengine.plan.planner.plan.node.process.TimeJoinNode; +import org.apache.iotdb.db.queryengine.plan.planner.plan.node.process.FullOuterTimeJoinNode; import org.apache.iotdb.db.queryengine.plan.planner.plan.parameter.OrderByParameter; import org.apache.iotdb.db.queryengine.plan.statement.component.OrderByKey; import org.apache.iotdb.db.queryengine.plan.statement.component.Ordering; @@ -39,8 +39,10 @@ import static org.junit.Assert.assertEquals; public class DeviceViewNodeSerdeTest { @Test public void testSerializeAndDeserialize() throws IllegalPathException { - TimeJoinNode timeJoinNode1 = new TimeJoinNode(new PlanNodeId("TestTimeJoinNode"), Ordering.ASC); - TimeJoinNode timeJoinNode2 = new TimeJoinNode(new PlanNodeId("TestTimeJoinNode"), Ordering.ASC); + FullOuterTimeJoinNode fullOuterTimeJoinNode1 = + new FullOuterTimeJoinNode(new PlanNodeId("TestTimeJoinNode"), Ordering.ASC); + FullOuterTimeJoinNode fullOuterTimeJoinNode2 = + new FullOuterTimeJoinNode(new PlanNodeId("TestTimeJoinNode"), Ordering.ASC); DeviceViewNode deviceViewNode = new DeviceViewNode( new PlanNodeId("TestDeviceMergeNode"), @@ -50,8 +52,8 @@ public class DeviceViewNodeSerdeTest { new SortItem(OrderByKey.TIME, Ordering.DESC))), Arrays.asList("s1", "s2"), new HashMap<>()); - deviceViewNode.addChildDeviceNode("root.sg.d1", timeJoinNode1); - deviceViewNode.addChildDeviceNode("root.sg.d2", timeJoinNode2); + deviceViewNode.addChildDeviceNode("root.sg.d1", fullOuterTimeJoinNode1); + deviceViewNode.addChildDeviceNode("root.sg.d2", fullOuterTimeJoinNode2); ByteBuffer byteBuffer = ByteBuffer.allocate(1024); deviceViewNode.serialize(byteBuffer); diff --git a/iotdb-core/datanode/src/test/java/org/apache/iotdb/db/queryengine/plan/plan/node/process/ExchangeNodeSerdeTest.java b/iotdb-core/datanode/src/test/java/org/apache/iotdb/db/queryengine/plan/plan/node/process/ExchangeNodeSerdeTest.java index aae120d94d0..daed4cd1772 100644 --- a/iotdb-core/datanode/src/test/java/org/apache/iotdb/db/queryengine/plan/plan/node/process/ExchangeNodeSerdeTest.java +++ b/iotdb-core/datanode/src/test/java/org/apache/iotdb/db/queryengine/plan/plan/node/process/ExchangeNodeSerdeTest.java @@ -26,7 +26,7 @@ import org.apache.iotdb.db.queryengine.execution.exchange.sink.DownStreamChannel import org.apache.iotdb.db.queryengine.plan.plan.node.PlanNodeDeserializeHelper; import org.apache.iotdb.db.queryengine.plan.planner.plan.node.PlanNodeId; import org.apache.iotdb.db.queryengine.plan.planner.plan.node.process.ExchangeNode; -import org.apache.iotdb.db.queryengine.plan.planner.plan.node.process.TimeJoinNode; +import org.apache.iotdb.db.queryengine.plan.planner.plan.node.process.FullOuterTimeJoinNode; import org.apache.iotdb.db.queryengine.plan.planner.plan.node.sink.IdentitySinkNode; import org.apache.iotdb.db.queryengine.plan.statement.component.Ordering; @@ -41,13 +41,14 @@ public class ExchangeNodeSerdeTest { @Test public void testSerializeAndDeserialize() throws IllegalPathException { - TimeJoinNode timeJoinNode = new TimeJoinNode(new PlanNodeId("TestTimeJoinNode"), Ordering.ASC); + FullOuterTimeJoinNode fullOuterTimeJoinNode = + new FullOuterTimeJoinNode(new PlanNodeId("TestTimeJoinNode"), Ordering.ASC); ExchangeNode exchangeNode = new ExchangeNode(new PlanNodeId("TestExchangeNode")); IdentitySinkNode sinkNode = new IdentitySinkNode( new PlanNodeId("sink"), - Collections.singletonList(timeJoinNode), + Collections.singletonList(fullOuterTimeJoinNode), Collections.singletonList( new DownStreamChannelLocation( new TEndPoint("127.0.0.1", 6667), diff --git a/iotdb-core/datanode/src/test/java/org/apache/iotdb/db/queryengine/plan/plan/node/process/FillNodeSerdeTest.java b/iotdb-core/datanode/src/test/java/org/apache/iotdb/db/queryengine/plan/plan/node/process/FillNodeSerdeTest.java index 57457b7765d..b79fe651054 100644 --- a/iotdb-core/datanode/src/test/java/org/apache/iotdb/db/queryengine/plan/plan/node/process/FillNodeSerdeTest.java +++ b/iotdb-core/datanode/src/test/java/org/apache/iotdb/db/queryengine/plan/plan/node/process/FillNodeSerdeTest.java @@ -22,7 +22,7 @@ import org.apache.iotdb.commons.exception.IllegalPathException; import org.apache.iotdb.db.queryengine.plan.plan.node.PlanNodeDeserializeHelper; import org.apache.iotdb.db.queryengine.plan.planner.plan.node.PlanNodeId; import org.apache.iotdb.db.queryengine.plan.planner.plan.node.process.FillNode; -import org.apache.iotdb.db.queryengine.plan.planner.plan.node.process.TimeJoinNode; +import org.apache.iotdb.db.queryengine.plan.planner.plan.node.process.FullOuterTimeJoinNode; import org.apache.iotdb.db.queryengine.plan.planner.plan.parameter.FillDescriptor; import org.apache.iotdb.db.queryengine.plan.statement.component.FillPolicy; import org.apache.iotdb.db.queryengine.plan.statement.component.Ordering; @@ -38,11 +38,12 @@ public class FillNodeSerdeTest { @Test public void testSerializeAndDeserialize() throws IllegalPathException { - TimeJoinNode timeJoinNode = new TimeJoinNode(new PlanNodeId("TestTimeJoinNode"), Ordering.ASC); + FullOuterTimeJoinNode fullOuterTimeJoinNode = + new FullOuterTimeJoinNode(new PlanNodeId("TestTimeJoinNode"), Ordering.ASC); FillNode fillNode = new FillNode( new PlanNodeId("TestFillNode"), - timeJoinNode, + fullOuterTimeJoinNode, new FillDescriptor(FillPolicy.VALUE, new LongLiteral("100"), null), Ordering.ASC); diff --git a/iotdb-core/datanode/src/test/java/org/apache/iotdb/db/queryengine/plan/plan/node/process/FilterNodeSerdeTest.java b/iotdb-core/datanode/src/test/java/org/apache/iotdb/db/queryengine/plan/plan/node/process/FilterNodeSerdeTest.java index 89cb7822a8a..7bc72b5e543 100644 --- a/iotdb-core/datanode/src/test/java/org/apache/iotdb/db/queryengine/plan/plan/node/process/FilterNodeSerdeTest.java +++ b/iotdb-core/datanode/src/test/java/org/apache/iotdb/db/queryengine/plan/plan/node/process/FilterNodeSerdeTest.java @@ -27,7 +27,7 @@ import org.apache.iotdb.db.queryengine.plan.expression.leaf.TimeSeriesOperand; import org.apache.iotdb.db.queryengine.plan.plan.node.PlanNodeDeserializeHelper; import org.apache.iotdb.db.queryengine.plan.planner.plan.node.PlanNodeId; import org.apache.iotdb.db.queryengine.plan.planner.plan.node.process.FilterNode; -import org.apache.iotdb.db.queryengine.plan.planner.plan.node.process.TimeJoinNode; +import org.apache.iotdb.db.queryengine.plan.planner.plan.node.process.FullOuterTimeJoinNode; import org.apache.iotdb.db.queryengine.plan.statement.component.Ordering; import org.apache.iotdb.tsfile.file.metadata.enums.TSDataType; @@ -42,11 +42,12 @@ public class FilterNodeSerdeTest { @Test public void testSerializeAndDeserialize() throws IllegalPathException { - TimeJoinNode timeJoinNode = new TimeJoinNode(new PlanNodeId("TestTimeJoinNode"), Ordering.ASC); + FullOuterTimeJoinNode fullOuterTimeJoinNode = + new FullOuterTimeJoinNode(new PlanNodeId("TestTimeJoinNode"), Ordering.ASC); FilterNode filterNode = new FilterNode( new PlanNodeId("TestFilterNode"), - timeJoinNode, + fullOuterTimeJoinNode, new Expression[] {new TimeSeriesOperand(new PartialPath("root.sg.d1.s1"))}, new GreaterThanExpression( new TimeSeriesOperand(new PartialPath("root.sg.d1.s1")), diff --git a/iotdb-core/datanode/src/test/java/org/apache/iotdb/db/queryengine/plan/plan/node/process/TimeJoinNodeSerdeTest.java b/iotdb-core/datanode/src/test/java/org/apache/iotdb/db/queryengine/plan/plan/node/process/FullOuterTimeJoinNodeSerdeTest.java similarity index 85% rename from iotdb-core/datanode/src/test/java/org/apache/iotdb/db/queryengine/plan/plan/node/process/TimeJoinNodeSerdeTest.java rename to iotdb-core/datanode/src/test/java/org/apache/iotdb/db/queryengine/plan/plan/node/process/FullOuterTimeJoinNodeSerdeTest.java index 350699acf07..bf456efc07a 100644 --- a/iotdb-core/datanode/src/test/java/org/apache/iotdb/db/queryengine/plan/plan/node/process/TimeJoinNodeSerdeTest.java +++ b/iotdb-core/datanode/src/test/java/org/apache/iotdb/db/queryengine/plan/plan/node/process/FullOuterTimeJoinNodeSerdeTest.java @@ -22,7 +22,7 @@ import org.apache.iotdb.commons.exception.IllegalPathException; import org.apache.iotdb.commons.path.MeasurementPath; import org.apache.iotdb.db.queryengine.plan.plan.node.PlanNodeDeserializeHelper; import org.apache.iotdb.db.queryengine.plan.planner.plan.node.PlanNodeId; -import org.apache.iotdb.db.queryengine.plan.planner.plan.node.process.TimeJoinNode; +import org.apache.iotdb.db.queryengine.plan.planner.plan.node.process.FullOuterTimeJoinNode; import org.apache.iotdb.db.queryengine.plan.planner.plan.node.source.SeriesScanNode; import org.apache.iotdb.db.queryengine.plan.statement.component.Ordering; import org.apache.iotdb.tsfile.file.metadata.enums.TSDataType; @@ -33,7 +33,7 @@ import java.nio.ByteBuffer; import static org.junit.Assert.assertEquals; -public class TimeJoinNodeSerdeTest { +public class FullOuterTimeJoinNodeSerdeTest { @Test public void testSerializeAndDeserialize() throws IllegalPathException { SeriesScanNode seriesScanNode1 = @@ -55,13 +55,14 @@ public class TimeJoinNodeSerdeTest { 100, null); - TimeJoinNode timeJoinNode = new TimeJoinNode(new PlanNodeId("TestTimeJoinNode"), Ordering.ASC); - timeJoinNode.addChild(seriesScanNode1); - timeJoinNode.addChild(seriesScanNode2); + FullOuterTimeJoinNode fullOuterTimeJoinNode = + new FullOuterTimeJoinNode(new PlanNodeId("TestTimeJoinNode"), Ordering.ASC); + fullOuterTimeJoinNode.addChild(seriesScanNode1); + fullOuterTimeJoinNode.addChild(seriesScanNode2); ByteBuffer byteBuffer = ByteBuffer.allocate(2048); - timeJoinNode.serialize(byteBuffer); + fullOuterTimeJoinNode.serialize(byteBuffer); byteBuffer.flip(); - assertEquals(PlanNodeDeserializeHelper.deserialize(byteBuffer), timeJoinNode); + assertEquals(PlanNodeDeserializeHelper.deserialize(byteBuffer), fullOuterTimeJoinNode); } }
