This is an automated email from the ASF dual-hosted git repository.
morningman pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/doris.git
The following commit(s) were added to refs/heads/master by this push:
new de2ce48b992 [fix](Export) Fix the problem of exporting stuck (#44944)
de2ce48b992 is described below
commit de2ce48b992d12237f2d3aeb6b0461f6770e16c3
Author: Tiewei Fang <[email protected]>
AuthorDate: Fri Dec 6 13:02:58 2024 +0800
[fix](Export) Fix the problem of exporting stuck (#44944)
### What problem does this PR solve?
Problem Summary:
The `disruptor` will be witing when the ringbuffer does not have enough
capacity. At the same time, `addExportJobAndRegisterTask` will not
release the lock of `ExportMgr`. This prevents other methods from
obtaining the lock.
---
.../main/java/org/apache/doris/load/ExportMgr.java | 30 ++++++++++------------
.../doris/scheduler/disruptor/TaskDisruptor.java | 11 +++++---
.../scheduler/manager/TransientTaskManager.java | 4 +--
3 files changed, 22 insertions(+), 23 deletions(-)
diff --git a/fe/fe-core/src/main/java/org/apache/doris/load/ExportMgr.java
b/fe/fe-core/src/main/java/org/apache/doris/load/ExportMgr.java
index eddd5fb27ee..94ae436ee6d 100644
--- a/fe/fe-core/src/main/java/org/apache/doris/load/ExportMgr.java
+++ b/fe/fe-core/src/main/java/org/apache/doris/load/ExportMgr.java
@@ -108,26 +108,24 @@ public class ExportMgr {
}
}
unprotectAddJob(job);
- // delete existing files
- if (Config.enable_delete_existing_files &&
Boolean.parseBoolean(job.getDeleteExistingFiles())) {
- if (job.getBrokerDesc() == null) {
- throw new AnalysisException("Local file system does not
support delete existing files");
- }
- String fullPath = job.getExportPath();
- BrokerUtil.deleteDirectoryWithFileSystem(fullPath.substring(0,
fullPath.lastIndexOf('/') + 1),
- job.getBrokerDesc());
- }
Env.getCurrentEnv().getEditLog().logExportCreate(job);
- // ATTN: Must add task after edit log, otherwise the job may
finish before adding job.
- job.getCopiedTaskExecutors().forEach(executor -> {
-
Env.getCurrentEnv().getTransientTaskManager().addMemoryTask(executor);
- });
- LOG.info("add export job. {}", job);
-
} finally {
writeUnlock();
}
-
+ // delete existing files
+ if (Config.enable_delete_existing_files &&
Boolean.parseBoolean(job.getDeleteExistingFiles())) {
+ if (job.getBrokerDesc() == null) {
+ throw new AnalysisException("Local file system does not
support delete existing files");
+ }
+ String fullPath = job.getExportPath();
+ BrokerUtil.deleteDirectoryWithFileSystem(fullPath.substring(0,
fullPath.lastIndexOf('/') + 1),
+ job.getBrokerDesc());
+ }
+ // ATTN: Must add task after edit log, otherwise the job may finish
before adding job.
+ for (int i = 0; i < job.getCopiedTaskExecutors().size(); i++) {
+
Env.getCurrentEnv().getTransientTaskManager().addMemoryTask(job.getCopiedTaskExecutors().get(i));
+ }
+ LOG.info("add export job. {}", job);
}
public void cancelExportJob(CancelExportStmt stmt) throws DdlException,
AnalysisException {
diff --git
a/fe/fe-core/src/main/java/org/apache/doris/scheduler/disruptor/TaskDisruptor.java
b/fe/fe-core/src/main/java/org/apache/doris/scheduler/disruptor/TaskDisruptor.java
index 345b31d6bc2..8144ca22ea2 100644
---
a/fe/fe-core/src/main/java/org/apache/doris/scheduler/disruptor/TaskDisruptor.java
+++
b/fe/fe-core/src/main/java/org/apache/doris/scheduler/disruptor/TaskDisruptor.java
@@ -20,6 +20,7 @@ package org.apache.doris.scheduler.disruptor;
import org.apache.doris.common.Config;
import org.apache.doris.common.CustomThreadFactory;
import org.apache.doris.scheduler.constants.TaskType;
+import org.apache.doris.scheduler.exception.JobException;
import com.lmax.disruptor.EventTranslatorThreeArg;
import com.lmax.disruptor.LiteTimeoutBlockingWaitStrategy;
@@ -119,15 +120,17 @@ public class TaskDisruptor implements Closeable {
*
* @param taskId task id
*/
- public void tryPublishTask(Long taskId) {
+ public void tryPublishTask(Long taskId) throws JobException {
if (isClosed) {
log.info("tryPublish failed, disruptor is closed, taskId: {}",
taskId);
return;
}
- try {
+ // We reserve two slots in the ring buffer
+ // to prevent it from becoming stuck due to competition between
producers and consumers.
+ if (disruptor.getRingBuffer().hasAvailableCapacity(2)) {
disruptor.publishEvent(TRANSLATOR, taskId, 0L,
TaskType.TRANSIENT_TASK);
- } catch (Exception e) {
- log.warn("tryPublish failed, taskId: {}", taskId, e);
+ } else {
+ throw new JobException("There is not enough available capacity in
the RingBuffer.");
}
}
diff --git
a/fe/fe-core/src/main/java/org/apache/doris/scheduler/manager/TransientTaskManager.java
b/fe/fe-core/src/main/java/org/apache/doris/scheduler/manager/TransientTaskManager.java
index 7461399c8eb..de501d3e0c2 100644
---
a/fe/fe-core/src/main/java/org/apache/doris/scheduler/manager/TransientTaskManager.java
+++
b/fe/fe-core/src/main/java/org/apache/doris/scheduler/manager/TransientTaskManager.java
@@ -21,7 +21,6 @@ import org.apache.doris.scheduler.disruptor.TaskDisruptor;
import org.apache.doris.scheduler.exception.JobException;
import org.apache.doris.scheduler.executor.TransientTaskExecutor;
-import lombok.Setter;
import org.apache.logging.log4j.LogManager;
import org.apache.logging.log4j.Logger;
@@ -41,7 +40,6 @@ public class TransientTaskManager {
* disruptor is used to handle task
* disruptor will start a thread pool to handle task
*/
- @Setter
private TaskDisruptor disruptor;
public TransientTaskManager() {
@@ -56,7 +54,7 @@ public class TransientTaskManager {
return taskExecutorMap.get(taskId);
}
- public Long addMemoryTask(TransientTaskExecutor executor) {
+ public Long addMemoryTask(TransientTaskExecutor executor) throws
JobException {
Long taskId = executor.getId();
taskExecutorMap.put(taskId, executor);
disruptor.tryPublishTask(taskId);
---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]