This is an automated email from the ASF dual-hosted git repository.

jackietien pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/iotdb.git


The following commit(s) were added to refs/heads/master by this push:
     new 36dd4890577 Add clean logic for FragmentInstance in case that callback 
is not added. (#12768)
36dd4890577 is described below

commit 36dd48905772121d2fc14621f9c1d6abaa1a997f
Author: Liao Lanyu <[email protected]>
AuthorDate: Wed Jun 19 21:49:54 2024 +0800

    Add clean logic for FragmentInstance in case that callback is not added. 
(#12768)
---
 .../execution/exchange/MPPDataExchangeManager.java        | 15 +++++++++------
 .../execution/fragment/FragmentInstanceExecution.java     |  2 +-
 .../execution/fragment/FragmentInstanceManager.java       | 10 ++++++++++
 .../iotdb/db/queryengine/execution/memory/MemoryPool.java |  9 ++++++++-
 .../db/queryengine/plan/execution/QueryExecution.java     |  3 ++-
 5 files changed, 30 insertions(+), 9 deletions(-)

diff --git 
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/execution/exchange/MPPDataExchangeManager.java
 
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/execution/exchange/MPPDataExchangeManager.java
index a38ef923bbb..ac0b4111090 100644
--- 
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/execution/exchange/MPPDataExchangeManager.java
+++ 
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/execution/exchange/MPPDataExchangeManager.java
@@ -585,11 +585,13 @@ public class MPPDataExchangeManager implements 
IMPPDataExchangeManager {
       ExecutorService executorService,
       IClientManager<TEndPoint, SyncDataNodeMPPDataExchangeServiceClient>
           mppDataExchangeServiceClientManager) {
-    this.localMemoryManager = Validate.notNull(localMemoryManager);
-    this.tsBlockSerdeFactory = Validate.notNull(tsBlockSerdeFactory);
-    this.executorService = Validate.notNull(executorService);
+    this.localMemoryManager = Validate.notNull(localMemoryManager, 
"localMemoryManager is null.");
+    this.tsBlockSerdeFactory =
+        Validate.notNull(tsBlockSerdeFactory, "tsBlockSerdeFactory is null.");
+    this.executorService = Validate.notNull(executorService, "executorService 
is null.");
     this.mppDataExchangeServiceClientManager =
-        Validate.notNull(mppDataExchangeServiceClientManager);
+        Validate.notNull(
+            mppDataExchangeServiceClientManager, 
"mppDataExchangeServiceClientManager is null.");
     sourceHandles = new ConcurrentHashMap<>();
     shuffleSinkHandles = new ConcurrentHashMap<>();
   }
@@ -601,10 +603,11 @@ public class MPPDataExchangeManager implements 
IMPPDataExchangeManager {
     return mppDataExchangeService;
   }
 
-  public void deRegisterFragmentInstanceFromMemoryPool(String queryId, String 
fragmentInstanceId) {
+  public void deRegisterFragmentInstanceFromMemoryPool(
+      String queryId, String fragmentInstanceId, boolean forceDeregister) {
     localMemoryManager
         .getQueryPool()
-        .deRegisterFragmentInstanceFromQueryMemoryMap(queryId, 
fragmentInstanceId);
+        .deRegisterFragmentInstanceFromQueryMemoryMap(queryId, 
fragmentInstanceId, forceDeregister);
   }
 
   public LocalMemoryManager getLocalMemoryManager() {
diff --git 
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/execution/fragment/FragmentInstanceExecution.java
 
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/execution/fragment/FragmentInstanceExecution.java
index d4ba3abd969..b31d4e13eeb 100644
--- 
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/execution/fragment/FragmentInstanceExecution.java
+++ 
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/execution/fragment/FragmentInstanceExecution.java
@@ -309,7 +309,7 @@ public class FragmentInstanceExecution {
 
             // release memory
             exchangeManager.deRegisterFragmentInstanceFromMemoryPool(
-                instanceId.getQueryId().getId(), 
instanceId.getFragmentInstanceId());
+                instanceId.getQueryId().getId(), 
instanceId.getFragmentInstanceId(), true);
 
             if (newState.isFailed()) {
               scheduler.abortFragmentInstance(instanceId);
diff --git 
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/execution/fragment/FragmentInstanceManager.java
 
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/execution/fragment/FragmentInstanceManager.java
index bea961c7525..900865dc887 100644
--- 
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/execution/fragment/FragmentInstanceManager.java
+++ 
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/execution/fragment/FragmentInstanceManager.java
@@ -177,6 +177,7 @@ public class FragmentInstanceManager {
                       instance.isExplainAnalyze(),
                       exchangeManager);
                 } catch (Throwable t) {
+                  clearFIRelatedResources(instanceId);
                   logger.warn("error when create FragmentInstanceExecution.", 
t);
                   stateMachine.failed(t);
                   return null;
@@ -205,6 +206,14 @@ public class FragmentInstanceManager {
     }
   }
 
+  private void clearFIRelatedResources(FragmentInstanceId instanceId) {
+    // close and remove all the handles of the fragment instance
+    exchangeManager.forceDeregisterFragmentInstance(instanceId.toThrift());
+    // clear MemoryPool
+    exchangeManager.deRegisterFragmentInstanceFromMemoryPool(
+        instanceId.getQueryId().getId(), instanceId.getFragmentInstanceId(), 
false);
+  }
+
   private DataNodeQueryContext getOrCreateDataNodeQueryContext(QueryId 
queryId, int dataNodeFINum) {
     return dataNodeQueryContextMap.computeIfAbsent(
         queryId, queryId1 -> new DataNodeQueryContext(dataNodeFINum));
@@ -249,6 +258,7 @@ public class FragmentInstanceManager {
                     false,
                     exchangeManager);
               } catch (Throwable t) {
+                clearFIRelatedResources(instanceId);
                 logger.warn("Execute error caused by ", t);
                 stateMachine.failed(t);
                 return null;
diff --git 
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/execution/memory/MemoryPool.java
 
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/execution/memory/MemoryPool.java
index 188833e329e..a9ab6ed5d81 100644
--- 
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/execution/memory/MemoryPool.java
+++ 
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/execution/memory/MemoryPool.java
@@ -181,7 +181,7 @@ public class MemoryPool {
    * @throws MemoryLeakException throw {@link MemoryLeakException}
    */
   public void deRegisterFragmentInstanceFromQueryMemoryMap(
-      String queryId, String fragmentInstanceId) {
+      String queryId, String fragmentInstanceId, boolean forceDeregister) {
     Map<String, Map<String, Long>> queryRelatedMemory = 
queryMemoryReservations.get(queryId);
     if (queryRelatedMemory != null) {
       Map<String, Long> fragmentRelatedMemory = 
queryRelatedMemory.get(fragmentInstanceId);
@@ -192,6 +192,13 @@ public class MemoryPool {
         hasPotentialMemoryLeak =
             fragmentRelatedMemory.values().stream().anyMatch(value -> value != 
0L);
       }
+      if (!forceDeregister && hasPotentialMemoryLeak) {
+        // If hasPotentialMemoryLeak is true, it means that 
LocalSourceChannel/LocalSourceHandles
+        // have not been closed.
+        // We should wait for them to be closed. Make sure this method is 
called again with
+        // forceDeregister == true, after all 
LocalSourceChannel/LocalSourceHandles are closed.
+        return;
+      }
       synchronized (queryMemoryReservations) {
         queryRelatedMemory.remove(fragmentInstanceId);
         if (queryRelatedMemory.isEmpty()) {
diff --git 
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/execution/QueryExecution.java
 
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/execution/QueryExecution.java
index 40a188e3d6a..8cb8ea48ea8 100644
--- 
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/execution/QueryExecution.java
+++ 
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/execution/QueryExecution.java
@@ -365,7 +365,8 @@ public class QueryExecution implements IQueryExecution {
           .deRegisterFragmentInstanceFromMemoryPool(
               fragmentInstanceId.queryId,
               
FragmentInstanceId.createFragmentInstanceIdFromTFragmentInstanceId(
-                  fragmentInstanceId));
+                  fragmentInstanceId),
+              true);
     }
   }
 

Reply via email to