This is an automated email from the ASF dual-hosted git repository. lancelly pushed a commit to branch memoryLeak in repository https://gitbox.apache.org/repos/asf/iotdb.git
commit 11ec38ec7e7d9ac454cf82adf5cef5a065931d93 Author: lancelly <[email protected]> AuthorDate: Wed Jun 19 16:53:26 2024 +0800 fix --- .../execution/exchange/MPPDataExchangeManager.java | 21 +++++++++++++++++---- .../execution/fragment/FragmentInstanceManager.java | 21 ++++++++++----------- 2 files changed, 27 insertions(+), 15 deletions(-) diff --git a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/execution/exchange/MPPDataExchangeManager.java b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/execution/exchange/MPPDataExchangeManager.java index a38ef923bbb..49c17b54d74 100644 --- a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/execution/exchange/MPPDataExchangeManager.java +++ b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/execution/exchange/MPPDataExchangeManager.java @@ -585,11 +585,13 @@ public class MPPDataExchangeManager implements IMPPDataExchangeManager { ExecutorService executorService, IClientManager<TEndPoint, SyncDataNodeMPPDataExchangeServiceClient> mppDataExchangeServiceClientManager) { - this.localMemoryManager = Validate.notNull(localMemoryManager); - this.tsBlockSerdeFactory = Validate.notNull(tsBlockSerdeFactory); - this.executorService = Validate.notNull(executorService); + this.localMemoryManager = Validate.notNull(localMemoryManager, "localMemoryManager is null."); + this.tsBlockSerdeFactory = + Validate.notNull(tsBlockSerdeFactory, "tsBlockSerdeFactory is null."); + this.executorService = Validate.notNull(executorService, "executorService is null."); this.mppDataExchangeServiceClientManager = - Validate.notNull(mppDataExchangeServiceClientManager); + Validate.notNull( + mppDataExchangeServiceClientManager, "mppDataExchangeServiceClientManager is null."); sourceHandles = new ConcurrentHashMap<>(); shuffleSinkHandles = new ConcurrentHashMap<>(); } @@ -607,6 +609,17 @@ public class MPPDataExchangeManager implements IMPPDataExchangeManager { .deRegisterFragmentInstanceFromQueryMemoryMap(queryId, fragmentInstanceId); } + public void removeSourceHandlesAndShuffleSinkHandlesOfFI(TFragmentInstanceId fragmentInstanceId) { + Map<String, ISourceHandle> removedSourceHandles = sourceHandles.remove(fragmentInstanceId); + if (removedSourceHandles != null) { + removedSourceHandles.values().forEach(ISourceHandle::abort); + } + ISinkHandle shuffleSinkHandle = shuffleSinkHandles.remove(fragmentInstanceId); + if (shuffleSinkHandle != null) { + shuffleSinkHandle.abort(); + } + } + public LocalMemoryManager getLocalMemoryManager() { return localMemoryManager; } diff --git a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/execution/fragment/FragmentInstanceManager.java b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/execution/fragment/FragmentInstanceManager.java index bea961c7525..71cb0ab0f98 100644 --- a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/execution/fragment/FragmentInstanceManager.java +++ b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/execution/fragment/FragmentInstanceManager.java @@ -177,6 +177,7 @@ public class FragmentInstanceManager { instance.isExplainAnalyze(), exchangeManager); } catch (Throwable t) { + clearFIRelatedResources(instanceId); logger.warn("error when create FragmentInstanceExecution.", t); stateMachine.failed(t); return null; @@ -205,6 +206,14 @@ public class FragmentInstanceManager { } } + private void clearFIRelatedResources(FragmentInstanceId instanceId) { + // close and remove all the handles of the fragment instance + exchangeManager.removeSourceHandlesAndShuffleSinkHandlesOfFI(instanceId.toThrift()); + // clear MemoryPool + exchangeManager.deRegisterFragmentInstanceFromMemoryPool( + instanceId.getQueryId().getId(), instanceId.getFragmentInstanceId()); + } + private DataNodeQueryContext getOrCreateDataNodeQueryContext(QueryId queryId, int dataNodeFINum) { return dataNodeQueryContextMap.computeIfAbsent( queryId, queryId1 -> new DataNodeQueryContext(dataNodeFINum)); @@ -249,6 +258,7 @@ public class FragmentInstanceManager { false, exchangeManager); } catch (Throwable t) { + clearFIRelatedResources(instanceId); logger.warn("Execute error caused by ", t); stateMachine.failed(t); return null; @@ -269,17 +279,6 @@ public class FragmentInstanceManager { } } - /** Aborts a FragmentInstance. keep FragmentInstanceContext for later state tracking */ - public FragmentInstanceInfo abortFragmentInstance(FragmentInstanceId fragmentInstanceId) { - instanceExecution.remove(fragmentInstanceId); - FragmentInstanceContext context = instanceContext.get(fragmentInstanceId); - if (context != null) { - context.abort(); - return context.getInstanceInfo(); - } - return null; - } - /** Cancels a FragmentInstance. */ public FragmentInstanceInfo cancelTask(FragmentInstanceId instanceId, boolean hasThrowable) { logger.debug("[CancelFI]");
