This is an automated email from the ASF dual-hosted git repository. lancelly pushed a commit to branch memoryLeak in repository https://gitbox.apache.org/repos/asf/iotdb.git
commit 3a2ff6d920effb71add7af06b0182146bac6b481 Author: lancelly <[email protected]> AuthorDate: Wed Jun 19 18:20:23 2024 +0800 add forceDeregister --- .../queryengine/execution/exchange/MPPDataExchangeManager.java | 5 +++-- .../execution/fragment/FragmentInstanceExecution.java | 2 +- .../queryengine/execution/fragment/FragmentInstanceManager.java | 2 +- .../apache/iotdb/db/queryengine/execution/memory/MemoryPool.java | 9 ++++++++- .../iotdb/db/queryengine/plan/execution/QueryExecution.java | 3 ++- 5 files changed, 15 insertions(+), 6 deletions(-) diff --git a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/execution/exchange/MPPDataExchangeManager.java b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/execution/exchange/MPPDataExchangeManager.java index 8f46e1945bc..ac0b4111090 100644 --- a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/execution/exchange/MPPDataExchangeManager.java +++ b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/execution/exchange/MPPDataExchangeManager.java @@ -603,10 +603,11 @@ public class MPPDataExchangeManager implements IMPPDataExchangeManager { return mppDataExchangeService; } - public void deRegisterFragmentInstanceFromMemoryPool(String queryId, String fragmentInstanceId) { + public void deRegisterFragmentInstanceFromMemoryPool( + String queryId, String fragmentInstanceId, boolean forceDeregister) { localMemoryManager .getQueryPool() - .deRegisterFragmentInstanceFromQueryMemoryMap(queryId, fragmentInstanceId); + .deRegisterFragmentInstanceFromQueryMemoryMap(queryId, fragmentInstanceId, forceDeregister); } public LocalMemoryManager getLocalMemoryManager() { diff --git a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/execution/fragment/FragmentInstanceExecution.java b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/execution/fragment/FragmentInstanceExecution.java index d4ba3abd969..b31d4e13eeb 100644 --- a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/execution/fragment/FragmentInstanceExecution.java +++ b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/execution/fragment/FragmentInstanceExecution.java @@ -309,7 +309,7 @@ public class FragmentInstanceExecution { // release memory exchangeManager.deRegisterFragmentInstanceFromMemoryPool( - instanceId.getQueryId().getId(), instanceId.getFragmentInstanceId()); + instanceId.getQueryId().getId(), instanceId.getFragmentInstanceId(), true); if (newState.isFailed()) { scheduler.abortFragmentInstance(instanceId); diff --git a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/execution/fragment/FragmentInstanceManager.java b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/execution/fragment/FragmentInstanceManager.java index 62e8e939f58..5280dcc404e 100644 --- a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/execution/fragment/FragmentInstanceManager.java +++ b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/execution/fragment/FragmentInstanceManager.java @@ -211,7 +211,7 @@ public class FragmentInstanceManager { exchangeManager.forceDeregisterFragmentInstance(instanceId.toThrift()); // clear MemoryPool exchangeManager.deRegisterFragmentInstanceFromMemoryPool( - instanceId.getQueryId().getId(), instanceId.getFragmentInstanceId()); + instanceId.getQueryId().getId(), instanceId.getFragmentInstanceId(), false); } private DataNodeQueryContext getOrCreateDataNodeQueryContext(QueryId queryId, int dataNodeFINum) { diff --git a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/execution/memory/MemoryPool.java b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/execution/memory/MemoryPool.java index 188833e329e..a9ab6ed5d81 100644 --- a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/execution/memory/MemoryPool.java +++ b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/execution/memory/MemoryPool.java @@ -181,7 +181,7 @@ public class MemoryPool { * @throws MemoryLeakException throw {@link MemoryLeakException} */ public void deRegisterFragmentInstanceFromQueryMemoryMap( - String queryId, String fragmentInstanceId) { + String queryId, String fragmentInstanceId, boolean forceDeregister) { Map<String, Map<String, Long>> queryRelatedMemory = queryMemoryReservations.get(queryId); if (queryRelatedMemory != null) { Map<String, Long> fragmentRelatedMemory = queryRelatedMemory.get(fragmentInstanceId); @@ -192,6 +192,13 @@ public class MemoryPool { hasPotentialMemoryLeak = fragmentRelatedMemory.values().stream().anyMatch(value -> value != 0L); } + if (!forceDeregister && hasPotentialMemoryLeak) { + // If hasPotentialMemoryLeak is true, it means that LocalSourceChannel/LocalSourceHandles + // have not been closed. + // We should wait for them to be closed. Make sure this method is called again with + // forceDeregister == true, after all LocalSourceChannel/LocalSourceHandles are closed. + return; + } synchronized (queryMemoryReservations) { queryRelatedMemory.remove(fragmentInstanceId); if (queryRelatedMemory.isEmpty()) { diff --git a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/execution/QueryExecution.java b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/execution/QueryExecution.java index 40a188e3d6a..8cb8ea48ea8 100644 --- a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/execution/QueryExecution.java +++ b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/execution/QueryExecution.java @@ -365,7 +365,8 @@ public class QueryExecution implements IQueryExecution { .deRegisterFragmentInstanceFromMemoryPool( fragmentInstanceId.queryId, FragmentInstanceId.createFragmentInstanceIdFromTFragmentInstanceId( - fragmentInstanceId)); + fragmentInstanceId), + true); } }
