This is an automated email from the ASF dual-hosted git repository.
wenjun pushed a commit to branch dev
in repository https://gitbox.apache.org/repos/asf/dolphinscheduler.git
The following commit(s) were added to refs/heads/dev by this push:
new 0e88ea3ac8 Recreate new TaskInstance Working Directory when exist in
worker (#15358)
0e88ea3ac8 is described below
commit 0e88ea3ac8435391adf605f7d54c688e7b498687
Author: Wenjun Ruan <[email protected]>
AuthorDate: Mon Dec 25 16:00:41 2023 +0800
Recreate new TaskInstance Working Directory when exist in worker (#15358)
---
.../server/worker/runner/WorkerTaskExecutor.java | 2 +-
.../worker/utils/TaskExecutionContextUtils.java | 35 +++++++++++++---------
2 files changed, 22 insertions(+), 15 deletions(-)
diff --git
a/dolphinscheduler-worker/src/main/java/org/apache/dolphinscheduler/server/worker/runner/WorkerTaskExecutor.java
b/dolphinscheduler-worker/src/main/java/org/apache/dolphinscheduler/server/worker/runner/WorkerTaskExecutor.java
index c335f34994..e47a28d264 100644
---
a/dolphinscheduler-worker/src/main/java/org/apache/dolphinscheduler/server/worker/runner/WorkerTaskExecutor.java
+++
b/dolphinscheduler-worker/src/main/java/org/apache/dolphinscheduler/server/worker/runner/WorkerTaskExecutor.java
@@ -217,7 +217,7 @@ public abstract class WorkerTaskExecutor implements
Runnable {
taskExecutionContext.setTenantCode(tenant);
log.info("TenantCode: {} check successfully",
taskExecutionContext.getTenantCode());
-
TaskExecutionContextUtils.createProcessLocalPathIfAbsent(taskExecutionContext);
+
TaskExecutionContextUtils.createTaskInstanceWorkingDirectory(taskExecutionContext);
log.info("WorkflowInstanceExecDir: {} check successfully",
taskExecutionContext.getExecutePath());
TaskChannel taskChannel =
diff --git
a/dolphinscheduler-worker/src/main/java/org/apache/dolphinscheduler/server/worker/utils/TaskExecutionContextUtils.java
b/dolphinscheduler-worker/src/main/java/org/apache/dolphinscheduler/server/worker/utils/TaskExecutionContextUtils.java
index bbe3b1ab4b..dcfeec4e1c 100644
---
a/dolphinscheduler-worker/src/main/java/org/apache/dolphinscheduler/server/worker/utils/TaskExecutionContextUtils.java
+++
b/dolphinscheduler-worker/src/main/java/org/apache/dolphinscheduler/server/worker/utils/TaskExecutionContextUtils.java
@@ -80,22 +80,29 @@ public class TaskExecutionContextUtils {
}
}
- public static void createProcessLocalPathIfAbsent(TaskExecutionContext
taskExecutionContext) throws TaskException {
+ public static void createTaskInstanceWorkingDirectory(TaskExecutionContext
taskExecutionContext) throws TaskException {
+ // local execute path
+ String taskInstanceWorkingDirectory = FileUtils.getProcessExecDir(
+ taskExecutionContext.getTenantCode(),
+ taskExecutionContext.getProjectCode(),
+ taskExecutionContext.getProcessDefineCode(),
+ taskExecutionContext.getProcessDefineVersion(),
+ taskExecutionContext.getProcessInstanceId(),
+ taskExecutionContext.getTaskInstanceId());
try {
- // local execute path
- String execLocalPath = FileUtils.getProcessExecDir(
- taskExecutionContext.getTenantCode(),
- taskExecutionContext.getProjectCode(),
- taskExecutionContext.getProcessDefineCode(),
- taskExecutionContext.getProcessDefineVersion(),
- taskExecutionContext.getProcessInstanceId(),
- taskExecutionContext.getTaskInstanceId());
- taskExecutionContext.setExecutePath(execLocalPath);
-
taskExecutionContext.setAppInfoPath(FileUtils.getAppInfoPath(execLocalPath));
- Path executePath =
Paths.get(taskExecutionContext.getExecutePath());
- FileUtils.createDirectoryIfNotPresent(executePath);
+ Path path = Paths.get(taskInstanceWorkingDirectory);
+ if (Files.deleteIfExists(path)) {
+ log.warn("The TaskInstance WorkingDirectory: {} is exist, will
recreate again",
+ taskInstanceWorkingDirectory);
+ }
+ Files.createDirectories(path);
+ taskExecutionContext.setExecutePath(taskInstanceWorkingDirectory);
+
+ taskExecutionContext.setExecutePath(taskInstanceWorkingDirectory);
+
taskExecutionContext.setAppInfoPath(FileUtils.getAppInfoPath(taskInstanceWorkingDirectory));
} catch (Throwable ex) {
- throw new TaskException("Cannot create process execute dir", ex);
+ throw new TaskException(
+ "Cannot create TaskInstance WorkingDirectory: " +
taskInstanceWorkingDirectory + " failed", ex);
}
}