This is an automated email from the ASF dual-hosted git repository. lancelly pushed a commit to branch memoryLeak in repository https://gitbox.apache.org/repos/asf/iotdb.git
commit 77ec32f397e0e04aefff1c5d834cc2571ac7a68c Author: lancelly <[email protected]> AuthorDate: Wed Jun 19 17:39:17 2024 +0800 fix --- .../execution/exchange/MPPDataExchangeManager.java | 11 ----------- .../execution/fragment/FragmentInstanceManager.java | 2 +- 2 files changed, 1 insertion(+), 12 deletions(-) diff --git a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/execution/exchange/MPPDataExchangeManager.java b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/execution/exchange/MPPDataExchangeManager.java index 49c17b54d74..8f46e1945bc 100644 --- a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/execution/exchange/MPPDataExchangeManager.java +++ b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/execution/exchange/MPPDataExchangeManager.java @@ -609,17 +609,6 @@ public class MPPDataExchangeManager implements IMPPDataExchangeManager { .deRegisterFragmentInstanceFromQueryMemoryMap(queryId, fragmentInstanceId); } - public void removeSourceHandlesAndShuffleSinkHandlesOfFI(TFragmentInstanceId fragmentInstanceId) { - Map<String, ISourceHandle> removedSourceHandles = sourceHandles.remove(fragmentInstanceId); - if (removedSourceHandles != null) { - removedSourceHandles.values().forEach(ISourceHandle::abort); - } - ISinkHandle shuffleSinkHandle = shuffleSinkHandles.remove(fragmentInstanceId); - if (shuffleSinkHandle != null) { - shuffleSinkHandle.abort(); - } - } - public LocalMemoryManager getLocalMemoryManager() { return localMemoryManager; } diff --git a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/execution/fragment/FragmentInstanceManager.java b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/execution/fragment/FragmentInstanceManager.java index 71cb0ab0f98..62e8e939f58 100644 --- a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/execution/fragment/FragmentInstanceManager.java +++ b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/execution/fragment/FragmentInstanceManager.java @@ -208,7 +208,7 @@ public class FragmentInstanceManager { private void clearFIRelatedResources(FragmentInstanceId instanceId) { // close and remove all the handles of the fragment instance - exchangeManager.removeSourceHandlesAndShuffleSinkHandlesOfFI(instanceId.toThrift()); + exchangeManager.forceDeregisterFragmentInstance(instanceId.toThrift()); // clear MemoryPool exchangeManager.deRegisterFragmentInstanceFromMemoryPool( instanceId.getQueryId().getId(), instanceId.getFragmentInstanceId());
