This is an automated email from the ASF dual-hosted git repository.

lancelly pushed a commit to branch memoryLeak
in repository https://gitbox.apache.org/repos/asf/iotdb.git

commit 3a2ff6d920effb71add7af06b0182146bac6b481
Author: lancelly <[email protected]>
AuthorDate: Wed Jun 19 18:20:23 2024 +0800

    add forceDeregister
---
 .../queryengine/execution/exchange/MPPDataExchangeManager.java   | 5 +++--
 .../execution/fragment/FragmentInstanceExecution.java            | 2 +-
 .../queryengine/execution/fragment/FragmentInstanceManager.java  | 2 +-
 .../apache/iotdb/db/queryengine/execution/memory/MemoryPool.java | 9 ++++++++-
 .../iotdb/db/queryengine/plan/execution/QueryExecution.java      | 3 ++-
 5 files changed, 15 insertions(+), 6 deletions(-)

diff --git 
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/execution/exchange/MPPDataExchangeManager.java
 
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/execution/exchange/MPPDataExchangeManager.java
index 8f46e1945bc..ac0b4111090 100644
--- 
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/execution/exchange/MPPDataExchangeManager.java
+++ 
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/execution/exchange/MPPDataExchangeManager.java
@@ -603,10 +603,11 @@ public class MPPDataExchangeManager implements 
IMPPDataExchangeManager {
     return mppDataExchangeService;
   }
 
-  public void deRegisterFragmentInstanceFromMemoryPool(String queryId, String 
fragmentInstanceId) {
+  public void deRegisterFragmentInstanceFromMemoryPool(
+      String queryId, String fragmentInstanceId, boolean forceDeregister) {
     localMemoryManager
         .getQueryPool()
-        .deRegisterFragmentInstanceFromQueryMemoryMap(queryId, 
fragmentInstanceId);
+        .deRegisterFragmentInstanceFromQueryMemoryMap(queryId, 
fragmentInstanceId, forceDeregister);
   }
 
   public LocalMemoryManager getLocalMemoryManager() {
diff --git 
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/execution/fragment/FragmentInstanceExecution.java
 
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/execution/fragment/FragmentInstanceExecution.java
index d4ba3abd969..b31d4e13eeb 100644
--- 
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/execution/fragment/FragmentInstanceExecution.java
+++ 
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/execution/fragment/FragmentInstanceExecution.java
@@ -309,7 +309,7 @@ public class FragmentInstanceExecution {
 
             // release memory
             exchangeManager.deRegisterFragmentInstanceFromMemoryPool(
-                instanceId.getQueryId().getId(), 
instanceId.getFragmentInstanceId());
+                instanceId.getQueryId().getId(), 
instanceId.getFragmentInstanceId(), true);
 
             if (newState.isFailed()) {
               scheduler.abortFragmentInstance(instanceId);
diff --git 
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/execution/fragment/FragmentInstanceManager.java
 
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/execution/fragment/FragmentInstanceManager.java
index 62e8e939f58..5280dcc404e 100644
--- 
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/execution/fragment/FragmentInstanceManager.java
+++ 
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/execution/fragment/FragmentInstanceManager.java
@@ -211,7 +211,7 @@ public class FragmentInstanceManager {
     exchangeManager.forceDeregisterFragmentInstance(instanceId.toThrift());
     // clear MemoryPool
     exchangeManager.deRegisterFragmentInstanceFromMemoryPool(
-        instanceId.getQueryId().getId(), instanceId.getFragmentInstanceId());
+        instanceId.getQueryId().getId(), instanceId.getFragmentInstanceId(), 
false);
   }
 
   private DataNodeQueryContext getOrCreateDataNodeQueryContext(QueryId 
queryId, int dataNodeFINum) {
diff --git 
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/execution/memory/MemoryPool.java
 
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/execution/memory/MemoryPool.java
index 188833e329e..a9ab6ed5d81 100644
--- 
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/execution/memory/MemoryPool.java
+++ 
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/execution/memory/MemoryPool.java
@@ -181,7 +181,7 @@ public class MemoryPool {
    * @throws MemoryLeakException throw {@link MemoryLeakException}
    */
   public void deRegisterFragmentInstanceFromQueryMemoryMap(
-      String queryId, String fragmentInstanceId) {
+      String queryId, String fragmentInstanceId, boolean forceDeregister) {
     Map<String, Map<String, Long>> queryRelatedMemory = 
queryMemoryReservations.get(queryId);
     if (queryRelatedMemory != null) {
       Map<String, Long> fragmentRelatedMemory = 
queryRelatedMemory.get(fragmentInstanceId);
@@ -192,6 +192,13 @@ public class MemoryPool {
         hasPotentialMemoryLeak =
             fragmentRelatedMemory.values().stream().anyMatch(value -> value != 
0L);
       }
+      if (!forceDeregister && hasPotentialMemoryLeak) {
+        // If hasPotentialMemoryLeak is true, it means that 
LocalSourceChannel/LocalSourceHandles
+        // have not been closed.
+        // We should wait for them to be closed. Make sure this method is 
called again with
+        // forceDeregister == true, after all 
LocalSourceChannel/LocalSourceHandles are closed.
+        return;
+      }
       synchronized (queryMemoryReservations) {
         queryRelatedMemory.remove(fragmentInstanceId);
         if (queryRelatedMemory.isEmpty()) {
diff --git 
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/execution/QueryExecution.java
 
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/execution/QueryExecution.java
index 40a188e3d6a..8cb8ea48ea8 100644
--- 
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/execution/QueryExecution.java
+++ 
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/execution/QueryExecution.java
@@ -365,7 +365,8 @@ public class QueryExecution implements IQueryExecution {
           .deRegisterFragmentInstanceFromMemoryPool(
               fragmentInstanceId.queryId,
               
FragmentInstanceId.createFragmentInstanceIdFromTFragmentInstanceId(
-                  fragmentInstanceId));
+                  fragmentInstanceId),
+              true);
     }
   }
 

Reply via email to