This is an automated email from the ASF dual-hosted git repository.
morningman pushed a commit to branch branch-3.0
in repository https://gitbox.apache.org/repos/asf/doris.git
The following commit(s) were added to refs/heads/branch-3.0 by this push:
new e9290340c89 [fix](export) remove export task executor in
TransientTaskExecutor and fix concurrency issue
(#42880)(#43051)(#43109)(#43250) (#43305)
e9290340c89 is described below
commit e9290340c899ad94c4f8c6a8964de18464321f5c
Author: Mingyu Chen (Rayner) <[email protected]>
AuthorDate: Wed Nov 6 13:55:27 2024 +0800
[fix](export) remove export task executor in TransientTaskExecutor and fix
concurrency issue (#42880)(#43051)(#43109)(#43250) (#43305)
cherry pick from (#42880)(#43051)(#43109)(#43250)
---
.../java/org/apache/doris/analysis/ExportStmt.java | 2 +-
.../main/java/org/apache/doris/catalog/Env.java | 8 -----
.../main/java/org/apache/doris/load/ExportJob.java | 20 ++++++-----
.../main/java/org/apache/doris/load/ExportMgr.java | 30 ++++++++++------
.../trees/plans/commands/ExportCommand.java | 2 +-
.../doris/scheduler/disruptor/TaskHandler.java | 2 ++
.../scheduler/manager/TransientTaskManager.java | 15 +++++++-
.../scheduler/registry/ExportTaskRegister.java | 40 ----------------------
.../doris/analysis/CancelExportStmtTest.java | 1 -
9 files changed, 48 insertions(+), 72 deletions(-)
diff --git a/fe/fe-core/src/main/java/org/apache/doris/analysis/ExportStmt.java
b/fe/fe-core/src/main/java/org/apache/doris/analysis/ExportStmt.java
index a9ce85b2d3e..ba7aa50ec69 100644
--- a/fe/fe-core/src/main/java/org/apache/doris/analysis/ExportStmt.java
+++ b/fe/fe-core/src/main/java/org/apache/doris/analysis/ExportStmt.java
@@ -208,7 +208,7 @@ public class ExportStmt extends StatementBase implements
NotFallbackInParser {
}
private void setJob() throws UserException {
- exportJob = new ExportJob();
+ exportJob = new ExportJob(Env.getCurrentEnv().getNextId());
Database db =
Env.getCurrentInternalCatalog().getDbOrDdlException(this.tblName.getDb());
exportJob.setDbId(db.getId());
diff --git a/fe/fe-core/src/main/java/org/apache/doris/catalog/Env.java
b/fe/fe-core/src/main/java/org/apache/doris/catalog/Env.java
index 47caa710f5e..dcc32d8276f 100644
--- a/fe/fe-core/src/main/java/org/apache/doris/catalog/Env.java
+++ b/fe/fe-core/src/main/java/org/apache/doris/catalog/Env.java
@@ -254,7 +254,6 @@ import
org.apache.doris.resource.workloadschedpolicy.WorkloadRuntimeStatusMgr;
import org.apache.doris.resource.workloadschedpolicy.WorkloadSchedPolicyMgr;
import
org.apache.doris.resource.workloadschedpolicy.WorkloadSchedPolicyPublisher;
import org.apache.doris.scheduler.manager.TransientTaskManager;
-import org.apache.doris.scheduler.registry.ExportTaskRegister;
import org.apache.doris.service.ExecuteEnv;
import org.apache.doris.service.FrontendOptions;
import org.apache.doris.statistics.AnalysisManager;
@@ -395,7 +394,6 @@ public class Env {
private ExternalMetaIdMgr externalMetaIdMgr;
private MetastoreEventsProcessor metastoreEventsProcessor;
- private ExportTaskRegister exportTaskRegister;
private JobManager<? extends AbstractJob<?, ?>, ?> jobManager;
private LabelProcessor labelProcessor;
private TransientTaskManager transientTaskManager;
@@ -709,7 +707,6 @@ public class Env {
this.jobManager = new JobManager<>();
this.labelProcessor = new LabelProcessor();
this.transientTaskManager = new TransientTaskManager();
- this.exportTaskRegister = new ExportTaskRegister(transientTaskManager);
this.replayedJournalId = new AtomicLong(0L);
this.stmtIdCounter = new AtomicLong(0L);
@@ -4418,11 +4415,6 @@ public class Env {
return this.syncJobManager;
}
-
- public ExportTaskRegister getExportTaskRegister() {
- return exportTaskRegister;
- }
-
public JobManager getJobManager() {
return jobManager;
}
diff --git a/fe/fe-core/src/main/java/org/apache/doris/load/ExportJob.java
b/fe/fe-core/src/main/java/org/apache/doris/load/ExportJob.java
index 33418531f2c..e77b0517d95 100644
--- a/fe/fe-core/src/main/java/org/apache/doris/load/ExportJob.java
+++ b/fe/fe-core/src/main/java/org/apache/doris/load/ExportJob.java
@@ -98,7 +98,6 @@ import java.util.List;
import java.util.Map;
import java.util.Map.Entry;
import java.util.Optional;
-import java.util.concurrent.ConcurrentHashMap;
import java.util.stream.Collectors;
@Data
@@ -207,9 +206,7 @@ public class ExportJob implements Writable {
// backend_address => snapshot path
private List<Pair<TNetworkAddress, String>> snapshotPaths =
Lists.newArrayList();
- private List<ExportTaskExecutor> jobExecutorList;
-
- private ConcurrentHashMap<Long, TransientTaskExecutor> taskIdToExecutor =
new ConcurrentHashMap<>();
+ private List<ExportTaskExecutor> jobExecutorList = Lists.newArrayList();
private Integer finishedTaskCount = 0;
private List<List<OutfileInfo>> allOutfileInfo = Lists.newArrayList();
@@ -399,8 +396,8 @@ public class ExportJob implements Writable {
return statementBase;
}
- public List<? extends TransientTaskExecutor> getTaskExecutors() {
- return jobExecutorList;
+ public List<? extends TransientTaskExecutor> getCopiedTaskExecutors() {
+ return Lists.newArrayList(jobExecutorList);
}
private void generateExportJobExecutor() {
@@ -690,11 +687,11 @@ public class ExportJob implements Writable {
}
// we need cancel all task
- taskIdToExecutor.keySet().forEach(id -> {
+ jobExecutorList.forEach(executor -> {
try {
-
Env.getCurrentEnv().getTransientTaskManager().cancelMemoryTask(id);
+
Env.getCurrentEnv().getTransientTaskManager().cancelMemoryTask(executor.getId());
} catch (JobException e) {
- LOG.warn("cancel export task {} exception: {}", id, e);
+ LOG.warn("cancel export task {} exception: {}",
executor.getId(), e);
}
});
@@ -705,10 +702,12 @@ public class ExportJob implements Writable {
setExportJobState(ExportJobState.CANCELLED);
finishTimeMs = System.currentTimeMillis();
failMsg = new ExportFailMsg(type, msg);
+ jobExecutorList.clear();
if (FeConstants.runningUnitTest) {
return;
}
Env.getCurrentEnv().getEditLog().logExportUpdateState(id,
ExportJobState.CANCELLED);
+ LOG.info("cancel export job {}", id);
}
private void exportExportJob() {
@@ -749,7 +748,10 @@ public class ExportJob implements Writable {
setExportJobState(ExportJobState.FINISHED);
finishTimeMs = System.currentTimeMillis();
outfileInfo = GsonUtils.GSON.toJson(allOutfileInfo);
+ // Clear the jobExecutorList to release memory.
+ jobExecutorList.clear();
Env.getCurrentEnv().getEditLog().logExportUpdateState(id,
ExportJobState.FINISHED);
+ LOG.info("finish export job {}", id);
}
public void replayExportJobState(ExportJobState newState) {
diff --git a/fe/fe-core/src/main/java/org/apache/doris/load/ExportMgr.java
b/fe/fe-core/src/main/java/org/apache/doris/load/ExportMgr.java
index 7dbe953cf9b..49ebbfe7dcd 100644
--- a/fe/fe-core/src/main/java/org/apache/doris/load/ExportMgr.java
+++ b/fe/fe-core/src/main/java/org/apache/doris/load/ExportMgr.java
@@ -67,8 +67,8 @@ public class ExportMgr {
// dbid -> <label -> job>
private Map<Long, Map<String, Long>> dbTolabelToExportJobId =
Maps.newHashMap();
- // lock for export job
- // lock is private and must use after db lock
+ // lock for protecting export jobs.
+ // need to be added when creating or cancelling export job.
private final ReentrantReadWriteLock lock = new
ReentrantReadWriteLock(true);
public ExportMgr() {
@@ -95,8 +95,6 @@ public class ExportMgr {
}
public void addExportJobAndRegisterTask(ExportJob job) throws Exception {
- long jobId = Env.getCurrentEnv().getNextId();
- job.setId(jobId);
writeLock();
try {
if (dbTolabelToExportJobId.containsKey(job.getDbId())
@@ -117,15 +115,17 @@ public class ExportMgr {
BrokerUtil.deleteDirectoryWithFileSystem(fullPath.substring(0,
fullPath.lastIndexOf('/') + 1),
job.getBrokerDesc());
}
- job.getTaskExecutors().forEach(executor -> {
- Long taskId =
Env.getCurrentEnv().getTransientTaskManager().addMemoryTask(executor);
- job.getTaskIdToExecutor().put(taskId, executor);
- });
Env.getCurrentEnv().getEditLog().logExportCreate(job);
+ // ATTN: Must add task after edit log, otherwise the job may
finish before adding job.
+ job.getCopiedTaskExecutors().forEach(executor -> {
+
Env.getCurrentEnv().getTransientTaskManager().addMemoryTask(executor);
+ });
+ LOG.info("add export job. {}", job);
+
} finally {
writeUnlock();
}
- LOG.info("add export job. {}", job);
+
}
public void cancelExportJob(CancelExportStmt stmt) throws DdlException,
AnalysisException {
@@ -142,6 +142,11 @@ public class ExportMgr {
// check auth
checkCancelExportJobAuth(InternalCatalog.INTERNAL_CATALOG_NAME,
stmt.getDbName(), matchExportJobs);
+ // Must add lock to protect export job.
+ // Because job may be cancelled when generating task executors,
+ // the cancel process may clear the task executor list at same time,
+ // which will cause ConcurrentModificationException
+ writeLock();
try {
for (ExportJob exportJob : matchExportJobs) {
// exportJob.cancel(ExportFailMsg.CancelType.USER_CANCEL,
"user cancel");
@@ -150,6 +155,8 @@ public class ExportMgr {
}
} catch (JobException e) {
throw new AnalysisException(e.getMessage());
+ } finally {
+ writeUnlock();
}
}
@@ -464,8 +471,9 @@ public class ExportMgr {
}
public void replayUpdateJobState(ExportJobStateTransfer stateTransfer) {
- readLock();
+ writeLock();
try {
+ LOG.info("replay update export job: {}, {}",
stateTransfer.getJobId(), stateTransfer.getState());
ExportJob job = exportIdToJob.get(stateTransfer.getJobId());
job.replayExportJobState(stateTransfer.getState());
job.setStartTimeMs(stateTransfer.getStartTimeMs());
@@ -473,7 +481,7 @@ public class ExportMgr {
job.setFailMsg(stateTransfer.getFailMsg());
job.setOutfileInfo(stateTransfer.getOutFileInfo());
} finally {
- readUnlock();
+ writeUnlock();
}
}
diff --git
a/fe/fe-core/src/main/java/org/apache/doris/nereids/trees/plans/commands/ExportCommand.java
b/fe/fe-core/src/main/java/org/apache/doris/nereids/trees/plans/commands/ExportCommand.java
index dbf6cf7067e..38083e406b9 100644
---
a/fe/fe-core/src/main/java/org/apache/doris/nereids/trees/plans/commands/ExportCommand.java
+++
b/fe/fe-core/src/main/java/org/apache/doris/nereids/trees/plans/commands/ExportCommand.java
@@ -242,7 +242,7 @@ public class ExportCommand extends Command implements
ForwardWithSync {
private ExportJob generateExportJob(ConnectContext ctx, Map<String,
String> fileProperties, TableName tblName)
throws UserException {
- ExportJob exportJob = new ExportJob();
+ ExportJob exportJob = new ExportJob(Env.getCurrentEnv().getNextId());
// set export job and check catalog/db/table
CatalogIf catalog =
ctx.getEnv().getCatalogMgr().getCatalogOrAnalysisException(tblName.getCtl());
DatabaseIf db = catalog.getDbOrAnalysisException(tblName.getDb());
diff --git
a/fe/fe-core/src/main/java/org/apache/doris/scheduler/disruptor/TaskHandler.java
b/fe/fe-core/src/main/java/org/apache/doris/scheduler/disruptor/TaskHandler.java
index de889c1b2e4..193f8ece9f7 100644
---
a/fe/fe-core/src/main/java/org/apache/doris/scheduler/disruptor/TaskHandler.java
+++
b/fe/fe-core/src/main/java/org/apache/doris/scheduler/disruptor/TaskHandler.java
@@ -68,6 +68,8 @@ public class TaskHandler implements WorkHandler<TaskEvent> {
taskExecutor.execute();
} catch (JobException e) {
log.warn("Memory task execute failed, taskId: {}, msg : {}",
taskId, e.getMessage());
+ } finally {
+ transientTaskManager.removeMemoryTask(taskId);
}
}
diff --git
a/fe/fe-core/src/main/java/org/apache/doris/scheduler/manager/TransientTaskManager.java
b/fe/fe-core/src/main/java/org/apache/doris/scheduler/manager/TransientTaskManager.java
index 51edd4af318..7461399c8eb 100644
---
a/fe/fe-core/src/main/java/org/apache/doris/scheduler/manager/TransientTaskManager.java
+++
b/fe/fe-core/src/main/java/org/apache/doris/scheduler/manager/TransientTaskManager.java
@@ -22,10 +22,13 @@ import org.apache.doris.scheduler.exception.JobException;
import org.apache.doris.scheduler.executor.TransientTaskExecutor;
import lombok.Setter;
+import org.apache.logging.log4j.LogManager;
+import org.apache.logging.log4j.Logger;
import java.util.concurrent.ConcurrentHashMap;
public class TransientTaskManager {
+ private static final Logger LOG =
LogManager.getLogger(TransientTaskManager.class);
/**
* key: taskId
* value: memory task executor of this task
@@ -57,10 +60,20 @@ public class TransientTaskManager {
Long taskId = executor.getId();
taskExecutorMap.put(taskId, executor);
disruptor.tryPublishTask(taskId);
+ LOG.info("add memory task, taskId: {}", taskId);
return taskId;
}
public void cancelMemoryTask(Long taskId) throws JobException {
- taskExecutorMap.get(taskId).cancel();
+ try {
+ taskExecutorMap.get(taskId).cancel();
+ } finally {
+ removeMemoryTask(taskId);
+ }
+ }
+
+ public void removeMemoryTask(Long taskId) {
+ taskExecutorMap.remove(taskId);
+ LOG.info("remove memory task, taskId: {}", taskId);
}
}
diff --git
a/fe/fe-core/src/main/java/org/apache/doris/scheduler/registry/ExportTaskRegister.java
b/fe/fe-core/src/main/java/org/apache/doris/scheduler/registry/ExportTaskRegister.java
deleted file mode 100644
index 0241f57fea0..00000000000
---
a/fe/fe-core/src/main/java/org/apache/doris/scheduler/registry/ExportTaskRegister.java
+++ /dev/null
@@ -1,40 +0,0 @@
-// Licensed to the Apache Software Foundation (ASF) under one
-// or more contributor license agreements. See the NOTICE file
-// distributed with this work for additional information
-// regarding copyright ownership. The ASF licenses this file
-// to you under the Apache License, Version 2.0 (the
-// "License"); you may not use this file except in compliance
-// with the License. You may obtain a copy of the License at
-//
-// http://www.apache.org/licenses/LICENSE-2.0
-//
-// Unless required by applicable law or agreed to in writing,
-// software distributed under the License is distributed on an
-// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
-// KIND, either express or implied. See the License for the
-// specific language governing permissions and limitations
-// under the License.
-
-package org.apache.doris.scheduler.registry;
-
-import org.apache.doris.scheduler.exception.JobException;
-import org.apache.doris.scheduler.executor.TransientTaskExecutor;
-import org.apache.doris.scheduler.manager.TransientTaskManager;
-
-public class ExportTaskRegister implements TransientTaskRegister {
- private final TransientTaskManager transientTaskManager;
-
- public ExportTaskRegister(TransientTaskManager transientTaskManager) {
- this.transientTaskManager = transientTaskManager;
- }
-
- @Override
- public Long registerTask(TransientTaskExecutor executor) {
- return transientTaskManager.addMemoryTask(executor);
- }
-
- @Override
- public void cancelTask(Long taskId) throws JobException {
- transientTaskManager.cancelMemoryTask(taskId);
- }
-}
diff --git
a/fe/fe-core/src/test/java/org/apache/doris/analysis/CancelExportStmtTest.java
b/fe/fe-core/src/test/java/org/apache/doris/analysis/CancelExportStmtTest.java
index 2d188230d8b..4ff15653fa0 100644
---
a/fe/fe-core/src/test/java/org/apache/doris/analysis/CancelExportStmtTest.java
+++
b/fe/fe-core/src/test/java/org/apache/doris/analysis/CancelExportStmtTest.java
@@ -234,7 +234,6 @@ public class CancelExportStmtTest extends TestWithFeService
{
exportMgr.unprotectAddJob(job3);
exportMgr.unprotectAddJob(job4);
-
// cancel export job where state = "PENDING"
Assert.assertTrue(job1.getState() == ExportJobState.PENDING);
SlotRef stateSlotRef = new SlotRef(null, "state");
---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]