This is an automated email from the ASF dual-hosted git repository. jackietien pushed a commit to branch MemoryFreeBug in repository https://gitbox.apache.org/repos/asf/iotdb.git
commit 04f518e71162acb85cf0e7d417f1e392214e2751 Author: JackieTien97 <[email protected]> AuthorDate: Tue Apr 4 16:14:18 2023 +0800 Fix a memory free bug and delete useless O(N^2) check in DriverContext --- .../iotdb/db/mpp/common/FragmentInstanceId.java | 2 +- .../db/mpp/execution/driver/DriverContext.java | 10 ---- .../execution/exchange/MPPDataExchangeManager.java | 57 +++++++++++++--------- 3 files changed, 36 insertions(+), 33 deletions(-) diff --git a/server/src/main/java/org/apache/iotdb/db/mpp/common/FragmentInstanceId.java b/server/src/main/java/org/apache/iotdb/db/mpp/common/FragmentInstanceId.java index 24164c4e89..1d5ca26b22 100644 --- a/server/src/main/java/org/apache/iotdb/db/mpp/common/FragmentInstanceId.java +++ b/server/src/main/java/org/apache/iotdb/db/mpp/common/FragmentInstanceId.java @@ -58,7 +58,7 @@ public class FragmentInstanceId { } public String getFragmentInstanceId() { - return fragmentId + "." + instanceId; + return fragmentId.getId() + "." + instanceId; } public String toString() { diff --git a/server/src/main/java/org/apache/iotdb/db/mpp/execution/driver/DriverContext.java b/server/src/main/java/org/apache/iotdb/db/mpp/execution/driver/DriverContext.java index 58a131545f..9b0ae75f28 100644 --- a/server/src/main/java/org/apache/iotdb/db/mpp/execution/driver/DriverContext.java +++ b/server/src/main/java/org/apache/iotdb/db/mpp/execution/driver/DriverContext.java @@ -29,8 +29,6 @@ import java.util.ArrayList; import java.util.List; import java.util.concurrent.atomic.AtomicBoolean; -import static com.google.common.base.Preconditions.checkArgument; - public class DriverContext { private boolean inputDriver = true; @@ -51,14 +49,6 @@ public class DriverContext { public OperatorContext addOperatorContext( int operatorId, PlanNodeId planNodeId, String operatorType) { - checkArgument(operatorId >= 0, "operatorId is negative"); - - for (OperatorContext operatorContext : operatorContexts) { - checkArgument( - operatorId != operatorContext.getOperatorId(), - "A context already exists for operatorId %s", - operatorId); - } OperatorContext operatorContext = new OperatorContext(operatorId, planNodeId, operatorType, this); diff --git a/server/src/main/java/org/apache/iotdb/db/mpp/execution/exchange/MPPDataExchangeManager.java b/server/src/main/java/org/apache/iotdb/db/mpp/execution/exchange/MPPDataExchangeManager.java index 9a58904c79..73729fe0ec 100644 --- a/server/src/main/java/org/apache/iotdb/db/mpp/execution/exchange/MPPDataExchangeManager.java +++ b/server/src/main/java/org/apache/iotdb/db/mpp/execution/exchange/MPPDataExchangeManager.java @@ -485,11 +485,13 @@ public class MPPDataExchangeManager implements IMPPDataExchangeManager { // FragmentInstanceContext FragmentInstanceContext instanceContext) { - LOGGER.debug( - "Create local sink handle to plan node {} of {} for {}", - remotePlanNodeId, - remoteFragmentInstanceId, - localFragmentInstanceId); + if (LOGGER.isDebugEnabled()) { + LOGGER.debug( + "Create local sink handle to plan node {} of {} for {}", + remotePlanNodeId, + remoteFragmentInstanceId, + localFragmentInstanceId); + } SharedTsBlockQueue queue; Map<String, ISourceHandle> sourceHandleMap = sourceHandles.get(remoteFragmentInstanceId); @@ -515,7 +517,9 @@ public class MPPDataExchangeManager implements IMPPDataExchangeManager { */ public ISinkChannel createLocalSinkChannelForPipeline( DriverContext driverContext, String planNodeId) { - LOGGER.debug("Create local sink handle for {}", driverContext.getDriverTaskID()); + if (LOGGER.isDebugEnabled()) { + LOGGER.debug("Create local sink handle for {}", driverContext.getDriverTaskID()); + } SharedTsBlockQueue queue = new SharedTsBlockQueue( driverContext.getDriverTaskID().getFragmentInstanceId().toThrift(), @@ -537,11 +541,13 @@ public class MPPDataExchangeManager implements IMPPDataExchangeManager { // FragmentInstanceContext FragmentInstanceContext instanceContext) { - LOGGER.debug( - "Create sink handle to plan node {} of {} for {}", - remotePlanNodeId, - remoteFragmentInstanceId, - localFragmentInstanceId); + if (LOGGER.isDebugEnabled()) { + LOGGER.debug( + "Create sink handle to plan node {} of {} for {}", + remotePlanNodeId, + remoteFragmentInstanceId, + localFragmentInstanceId); + } return new SinkChannel( remoteEndpoint, @@ -623,7 +629,9 @@ public class MPPDataExchangeManager implements IMPPDataExchangeManager { */ public ISourceHandle createLocalSourceHandleForPipeline( SharedTsBlockQueue queue, DriverContext context) { - LOGGER.debug("Create local source handle for {}", context.getDriverTaskID()); + if (LOGGER.isDebugEnabled()) { + LOGGER.debug("Create local source handle for {}", context.getDriverTaskID()); + } return new LocalSourceHandle( queue, new PipelineSourceHandleListenerImpl(context::failed), @@ -647,11 +655,14 @@ public class MPPDataExchangeManager implements IMPPDataExchangeManager { + " exists."); } - LOGGER.debug( - "Create local source handle from {} for plan node {} of {}", - remoteFragmentInstanceId, - localPlanNodeId, - localFragmentInstanceId); + if (LOGGER.isDebugEnabled()) { + LOGGER.debug( + "Create local source handle from {} for plan node {} of {}", + remoteFragmentInstanceId, + localPlanNodeId, + localFragmentInstanceId); + } + SharedTsBlockQueue queue; ISinkHandle sinkHandle = shuffleSinkHandles.get(remoteFragmentInstanceId); if (sinkHandle != null) { @@ -692,11 +703,13 @@ public class MPPDataExchangeManager implements IMPPDataExchangeManager { + " exists."); } - LOGGER.debug( - "Create source handle from {} for plan node {} of {}", - remoteFragmentInstanceId, - localPlanNodeId, - localFragmentInstanceId); + if (LOGGER.isDebugEnabled()) { + LOGGER.debug( + "Create source handle from {} for plan node {} of {}", + remoteFragmentInstanceId, + localPlanNodeId, + localFragmentInstanceId); + } SourceHandle sourceHandle = new SourceHandle(
