This is an automated email from the ASF dual-hosted git repository. hui pushed a commit to branch lmh/exchangeMem in repository https://gitbox.apache.org/repos/asf/iotdb.git
commit d0352bbbef8a0ec45e8de4910923014dec605bb0 Author: Minghui Liu <[email protected]> AuthorDate: Thu Feb 2 21:34:04 2023 +0800 add maxReturnSize in ExchangeOperator --- .../mpp/execution/operator/source/ExchangeOperator.java | 17 +++++++++++++++-- .../db/mpp/plan/planner/OperatorTreeGenerator.java | 3 ++- 2 files changed, 17 insertions(+), 3 deletions(-) diff --git a/server/src/main/java/org/apache/iotdb/db/mpp/execution/operator/source/ExchangeOperator.java b/server/src/main/java/org/apache/iotdb/db/mpp/execution/operator/source/ExchangeOperator.java index 7e7498fbc1..1d6a42f11b 100644 --- a/server/src/main/java/org/apache/iotdb/db/mpp/execution/operator/source/ExchangeOperator.java +++ b/server/src/main/java/org/apache/iotdb/db/mpp/execution/operator/source/ExchangeOperator.java @@ -37,6 +37,8 @@ public class ExchangeOperator implements SourceOperator { private ListenableFuture<?> isBlocked = NOT_BLOCKED; + private long maxReturnSize = DEFAULT_MAX_TSBLOCK_SIZE_IN_BYTES; + public ExchangeOperator( OperatorContext operatorContext, ISourceHandle sourceHandle, PlanNodeId sourceId) { this.operatorContext = operatorContext; @@ -44,6 +46,17 @@ public class ExchangeOperator implements SourceOperator { this.sourceId = sourceId; } + public ExchangeOperator( + OperatorContext operatorContext, + ISourceHandle sourceHandle, + PlanNodeId sourceId, + long maxReturnSize) { + this.operatorContext = operatorContext; + this.sourceHandle = sourceHandle; + this.sourceId = sourceId; + this.maxReturnSize = maxReturnSize; + } + @Override public OperatorContext getOperatorContext() { return operatorContext; @@ -66,12 +79,12 @@ public class ExchangeOperator implements SourceOperator { @Override public long calculateMaxPeekMemory() { - return DEFAULT_MAX_TSBLOCK_SIZE_IN_BYTES; + return maxReturnSize; } @Override public long calculateMaxReturnSize() { - return DEFAULT_MAX_TSBLOCK_SIZE_IN_BYTES; + return maxReturnSize; } @Override diff --git a/server/src/main/java/org/apache/iotdb/db/mpp/plan/planner/OperatorTreeGenerator.java b/server/src/main/java/org/apache/iotdb/db/mpp/plan/planner/OperatorTreeGenerator.java index ad6f8b29ca..a98d99a04f 100644 --- a/server/src/main/java/org/apache/iotdb/db/mpp/plan/planner/OperatorTreeGenerator.java +++ b/server/src/main/java/org/apache/iotdb/db/mpp/plan/planner/OperatorTreeGenerator.java @@ -2206,7 +2206,8 @@ public class OperatorTreeGenerator extends PlanVisitor<Operator, LocalExecutionP MPP_DATA_EXCHANGE_MANAGER.createLocalSourceHandleForPipeline( ((LocalSinkHandle) localSinkHandle).getSharedTsBlockQueue(), context.getDriverContext()), - childSource.getPlanNodeId()); + childSource.getPlanNodeId(), + childOperation.calculateMaxReturnSize()); context .getTimeSliceAllocator() .recordExecutionWeight(sourceOperator.getOperatorContext(), 1);
