This is an automated email from the ASF dual-hosted git repository.
leonbao pushed a commit to branch dev
in repository https://gitbox.apache.org/repos/asf/dolphinscheduler.git
The following commit(s) were added to refs/heads/dev by this push:
new e6239e8 [DS-6829][WorkerServer] skip create log dir and print log in
dryRun model (#6852)
e6239e8 is described below
commit e6239e808b9a508675889fccb7a47dff40e4d172
Author: wind <[email protected]>
AuthorDate: Sun Nov 21 17:51:56 2021 +0800
[DS-6829][WorkerServer] skip create log dir and print log in dryRun model
(#6852)
Co-authored-by: caishunfeng <[email protected]>
---
.../apache/dolphinscheduler/common/Constants.java | 1 +
.../worker/processor/TaskExecuteProcessor.java | 27 ++++++++-------
.../server/worker/runner/TaskExecuteThread.java | 38 +++++++++++-----------
3 files changed, 35 insertions(+), 31 deletions(-)
diff --git
a/dolphinscheduler-common/src/main/java/org/apache/dolphinscheduler/common/Constants.java
b/dolphinscheduler-common/src/main/java/org/apache/dolphinscheduler/common/Constants.java
index 8ca161d..6c7c106 100644
---
a/dolphinscheduler-common/src/main/java/org/apache/dolphinscheduler/common/Constants.java
+++
b/dolphinscheduler-common/src/main/java/org/apache/dolphinscheduler/common/Constants.java
@@ -766,4 +766,5 @@ public final class Constants {
* dry run flag
*/
public static final int DRY_RUN_FLAG_NO = 0;
+ public static final int DRY_RUN_FLAG_YES = 1;
}
diff --git
a/dolphinscheduler-server/src/main/java/org/apache/dolphinscheduler/server/worker/processor/TaskExecuteProcessor.java
b/dolphinscheduler-server/src/main/java/org/apache/dolphinscheduler/server/worker/processor/TaskExecuteProcessor.java
index 3252081..8dfaf34 100644
---
a/dolphinscheduler-server/src/main/java/org/apache/dolphinscheduler/server/worker/processor/TaskExecuteProcessor.java
+++
b/dolphinscheduler-server/src/main/java/org/apache/dolphinscheduler/server/worker/processor/TaskExecuteProcessor.java
@@ -17,6 +17,7 @@
package org.apache.dolphinscheduler.server.worker.processor;
+import org.apache.dolphinscheduler.common.Constants;
import org.apache.dolphinscheduler.common.enums.Event;
import org.apache.dolphinscheduler.common.enums.ExecutionStatus;
import org.apache.dolphinscheduler.common.enums.TaskType;
@@ -135,19 +136,21 @@ public class TaskExecuteProcessor implements
NettyRequestProcessor {
taskExecutionContext.setHost(NetUtils.getAddr(workerConfig.getListenPort()));
taskExecutionContext.setLogPath(LogUtils.getTaskLogPath(taskExecutionContext));
- // local execute path
- String execLocalPath = getExecLocalPath(taskExecutionContext);
- logger.info("task instance local execute path : {}", execLocalPath);
- taskExecutionContext.setExecutePath(execLocalPath);
-
- try {
- FileUtils.createWorkDirIfAbsent(execLocalPath);
- if (CommonUtils.isSudoEnable() &&
workerConfig.isTenantAutoCreate()) {
-
OSUtils.createUserIfAbsent(taskExecutionContext.getTenantCode());
+ if (Constants.DRY_RUN_FLAG_NO == taskExecutionContext.getDryRun()) {
+ // local execute path
+ String execLocalPath = getExecLocalPath(taskExecutionContext);
+ logger.info("task instance local execute path : {}",
execLocalPath);
+ taskExecutionContext.setExecutePath(execLocalPath);
+
+ try {
+ FileUtils.createWorkDirIfAbsent(execLocalPath);
+ if (CommonUtils.isSudoEnable() &&
workerConfig.isTenantAutoCreate()) {
+
OSUtils.createUserIfAbsent(taskExecutionContext.getTenantCode());
+ }
+ } catch (Throwable ex) {
+ logger.error("create execLocalPath: {}", execLocalPath, ex);
+
TaskExecutionContextCacheManager.removeByTaskInstanceId(taskExecutionContext.getTaskInstanceId());
}
- } catch (Throwable ex) {
- logger.error("create execLocalPath: {}", execLocalPath, ex);
-
TaskExecutionContextCacheManager.removeByTaskInstanceId(taskExecutionContext.getTaskInstanceId());
}
taskCallbackService.addRemoteChannel(taskExecutionContext.getTaskInstanceId(),
diff --git
a/dolphinscheduler-server/src/main/java/org/apache/dolphinscheduler/server/worker/runner/TaskExecuteThread.java
b/dolphinscheduler-server/src/main/java/org/apache/dolphinscheduler/server/worker/runner/TaskExecuteThread.java
index 084baf8..d8f1088 100644
---
a/dolphinscheduler-server/src/main/java/org/apache/dolphinscheduler/server/worker/runner/TaskExecuteThread.java
+++
b/dolphinscheduler-server/src/main/java/org/apache/dolphinscheduler/server/worker/runner/TaskExecuteThread.java
@@ -125,8 +125,16 @@ public class TaskExecuteThread implements Runnable,
Delayed {
@Override
public void run() {
-
TaskExecuteResponseCommand responseCommand = new
TaskExecuteResponseCommand(taskExecutionContext.getTaskInstanceId(),
taskExecutionContext.getProcessInstanceId());
+ if (Constants.DRY_RUN_FLAG_YES == taskExecutionContext.getDryRun()) {
+ responseCommand.setStatus(ExecutionStatus.SUCCESS.getCode());
+ responseCommand.setEndTime(new Date());
+
TaskExecutionContextCacheManager.removeByTaskInstanceId(taskExecutionContext.getTaskInstanceId());
+
ResponceCache.get().cache(taskExecutionContext.getTaskInstanceId(),
responseCommand.convert2Command(), Event.RESULT);
+
taskCallbackService.sendResult(taskExecutionContext.getTaskInstanceId(),
responseCommand.convert2Command());
+ return;
+ }
+
try {
logger.info("script path : {}",
taskExecutionContext.getExecutePath());
// check if the OS user exists
@@ -146,13 +154,8 @@ public class TaskExecuteThread implements Runnable,
Delayed {
}
logger.info("the task begins to execute. task instance id: {}",
taskExecutionContext.getTaskInstanceId());
- int dryRun = taskExecutionContext.getDryRun();
// copy hdfs/minio file to local
- if (dryRun == Constants.DRY_RUN_FLAG_NO) {
- downloadResource(taskExecutionContext.getExecutePath(),
- taskExecutionContext.getResources(),
- logger);
- }
+ downloadResource(taskExecutionContext.getExecutePath(),
taskExecutionContext.getResources(), logger);
taskExecutionContext.setEnvFile(CommonUtils.getSystemEnvPath());
taskExecutionContext.setDefinedParams(getGlobalParamsMap());
@@ -177,31 +180,28 @@ public class TaskExecuteThread implements Runnable,
Delayed {
taskRequest.setTaskLogName(taskLogName);
task = taskChannel.createTask(taskRequest);
+
// task init
this.task.init();
+
//init varPool
this.task.getParameters().setVarPool(taskExecutionContext.getVarPool());
- if (dryRun == Constants.DRY_RUN_FLAG_NO) {
- // task handle
- this.task.handle();
+ // task handle
+ this.task.handle();
- // task result process
- if (this.task.getNeedAlert()) {
- sendAlert(this.task.getTaskAlertInfo());
- }
- responseCommand.setStatus(this.task.getExitStatus().getCode());
- } else {
- responseCommand.setStatus(ExecutionStatus.SUCCESS.getCode());
- task.setExitStatusCode(Constants.EXIT_CODE_SUCCESS);
+ // task result process
+ if (this.task.getNeedAlert()) {
+ sendAlert(this.task.getTaskAlertInfo());
}
+
+ responseCommand.setStatus(this.task.getExitStatus().getCode());
responseCommand.setEndTime(new Date());
responseCommand.setProcessId(this.task.getProcessId());
responseCommand.setAppIds(this.task.getAppIds());
responseCommand.setVarPool(JSONUtils.toJsonString(this.task.getParameters().getVarPool()));
logger.info("task instance id : {},task final status : {}",
taskExecutionContext.getTaskInstanceId(), this.task.getExitStatus());
} catch (Throwable e) {
-
logger.error("task scheduler failure", e);
kill();
responseCommand.setStatus(ExecutionStatus.FAILURE.getCode());