This is an automated email from the ASF dual-hosted git repository. jackietien pushed a commit to branch QueryMemoryLeaky in repository https://gitbox.apache.org/repos/asf/iotdb.git
commit 4696cfd24d23dd3dcc8ba17b3a630a8855db1a01 Author: JackieTien97 <[email protected]> AuthorDate: Tue Nov 8 20:07:47 2022 +0800 Fix memory leak in query --- .../fragment/FragmentInstanceExecution.java | 4 +++ .../fragment/FragmentInstanceManager.java | 30 +++++++++++++++++++--- 2 files changed, 31 insertions(+), 3 deletions(-) diff --git a/server/src/main/java/org/apache/iotdb/db/mpp/execution/fragment/FragmentInstanceExecution.java b/server/src/main/java/org/apache/iotdb/db/mpp/execution/fragment/FragmentInstanceExecution.java index 7a4841bc45..12e5da9231 100644 --- a/server/src/main/java/org/apache/iotdb/db/mpp/execution/fragment/FragmentInstanceExecution.java +++ b/server/src/main/java/org/apache/iotdb/db/mpp/execution/fragment/FragmentInstanceExecution.java @@ -93,6 +93,10 @@ public class FragmentInstanceExecution { stateMachine.getState(), context.getEndTime(), context.getFailedCause()); } + public FragmentInstanceStateMachine getStateMachine() { + return stateMachine; + } + // this is a separate method to ensure that the `this` reference is not leaked during construction private void initialize(CounterStat failedInstances, IDriverScheduler scheduler) { requireNonNull(failedInstances, "failedInstances is null"); diff --git a/server/src/main/java/org/apache/iotdb/db/mpp/execution/fragment/FragmentInstanceManager.java b/server/src/main/java/org/apache/iotdb/db/mpp/execution/fragment/FragmentInstanceManager.java index 1d7499560f..9f95052392 100644 --- a/server/src/main/java/org/apache/iotdb/db/mpp/execution/fragment/FragmentInstanceManager.java +++ b/server/src/main/java/org/apache/iotdb/db/mpp/execution/fragment/FragmentInstanceManager.java @@ -80,7 +80,7 @@ public class FragmentInstanceManager { this.instanceNotificationExecutor = IoTDBThreadPoolFactory.newFixedThreadPool(4, "instance-notification"); - this.infoCacheTime = new Duration(15, TimeUnit.MINUTES); + this.infoCacheTime = new Duration(5, TimeUnit.MINUTES); ScheduledExecutorUtil.safelyScheduleWithFixedDelay( instanceManagementExecutor, this::removeOldInstances, 200, 200, TimeUnit.MILLISECONDS); @@ -134,7 +134,19 @@ public class FragmentInstanceManager { } }); - return execution != null ? execution.getInstanceInfo() : createFailedInstanceInfo(instanceId); + if (execution != null) { + execution + .getStateMachine() + .addStateChangeListener( + newState -> { + if (newState.isDone()) { + instanceExecution.remove(instanceId); + } + }); + return execution.getInstanceInfo(); + } else { + return createFailedInstanceInfo(instanceId); + } } } @@ -172,7 +184,19 @@ public class FragmentInstanceManager { return null; } }); - return execution != null ? execution.getInstanceInfo() : createFailedInstanceInfo(instanceId); + if (execution != null) { + execution + .getStateMachine() + .addStateChangeListener( + newState -> { + if (newState.isDone()) { + instanceExecution.remove(instanceId); + } + }); + return execution.getInstanceInfo(); + } else { + return createFailedInstanceInfo(instanceId); + } } /** Aborts a FragmentInstance. keep FragmentInstanceContext for later state tracking */
