This is an automated email from the ASF dual-hosted git repository.

hui pushed a commit to branch lmh/exchangeMem
in repository https://gitbox.apache.org/repos/asf/iotdb.git

commit d0352bbbef8a0ec45e8de4910923014dec605bb0
Author: Minghui Liu <[email protected]>
AuthorDate: Thu Feb 2 21:34:04 2023 +0800

    add maxReturnSize in ExchangeOperator
---
 .../mpp/execution/operator/source/ExchangeOperator.java | 17 +++++++++++++++--
 .../db/mpp/plan/planner/OperatorTreeGenerator.java      |  3 ++-
 2 files changed, 17 insertions(+), 3 deletions(-)

diff --git 
a/server/src/main/java/org/apache/iotdb/db/mpp/execution/operator/source/ExchangeOperator.java
 
b/server/src/main/java/org/apache/iotdb/db/mpp/execution/operator/source/ExchangeOperator.java
index 7e7498fbc1..1d6a42f11b 100644
--- 
a/server/src/main/java/org/apache/iotdb/db/mpp/execution/operator/source/ExchangeOperator.java
+++ 
b/server/src/main/java/org/apache/iotdb/db/mpp/execution/operator/source/ExchangeOperator.java
@@ -37,6 +37,8 @@ public class ExchangeOperator implements SourceOperator {
 
   private ListenableFuture<?> isBlocked = NOT_BLOCKED;
 
+  private long maxReturnSize = DEFAULT_MAX_TSBLOCK_SIZE_IN_BYTES;
+
   public ExchangeOperator(
       OperatorContext operatorContext, ISourceHandle sourceHandle, PlanNodeId 
sourceId) {
     this.operatorContext = operatorContext;
@@ -44,6 +46,17 @@ public class ExchangeOperator implements SourceOperator {
     this.sourceId = sourceId;
   }
 
+  public ExchangeOperator(
+      OperatorContext operatorContext,
+      ISourceHandle sourceHandle,
+      PlanNodeId sourceId,
+      long maxReturnSize) {
+    this.operatorContext = operatorContext;
+    this.sourceHandle = sourceHandle;
+    this.sourceId = sourceId;
+    this.maxReturnSize = maxReturnSize;
+  }
+
   @Override
   public OperatorContext getOperatorContext() {
     return operatorContext;
@@ -66,12 +79,12 @@ public class ExchangeOperator implements SourceOperator {
 
   @Override
   public long calculateMaxPeekMemory() {
-    return DEFAULT_MAX_TSBLOCK_SIZE_IN_BYTES;
+    return maxReturnSize;
   }
 
   @Override
   public long calculateMaxReturnSize() {
-    return DEFAULT_MAX_TSBLOCK_SIZE_IN_BYTES;
+    return maxReturnSize;
   }
 
   @Override
diff --git 
a/server/src/main/java/org/apache/iotdb/db/mpp/plan/planner/OperatorTreeGenerator.java
 
b/server/src/main/java/org/apache/iotdb/db/mpp/plan/planner/OperatorTreeGenerator.java
index ad6f8b29ca..a98d99a04f 100644
--- 
a/server/src/main/java/org/apache/iotdb/db/mpp/plan/planner/OperatorTreeGenerator.java
+++ 
b/server/src/main/java/org/apache/iotdb/db/mpp/plan/planner/OperatorTreeGenerator.java
@@ -2206,7 +2206,8 @@ public class OperatorTreeGenerator extends 
PlanVisitor<Operator, LocalExecutionP
                 MPP_DATA_EXCHANGE_MANAGER.createLocalSourceHandleForPipeline(
                     ((LocalSinkHandle) 
localSinkHandle).getSharedTsBlockQueue(),
                     context.getDriverContext()),
-                childSource.getPlanNodeId());
+                childSource.getPlanNodeId(),
+                childOperation.calculateMaxReturnSize());
         context
             .getTimeSliceAllocator()
             .recordExecutionWeight(sourceOperator.getOperatorContext(), 1);

Reply via email to