This is an automated email from the ASF dual-hosted git repository. jackietien pushed a commit to branch MemoryFreeBug1.1 in repository https://gitbox.apache.org/repos/asf/iotdb.git
commit 3d25e7e8770fc0f6049ca29b2c6b402b5bb9c5db Author: Jackie Tien <[email protected]> AuthorDate: Thu Apr 6 08:57:36 2023 +0800 resolve conflicts --- .../db/mpp/execution/driver/DriverContext.java | 10 ---- .../execution/exchange/MPPDataExchangeManager.java | 57 +++++++++++++--------- 2 files changed, 35 insertions(+), 32 deletions(-) diff --git a/server/src/main/java/org/apache/iotdb/db/mpp/execution/driver/DriverContext.java b/server/src/main/java/org/apache/iotdb/db/mpp/execution/driver/DriverContext.java index 58a131545f..9b0ae75f28 100644 --- a/server/src/main/java/org/apache/iotdb/db/mpp/execution/driver/DriverContext.java +++ b/server/src/main/java/org/apache/iotdb/db/mpp/execution/driver/DriverContext.java @@ -29,8 +29,6 @@ import java.util.ArrayList; import java.util.List; import java.util.concurrent.atomic.AtomicBoolean; -import static com.google.common.base.Preconditions.checkArgument; - public class DriverContext { private boolean inputDriver = true; @@ -51,14 +49,6 @@ public class DriverContext { public OperatorContext addOperatorContext( int operatorId, PlanNodeId planNodeId, String operatorType) { - checkArgument(operatorId >= 0, "operatorId is negative"); - - for (OperatorContext operatorContext : operatorContexts) { - checkArgument( - operatorId != operatorContext.getOperatorId(), - "A context already exists for operatorId %s", - operatorId); - } OperatorContext operatorContext = new OperatorContext(operatorId, planNodeId, operatorType, this); diff --git a/server/src/main/java/org/apache/iotdb/db/mpp/execution/exchange/MPPDataExchangeManager.java b/server/src/main/java/org/apache/iotdb/db/mpp/execution/exchange/MPPDataExchangeManager.java index 501781b408..a5370f0191 100644 --- a/server/src/main/java/org/apache/iotdb/db/mpp/execution/exchange/MPPDataExchangeManager.java +++ b/server/src/main/java/org/apache/iotdb/db/mpp/execution/exchange/MPPDataExchangeManager.java @@ -479,11 +479,13 @@ public class MPPDataExchangeManager implements IMPPDataExchangeManager { // FragmentInstanceContext FragmentInstanceContext instanceContext) { - LOGGER.debug( - "Create local sink handle to plan node {} of {} for {}", - remotePlanNodeId, - remoteFragmentInstanceId, - localFragmentInstanceId); + if (LOGGER.isDebugEnabled()) { + LOGGER.debug( + "Create local sink handle to plan node {} of {} for {}", + remotePlanNodeId, + remoteFragmentInstanceId, + localFragmentInstanceId); + } SharedTsBlockQueue queue; Map<String, ISourceHandle> sourceHandleMap = sourceHandles.get(remoteFragmentInstanceId); @@ -509,7 +511,9 @@ public class MPPDataExchangeManager implements IMPPDataExchangeManager { */ public ISinkChannel createLocalSinkChannelForPipeline( DriverContext driverContext, String planNodeId) { - LOGGER.debug("Create local sink handle for {}", driverContext.getDriverTaskID()); + if (LOGGER.isDebugEnabled()) { + LOGGER.debug("Create local sink handle for {}", driverContext.getDriverTaskID()); + } SharedTsBlockQueue queue = new SharedTsBlockQueue( driverContext.getDriverTaskID().getFragmentInstanceId().toThrift(), @@ -531,11 +535,13 @@ public class MPPDataExchangeManager implements IMPPDataExchangeManager { // FragmentInstanceContext FragmentInstanceContext instanceContext) { - LOGGER.debug( - "Create sink handle to plan node {} of {} for {}", - remotePlanNodeId, - remoteFragmentInstanceId, - localFragmentInstanceId); + if (LOGGER.isDebugEnabled()) { + LOGGER.debug( + "Create sink handle to plan node {} of {} for {}", + remotePlanNodeId, + remoteFragmentInstanceId, + localFragmentInstanceId); + } return new SinkChannel( remoteEndpoint, @@ -617,7 +623,9 @@ public class MPPDataExchangeManager implements IMPPDataExchangeManager { */ public ISourceHandle createLocalSourceHandleForPipeline( SharedTsBlockQueue queue, DriverContext context) { - LOGGER.debug("Create local source handle for {}", context.getDriverTaskID()); + if (LOGGER.isDebugEnabled()) { + LOGGER.debug("Create local source handle for {}", context.getDriverTaskID()); + } return new LocalSourceHandle( queue, new PipelineSourceHandleListenerImpl(context::failed), @@ -641,11 +649,14 @@ public class MPPDataExchangeManager implements IMPPDataExchangeManager { + " exists."); } - LOGGER.debug( - "Create local source handle from {} for plan node {} of {}", - remoteFragmentInstanceId, - localPlanNodeId, - localFragmentInstanceId); + if (LOGGER.isDebugEnabled()) { + LOGGER.debug( + "Create local source handle from {} for plan node {} of {}", + remoteFragmentInstanceId, + localPlanNodeId, + localFragmentInstanceId); + } + SharedTsBlockQueue queue; ISinkHandle sinkHandle = shuffleSinkHandles.get(remoteFragmentInstanceId); if (sinkHandle != null) { @@ -686,11 +697,13 @@ public class MPPDataExchangeManager implements IMPPDataExchangeManager { + " exists."); } - LOGGER.debug( - "Create source handle from {} for plan node {} of {}", - remoteFragmentInstanceId, - localPlanNodeId, - localFragmentInstanceId); + if (LOGGER.isDebugEnabled()) { + LOGGER.debug( + "Create source handle from {} for plan node {} of {}", + remoteFragmentInstanceId, + localPlanNodeId, + localFragmentInstanceId); + } SourceHandle sourceHandle = new SourceHandle(
