This is an automated email from the ASF dual-hosted git repository.
jackietien pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/iotdb.git
The following commit(s) were added to refs/heads/master by this push:
new 36dd4890577 Add clean logic for FragmentInstance in case that callback
is not added. (#12768)
36dd4890577 is described below
commit 36dd48905772121d2fc14621f9c1d6abaa1a997f
Author: Liao Lanyu <[email protected]>
AuthorDate: Wed Jun 19 21:49:54 2024 +0800
Add clean logic for FragmentInstance in case that callback is not added.
(#12768)
---
.../execution/exchange/MPPDataExchangeManager.java | 15 +++++++++------
.../execution/fragment/FragmentInstanceExecution.java | 2 +-
.../execution/fragment/FragmentInstanceManager.java | 10 ++++++++++
.../iotdb/db/queryengine/execution/memory/MemoryPool.java | 9 ++++++++-
.../db/queryengine/plan/execution/QueryExecution.java | 3 ++-
5 files changed, 30 insertions(+), 9 deletions(-)
diff --git
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/execution/exchange/MPPDataExchangeManager.java
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/execution/exchange/MPPDataExchangeManager.java
index a38ef923bbb..ac0b4111090 100644
---
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/execution/exchange/MPPDataExchangeManager.java
+++
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/execution/exchange/MPPDataExchangeManager.java
@@ -585,11 +585,13 @@ public class MPPDataExchangeManager implements
IMPPDataExchangeManager {
ExecutorService executorService,
IClientManager<TEndPoint, SyncDataNodeMPPDataExchangeServiceClient>
mppDataExchangeServiceClientManager) {
- this.localMemoryManager = Validate.notNull(localMemoryManager);
- this.tsBlockSerdeFactory = Validate.notNull(tsBlockSerdeFactory);
- this.executorService = Validate.notNull(executorService);
+ this.localMemoryManager = Validate.notNull(localMemoryManager,
"localMemoryManager is null.");
+ this.tsBlockSerdeFactory =
+ Validate.notNull(tsBlockSerdeFactory, "tsBlockSerdeFactory is null.");
+ this.executorService = Validate.notNull(executorService, "executorService
is null.");
this.mppDataExchangeServiceClientManager =
- Validate.notNull(mppDataExchangeServiceClientManager);
+ Validate.notNull(
+ mppDataExchangeServiceClientManager,
"mppDataExchangeServiceClientManager is null.");
sourceHandles = new ConcurrentHashMap<>();
shuffleSinkHandles = new ConcurrentHashMap<>();
}
@@ -601,10 +603,11 @@ public class MPPDataExchangeManager implements
IMPPDataExchangeManager {
return mppDataExchangeService;
}
- public void deRegisterFragmentInstanceFromMemoryPool(String queryId, String
fragmentInstanceId) {
+ public void deRegisterFragmentInstanceFromMemoryPool(
+ String queryId, String fragmentInstanceId, boolean forceDeregister) {
localMemoryManager
.getQueryPool()
- .deRegisterFragmentInstanceFromQueryMemoryMap(queryId,
fragmentInstanceId);
+ .deRegisterFragmentInstanceFromQueryMemoryMap(queryId,
fragmentInstanceId, forceDeregister);
}
public LocalMemoryManager getLocalMemoryManager() {
diff --git
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/execution/fragment/FragmentInstanceExecution.java
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/execution/fragment/FragmentInstanceExecution.java
index d4ba3abd969..b31d4e13eeb 100644
---
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/execution/fragment/FragmentInstanceExecution.java
+++
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/execution/fragment/FragmentInstanceExecution.java
@@ -309,7 +309,7 @@ public class FragmentInstanceExecution {
// release memory
exchangeManager.deRegisterFragmentInstanceFromMemoryPool(
- instanceId.getQueryId().getId(),
instanceId.getFragmentInstanceId());
+ instanceId.getQueryId().getId(),
instanceId.getFragmentInstanceId(), true);
if (newState.isFailed()) {
scheduler.abortFragmentInstance(instanceId);
diff --git
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/execution/fragment/FragmentInstanceManager.java
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/execution/fragment/FragmentInstanceManager.java
index bea961c7525..900865dc887 100644
---
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/execution/fragment/FragmentInstanceManager.java
+++
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/execution/fragment/FragmentInstanceManager.java
@@ -177,6 +177,7 @@ public class FragmentInstanceManager {
instance.isExplainAnalyze(),
exchangeManager);
} catch (Throwable t) {
+ clearFIRelatedResources(instanceId);
logger.warn("error when create FragmentInstanceExecution.",
t);
stateMachine.failed(t);
return null;
@@ -205,6 +206,14 @@ public class FragmentInstanceManager {
}
}
+ private void clearFIRelatedResources(FragmentInstanceId instanceId) {
+ // close and remove all the handles of the fragment instance
+ exchangeManager.forceDeregisterFragmentInstance(instanceId.toThrift());
+ // clear MemoryPool
+ exchangeManager.deRegisterFragmentInstanceFromMemoryPool(
+ instanceId.getQueryId().getId(), instanceId.getFragmentInstanceId(),
false);
+ }
+
private DataNodeQueryContext getOrCreateDataNodeQueryContext(QueryId
queryId, int dataNodeFINum) {
return dataNodeQueryContextMap.computeIfAbsent(
queryId, queryId1 -> new DataNodeQueryContext(dataNodeFINum));
@@ -249,6 +258,7 @@ public class FragmentInstanceManager {
false,
exchangeManager);
} catch (Throwable t) {
+ clearFIRelatedResources(instanceId);
logger.warn("Execute error caused by ", t);
stateMachine.failed(t);
return null;
diff --git
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/execution/memory/MemoryPool.java
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/execution/memory/MemoryPool.java
index 188833e329e..a9ab6ed5d81 100644
---
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/execution/memory/MemoryPool.java
+++
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/execution/memory/MemoryPool.java
@@ -181,7 +181,7 @@ public class MemoryPool {
* @throws MemoryLeakException throw {@link MemoryLeakException}
*/
public void deRegisterFragmentInstanceFromQueryMemoryMap(
- String queryId, String fragmentInstanceId) {
+ String queryId, String fragmentInstanceId, boolean forceDeregister) {
Map<String, Map<String, Long>> queryRelatedMemory =
queryMemoryReservations.get(queryId);
if (queryRelatedMemory != null) {
Map<String, Long> fragmentRelatedMemory =
queryRelatedMemory.get(fragmentInstanceId);
@@ -192,6 +192,13 @@ public class MemoryPool {
hasPotentialMemoryLeak =
fragmentRelatedMemory.values().stream().anyMatch(value -> value !=
0L);
}
+ if (!forceDeregister && hasPotentialMemoryLeak) {
+ // If hasPotentialMemoryLeak is true, it means that
LocalSourceChannel/LocalSourceHandles
+ // have not been closed.
+ // We should wait for them to be closed. Make sure this method is
called again with
+ // forceDeregister == true, after all
LocalSourceChannel/LocalSourceHandles are closed.
+ return;
+ }
synchronized (queryMemoryReservations) {
queryRelatedMemory.remove(fragmentInstanceId);
if (queryRelatedMemory.isEmpty()) {
diff --git
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/execution/QueryExecution.java
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/execution/QueryExecution.java
index 40a188e3d6a..8cb8ea48ea8 100644
---
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/execution/QueryExecution.java
+++
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/execution/QueryExecution.java
@@ -365,7 +365,8 @@ public class QueryExecution implements IQueryExecution {
.deRegisterFragmentInstanceFromMemoryPool(
fragmentInstanceId.queryId,
FragmentInstanceId.createFragmentInstanceIdFromTFragmentInstanceId(
- fragmentInstanceId));
+ fragmentInstanceId),
+ true);
}
}