This is an automated email from the ASF dual-hosted git repository.
morningman pushed a commit to branch branch-2.1
in repository https://gitbox.apache.org/repos/asf/doris.git
The following commit(s) were added to refs/heads/branch-2.1 by this push:
new 0f9c2c2f0ad [fix](export) remove export task executor in
TransientTaskExecutor (#42880)
0f9c2c2f0ad is described below
commit 0f9c2c2f0ad0f7e26d42e475725233747eeb0c75
Author: Mingyu Chen (Rayner) <[email protected]>
AuthorDate: Thu Oct 31 09:57:40 2024 +0800
[fix](export) remove export task executor in TransientTaskExecutor (#42880)
### What problem does this PR solve?
Problem Summary:
There is a memory leak on FE side when continue running `export`
command.
The `ExportTaskExecutor` instances will be added to
`TransientTaskManager` and never be removed.
This PR mainly changes:
1. Remove unused `ExportTaskRegister` in `Env`.
2. Remove `ExportTaskExecutor` from `TransientTaskManager` once export
job is finished or cancelled.
### Release note
Fix FE memory leak by removing `ExportTaskExecutor` from
`TransientTaskManager` once export job is finished or cancelled.
---
.../main/java/org/apache/doris/catalog/Env.java | 8 -----
.../main/java/org/apache/doris/load/ExportJob.java | 14 ++++----
.../main/java/org/apache/doris/load/ExportMgr.java | 3 +-
.../doris/scheduler/disruptor/TaskHandler.java | 2 ++
.../scheduler/manager/TransientTaskManager.java | 15 +++++++-
.../scheduler/registry/ExportTaskRegister.java | 40 ----------------------
.../doris/analysis/CancelExportStmtTest.java | 1 -
7 files changed, 24 insertions(+), 59 deletions(-)
diff --git a/fe/fe-core/src/main/java/org/apache/doris/catalog/Env.java
b/fe/fe-core/src/main/java/org/apache/doris/catalog/Env.java
index 0332b45f265..03679d64330 100755
--- a/fe/fe-core/src/main/java/org/apache/doris/catalog/Env.java
+++ b/fe/fe-core/src/main/java/org/apache/doris/catalog/Env.java
@@ -251,7 +251,6 @@ import
org.apache.doris.resource.workloadschedpolicy.WorkloadRuntimeStatusMgr;
import org.apache.doris.resource.workloadschedpolicy.WorkloadSchedPolicyMgr;
import
org.apache.doris.resource.workloadschedpolicy.WorkloadSchedPolicyPublisher;
import org.apache.doris.scheduler.manager.TransientTaskManager;
-import org.apache.doris.scheduler.registry.ExportTaskRegister;
import org.apache.doris.service.ExecuteEnv;
import org.apache.doris.service.FrontendOptions;
import org.apache.doris.statistics.AnalysisManager;
@@ -388,7 +387,6 @@ public class Env {
private ExternalMetaIdMgr externalMetaIdMgr;
private MetastoreEventsProcessor metastoreEventsProcessor;
- private ExportTaskRegister exportTaskRegister;
private JobManager<? extends AbstractJob<?, ?>, ?> jobManager;
private LabelProcessor labelProcessor;
private TransientTaskManager transientTaskManager;
@@ -695,7 +693,6 @@ public class Env {
this.jobManager = new JobManager<>();
this.labelProcessor = new LabelProcessor();
this.transientTaskManager = new TransientTaskManager();
- this.exportTaskRegister = new ExportTaskRegister(transientTaskManager);
this.replayedJournalId = new AtomicLong(0L);
this.stmtIdCounter = new AtomicLong(0L);
@@ -4228,11 +4225,6 @@ public class Env {
return this.syncJobManager;
}
-
- public ExportTaskRegister getExportTaskRegister() {
- return exportTaskRegister;
- }
-
public JobManager getJobManager() {
return jobManager;
}
diff --git a/fe/fe-core/src/main/java/org/apache/doris/load/ExportJob.java
b/fe/fe-core/src/main/java/org/apache/doris/load/ExportJob.java
index 303887875eb..2fe07c6bf0f 100644
--- a/fe/fe-core/src/main/java/org/apache/doris/load/ExportJob.java
+++ b/fe/fe-core/src/main/java/org/apache/doris/load/ExportJob.java
@@ -99,7 +99,6 @@ import java.util.List;
import java.util.Map;
import java.util.Map.Entry;
import java.util.Optional;
-import java.util.concurrent.ConcurrentHashMap;
import java.util.stream.Collectors;
@Data
@@ -208,9 +207,7 @@ public class ExportJob implements Writable {
// backend_address => snapshot path
private List<Pair<TNetworkAddress, String>> snapshotPaths =
Lists.newArrayList();
- private List<ExportTaskExecutor> jobExecutorList;
-
- private ConcurrentHashMap<Long, TransientTaskExecutor> taskIdToExecutor =
new ConcurrentHashMap<>();
+ private List<ExportTaskExecutor> jobExecutorList = Lists.newArrayList();
private Integer finishedTaskCount = 0;
private List<List<OutfileInfo>> allOutfileInfo = Lists.newArrayList();
@@ -692,11 +689,11 @@ public class ExportJob implements Writable {
}
// we need cancel all task
- taskIdToExecutor.keySet().forEach(id -> {
+ jobExecutorList.forEach(executor -> {
try {
-
Env.getCurrentEnv().getTransientTaskManager().cancelMemoryTask(id);
+
Env.getCurrentEnv().getTransientTaskManager().cancelMemoryTask(executor.getId());
} catch (JobException e) {
- LOG.warn("cancel export task {} exception: {}", id, e);
+ LOG.warn("cancel export task {} exception: {}",
executor.getId(), e);
}
});
@@ -707,6 +704,7 @@ public class ExportJob implements Writable {
setExportJobState(ExportJobState.CANCELLED);
finishTimeMs = System.currentTimeMillis();
failMsg = new ExportFailMsg(type, msg);
+ jobExecutorList.clear();
if (FeConstants.runningUnitTest) {
return;
}
@@ -751,6 +749,8 @@ public class ExportJob implements Writable {
setExportJobState(ExportJobState.FINISHED);
finishTimeMs = System.currentTimeMillis();
outfileInfo = GsonUtils.GSON.toJson(allOutfileInfo);
+ // Clear the jobExecutorList to release memory.
+ jobExecutorList.clear();
Env.getCurrentEnv().getEditLog().logExportUpdateState(id,
ExportJobState.FINISHED);
}
diff --git a/fe/fe-core/src/main/java/org/apache/doris/load/ExportMgr.java
b/fe/fe-core/src/main/java/org/apache/doris/load/ExportMgr.java
index 7439fd89aa4..09eb8ea5eed 100644
--- a/fe/fe-core/src/main/java/org/apache/doris/load/ExportMgr.java
+++ b/fe/fe-core/src/main/java/org/apache/doris/load/ExportMgr.java
@@ -114,8 +114,7 @@ public class ExportMgr {
job.getBrokerDesc());
}
job.getTaskExecutors().forEach(executor -> {
- Long taskId =
Env.getCurrentEnv().getTransientTaskManager().addMemoryTask(executor);
- job.getTaskIdToExecutor().put(taskId, executor);
+
Env.getCurrentEnv().getTransientTaskManager().addMemoryTask(executor);
});
Env.getCurrentEnv().getEditLog().logExportCreate(job);
} finally {
diff --git
a/fe/fe-core/src/main/java/org/apache/doris/scheduler/disruptor/TaskHandler.java
b/fe/fe-core/src/main/java/org/apache/doris/scheduler/disruptor/TaskHandler.java
index de889c1b2e4..193f8ece9f7 100644
---
a/fe/fe-core/src/main/java/org/apache/doris/scheduler/disruptor/TaskHandler.java
+++
b/fe/fe-core/src/main/java/org/apache/doris/scheduler/disruptor/TaskHandler.java
@@ -68,6 +68,8 @@ public class TaskHandler implements WorkHandler<TaskEvent> {
taskExecutor.execute();
} catch (JobException e) {
log.warn("Memory task execute failed, taskId: {}, msg : {}",
taskId, e.getMessage());
+ } finally {
+ transientTaskManager.removeMemoryTask(taskId);
}
}
diff --git
a/fe/fe-core/src/main/java/org/apache/doris/scheduler/manager/TransientTaskManager.java
b/fe/fe-core/src/main/java/org/apache/doris/scheduler/manager/TransientTaskManager.java
index 51edd4af318..7461399c8eb 100644
---
a/fe/fe-core/src/main/java/org/apache/doris/scheduler/manager/TransientTaskManager.java
+++
b/fe/fe-core/src/main/java/org/apache/doris/scheduler/manager/TransientTaskManager.java
@@ -22,10 +22,13 @@ import org.apache.doris.scheduler.exception.JobException;
import org.apache.doris.scheduler.executor.TransientTaskExecutor;
import lombok.Setter;
+import org.apache.logging.log4j.LogManager;
+import org.apache.logging.log4j.Logger;
import java.util.concurrent.ConcurrentHashMap;
public class TransientTaskManager {
+ private static final Logger LOG =
LogManager.getLogger(TransientTaskManager.class);
/**
* key: taskId
* value: memory task executor of this task
@@ -57,10 +60,20 @@ public class TransientTaskManager {
Long taskId = executor.getId();
taskExecutorMap.put(taskId, executor);
disruptor.tryPublishTask(taskId);
+ LOG.info("add memory task, taskId: {}", taskId);
return taskId;
}
public void cancelMemoryTask(Long taskId) throws JobException {
- taskExecutorMap.get(taskId).cancel();
+ try {
+ taskExecutorMap.get(taskId).cancel();
+ } finally {
+ removeMemoryTask(taskId);
+ }
+ }
+
+ public void removeMemoryTask(Long taskId) {
+ taskExecutorMap.remove(taskId);
+ LOG.info("remove memory task, taskId: {}", taskId);
}
}
diff --git
a/fe/fe-core/src/main/java/org/apache/doris/scheduler/registry/ExportTaskRegister.java
b/fe/fe-core/src/main/java/org/apache/doris/scheduler/registry/ExportTaskRegister.java
deleted file mode 100644
index 0241f57fea0..00000000000
---
a/fe/fe-core/src/main/java/org/apache/doris/scheduler/registry/ExportTaskRegister.java
+++ /dev/null
@@ -1,40 +0,0 @@
-// Licensed to the Apache Software Foundation (ASF) under one
-// or more contributor license agreements. See the NOTICE file
-// distributed with this work for additional information
-// regarding copyright ownership. The ASF licenses this file
-// to you under the Apache License, Version 2.0 (the
-// "License"); you may not use this file except in compliance
-// with the License. You may obtain a copy of the License at
-//
-// http://www.apache.org/licenses/LICENSE-2.0
-//
-// Unless required by applicable law or agreed to in writing,
-// software distributed under the License is distributed on an
-// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
-// KIND, either express or implied. See the License for the
-// specific language governing permissions and limitations
-// under the License.
-
-package org.apache.doris.scheduler.registry;
-
-import org.apache.doris.scheduler.exception.JobException;
-import org.apache.doris.scheduler.executor.TransientTaskExecutor;
-import org.apache.doris.scheduler.manager.TransientTaskManager;
-
-public class ExportTaskRegister implements TransientTaskRegister {
- private final TransientTaskManager transientTaskManager;
-
- public ExportTaskRegister(TransientTaskManager transientTaskManager) {
- this.transientTaskManager = transientTaskManager;
- }
-
- @Override
- public Long registerTask(TransientTaskExecutor executor) {
- return transientTaskManager.addMemoryTask(executor);
- }
-
- @Override
- public void cancelTask(Long taskId) throws JobException {
- transientTaskManager.cancelMemoryTask(taskId);
- }
-}
diff --git
a/fe/fe-core/src/test/java/org/apache/doris/analysis/CancelExportStmtTest.java
b/fe/fe-core/src/test/java/org/apache/doris/analysis/CancelExportStmtTest.java
index 2d188230d8b..4ff15653fa0 100644
---
a/fe/fe-core/src/test/java/org/apache/doris/analysis/CancelExportStmtTest.java
+++
b/fe/fe-core/src/test/java/org/apache/doris/analysis/CancelExportStmtTest.java
@@ -234,7 +234,6 @@ public class CancelExportStmtTest extends TestWithFeService
{
exportMgr.unprotectAddJob(job3);
exportMgr.unprotectAddJob(job4);
-
// cancel export job where state = "PENDING"
Assert.assertTrue(job1.getState() == ExportJobState.PENDING);
SlotRef stateSlotRef = new SlotRef(null, "state");
---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]