This is an automated email from the ASF dual-hosted git repository.

lancelly pushed a commit to branch pipelinehierarchyTree
in repository https://gitbox.apache.org/repos/asf/iotdb.git

commit 7992acd1b49da8db36aaf9b79003e19b2ae74b3a
Author: lancelly <[email protected]>
AuthorDate: Tue Apr 23 15:12:49 2024 +0800

    finish
---
 .../plan/planner/LocalExecutionPlanContext.java    |  50 ++
 .../plan/planner/LocalExecutionPlanner.java        |  16 +-
 .../plan/planner/OperatorTreeGenerator.java        |  21 +-
 .../ConsumeAllChildrenPipelineMemoryEstimator.java |  38 ++
 ...umeChildrenOneByOnePipelineMemoryEstimator.java |  74 +++
 .../planner/memory/PipelineMemoryEstimator.java    |  68 +++
 .../memory/PipelineMemoryEstimatorFactory.java     |  51 ++
 .../plan/planner/plan/node/PlanNodeType.java       |   2 +
 .../node/process/join/FullOuterTimeJoinNode.java   |   5 +
 .../plan/planner/PipelineBuilderTest.java          | 578 +++++++++++++++++++++
 10 files changed, 896 insertions(+), 7 deletions(-)

diff --git 
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/planner/LocalExecutionPlanContext.java
 
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/planner/LocalExecutionPlanContext.java
index 5d9a2f8f46d..9a55d298a0e 100644
--- 
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/planner/LocalExecutionPlanContext.java
+++ 
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/planner/LocalExecutionPlanContext.java
@@ -18,6 +18,7 @@
  */
 package org.apache.iotdb.db.queryengine.plan.planner;
 
+import org.apache.iotdb.commons.utils.TestOnly;
 import org.apache.iotdb.db.conf.IoTDBDescriptor;
 import org.apache.iotdb.db.queryengine.common.FragmentInstanceId;
 import org.apache.iotdb.db.queryengine.execution.driver.DataDriverContext;
@@ -30,6 +31,10 @@ import 
org.apache.iotdb.db.queryengine.execution.operator.Operator;
 import 
org.apache.iotdb.db.queryengine.execution.operator.source.ExchangeOperator;
 import org.apache.iotdb.db.queryengine.plan.analyze.TemplatedInfo;
 import org.apache.iotdb.db.queryengine.plan.analyze.TypeProvider;
+import 
org.apache.iotdb.db.queryengine.plan.planner.memory.PipelineMemoryEstimator;
+import 
org.apache.iotdb.db.queryengine.plan.planner.memory.PipelineMemoryEstimatorFactory;
+import org.apache.iotdb.db.queryengine.plan.planner.plan.node.PlanNode;
+import org.apache.iotdb.db.queryengine.plan.planner.plan.node.PlanNodeId;
 import org.apache.iotdb.db.schemaengine.schemaregion.ISchemaRegion;
 import org.apache.iotdb.tsfile.common.conf.TSFileConfig;
 import org.apache.iotdb.tsfile.file.metadata.enums.TSDataType;
@@ -41,9 +46,12 @@ import org.apache.iotdb.tsfile.utils.Pair;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
+import javax.annotation.Nullable;
+
 import java.time.ZoneId;
 import java.util.ArrayList;
 import java.util.HashSet;
+import java.util.LinkedList;
 import java.util.List;
 import java.util.Map;
 import java.util.Optional;
@@ -91,6 +99,10 @@ public class LocalExecutionPlanContext {
   // use AtomicReference not for thread-safe, just for updating same field in 
different pipeline
   private AtomicReference<List<Long>> timePartitions = new AtomicReference<>();
 
+  /** Records the parent of each pipeline. The order of each list does not 
matter for now. */
+  private Map<PlanNodeId, List<PipelineMemoryEstimator>> 
parentPlanNodeIdToMemoryEstimator =
+      new ConcurrentHashMap<>();
+
   // for data region
   public LocalExecutionPlanContext(
       TypeProvider typeProvider,
@@ -122,6 +134,7 @@ public class LocalExecutionPlanContext {
         
parentContext.getDriverContext().createSubDriverContext(getNextPipelineId());
     this.dataNodeQueryContext = parentContext.dataNodeQueryContext;
     this.timePartitions = parentContext.timePartitions;
+    this.parentPlanNodeIdToMemoryEstimator = 
parentContext.parentPlanNodeIdToMemoryEstimator;
   }
 
   // for schema region
@@ -146,6 +159,38 @@ public class LocalExecutionPlanContext {
         new PipelineDriverFactory(operation, driverContext, 
estimatedMemorySize));
   }
 
+  /**
+   * Each time we construct a pipeline, we should also construct the memory 
estimator for the
+   * pipeline.
+   *
+   * @param operation the root operator of the pipeline
+   * @param parentPlanNodeId the parent plan node id of the root of the 
pipeline
+   * @param root the root node of the pipeline
+   * @param dependencyPipelineIndex the index of the dependency pipeline, -1 
if no dependency
+   */
+  public PipelineMemoryEstimator constructPipelineMemoryEstimator(
+      final Operator operation,
+      @Nullable final PlanNodeId parentPlanNodeId,
+      final PlanNode root,
+      final int dependencyPipelineIndex) {
+    PipelineMemoryEstimator currentPipelineMemoryEstimator =
+        PipelineMemoryEstimatorFactory.createPipelineMemoryEstimator(
+            operation, root, dependencyPipelineIndex);
+    // As OperatorTreeGenerator traverse the tree in a post-order way, all the 
children of current
+    // pipeline have been recorded in the map.
+    List<PipelineMemoryEstimator> childrenMemoryEstimators =
+        parentPlanNodeIdToMemoryEstimator.get(root.getPlanNodeId());
+    if (childrenMemoryEstimators != null) {
+      currentPipelineMemoryEstimator.addChildren(childrenMemoryEstimators);
+    }
+    if (parentPlanNodeId != null) {
+      parentPlanNodeIdToMemoryEstimator
+          .computeIfAbsent(parentPlanNodeId, k -> new LinkedList<>())
+          .add(currentPipelineMemoryEstimator);
+    }
+    return currentPipelineMemoryEstimator;
+  }
+
   public LocalExecutionPlanContext createSubContext() {
     return new LocalExecutionPlanContext(this);
   }
@@ -305,4 +350,9 @@ public class LocalExecutionPlanContext {
   public TemplatedInfo getTemplatedInfo() {
     return typeProvider.getTemplatedInfo();
   }
+
+  @TestOnly
+  public Map<PlanNodeId, List<PipelineMemoryEstimator>> 
getParentPlanNodeIdToMemoryEstimator() {
+    return parentPlanNodeIdToMemoryEstimator;
+  }
 }
diff --git 
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/planner/LocalExecutionPlanner.java
 
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/planner/LocalExecutionPlanner.java
index 6acd4e1b716..2b6c4e173fb 100644
--- 
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/planner/LocalExecutionPlanner.java
+++ 
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/planner/LocalExecutionPlanner.java
@@ -29,6 +29,7 @@ import 
org.apache.iotdb.db.queryengine.execution.fragment.FragmentInstanceStateM
 import org.apache.iotdb.db.queryengine.execution.operator.Operator;
 import org.apache.iotdb.db.queryengine.metric.QueryRelatedResourceMetricSet;
 import org.apache.iotdb.db.queryengine.plan.analyze.TypeProvider;
+import 
org.apache.iotdb.db.queryengine.plan.planner.memory.PipelineMemoryEstimator;
 import org.apache.iotdb.db.queryengine.plan.planner.plan.node.PlanNode;
 import org.apache.iotdb.db.schemaengine.schemaregion.ISchemaRegion;
 import org.apache.iotdb.db.utils.SetThreadName;
@@ -82,8 +83,11 @@ public class LocalExecutionPlanner {
     // TODO Replace operator with operatorFactory to build multiple driver for 
one pipeline
     Operator root = plan.accept(new OperatorTreeGenerator(), context);
 
+    PipelineMemoryEstimator memoryEstimator =
+        context.constructPipelineMemoryEstimator(root, null, plan, -1);
+
     // check whether current free memory is enough to execute current query
-    long estimatedMemorySize = checkMemory(root, 
instanceContext.getStateMachine());
+    long estimatedMemorySize = checkMemory(memoryEstimator, 
instanceContext.getStateMachine());
 
     context.addPipelineDriverFactory(root, context.getDriverContext(), 
estimatedMemorySize);
 
@@ -105,8 +109,11 @@ public class LocalExecutionPlanner {
 
     Operator root = plan.accept(new OperatorTreeGenerator(), context);
 
+    PipelineMemoryEstimator memoryEstimator =
+        context.constructPipelineMemoryEstimator(root, null, plan, -1);
+
     // check whether current free memory is enough to execute current query
-    checkMemory(root, instanceContext.getStateMachine());
+    checkMemory(memoryEstimator, instanceContext.getStateMachine());
 
     context.addPipelineDriverFactory(root, context.getDriverContext(), 0);
 
@@ -116,7 +123,8 @@ public class LocalExecutionPlanner {
     return context.getPipelineDriverFactories();
   }
 
-  private long checkMemory(Operator root, FragmentInstanceStateMachine 
stateMachine)
+  private long checkMemory(
+      final PipelineMemoryEstimator memoryEstimator, 
FragmentInstanceStateMachine stateMachine)
       throws MemoryNotEnoughException {
 
     // if it is disabled, just return
@@ -125,7 +133,7 @@ public class LocalExecutionPlanner {
       return 0;
     }
 
-    long estimatedMemorySize = root.calculateMaxPeekMemoryWithCounter();
+    long estimatedMemorySize = memoryEstimator.calculateEstimatedMemorySize();
     
QueryRelatedResourceMetricSet.getInstance().updateEstimatedMemory(estimatedMemorySize);
 
     synchronized (this) {
diff --git 
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/planner/OperatorTreeGenerator.java
 
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/planner/OperatorTreeGenerator.java
index a6d043f5ad2..8e46e4ec6ed 100644
--- 
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/planner/OperatorTreeGenerator.java
+++ 
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/planner/OperatorTreeGenerator.java
@@ -154,6 +154,7 @@ import 
org.apache.iotdb.db.queryengine.plan.expression.leaf.TimeSeriesOperand;
 import org.apache.iotdb.db.queryengine.plan.expression.leaf.TimestampOperand;
 import 
org.apache.iotdb.db.queryengine.plan.expression.multi.FunctionExpression;
 import 
org.apache.iotdb.db.queryengine.plan.expression.visitor.ColumnTransformerVisitor;
+import 
org.apache.iotdb.db.queryengine.plan.planner.memory.PipelineMemoryEstimatorFactory;
 import 
org.apache.iotdb.db.queryengine.plan.planner.plan.node.ExplainAnalyzeNode;
 import org.apache.iotdb.db.queryengine.plan.planner.plan.node.PlanNode;
 import org.apache.iotdb.db.queryengine.plan.planner.plan.node.PlanNodeId;
@@ -2987,7 +2988,8 @@ public class OperatorTreeGenerator extends 
PlanVisitor<Operator, LocalExecutionP
             subContext.setDegreeOfParallelism(dopForChild);
 
             int originPipeNum = context.getPipelineNumber();
-            Operator sourceOperator = createNewPipelineForChildNode(context, 
subContext, childNode);
+            Operator sourceOperator =
+                createNewPipelineForChildNode(context, subContext, childNode, 
node.getPlanNodeId());
             parentPipelineChildren.add(sourceOperator);
             dopForChild =
                 Math.max(1, dopForChild - (subContext.getPipelineNumber() - 1 
- originPipeNum));
@@ -3029,7 +3031,8 @@ public class OperatorTreeGenerator extends 
PlanVisitor<Operator, LocalExecutionP
           }
 
           Operator sourceOperator =
-              createNewPipelineForChildNode(context, subContext, 
partialParentNode);
+              createNewPipelineForChildNode(
+                  context, subContext, partialParentNode, 
node.getPlanNodeId());
           parentPipelineChildren.add(sourceOperator);
           afterwardsNodes.add(partialParentNode);
           finalExchangeNum += subContext.getExchangeSumNum() - 
context.getExchangeSumNum() + 1;
@@ -3091,7 +3094,10 @@ public class OperatorTreeGenerator extends 
PlanVisitor<Operator, LocalExecutionP
   }
 
   private Operator createNewPipelineForChildNode(
-      LocalExecutionPlanContext context, LocalExecutionPlanContext subContext, 
PlanNode childNode) {
+      LocalExecutionPlanContext context,
+      LocalExecutionPlanContext subContext,
+      PlanNode childNode,
+      PlanNodeId parentNodeId) {
     Operator childOperation = childNode.accept(this, subContext);
     ISinkChannel localSinkChannel =
         MPP_DATA_EXCHANGE_MANAGER.createLocalSinkChannelForPipeline(
@@ -3099,6 +3105,7 @@ public class OperatorTreeGenerator extends 
PlanVisitor<Operator, LocalExecutionP
             subContext.getDriverContext(), childNode.getPlanNodeId().getId());
     subContext.setISink(localSinkChannel);
     subContext.addPipelineDriverFactory(childOperation, 
subContext.getDriverContext(), 0);
+    subContext.constructPipelineMemoryEstimator(childOperation, parentNodeId, 
childNode, -1);
 
     ExchangeOperator sourceOperator =
         new ExchangeOperator(
@@ -3118,6 +3125,7 @@ public class OperatorTreeGenerator extends 
PlanVisitor<Operator, LocalExecutionP
 
   public List<Operator> dealWithConsumeChildrenOneByOneNode(
       PlanNode node, LocalExecutionPlanContext context) {
+    
checkArgument(PipelineMemoryEstimatorFactory.isConsumeChildrenOneByOneNode(node));
     List<Operator> parentPipelineChildren = new ArrayList<>();
     int originExchangeNum = context.getExchangeSumNum();
     int finalExchangeNum = context.getExchangeSumNum();
@@ -3162,6 +3170,7 @@ public class OperatorTreeGenerator extends 
PlanVisitor<Operator, LocalExecutionP
           // actually is dop
           int curChildPipelineNum =
               Math.min(dopForChild, subContext.getPipelineNumber() - 
originPipeNum);
+
           childPipelineNums.add(curChildPipelineNum);
           sumOfChildPipelines += curChildPipelineNum;
           // If sumOfChildPipelines > dopForChild, we have to wait until some 
pipelines finish
@@ -3182,6 +3191,12 @@ public class OperatorTreeGenerator extends 
PlanVisitor<Operator, LocalExecutionP
             }
           }
 
+          subContext.constructPipelineMemoryEstimator(
+              childOperation,
+              node.getPlanNodeId(),
+              childNode,
+              dependencyChildNode == 0 ? -1 : dependencyPipeId);
+
           ExchangeOperator sourceOperator =
               new ExchangeOperator(
                   context
diff --git 
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/planner/memory/ConsumeAllChildrenPipelineMemoryEstimator.java
 
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/planner/memory/ConsumeAllChildrenPipelineMemoryEstimator.java
new file mode 100644
index 00000000000..d31c5873d60
--- /dev/null
+++ 
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/planner/memory/ConsumeAllChildrenPipelineMemoryEstimator.java
@@ -0,0 +1,38 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.iotdb.db.queryengine.plan.planner.memory;
+
+import org.apache.iotdb.db.queryengine.execution.operator.Operator;
+
+public class ConsumeAllChildrenPipelineMemoryEstimator extends 
PipelineMemoryEstimator {
+
+  public ConsumeAllChildrenPipelineMemoryEstimator(
+      final Operator root, final int dependencyPipelineIndex) {
+    super(root, dependencyPipelineIndex);
+  }
+
+  @Override
+  public long calculateEstimatedMemorySize() {
+    return root.calculateMaxPeekMemoryWithCounter()
+        + children.stream()
+            .map(PipelineMemoryEstimator::calculateEstimatedMemorySize)
+            .reduce(0L, Long::sum);
+  }
+}
diff --git 
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/planner/memory/ConsumeChildrenOneByOnePipelineMemoryEstimator.java
 
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/planner/memory/ConsumeChildrenOneByOnePipelineMemoryEstimator.java
new file mode 100644
index 00000000000..71c431fea77
--- /dev/null
+++ 
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/planner/memory/ConsumeChildrenOneByOnePipelineMemoryEstimator.java
@@ -0,0 +1,74 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.iotdb.db.queryengine.plan.planner.memory;
+
+import org.apache.iotdb.commons.utils.TestOnly;
+import org.apache.iotdb.db.queryengine.execution.operator.Operator;
+
+public class ConsumeChildrenOneByOnePipelineMemoryEstimator extends 
PipelineMemoryEstimator {
+
+  private long concurrentRunningChildrenNum = -1;
+
+  private boolean concurrentRunningChildrenNumInitialized = false;
+
+  public ConsumeChildrenOneByOnePipelineMemoryEstimator(
+      final Operator root, final int dependencyPipelineIndex) {
+    super(root, dependencyPipelineIndex);
+  }
+
+  @Override
+  public long calculateEstimatedMemorySize() {
+    // EstimatedSize = root.calculateMaxPeekMemoryWithCounter() + 
sum(children's estimated size) *
+    // runningChildrenNum / children.size() + retainedSize of not running 
children
+    return children.isEmpty()
+        ? root.calculateMaxPeekMemoryWithCounter()
+        : (long)
+            (root.calculateMaxPeekMemoryWithCounter()
+                + children.stream()
+                        
.map(PipelineMemoryEstimator::calculateEstimatedMemorySize)
+                        .reduce(0L, Long::sum)
+                    / (double) (children.size())
+                    * getConcurrentRunningChildrenNum()
+                + children.stream()
+                        
.map(PipelineMemoryEstimator::calculateRetainedMemorySize)
+                        .reduce(0L, Long::sum)
+                    / (double) (children.size())
+                    * (children.size() - getConcurrentRunningChildrenNum()));
+  }
+
+  private long getConcurrentRunningChildrenNum() {
+    if (concurrentRunningChildrenNumInitialized) {
+      return concurrentRunningChildrenNum;
+    }
+    concurrentRunningChildrenNum =
+        children.stream()
+            .filter(memoryEstimator -> 
memoryEstimator.getDependencyPipelineIndex() == -1)
+            .count();
+    concurrentRunningChildrenNumInitialized = true;
+    return concurrentRunningChildrenNum;
+  }
+
+  @TestOnly
+  public long getConcurrentRunningChildrenNumForTest() {
+    return children.stream()
+        .filter(memoryEstimator -> 
memoryEstimator.getDependencyPipelineIndex() == -1)
+        .count();
+  }
+}
diff --git 
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/planner/memory/PipelineMemoryEstimator.java
 
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/planner/memory/PipelineMemoryEstimator.java
new file mode 100644
index 00000000000..cdf16571ca3
--- /dev/null
+++ 
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/planner/memory/PipelineMemoryEstimator.java
@@ -0,0 +1,68 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.iotdb.db.queryengine.plan.planner.memory;
+
+import org.apache.iotdb.commons.utils.TestOnly;
+import org.apache.iotdb.db.queryengine.execution.operator.Operator;
+
+import java.util.LinkedList;
+import java.util.List;
+
+public abstract class PipelineMemoryEstimator {
+
+  protected final List<PipelineMemoryEstimator> children;
+
+  protected final Operator root;
+
+  protected final int dependencyPipelineIndex;
+
+  protected PipelineMemoryEstimator(final Operator root, final int 
dependencyPipelineIndex) {
+    this.root = root;
+    this.dependencyPipelineIndex = dependencyPipelineIndex;
+    this.children = new LinkedList<>();
+  }
+  /** Calculate the estimated memory size of the pipeline. */
+  public abstract long calculateEstimatedMemorySize();
+
+  protected long calculateRetainedMemorySize() {
+    return root.calculateRetainedSizeAfterCallingNext()
+        + children.stream()
+            .map(PipelineMemoryEstimator::calculateRetainedMemorySize)
+            .reduce(0L, Long::sum);
+  }
+
+  public void addChildren(final List<PipelineMemoryEstimator> child) {
+    children.addAll(child);
+  }
+
+  protected int getDependencyPipelineIndex() {
+    return dependencyPipelineIndex;
+  }
+
+  @TestOnly
+  public List<PipelineMemoryEstimator> getChildren() {
+    return children;
+  }
+
+  @TestOnly
+  public Operator getRoot() {
+    return root;
+  }
+}
diff --git 
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/planner/memory/PipelineMemoryEstimatorFactory.java
 
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/planner/memory/PipelineMemoryEstimatorFactory.java
new file mode 100644
index 00000000000..ea637b586e4
--- /dev/null
+++ 
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/planner/memory/PipelineMemoryEstimatorFactory.java
@@ -0,0 +1,51 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.iotdb.db.queryengine.plan.planner.memory;
+
+import org.apache.iotdb.db.queryengine.execution.operator.Operator;
+import org.apache.iotdb.db.queryengine.plan.planner.plan.node.PlanNode;
+
+public class PipelineMemoryEstimatorFactory {
+
+  private PipelineMemoryEstimatorFactory() {
+    // hidden constructor
+  }
+
+  public static PipelineMemoryEstimator createPipelineMemoryEstimator(
+      final Operator rootOperator, final PlanNode root, final int 
dependencyPipelineIndex) {
+    return isConsumeChildrenOneByOneNode(root)
+        ? new ConsumeChildrenOneByOnePipelineMemoryEstimator(rootOperator, 
dependencyPipelineIndex)
+        : new ConsumeAllChildrenPipelineMemoryEstimator(rootOperator, 
dependencyPipelineIndex);
+  }
+
+  public static boolean isConsumeChildrenOneByOneNode(final PlanNode node) {
+    switch (node.getType()) {
+      case SCHEMA_QUERY_MERGE:
+      case COUNT_MERGE:
+      case DEVICE_VIEW:
+      case IDENTITY_SINK:
+      case SCHEMA_FETCH_MERGE:
+      case LAST_QUERY_COLLECT:
+        return true;
+      default:
+        return false;
+    }
+  }
+}
diff --git 
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/planner/plan/node/PlanNodeType.java
 
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/planner/plan/node/PlanNodeType.java
index bda66843a97..6830a369a6d 100644
--- 
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/planner/plan/node/PlanNodeType.java
+++ 
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/planner/plan/node/PlanNodeType.java
@@ -209,6 +209,8 @@ public enum PlanNodeType {
   EXPLAIN_ANALYZE((short) 90),
 
   PIPE_OPERATE_SCHEMA_QUEUE_REFERENCE((short) 91),
+
+  FULL_OUTER_TIME_JOIN_REFERENCE((short) 92),
   ;
 
   public static final int BYTES = Short.BYTES;
diff --git 
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/planner/plan/node/process/join/FullOuterTimeJoinNode.java
 
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/planner/plan/node/process/join/FullOuterTimeJoinNode.java
index 750bc2bfc00..5ca6c352da1 100644
--- 
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/planner/plan/node/process/join/FullOuterTimeJoinNode.java
+++ 
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/planner/plan/node/process/join/FullOuterTimeJoinNode.java
@@ -140,4 +140,9 @@ public class FullOuterTimeJoinNode extends 
MultiChildProcessNode {
   public int hashCode() {
     return Objects.hash(super.hashCode(), mergeOrder);
   }
+
+  @Override
+  public PlanNodeType getType() {
+    return PlanNodeType.FULL_OUTER_TIME_JOIN;
+  }
 }
diff --git 
a/iotdb-core/datanode/src/test/java/org/apache/iotdb/db/queryengine/plan/planner/PipelineBuilderTest.java
 
b/iotdb-core/datanode/src/test/java/org/apache/iotdb/db/queryengine/plan/planner/PipelineBuilderTest.java
index 2ec572ede00..c7005a696a3 100644
--- 
a/iotdb-core/datanode/src/test/java/org/apache/iotdb/db/queryengine/plan/planner/PipelineBuilderTest.java
+++ 
b/iotdb-core/datanode/src/test/java/org/apache/iotdb/db/queryengine/plan/planner/PipelineBuilderTest.java
@@ -34,11 +34,15 @@ import 
org.apache.iotdb.db.queryengine.execution.fragment.FragmentInstanceContex
 import 
org.apache.iotdb.db.queryengine.execution.fragment.FragmentInstanceStateMachine;
 import org.apache.iotdb.db.queryengine.execution.operator.Operator;
 import 
org.apache.iotdb.db.queryengine.execution.operator.process.SingleDeviceViewOperator;
+import 
org.apache.iotdb.db.queryengine.execution.operator.process.join.FullOuterTimeJoinOperator;
 import 
org.apache.iotdb.db.queryengine.execution.operator.source.AlignedSeriesScanOperator;
 import 
org.apache.iotdb.db.queryengine.execution.operator.source.ExchangeOperator;
 import 
org.apache.iotdb.db.queryengine.execution.operator.source.SeriesScanOperator;
 import org.apache.iotdb.db.queryengine.plan.analyze.TypeProvider;
 import org.apache.iotdb.db.queryengine.plan.expression.leaf.TimeSeriesOperand;
+import 
org.apache.iotdb.db.queryengine.plan.planner.memory.ConsumeAllChildrenPipelineMemoryEstimator;
+import 
org.apache.iotdb.db.queryengine.plan.planner.memory.ConsumeChildrenOneByOnePipelineMemoryEstimator;
+import 
org.apache.iotdb.db.queryengine.plan.planner.memory.PipelineMemoryEstimator;
 import org.apache.iotdb.db.queryengine.plan.planner.plan.node.PlanNode;
 import org.apache.iotdb.db.queryengine.plan.planner.plan.node.PlanNodeId;
 import 
org.apache.iotdb.db.queryengine.plan.planner.plan.node.process.AggregationNode;
@@ -48,6 +52,7 @@ import 
org.apache.iotdb.db.queryengine.plan.planner.plan.node.process.SingleDevi
 import org.apache.iotdb.db.queryengine.plan.planner.plan.node.process.TopKNode;
 import 
org.apache.iotdb.db.queryengine.plan.planner.plan.node.process.join.FullOuterTimeJoinNode;
 import 
org.apache.iotdb.db.queryengine.plan.planner.plan.node.process.join.LeftOuterTimeJoinNode;
+import 
org.apache.iotdb.db.queryengine.plan.planner.plan.node.sink.IdentitySinkNode;
 import 
org.apache.iotdb.db.queryengine.plan.planner.plan.node.source.AlignedSeriesScanNode;
 import 
org.apache.iotdb.db.queryengine.plan.planner.plan.node.source.SeriesAggregationScanNode;
 import 
org.apache.iotdb.db.queryengine.plan.planner.plan.node.source.SeriesScanNode;
@@ -72,6 +77,7 @@ import java.util.concurrent.ExecutorService;
 import static 
org.apache.iotdb.db.queryengine.execution.fragment.FragmentInstanceContext.createFragmentInstanceContext;
 import static 
org.apache.iotdb.db.queryengine.plan.statement.component.OrderByKey.DEVICE;
 import static org.junit.Assert.assertEquals;
+import static org.junit.Assert.fail;
 
 public class PipelineBuilderTest {
 
@@ -108,6 +114,18 @@ public class PipelineBuilderTest {
 
     // Validate the number exchange operator
     assertEquals(0, context.getExchangeSumNum());
+
+    // Validate PipelineMemoryEstimator
+    try (Operator root = fullOuterTimeJoinNode.accept(operatorTreeGenerator, 
context)) {
+      PipelineMemoryEstimator pipelineMemoryEstimator =
+          context.constructPipelineMemoryEstimator(root, null, 
fullOuterTimeJoinNode, -1);
+      assertEquals(
+          root.calculateMaxPeekMemoryWithCounter(),
+          pipelineMemoryEstimator.calculateEstimatedMemorySize());
+    } catch (Exception e) {
+      e.printStackTrace();
+      fail();
+    }
   }
 
   /**
@@ -160,6 +178,38 @@ public class PipelineBuilderTest {
 
     // Validate the number exchange operator
     assertEquals(1, context.getExchangeSumNum());
+
+    // Validate PipelineMemoryEstimator
+    try (Operator root =
+        fullOuterTimeJoinNode.accept(
+            operatorTreeGenerator, 
createLocalExecutionPlanContext(typeProvider))) {
+      PipelineMemoryEstimator rootPipelineMemoryEstimator =
+          context.constructPipelineMemoryEstimator(root, null, 
fullOuterTimeJoinNode, -1);
+      assertEquals(
+          ConsumeAllChildrenPipelineMemoryEstimator.class, 
rootPipelineMemoryEstimator.getClass());
+      // test calculateEstimatedMemory
+      assertEquals(
+          root.calculateMaxPeekMemoryWithCounter()
+              + rootPipelineMemoryEstimator.getChildren().stream()
+                  .map(PipelineMemoryEstimator::calculateEstimatedMemorySize)
+                  .reduce(0L, Long::sum),
+          rootPipelineMemoryEstimator.calculateEstimatedMemorySize());
+
+      List<PipelineMemoryEstimator> childrenPipelineMemoryEstimators =
+          rootPipelineMemoryEstimator.getChildren();
+      assertEquals(1, childrenPipelineMemoryEstimators.size());
+      for (int i = 0; i < 1; i++) {
+        assertEquals(
+            ConsumeAllChildrenPipelineMemoryEstimator.class,
+            childrenPipelineMemoryEstimators.get(i).getClass());
+        assertEquals(
+            FullOuterTimeJoinOperator.class,
+            childrenPipelineMemoryEstimators.get(i).getRoot().getClass());
+      }
+    } catch (Exception e) {
+      e.printStackTrace();
+      fail();
+    }
   }
 
   /**
@@ -218,6 +268,36 @@ public class PipelineBuilderTest {
 
     // Validate the number exchange operator
     assertEquals(2, context.getExchangeSumNum());
+
+    // Validate PipelineMemoryEstimator
+    try (Operator root =
+        fullOuterTimeJoinNode.accept(
+            operatorTreeGenerator, 
createLocalExecutionPlanContext(typeProvider))) {
+      PipelineMemoryEstimator rootPipelineMemoryEstimator =
+          context.constructPipelineMemoryEstimator(root, null, 
fullOuterTimeJoinNode, -1);
+      assertEquals(
+          ConsumeAllChildrenPipelineMemoryEstimator.class, 
rootPipelineMemoryEstimator.getClass());
+      List<PipelineMemoryEstimator> childrenPipelineMemoryEstimators =
+          rootPipelineMemoryEstimator.getChildren();
+      assertEquals(2, childrenPipelineMemoryEstimators.size());
+      for (int i = 0; i < 2; i++) {
+        assertEquals(
+            ConsumeAllChildrenPipelineMemoryEstimator.class,
+            childrenPipelineMemoryEstimators.get(i).getClass());
+        if (i == 0) {
+          assertEquals(
+              SeriesScanOperator.class,
+              childrenPipelineMemoryEstimators.get(i).getRoot().getClass());
+        } else {
+          assertEquals(
+              FullOuterTimeJoinOperator.class,
+              childrenPipelineMemoryEstimators.get(i).getRoot().getClass());
+        }
+      }
+    } catch (Exception e) {
+      e.printStackTrace();
+      fail();
+    }
   }
 
   /**
@@ -275,6 +355,29 @@ public class PipelineBuilderTest {
 
     // Validate the number exchange operator
     assertEquals(3, context.getExchangeSumNum());
+
+    // Validate PipelineMemoryEstimator
+    try (Operator root =
+        fullOuterTimeJoinNode.accept(
+            operatorTreeGenerator, 
createLocalExecutionPlanContext(typeProvider))) {
+      PipelineMemoryEstimator rootPipelineMemoryEstimator =
+          context.constructPipelineMemoryEstimator(root, null, 
fullOuterTimeJoinNode, -1);
+      assertEquals(
+          ConsumeAllChildrenPipelineMemoryEstimator.class, 
rootPipelineMemoryEstimator.getClass());
+      List<PipelineMemoryEstimator> childrenPipelineMemoryEstimators =
+          rootPipelineMemoryEstimator.getChildren();
+      assertEquals(3, childrenPipelineMemoryEstimators.size());
+      for (int i = 0; i < 3; i++) {
+        assertEquals(
+            ConsumeAllChildrenPipelineMemoryEstimator.class,
+            childrenPipelineMemoryEstimators.get(i).getClass());
+        assertEquals(
+            SeriesScanOperator.class, 
childrenPipelineMemoryEstimators.get(i).getRoot().getClass());
+      }
+    } catch (Exception e) {
+      e.printStackTrace();
+      fail();
+    }
   }
 
   /**
@@ -337,6 +440,29 @@ public class PipelineBuilderTest {
 
     // Validate the number exchange operator
     assertEquals(4, context.getExchangeSumNum());
+
+    // Validate PipelineMemoryEstimator
+    try (Operator root =
+        fullOuterTimeJoinNode.accept(
+            operatorTreeGenerator, 
createLocalExecutionPlanContext(typeProvider))) {
+      PipelineMemoryEstimator rootPipelineMemoryEstimator =
+          context.constructPipelineMemoryEstimator(root, null, 
fullOuterTimeJoinNode, -1);
+      assertEquals(
+          ConsumeAllChildrenPipelineMemoryEstimator.class, 
rootPipelineMemoryEstimator.getClass());
+      List<PipelineMemoryEstimator> childrenPipelineMemoryEstimators =
+          rootPipelineMemoryEstimator.getChildren();
+      assertEquals(4, childrenPipelineMemoryEstimators.size());
+      for (int i = 0; i < 4; i++) {
+        assertEquals(
+            ConsumeAllChildrenPipelineMemoryEstimator.class,
+            childrenPipelineMemoryEstimators.get(i).getClass());
+        assertEquals(
+            SeriesScanOperator.class, 
childrenPipelineMemoryEstimators.get(i).getRoot().getClass());
+      }
+    } catch (Exception e) {
+      e.printStackTrace();
+      fail();
+    }
   }
 
   /**
@@ -399,6 +525,29 @@ public class PipelineBuilderTest {
 
     // Validate the number exchange operator
     assertEquals(4, context.getExchangeSumNum());
+
+    // Validate PipelineMemoryEstimator
+    try (Operator root =
+        fullOuterTimeJoinNode.accept(
+            operatorTreeGenerator, 
createLocalExecutionPlanContext(typeProvider))) {
+      PipelineMemoryEstimator rootPipelineMemoryEstimator =
+          context.constructPipelineMemoryEstimator(root, null, 
fullOuterTimeJoinNode, -1);
+      assertEquals(
+          ConsumeAllChildrenPipelineMemoryEstimator.class, 
rootPipelineMemoryEstimator.getClass());
+      List<PipelineMemoryEstimator> childrenPipelineMemoryEstimators =
+          rootPipelineMemoryEstimator.getChildren();
+      assertEquals(4, childrenPipelineMemoryEstimators.size());
+      for (int i = 0; i < 4; i++) {
+        assertEquals(
+            ConsumeAllChildrenPipelineMemoryEstimator.class,
+            childrenPipelineMemoryEstimators.get(i).getClass());
+        assertEquals(
+            SeriesScanOperator.class, 
childrenPipelineMemoryEstimators.get(i).getRoot().getClass());
+      }
+    } catch (Exception e) {
+      e.printStackTrace();
+      fail();
+    }
   }
 
   /**
@@ -479,6 +628,21 @@ public class PipelineBuilderTest {
 
     // Validate the number exchange operator
     assertEquals(0, context.getExchangeSumNum());
+
+    // Validate PipelineMemoryEstimator
+    try {
+      PipelineMemoryEstimator rootPipelineMemoryEstimator =
+          context.constructPipelineMemoryEstimator(null, null, deviceViewNode, 
-1);
+      assertEquals(
+          ConsumeChildrenOneByOnePipelineMemoryEstimator.class,
+          rootPipelineMemoryEstimator.getClass());
+      List<PipelineMemoryEstimator> childrenPipelineMemoryEstimators =
+          rootPipelineMemoryEstimator.getChildren();
+      assertEquals(0, childrenPipelineMemoryEstimators.size());
+    } catch (Exception e) {
+      e.printStackTrace();
+      fail();
+    }
   }
 
   /**
@@ -535,6 +699,29 @@ public class PipelineBuilderTest {
 
     // Validate the number exchange operator
     assertEquals(1, context.getExchangeSumNum());
+
+    // Validate PipelineMemoryEstimator
+    try {
+      PipelineMemoryEstimator rootPipelineMemoryEstimator =
+          context.constructPipelineMemoryEstimator(null, null, deviceViewNode, 
-1);
+      assertEquals(
+          ConsumeChildrenOneByOnePipelineMemoryEstimator.class,
+          rootPipelineMemoryEstimator.getClass());
+      List<PipelineMemoryEstimator> childrenPipelineMemoryEstimators =
+          rootPipelineMemoryEstimator.getChildren();
+      assertEquals(4, childrenPipelineMemoryEstimators.size());
+      for (int i = 0; i < 4; i++) {
+        assertEquals(
+            ConsumeAllChildrenPipelineMemoryEstimator.class,
+            childrenPipelineMemoryEstimators.get(i).getClass());
+        assertEquals(
+            AlignedSeriesScanOperator.class,
+            childrenPipelineMemoryEstimators.get(i).getRoot().getClass());
+      }
+    } catch (Exception e) {
+      e.printStackTrace();
+      fail();
+    }
   }
 
   /**
@@ -591,6 +778,29 @@ public class PipelineBuilderTest {
 
     // Validate the number exchange operator
     assertEquals(2, context.getExchangeSumNum());
+
+    // Validate PipelineMemoryEstimator
+    try {
+      PipelineMemoryEstimator rootPipelineMemoryEstimator =
+          context.constructPipelineMemoryEstimator(null, null, deviceViewNode, 
-1);
+      assertEquals(
+          ConsumeChildrenOneByOnePipelineMemoryEstimator.class,
+          rootPipelineMemoryEstimator.getClass());
+      List<PipelineMemoryEstimator> childrenPipelineMemoryEstimators =
+          rootPipelineMemoryEstimator.getChildren();
+      assertEquals(4, childrenPipelineMemoryEstimators.size());
+      for (int i = 0; i < 4; i++) {
+        assertEquals(
+            ConsumeAllChildrenPipelineMemoryEstimator.class,
+            childrenPipelineMemoryEstimators.get(i).getClass());
+        assertEquals(
+            AlignedSeriesScanOperator.class,
+            childrenPipelineMemoryEstimators.get(i).getRoot().getClass());
+      }
+    } catch (Exception e) {
+      e.printStackTrace();
+      fail();
+    }
   }
 
   /**
@@ -647,6 +857,30 @@ public class PipelineBuilderTest {
 
     // Validate the number exchange operator
     assertEquals(3, context.getExchangeSumNum());
+
+    // Validate PipelineMemoryEstimator
+    try {
+      PipelineMemoryEstimator rootPipelineMemoryEstimator =
+          context.constructPipelineMemoryEstimator(null, null, deviceViewNode, 
-1);
+      assertEquals(
+          ConsumeChildrenOneByOnePipelineMemoryEstimator.class,
+          rootPipelineMemoryEstimator.getClass());
+
+      List<PipelineMemoryEstimator> childrenPipelineMemoryEstimators =
+          rootPipelineMemoryEstimator.getChildren();
+      assertEquals(4, childrenPipelineMemoryEstimators.size());
+      for (int i = 0; i < 4; i++) {
+        assertEquals(
+            ConsumeAllChildrenPipelineMemoryEstimator.class,
+            childrenPipelineMemoryEstimators.get(i).getClass());
+        assertEquals(
+            AlignedSeriesScanOperator.class,
+            childrenPipelineMemoryEstimators.get(i).getRoot().getClass());
+      }
+    } catch (Exception e) {
+      e.printStackTrace();
+      fail();
+    }
   }
 
   /**
@@ -703,6 +937,30 @@ public class PipelineBuilderTest {
 
     // Validate the number exchange operator
     assertEquals(4, context.getExchangeSumNum());
+
+    // Validate PipelineMemoryEstimator
+    try {
+      PipelineMemoryEstimator rootPipelineMemoryEstimator =
+          context.constructPipelineMemoryEstimator(null, null, deviceViewNode, 
-1);
+      assertEquals(
+          ConsumeChildrenOneByOnePipelineMemoryEstimator.class,
+          rootPipelineMemoryEstimator.getClass());
+
+      List<PipelineMemoryEstimator> childrenPipelineMemoryEstimators =
+          rootPipelineMemoryEstimator.getChildren();
+      assertEquals(4, childrenPipelineMemoryEstimators.size());
+      for (int i = 0; i < 4; i++) {
+        assertEquals(
+            ConsumeAllChildrenPipelineMemoryEstimator.class,
+            childrenPipelineMemoryEstimators.get(i).getClass());
+        assertEquals(
+            AlignedSeriesScanOperator.class,
+            childrenPipelineMemoryEstimators.get(i).getRoot().getClass());
+      }
+    } catch (Exception e) {
+      e.printStackTrace();
+      fail();
+    }
   }
 
   /**
@@ -759,6 +1017,30 @@ public class PipelineBuilderTest {
 
     // Validate the number exchange operator
     assertEquals(4, context.getExchangeSumNum());
+
+    // Validate PipelineMemoryEstimator
+    try {
+      PipelineMemoryEstimator rootPipelineMemoryEstimator =
+          context.constructPipelineMemoryEstimator(null, null, deviceViewNode, 
-1);
+      assertEquals(
+          ConsumeChildrenOneByOnePipelineMemoryEstimator.class,
+          rootPipelineMemoryEstimator.getClass());
+
+      List<PipelineMemoryEstimator> childrenPipelineMemoryEstimators =
+          rootPipelineMemoryEstimator.getChildren();
+      assertEquals(4, childrenPipelineMemoryEstimators.size());
+      for (int i = 0; i < 4; i++) {
+        assertEquals(
+            ConsumeAllChildrenPipelineMemoryEstimator.class,
+            childrenPipelineMemoryEstimators.get(i).getClass());
+        assertEquals(
+            AlignedSeriesScanOperator.class,
+            childrenPipelineMemoryEstimators.get(i).getRoot().getClass());
+      }
+    } catch (Exception e) {
+      e.printStackTrace();
+      fail();
+    }
   }
 
   /**
@@ -1038,6 +1320,268 @@ public class PipelineBuilderTest {
     assertEquals(3, context.getExchangeSumNum());
   }
 
+  @Test
+  public void testIdentitySinkNodeMemoryEstimatorWithDop1() throws 
IllegalPathException {
+    TypeProvider typeProvider = new TypeProvider();
+    IdentitySinkNode identitySinkNode = initIdentitySinkNode(typeProvider);
+    LocalExecutionPlanContext context = 
createLocalExecutionPlanContext(typeProvider);
+    context.setDegreeOfParallelism(1);
+
+    // Validate PipelineMemoryEstimator
+    try (Operator root = identitySinkNode.accept(operatorTreeGenerator, 
context)) {
+      PipelineMemoryEstimator rootPipelineMemoryEstimator =
+          context.constructPipelineMemoryEstimator(root, null, 
identitySinkNode, -1);
+      assertEquals(
+          ConsumeChildrenOneByOnePipelineMemoryEstimator.class,
+          rootPipelineMemoryEstimator.getClass());
+      assertEquals(
+          root.calculateMaxPeekMemoryWithCounter(),
+          rootPipelineMemoryEstimator.calculateEstimatedMemorySize());
+
+      List<PipelineMemoryEstimator> childrenPipelineMemoryEstimators =
+          rootPipelineMemoryEstimator.getChildren();
+      assertEquals(0, childrenPipelineMemoryEstimators.size());
+    } catch (Exception e) {
+      e.printStackTrace();
+      fail();
+    }
+  }
+
+  /**
+   * The operator structure is:
+   *
+   * <p>IdentitySinkOperator - [ExchangeOperator1, ExchangeOperator2, 
ExchangeOperator3]
+   *
+   * <p>This test will test dop = 2. Expected result is four pipelines with 
dependency:
+   *
+   * <p>The pipeline0 is: ExchangeOperator1 - FullOuterTimeJoinOperator1.
+   *
+   * <p>The pipeline1 is: ExchangeOperator2 - FullOuterTimeJoinOperator2, 
which has dependency 0.
+   *
+   * <p>The pipeline2 is: ExchangeOperator2 - FullOuterTimeJoinOperator2, 
which has dependency 1.
+   *
+   * <p>The pipeline3 is: IdentitySinkOperator - [ExchangeOperator1, 
ExchangeOperator2,
+   * ExchangeOperator3]
+   */
+  @Test
+  public void testIdentitySinkNodeMemoryEstimatorWithDop2() throws 
IllegalPathException {
+    TypeProvider typeProvider = new TypeProvider();
+    IdentitySinkNode identitySinkNode = initIdentitySinkNode(typeProvider);
+    LocalExecutionPlanContext context =
+        createLocalExecutionPlanContextWithQueryId(typeProvider, new 
QueryId("test_dop2"));
+    context.setDegreeOfParallelism(2);
+
+    // Validate PipelineMemoryEstimator
+    try (Operator root = identitySinkNode.accept(operatorTreeGenerator, 
context)) {
+      PipelineMemoryEstimator rootPipelineMemoryEstimator =
+          context.constructPipelineMemoryEstimator(root, null, 
identitySinkNode, -1);
+      assertEquals(
+          ConsumeChildrenOneByOnePipelineMemoryEstimator.class,
+          rootPipelineMemoryEstimator.getClass());
+      assertEquals(
+          root.calculateMaxPeekMemoryWithCounter()
+              + rootPipelineMemoryEstimator.getChildren().stream()
+                      
.map(PipelineMemoryEstimator::calculateEstimatedMemorySize)
+                      .reduce(0L, Long::sum)
+                  / 3
+              + rootPipelineMemoryEstimator
+                      .getChildren()
+                      .get(1)
+                      .getRoot()
+                      .calculateRetainedSizeAfterCallingNext()
+                  * 2,
+          rootPipelineMemoryEstimator.calculateEstimatedMemorySize());
+
+      List<PipelineMemoryEstimator> childrenPipelineMemoryEstimators =
+          rootPipelineMemoryEstimator.getChildren();
+      assertEquals(3, childrenPipelineMemoryEstimators.size());
+      for (int i = 0; i < 3; i++) {
+        assertEquals(
+            ConsumeAllChildrenPipelineMemoryEstimator.class,
+            childrenPipelineMemoryEstimators.get(i).getClass());
+        assertEquals(
+            FullOuterTimeJoinOperator.class,
+            childrenPipelineMemoryEstimators.get(i).getRoot().getClass());
+        assertEquals(0, 
childrenPipelineMemoryEstimators.get(i).getChildren().size());
+      }
+    } catch (Exception e) {
+      e.printStackTrace();
+      fail();
+    }
+  }
+
+  /**
+   * The operator structure is:
+   *
+   * <p>IdentitySinkOperator - [ExchangeOperator - FullOuterTimeJoinOperator1 -
+   * [SeriesScanOperator1, ExchangeOperator, ExchangeOperator], 
ExchangeOperator -
+   * FullOuterTimeJoinOperator2 - [SeriesScanOperator4, ExchangeOperator, 
ExchangeOperator],
+   * ExchangeOperator - FullOuterTimeJoinOperator3 - [SeriesScanOperator7, 
ExchangeOperator,
+   * ExchangeOperator]]
+   *
+   * <p>This test will test dop = 4. Expected result is ten pipelines with 
dependency:
+   *
+   * <p>The pipeline0 is: ExchangeOperator - SeriesOperator2.
+   *
+   * <p>The pipeline1 is: ExchangeOperator - SeriesOperator3.
+   *
+   * <p>The pipeline2 is: ExchangeOperator - FullOuterTimeJoinOperator1 - 
[SeriesScanOperator1,
+   * ExchangeOperator, ExchangeOperator]
+   *
+   * <p>The pipeline3 is: ExchangeOperator - SeriesOperator5, which has 
dependency 2.
+   *
+   * <p>The pipeline4 is: ExchangeOperator - SeriesOperator6, which has 
dependency 2.
+   *
+   * <p>The pipeline5 is: ExchangeOperator - FullOuterTimeJoinOperator2 - 
[SeriesScanOperator4,
+   * ExchangeOperator, ExchangeOperator], which has dependency 2.
+   *
+   * <p>The pipeline7 is: ExchangeOperator - SeriesOperator5, which has 
dependency 5.
+   *
+   * <p>The pipeline8 is: ExchangeOperator - SeriesOperator6, which has 
dependency 5.
+   *
+   * <p>The pipeline9 is: ExchangeOperator - FullOuterTimeJoinOperator2 - 
[SeriesScanOperator4,
+   * ExchangeOperator, ExchangeOperator], which has dependency 5.
+   *
+   * <p>The pipeline10 is: IdentitySinkOperator - [ExchangeOperator1, 
ExchangeOperator2,
+   * ExchangeOperator3]
+   */
+  @Test
+  public void testIdentitySinkNodeMemoryEstimatorWithDop4() throws 
IllegalPathException {
+    TypeProvider typeProvider = new TypeProvider();
+    IdentitySinkNode identitySinkNode = initIdentitySinkNode(typeProvider);
+    LocalExecutionPlanContext context =
+        createLocalExecutionPlanContextWithQueryId(typeProvider, new 
QueryId("test_dop4"));
+    context.setDegreeOfParallelism(4);
+
+    // Validate PipelineMemoryEstimator
+    try (Operator root = identitySinkNode.accept(operatorTreeGenerator, 
context)) {
+      PipelineMemoryEstimator rootPipelineMemoryEstimator =
+          context.constructPipelineMemoryEstimator(root, null, 
identitySinkNode, -1);
+      assertEquals(
+          ConsumeChildrenOneByOnePipelineMemoryEstimator.class,
+          rootPipelineMemoryEstimator.getClass());
+      assertEquals(
+          1,
+          ((ConsumeChildrenOneByOnePipelineMemoryEstimator) 
rootPipelineMemoryEstimator)
+              .getConcurrentRunningChildrenNumForTest());
+
+      List<PipelineMemoryEstimator> childrenPipelineMemoryEstimators =
+          rootPipelineMemoryEstimator.getChildren();
+      assertEquals(3, childrenPipelineMemoryEstimators.size());
+      for (int i = 0; i < 3; i++) {
+        assertEquals(
+            ConsumeAllChildrenPipelineMemoryEstimator.class,
+            childrenPipelineMemoryEstimators.get(i).getClass());
+        assertEquals(
+            FullOuterTimeJoinOperator.class,
+            childrenPipelineMemoryEstimators.get(i).getRoot().getClass());
+        assertEquals(2, 
childrenPipelineMemoryEstimators.get(i).getChildren().size());
+        for (int j = 0; j < 2; j++) {
+          assertEquals(
+              ConsumeAllChildrenPipelineMemoryEstimator.class,
+              
childrenPipelineMemoryEstimators.get(i).getChildren().get(j).getClass());
+          assertEquals(
+              SeriesScanOperator.class,
+              
childrenPipelineMemoryEstimators.get(i).getChildren().get(j).getRoot().getClass());
+        }
+      }
+    } catch (Exception e) {
+      e.printStackTrace();
+      fail();
+    }
+  }
+
+  @Test
+  public void testIdentitySinkNodeMemoryEstimatorWithDop8() throws 
IllegalPathException {
+    TypeProvider typeProvider = new TypeProvider();
+    IdentitySinkNode identitySinkNode = initIdentitySinkNode(typeProvider);
+    LocalExecutionPlanContext context =
+        createLocalExecutionPlanContextWithQueryId(typeProvider, new 
QueryId("test_dop16"));
+    context.setDegreeOfParallelism(8);
+
+    // Validate PipelineMemoryEstimator
+    try (Operator root = identitySinkNode.accept(operatorTreeGenerator, 
context)) {
+      PipelineMemoryEstimator rootPipelineMemoryEstimator =
+          context.constructPipelineMemoryEstimator(root, null, 
identitySinkNode, -1);
+      assertEquals(
+          ConsumeChildrenOneByOnePipelineMemoryEstimator.class,
+          rootPipelineMemoryEstimator.getClass());
+      assertEquals(
+          1,
+          ((ConsumeChildrenOneByOnePipelineMemoryEstimator) 
rootPipelineMemoryEstimator)
+              .getConcurrentRunningChildrenNumForTest());
+
+      List<PipelineMemoryEstimator> childrenPipelineMemoryEstimators =
+          rootPipelineMemoryEstimator.getChildren();
+      assertEquals(3, childrenPipelineMemoryEstimators.size());
+      for (int i = 0; i < 3; i++) {
+        assertEquals(
+            ConsumeAllChildrenPipelineMemoryEstimator.class,
+            childrenPipelineMemoryEstimators.get(i).getClass());
+        assertEquals(
+            FullOuterTimeJoinOperator.class,
+            childrenPipelineMemoryEstimators.get(i).getRoot().getClass());
+        for (int j = 0; j < 3; j++) {
+          assertEquals(
+              ConsumeAllChildrenPipelineMemoryEstimator.class,
+              
childrenPipelineMemoryEstimators.get(i).getChildren().get(j).getClass());
+          assertEquals(
+              SeriesScanOperator.class,
+              
childrenPipelineMemoryEstimators.get(i).getChildren().get(j).getRoot().getClass());
+        }
+      }
+    } catch (Exception e) {
+      e.printStackTrace();
+      fail();
+    }
+  }
+
+  @Test
+  public void testIdentitySinkNodeMemoryEstimatorWithDop16() throws 
IllegalPathException {
+    TypeProvider typeProvider = new TypeProvider();
+    IdentitySinkNode identitySinkNode = initIdentitySinkNode(typeProvider);
+    LocalExecutionPlanContext context =
+        createLocalExecutionPlanContextWithQueryId(typeProvider, new 
QueryId("test_dop8"));
+    context.setDegreeOfParallelism(16);
+
+    // Validate PipelineMemoryEstimator
+    try (Operator root = identitySinkNode.accept(operatorTreeGenerator, 
context)) {
+      PipelineMemoryEstimator rootPipelineMemoryEstimator =
+          context.constructPipelineMemoryEstimator(root, null, 
identitySinkNode, -1);
+      assertEquals(
+          ConsumeChildrenOneByOnePipelineMemoryEstimator.class,
+          rootPipelineMemoryEstimator.getClass());
+      // all the pipeline under this node will be executed concurrently
+      assertEquals(
+          3,
+          ((ConsumeChildrenOneByOnePipelineMemoryEstimator) 
rootPipelineMemoryEstimator)
+              .getConcurrentRunningChildrenNumForTest());
+
+      List<PipelineMemoryEstimator> childrenPipelineMemoryEstimators =
+          rootPipelineMemoryEstimator.getChildren();
+      assertEquals(3, childrenPipelineMemoryEstimators.size());
+      for (int i = 0; i < 3; i++) {
+        assertEquals(
+            ConsumeAllChildrenPipelineMemoryEstimator.class,
+            childrenPipelineMemoryEstimators.get(i).getClass());
+        assertEquals(
+            FullOuterTimeJoinOperator.class,
+            childrenPipelineMemoryEstimators.get(i).getRoot().getClass());
+        for (int j = 0; j < 3; j++) {
+          assertEquals(
+              ConsumeAllChildrenPipelineMemoryEstimator.class,
+              
childrenPipelineMemoryEstimators.get(i).getChildren().get(j).getClass());
+          assertEquals(
+              SeriesScanOperator.class,
+              
childrenPipelineMemoryEstimators.get(i).getChildren().get(j).getRoot().getClass());
+        }
+      }
+    } catch (Exception e) {
+      e.printStackTrace();
+      fail();
+    }
+  }
+
   private LocalExecutionPlanContext 
createLocalExecutionPlanContext(TypeProvider typeProvider) {
     ExecutorService instanceNotificationExecutor =
         IoTDBThreadPoolFactory.newFixedThreadPool(1, 
"test-instance-notification");
@@ -1056,6 +1600,24 @@ public class PipelineBuilderTest {
         typeProvider, fragmentInstanceContext, new DataNodeQueryContext(1));
   }
 
+  private LocalExecutionPlanContext createLocalExecutionPlanContextWithQueryId(
+      TypeProvider typeProvider, QueryId queryId) {
+    ExecutorService instanceNotificationExecutor =
+        IoTDBThreadPoolFactory.newFixedThreadPool(1, 
"test-instance-notification");
+
+    FragmentInstanceId instanceId =
+        new FragmentInstanceId(new PlanFragmentId(queryId, 0), 
"stub-instance");
+    FragmentInstanceStateMachine stateMachine =
+        new FragmentInstanceStateMachine(instanceId, 
instanceNotificationExecutor);
+    DataRegion dataRegion = Mockito.mock(DataRegion.class);
+    FragmentInstanceContext fragmentInstanceContext =
+        createFragmentInstanceContext(instanceId, stateMachine);
+    fragmentInstanceContext.setDataRegion(dataRegion);
+
+    return new LocalExecutionPlanContext(
+        typeProvider, fragmentInstanceContext, new DataNodeQueryContext(1));
+  }
+
   /**
    * This method will init a timeJoinNode with @childNum seriesScanNode as 
children.
    *
@@ -1168,4 +1730,20 @@ public class PipelineBuilderTest {
     }
     return topKNode;
   }
+
+  private IdentitySinkNode initIdentitySinkNode(TypeProvider typeProvider)
+      throws IllegalPathException {
+    FullOuterTimeJoinNode fullOuterTimeJoinNode1 = 
initFullOuterTimeJoinNode(typeProvider, 3);
+    FullOuterTimeJoinNode fullOuterTimeJoinNode2 = 
initFullOuterTimeJoinNode(typeProvider, 3);
+    FullOuterTimeJoinNode fullOuterTimeJoinNode3 = 
initFullOuterTimeJoinNode(typeProvider, 3);
+    fullOuterTimeJoinNode1.setPlanNodeId(new 
PlanNodeId("FullOuterTimeJoinNode1"));
+    fullOuterTimeJoinNode2.setPlanNodeId(new 
PlanNodeId("FullOuterTimeJoinNode2"));
+    fullOuterTimeJoinNode3.setPlanNodeId(new 
PlanNodeId("FullOuterTimeJoinNode3"));
+
+    IdentitySinkNode identitySinkNode = new IdentitySinkNode(new 
PlanNodeId("IdentitySinkNode"));
+    identitySinkNode.addChild(fullOuterTimeJoinNode1);
+    identitySinkNode.addChild(fullOuterTimeJoinNode2);
+    identitySinkNode.addChild(fullOuterTimeJoinNode3);
+    return identitySinkNode;
+  }
 }

Reply via email to