This is an automated email from the ASF dual-hosted git repository.

lancelly pushed a commit to branch memoryLeak
in repository https://gitbox.apache.org/repos/asf/iotdb.git

commit 11ec38ec7e7d9ac454cf82adf5cef5a065931d93
Author: lancelly <[email protected]>
AuthorDate: Wed Jun 19 16:53:26 2024 +0800

    fix
---
 .../execution/exchange/MPPDataExchangeManager.java  | 21 +++++++++++++++++----
 .../execution/fragment/FragmentInstanceManager.java | 21 ++++++++++-----------
 2 files changed, 27 insertions(+), 15 deletions(-)

diff --git 
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/execution/exchange/MPPDataExchangeManager.java
 
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/execution/exchange/MPPDataExchangeManager.java
index a38ef923bbb..49c17b54d74 100644
--- 
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/execution/exchange/MPPDataExchangeManager.java
+++ 
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/execution/exchange/MPPDataExchangeManager.java
@@ -585,11 +585,13 @@ public class MPPDataExchangeManager implements 
IMPPDataExchangeManager {
       ExecutorService executorService,
       IClientManager<TEndPoint, SyncDataNodeMPPDataExchangeServiceClient>
           mppDataExchangeServiceClientManager) {
-    this.localMemoryManager = Validate.notNull(localMemoryManager);
-    this.tsBlockSerdeFactory = Validate.notNull(tsBlockSerdeFactory);
-    this.executorService = Validate.notNull(executorService);
+    this.localMemoryManager = Validate.notNull(localMemoryManager, 
"localMemoryManager is null.");
+    this.tsBlockSerdeFactory =
+        Validate.notNull(tsBlockSerdeFactory, "tsBlockSerdeFactory is null.");
+    this.executorService = Validate.notNull(executorService, "executorService 
is null.");
     this.mppDataExchangeServiceClientManager =
-        Validate.notNull(mppDataExchangeServiceClientManager);
+        Validate.notNull(
+            mppDataExchangeServiceClientManager, 
"mppDataExchangeServiceClientManager is null.");
     sourceHandles = new ConcurrentHashMap<>();
     shuffleSinkHandles = new ConcurrentHashMap<>();
   }
@@ -607,6 +609,17 @@ public class MPPDataExchangeManager implements 
IMPPDataExchangeManager {
         .deRegisterFragmentInstanceFromQueryMemoryMap(queryId, 
fragmentInstanceId);
   }
 
+  public void removeSourceHandlesAndShuffleSinkHandlesOfFI(TFragmentInstanceId 
fragmentInstanceId) {
+    Map<String, ISourceHandle> removedSourceHandles = 
sourceHandles.remove(fragmentInstanceId);
+    if (removedSourceHandles != null) {
+      removedSourceHandles.values().forEach(ISourceHandle::abort);
+    }
+    ISinkHandle shuffleSinkHandle = 
shuffleSinkHandles.remove(fragmentInstanceId);
+    if (shuffleSinkHandle != null) {
+      shuffleSinkHandle.abort();
+    }
+  }
+
   public LocalMemoryManager getLocalMemoryManager() {
     return localMemoryManager;
   }
diff --git 
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/execution/fragment/FragmentInstanceManager.java
 
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/execution/fragment/FragmentInstanceManager.java
index bea961c7525..71cb0ab0f98 100644
--- 
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/execution/fragment/FragmentInstanceManager.java
+++ 
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/execution/fragment/FragmentInstanceManager.java
@@ -177,6 +177,7 @@ public class FragmentInstanceManager {
                       instance.isExplainAnalyze(),
                       exchangeManager);
                 } catch (Throwable t) {
+                  clearFIRelatedResources(instanceId);
                   logger.warn("error when create FragmentInstanceExecution.", 
t);
                   stateMachine.failed(t);
                   return null;
@@ -205,6 +206,14 @@ public class FragmentInstanceManager {
     }
   }
 
+  private void clearFIRelatedResources(FragmentInstanceId instanceId) {
+    // close and remove all the handles of the fragment instance
+    
exchangeManager.removeSourceHandlesAndShuffleSinkHandlesOfFI(instanceId.toThrift());
+    // clear MemoryPool
+    exchangeManager.deRegisterFragmentInstanceFromMemoryPool(
+        instanceId.getQueryId().getId(), instanceId.getFragmentInstanceId());
+  }
+
   private DataNodeQueryContext getOrCreateDataNodeQueryContext(QueryId 
queryId, int dataNodeFINum) {
     return dataNodeQueryContextMap.computeIfAbsent(
         queryId, queryId1 -> new DataNodeQueryContext(dataNodeFINum));
@@ -249,6 +258,7 @@ public class FragmentInstanceManager {
                     false,
                     exchangeManager);
               } catch (Throwable t) {
+                clearFIRelatedResources(instanceId);
                 logger.warn("Execute error caused by ", t);
                 stateMachine.failed(t);
                 return null;
@@ -269,17 +279,6 @@ public class FragmentInstanceManager {
     }
   }
 
-  /** Aborts a FragmentInstance. keep FragmentInstanceContext for later state 
tracking */
-  public FragmentInstanceInfo abortFragmentInstance(FragmentInstanceId 
fragmentInstanceId) {
-    instanceExecution.remove(fragmentInstanceId);
-    FragmentInstanceContext context = instanceContext.get(fragmentInstanceId);
-    if (context != null) {
-      context.abort();
-      return context.getInstanceInfo();
-    }
-    return null;
-  }
-
   /** Cancels a FragmentInstance. */
   public FragmentInstanceInfo cancelTask(FragmentInstanceId instanceId, 
boolean hasThrowable) {
     logger.debug("[CancelFI]");

Reply via email to