This is an automated email from the ASF dual-hosted git repository.
journey pushed a commit to branch refactor-worker
in repository https://gitbox.apache.org/repos/asf/incubator-dolphinscheduler.git
The following commit(s) were added to refs/heads/refactor-worker by this push:
new 2bb5eba add TaskInstanceCacheManager receive Worker report
result,modify master polling db transfrom to cache (#2021)
2bb5eba is described below
commit 2bb5ebaf3a20297b3cfa4d8e046f9a6479e72d5a
Author: qiaozhanwei <[email protected]>
AuthorDate: Wed Feb 26 18:46:05 2020 +0800
add TaskInstanceCacheManager receive Worker report result,modify master
polling db transfrom to cache (#2021)
* 1, master persistent task
2. extract master and worker communication model
* 1, master persistent task
2. extract master and worker communication model
* 1, master persistent task
2. extract master and worker communication model
* add license
* modify javadoc error
* TaskExecutionContext create modify
* buildAckCommand taskInstanceId not set modify
* java doc error modify
* add comment
* ExecutorManager interface add generic type
* add TaskInstanceCacheManager receive Worker report result
* TaskInstance setExecutePath
* add TaskInstanceCacheManager to receive Worker Task result report
* TaskInstanceCacheManager add remove method
* add license
* add dispatcht task method
* AbstractCommandExecutor remove db access
* AbstractCommandExecutor remove db access
* AbstractCommandExecutor remove db access
* AbstractCommandExecutor remove db access
* AbstractCommandExecutor remove db access
* AbstractCommandExecutor remove db access
* AbstractCommandExecutor remove db access
---
.../dolphinscheduler/common/utils/CommonUtils.java | 2 +-
.../dolphinscheduler/dao/entity/TaskInstance.java | 22 +--
.../remote/command/ExecuteTaskResponseCommand.java | 2 +-
.../remote/entity/TaskExecutionContext.java | 15 +-
.../builder/TaskExecutionContextBuilder.java | 1 +
.../master/cache/TaskInstanceCacheManager.java | 64 ++++++
.../cache/impl/TaskInstanceCacheManagerImpl.java | 115 +++++++++++
.../server/master/processor/TaskAckProcessor.java | 13 +-
.../master/processor/TaskResponseProcessor.java | 18 +-
.../master/runner/MasterBaseTaskExecThread.java | 79 ++++++--
.../server/master/runner/MasterExecThread.java | 4 +-
.../server/master/runner/MasterTaskExecThread.java | 26 ++-
.../worker/processor/TaskExecuteProcessor.java | 1 +
.../server/worker/processor/TaskKillProcessor.java | 19 ++
.../server/worker/runner/TaskExecuteThread.java | 8 +-
.../worker/task/AbstractCommandExecutor.java | 214 ++++++++++-----------
.../server/worker/task/AbstractTask.java | 30 ++-
.../server/worker/task/AbstractYarnTask.java | 20 +-
.../server/worker/task/CommandExecuteResult.java | 69 +++++++
.../server/worker/task/PythonCommandExecutor.java | 13 +-
.../server/worker/task/ShellCommandExecutor.java | 17 +-
.../server/worker/task/TaskProps.java | 84 +++++---
.../server/worker/task/datax/DataxTask.java | 20 +-
.../worker/task/dependent/DependentTask.java | 2 +-
.../server/worker/task/flink/FlinkTask.java | 2 +-
.../server/worker/task/http/HttpTask.java | 2 +-
.../server/worker/task/python/PythonTask.java | 21 +-
.../server/worker/task/shell/ShellTask.java | 16 +-
.../server/worker/task/sql/SqlTask.java | 8 +-
.../worker/shell/ShellCommandExecutorTest.java | 4 +-
.../server/worker/sql/SqlExecutorTest.java | 6 +-
.../server/worker/task/datax/DataxTaskTest.java | 12 +-
.../worker/task/dependent/DependentTaskTest.java | 2 +-
.../conf/home/pages/dag/_source/formModel/log.vue | 4 +-
34 files changed, 679 insertions(+), 256 deletions(-)
diff --git
a/dolphinscheduler-common/src/main/java/org/apache/dolphinscheduler/common/utils/CommonUtils.java
b/dolphinscheduler-common/src/main/java/org/apache/dolphinscheduler/common/utils/CommonUtils.java
index b4b89bf..d15ede2 100644
---
a/dolphinscheduler-common/src/main/java/org/apache/dolphinscheduler/common/utils/CommonUtils.java
+++
b/dolphinscheduler-common/src/main/java/org/apache/dolphinscheduler/common/utils/CommonUtils.java
@@ -26,7 +26,7 @@ import java.io.File;
/**
* common utils
*/
-public class CommonUtils {
+public class CommonUtils {
private CommonUtils() {
throw new IllegalStateException("CommonUtils class");
}
diff --git
a/dolphinscheduler-dao/src/main/java/org/apache/dolphinscheduler/dao/entity/TaskInstance.java
b/dolphinscheduler-dao/src/main/java/org/apache/dolphinscheduler/dao/entity/TaskInstance.java
index 428f5d4..92cb3af 100644
---
a/dolphinscheduler-dao/src/main/java/org/apache/dolphinscheduler/dao/entity/TaskInstance.java
+++
b/dolphinscheduler-dao/src/main/java/org/apache/dolphinscheduler/dao/entity/TaskInstance.java
@@ -462,6 +462,14 @@ public class TaskInstance implements Serializable {
this.workerGroupId = workerGroupId;
}
+ public String getDependentResult() {
+ return dependentResult;
+ }
+
+ public void setDependentResult(String dependentResult) {
+ this.dependentResult = dependentResult;
+ }
+
@Override
public String toString() {
return "TaskInstance{" +
@@ -481,27 +489,19 @@ public class TaskInstance implements Serializable {
", logPath='" + logPath + '\'' +
", retryTimes=" + retryTimes +
", alertFlag=" + alertFlag +
- ", flag=" + flag +
", processInstance=" + processInstance +
", processDefine=" + processDefine +
", pid=" + pid +
", appLink='" + appLink + '\'' +
", flag=" + flag +
- ", dependency=" + dependency +
+ ", dependency='" + dependency + '\'' +
", duration=" + duration +
", maxRetryTimes=" + maxRetryTimes +
", retryInterval=" + retryInterval +
", taskInstancePriority=" + taskInstancePriority +
", processInstancePriority=" + processInstancePriority +
- ", workGroupId=" + workerGroupId +
+ ", dependentResult='" + dependentResult + '\'' +
+ ", workerGroupId=" + workerGroupId +
'}';
}
-
- public String getDependentResult() {
- return dependentResult;
- }
-
- public void setDependentResult(String dependentResult) {
- this.dependentResult = dependentResult;
- }
}
diff --git
a/dolphinscheduler-remote/src/main/java/org/apache/dolphinscheduler/remote/command/ExecuteTaskResponseCommand.java
b/dolphinscheduler-remote/src/main/java/org/apache/dolphinscheduler/remote/command/ExecuteTaskResponseCommand.java
index 6bbc2f7..d8253a8 100644
---
a/dolphinscheduler-remote/src/main/java/org/apache/dolphinscheduler/remote/command/ExecuteTaskResponseCommand.java
+++
b/dolphinscheduler-remote/src/main/java/org/apache/dolphinscheduler/remote/command/ExecuteTaskResponseCommand.java
@@ -1 +1 @@
-/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.dolphinscheduler.remote.command;
import org.apache.dolphinscheduler.remote.utils.FastJsonSerializer;
import java.io.Serializable;
import java.util.Date;
/**
* execute task
response command
*/
public class ExecuteTaskResponseCommand implements Serializable {
public ExecuteTaskResponseCommand() {
}
public ExecuteTaskResponseCommand(int taskInstanceId) {
this.taskInstanceId = taskInstanceId;
}
/**
* task instance id
*/
private int taskInstanceId;
/**
* status
*/
private int status;
/**
* end time
*/
private Date endTime;
public int getTaskInstanceId() {
return taskInstanceId;
}
public void setTaskInstanceId(int taskInstanceId) {
this.taskInstanceId = taskInstanceId;
}
public int getStatus() {
return status;
}
public void setStatus(int status) {
this.status = status;
}
public Date getEndTime() {
return endTime;
}
public void setEndTime(Date endTime) {
this.endTime = endTime;
}
/**
* package response command
* @return command
*/
public Command
convert2Command(){
Command command = new Command();
command.setType(CommandType.EXECUTE_TASK_RESPONSE);
byte[] body = FastJsonSerializer.serialize(this);
command.setBody(body);
return command;
}
}
\ No newline at end of file
+/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.dolphinscheduler.remote.command;
import org.apache.dolphinscheduler.remote.utils.FastJsonSerializer;
import java.io.Serializable;
import java.util.Date;
/**
* execute task
response command
*/
public class ExecuteTaskResponseCommand implements Serializable {
public ExecuteTaskResponseCommand() {
}
public ExecuteTaskResponseCommand(int taskInstanceId) {
this.taskInstanceId = taskInstanceId;
}
/**
* task instance id
*/
private int taskInstanceId;
/**
* status
*/
private int status;
/**
* end time
*/
private Date endTime;
public int getTaskInstanceId() {
return taskInstanceId;
}
public void setTaskInstanceId(int taskInstanceId) {
this.taskInstanceId = taskInstanceId;
}
public int getStatus() {
return status;
}
public void setStatus(int status) {
this.status = status;
}
public Date getEndTime() {
return endTime;
}
public void setEndTime(Date endTime) {
this.endTime = endTime;
}
/**
* package response command
* @return command
*/
public Command
convert2Command(){
Command command = new Command();
command.setType(CommandType.EXECUTE_TASK_RESPONSE);
byte[] body = FastJsonSerializer.serialize(this);
command.setBody(body);
return command;
}
@Override
public String toString() {
return "ExecuteTaskResponseCommand{" +
"taskInstanceId=" + taskInstanceId +
", status=" + status +
", endTime=" + endTime +
'}';
}
}
\ No newline at end of file
diff --git
a/dolphinscheduler-remote/src/main/java/org/apache/dolphinscheduler/remote/entity/TaskExecutionContext.java
b/dolphinscheduler-remote/src/main/java/org/apache/dolphinscheduler/remote/entity/TaskExecutionContext.java
index e3da43a..711e3d8 100644
---
a/dolphinscheduler-remote/src/main/java/org/apache/dolphinscheduler/remote/entity/TaskExecutionContext.java
+++
b/dolphinscheduler-remote/src/main/java/org/apache/dolphinscheduler/remote/entity/TaskExecutionContext.java
@@ -52,11 +52,15 @@ public class TaskExecutionContext implements Serializable{
private String executePath;
/**
+ * log path
+ */
+ private String logPath;
+
+ /**
* task json
*/
private String taskJson;
-
/**
* process instance id
*/
@@ -228,6 +232,14 @@ public class TaskExecutionContext implements Serializable{
this.cmdTypeIfComplement = cmdTypeIfComplement;
}
+ public String getLogPath() {
+ return logPath;
+ }
+
+ public void setLogPath(String logPath) {
+ this.logPath = logPath;
+ }
+
@Override
public String toString() {
return "TaskExecutionContext{" +
@@ -236,6 +248,7 @@ public class TaskExecutionContext implements Serializable{
", startTime=" + startTime +
", taskType='" + taskType + '\'' +
", executePath='" + executePath + '\'' +
+ ", logPath='" + logPath + '\'' +
", taskJson='" + taskJson + '\'' +
", processInstanceId=" + processInstanceId +
", scheduleTime=" + scheduleTime +
diff --git
a/dolphinscheduler-server/src/main/java/org/apache/dolphinscheduler/server/builder/TaskExecutionContextBuilder.java
b/dolphinscheduler-server/src/main/java/org/apache/dolphinscheduler/server/builder/TaskExecutionContextBuilder.java
index a3ddd29..8cdd13e 100644
---
a/dolphinscheduler-server/src/main/java/org/apache/dolphinscheduler/server/builder/TaskExecutionContextBuilder.java
+++
b/dolphinscheduler-server/src/main/java/org/apache/dolphinscheduler/server/builder/TaskExecutionContextBuilder.java
@@ -44,6 +44,7 @@ public class TaskExecutionContextBuilder {
taskExecutionContext.setTaskName(taskInstance.getName());
taskExecutionContext.setStartTime(taskInstance.getStartTime());
taskExecutionContext.setTaskType(taskInstance.getTaskType());
+ taskExecutionContext.setLogPath(taskInstance.getLogPath());
taskExecutionContext.setExecutePath(taskInstance.getExecutePath());
taskExecutionContext.setTaskJson(taskInstance.getTaskJson());
return this;
diff --git
a/dolphinscheduler-server/src/main/java/org/apache/dolphinscheduler/server/master/cache/TaskInstanceCacheManager.java
b/dolphinscheduler-server/src/main/java/org/apache/dolphinscheduler/server/master/cache/TaskInstanceCacheManager.java
new file mode 100644
index 0000000..98d2a24
--- /dev/null
+++
b/dolphinscheduler-server/src/main/java/org/apache/dolphinscheduler/server/master/cache/TaskInstanceCacheManager.java
@@ -0,0 +1,64 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.dolphinscheduler.server.master.cache;
+
+import org.apache.dolphinscheduler.dao.entity.TaskInstance;
+import org.apache.dolphinscheduler.remote.command.ExecuteTaskAckCommand;
+import org.apache.dolphinscheduler.remote.command.ExecuteTaskResponseCommand;
+import org.apache.dolphinscheduler.remote.entity.TaskExecutionContext;
+
+/**
+ * task instance state manager
+ */
+public interface TaskInstanceCacheManager {
+
+ /**
+ * get taskInstance by taskInstance id
+ *
+ * @param taskInstanceId taskInstanceId
+ * @return taskInstance
+ */
+ TaskInstance getByTaskInstanceId(Integer taskInstanceId);
+
+ /**
+ * cache taskInstance
+ *
+ * @param taskExecutionContext taskExecutionContext
+ */
+ void cacheTaskInstance(TaskExecutionContext taskExecutionContext);
+
+ /**
+ * cache taskInstance
+ *
+ * @param taskAckCommand taskAckCommand
+ */
+ void cacheTaskInstance(ExecuteTaskAckCommand taskAckCommand);
+
+ /**
+ * cache taskInstance
+ *
+ * @param executeTaskResponseCommand executeTaskResponseCommand
+ */
+ void cacheTaskInstance(ExecuteTaskResponseCommand
executeTaskResponseCommand);
+
+ /**
+ * remove taskInstance by taskInstanceId
+ * @param taskInstanceId taskInstanceId
+ */
+ void removeByTaskInstanceId(Integer taskInstanceId);
+}
diff --git
a/dolphinscheduler-server/src/main/java/org/apache/dolphinscheduler/server/master/cache/impl/TaskInstanceCacheManagerImpl.java
b/dolphinscheduler-server/src/main/java/org/apache/dolphinscheduler/server/master/cache/impl/TaskInstanceCacheManagerImpl.java
new file mode 100644
index 0000000..634a6a9
--- /dev/null
+++
b/dolphinscheduler-server/src/main/java/org/apache/dolphinscheduler/server/master/cache/impl/TaskInstanceCacheManagerImpl.java
@@ -0,0 +1,115 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.dolphinscheduler.server.master.cache.impl;
+
+import org.apache.dolphinscheduler.common.enums.ExecutionStatus;
+import org.apache.dolphinscheduler.dao.entity.TaskInstance;
+import org.apache.dolphinscheduler.remote.command.ExecuteTaskAckCommand;
+import org.apache.dolphinscheduler.remote.command.ExecuteTaskResponseCommand;
+import org.apache.dolphinscheduler.remote.entity.TaskExecutionContext;
+import
org.apache.dolphinscheduler.server.master.cache.TaskInstanceCacheManager;
+import org.apache.dolphinscheduler.service.process.ProcessService;
+import org.springframework.beans.factory.annotation.Autowired;
+import org.springframework.stereotype.Component;
+
+import java.util.Map;
+import java.util.concurrent.ConcurrentHashMap;
+
+/**
+ * taskInstance state manager
+ */
+@Component
+public class TaskInstanceCacheManagerImpl implements TaskInstanceCacheManager {
+
+ /**
+ * taskInstance caceh
+ */
+ private Map<Integer,TaskInstance> taskInstanceCache = new
ConcurrentHashMap<>();
+
+
+ /**
+ * get taskInstance by taskInstance id
+ *
+ * @param taskInstanceId taskInstanceId
+ * @return taskInstance
+ */
+ @Override
+ public TaskInstance getByTaskInstanceId(Integer taskInstanceId) {
+ return taskInstanceCache.get(taskInstanceId);
+ }
+
+ /**
+ * cache taskInstance
+ *
+ * @param taskExecutionContext taskExecutionContext
+ */
+ @Override
+ public void cacheTaskInstance(TaskExecutionContext taskExecutionContext) {
+ TaskInstance taskInstance =
getByTaskInstanceId(taskExecutionContext.getTaskInstanceId());
+ if (taskInstance == null){
+ taskInstance = new TaskInstance();
+ }
+ taskInstance.setId(taskExecutionContext.getTaskInstanceId());
+ taskInstance.setName(taskExecutionContext.getTaskName());
+ taskInstance.setStartTime(taskExecutionContext.getStartTime());
+ taskInstance.setTaskType(taskInstance.getTaskType());
+ taskInstance.setExecutePath(taskInstance.getExecutePath());
+ taskInstance.setTaskJson(taskInstance.getTaskJson());
+ }
+
+ /**
+ * cache taskInstance
+ *
+ * @param taskAckCommand taskAckCommand
+ */
+ @Override
+ public void cacheTaskInstance(ExecuteTaskAckCommand taskAckCommand) {
+ TaskInstance taskInstance =
getByTaskInstanceId(taskAckCommand.getTaskInstanceId());
+ if (taskInstance == null){
+ taskInstance = new TaskInstance();
+ }
+ taskInstance.setState(ExecutionStatus.of(taskAckCommand.getStatus()));
+ taskInstance.setStartTime(taskAckCommand.getStartTime());
+ taskInstance.setHost(taskAckCommand.getHost());
+ taskInstance.setExecutePath(taskAckCommand.getExecutePath());
+ taskInstance.setLogPath(taskAckCommand.getLogPath());
+ }
+
+ /**
+ * cache taskInstance
+ *
+ * @param executeTaskResponseCommand executeTaskResponseCommand
+ */
+ @Override
+ public void cacheTaskInstance(ExecuteTaskResponseCommand
executeTaskResponseCommand) {
+ TaskInstance taskInstance =
getByTaskInstanceId(executeTaskResponseCommand.getTaskInstanceId());
+ if (taskInstance == null){
+ taskInstance = new TaskInstance();
+ }
+
taskInstance.setState(ExecutionStatus.of(executeTaskResponseCommand.getStatus()));
+ taskInstance.setEndTime(executeTaskResponseCommand.getEndTime());
+ }
+
+ /**
+ * remove taskInstance by taskInstanceId
+ * @param taskInstanceId taskInstanceId
+ */
+ @Override
+ public void removeByTaskInstanceId(Integer taskInstanceId) {
+ taskInstanceCache.remove(taskInstanceId);
+ }
+}
diff --git
a/dolphinscheduler-server/src/main/java/org/apache/dolphinscheduler/server/master/processor/TaskAckProcessor.java
b/dolphinscheduler-server/src/main/java/org/apache/dolphinscheduler/server/master/processor/TaskAckProcessor.java
index 83da3b0..cf38579 100644
---
a/dolphinscheduler-server/src/main/java/org/apache/dolphinscheduler/server/master/processor/TaskAckProcessor.java
+++
b/dolphinscheduler-server/src/main/java/org/apache/dolphinscheduler/server/master/processor/TaskAckProcessor.java
@@ -25,6 +25,8 @@ import org.apache.dolphinscheduler.remote.command.CommandType;
import org.apache.dolphinscheduler.remote.command.ExecuteTaskAckCommand;
import org.apache.dolphinscheduler.remote.processor.NettyRequestProcessor;
import org.apache.dolphinscheduler.remote.utils.FastJsonSerializer;
+import
org.apache.dolphinscheduler.server.master.cache.TaskInstanceCacheManager;
+import
org.apache.dolphinscheduler.server.master.cache.impl.TaskInstanceCacheManagerImpl;
import org.apache.dolphinscheduler.service.bean.SpringApplicationContext;
import org.apache.dolphinscheduler.service.process.ProcessService;
import org.slf4j.Logger;
@@ -42,8 +44,14 @@ public class TaskAckProcessor implements
NettyRequestProcessor {
*/
private final ProcessService processService;
+ /**
+ * taskInstance cache manager
+ */
+ private final TaskInstanceCacheManager taskInstanceCacheManager;
+
public TaskAckProcessor(){
this.processService =
SpringApplicationContext.getBean(ProcessService.class);
+ this.taskInstanceCacheManager =
SpringApplicationContext.getBean(TaskInstanceCacheManagerImpl.class);
}
/**
@@ -55,7 +63,9 @@ public class TaskAckProcessor implements
NettyRequestProcessor {
public void process(Channel channel, Command command) {
Preconditions.checkArgument(CommandType.EXECUTE_TASK_ACK ==
command.getType(), String.format("invalid command type : %s",
command.getType()));
ExecuteTaskAckCommand taskAckCommand =
FastJsonSerializer.deserialize(command.getBody(), ExecuteTaskAckCommand.class);
- logger.info("taskAckCommand : {}",taskAckCommand);
+ logger.info("taskAckCommand : {}", taskAckCommand);
+
+ taskInstanceCacheManager.cacheTaskInstance(taskAckCommand);
/**
* change Task state
*/
@@ -65,6 +75,7 @@ public class TaskAckProcessor implements
NettyRequestProcessor {
taskAckCommand.getExecutePath(),
taskAckCommand.getLogPath(),
taskAckCommand.getTaskInstanceId());
+
}
}
diff --git
a/dolphinscheduler-server/src/main/java/org/apache/dolphinscheduler/server/master/processor/TaskResponseProcessor.java
b/dolphinscheduler-server/src/main/java/org/apache/dolphinscheduler/server/master/processor/TaskResponseProcessor.java
index bbc710c..d6279c6 100644
---
a/dolphinscheduler-server/src/main/java/org/apache/dolphinscheduler/server/master/processor/TaskResponseProcessor.java
+++
b/dolphinscheduler-server/src/main/java/org/apache/dolphinscheduler/server/master/processor/TaskResponseProcessor.java
@@ -25,6 +25,8 @@ import org.apache.dolphinscheduler.remote.command.CommandType;
import org.apache.dolphinscheduler.remote.command.ExecuteTaskResponseCommand;
import org.apache.dolphinscheduler.remote.processor.NettyRequestProcessor;
import org.apache.dolphinscheduler.remote.utils.FastJsonSerializer;
+import
org.apache.dolphinscheduler.server.master.cache.TaskInstanceCacheManager;
+import
org.apache.dolphinscheduler.server.master.cache.impl.TaskInstanceCacheManagerImpl;
import org.apache.dolphinscheduler.service.bean.SpringApplicationContext;
import org.apache.dolphinscheduler.service.process.ProcessService;
import org.slf4j.Logger;
@@ -42,8 +44,14 @@ public class TaskResponseProcessor implements
NettyRequestProcessor {
*/
private final ProcessService processService;
+ /**
+ * taskInstance cache manager
+ */
+ private final TaskInstanceCacheManager taskInstanceCacheManager;
+
public TaskResponseProcessor(){
this.processService =
SpringApplicationContext.getBean(ProcessService.class);
+ this.taskInstanceCacheManager =
SpringApplicationContext.getBean(TaskInstanceCacheManagerImpl.class);
}
/**
@@ -56,9 +64,15 @@ public class TaskResponseProcessor implements
NettyRequestProcessor {
@Override
public void process(Channel channel, Command command) {
Preconditions.checkArgument(CommandType.EXECUTE_TASK_RESPONSE ==
command.getType(), String.format("invalid command type : %s",
command.getType()));
- logger.info("received command : {}", command);
+
ExecuteTaskResponseCommand responseCommand =
FastJsonSerializer.deserialize(command.getBody(),
ExecuteTaskResponseCommand.class);
-
processService.changeTaskState(ExecutionStatus.of(responseCommand.getStatus()),
responseCommand.getEndTime(), responseCommand.getTaskInstanceId());
+ logger.info("received command : {}", responseCommand);
+
+ taskInstanceCacheManager.cacheTaskInstance(responseCommand);
+
+
processService.changeTaskState(ExecutionStatus.of(responseCommand.getStatus()),
+ responseCommand.getEndTime(),
+ responseCommand.getTaskInstanceId());
}
diff --git
a/dolphinscheduler-server/src/main/java/org/apache/dolphinscheduler/server/master/runner/MasterBaseTaskExecThread.java
b/dolphinscheduler-server/src/main/java/org/apache/dolphinscheduler/server/master/runner/MasterBaseTaskExecThread.java
index 9bf69dd..d2a5439 100644
---
a/dolphinscheduler-server/src/main/java/org/apache/dolphinscheduler/server/master/runner/MasterBaseTaskExecThread.java
+++
b/dolphinscheduler-server/src/main/java/org/apache/dolphinscheduler/server/master/runner/MasterBaseTaskExecThread.java
@@ -18,6 +18,7 @@ package org.apache.dolphinscheduler.server.master.runner;
import org.apache.dolphinscheduler.common.enums.ExecutionStatus;
import org.apache.dolphinscheduler.common.utils.FileUtils;
+import org.apache.dolphinscheduler.common.utils.JSONUtils;
import org.apache.dolphinscheduler.common.utils.StringUtils;
import org.apache.dolphinscheduler.dao.AlertDao;
import org.apache.dolphinscheduler.dao.entity.ProcessInstance;
@@ -39,6 +40,8 @@ import org.slf4j.LoggerFactory;
import java.util.concurrent.Callable;
+import static
org.apache.dolphinscheduler.common.Constants.DOLPHINSCHEDULER_TASKS_QUEUE;
+
/**
* master task exec base class
*/
@@ -163,6 +166,7 @@ public class MasterBaseTaskExecThread implements
Callable<Boolean> {
String userQueue =
processService.queryUserQueueByProcessInstanceId(taskInstance.getProcessInstanceId());
taskInstance.getProcessInstance().setQueue(StringUtils.isEmpty(userQueue) ?
tenant.getQueue() : userQueue);
taskInstance.getProcessInstance().setTenantCode(tenant.getTenantCode());
+ taskInstance.setExecutePath(getExecLocalPath(taskInstance));
return TaskExecutionContextBuilder.get()
.buildTaskInstanceRelatedInfo(taskInstance)
@@ -173,6 +177,19 @@ public class MasterBaseTaskExecThread implements
Callable<Boolean> {
/**
+ * get execute local path
+ *
+ * @return execute local path
+ */
+ private String getExecLocalPath(TaskInstance taskInstance){
+ return
FileUtils.getProcessExecDir(taskInstance.getProcessDefine().getProjectId(),
+ taskInstance.getProcessDefine().getId(),
+ taskInstance.getProcessInstance().getId(),
+ taskInstance.getId());
+ }
+
+
+ /**
* whehter tenant is null
* @param tenant tenant
* @param taskInstance taskInstance
@@ -187,19 +204,6 @@ public class MasterBaseTaskExecThread implements
Callable<Boolean> {
}
return false;
}
-
- /**
- * get execute local path
- *
- * @return execute local path
- */
- private String getExecLocalPath(TaskInstance taskInstance){
- return
FileUtils.getProcessExecDir(taskInstance.getProcessDefine().getProjectId(),
- taskInstance.getProcessDefine().getId(),
- taskInstance.getProcessInstance().getId(),
- taskInstance.getId());
- }
-
/**
* submit master base task exec thread
* @return TaskInstance
@@ -210,7 +214,7 @@ public class MasterBaseTaskExecThread implements
Callable<Boolean> {
int retryTimes = 1;
boolean submitDB = false;
- boolean submitQueue = false;
+ boolean submitTask = false;
TaskInstance task = null;
while (retryTimes <= commitRetryTimes){
try {
@@ -221,27 +225,60 @@ public class MasterBaseTaskExecThread implements
Callable<Boolean> {
submitDB = true;
}
}
- if(submitDB && !submitQueue){
- // submit task to queue
- submitQueue = dispatch(task);
+ if(submitDB && !submitTask){
+ // dispatcht task
+ submitTask = dispatchtTask(task);
}
- if(submitDB && submitQueue){
+ if(submitDB && submitTask){
return task;
}
if(!submitDB){
logger.error("task commit to db failed , taskId {} has
already retry {} times, please check the database", taskInstance.getId(),
retryTimes);
- }else if(!submitQueue){
- logger.error("task commit to queue failed , taskId {} has
already retry {} times, please check the queue", taskInstance.getId(),
retryTimes);
+ }else if(!submitTask){
+ logger.error("task commit failed , taskId {} has already
retry {} times, please check", taskInstance.getId(), retryTimes);
}
Thread.sleep(commitRetryInterval);
} catch (Exception e) {
- logger.error("task commit to mysql and queue failed",e);
+ logger.error("task commit to mysql and dispatcht task
failed",e);
}
retryTimes += 1;
}
return task;
}
+
+
+ /**
+ * dispatcht task
+ * @param taskInstance taskInstance
+ * @return whether submit task success
+ */
+ public Boolean dispatchtTask(TaskInstance taskInstance) {
+
+ try{
+ if(taskInstance.isSubProcess()){
+ return true;
+ }
+ if(taskInstance.getState().typeIsFinished()){
+ logger.info(String.format("submit task , but task [%s] state
[%s] is already finished. ", taskInstance.getName(),
taskInstance.getState().toString()));
+ return true;
+ }
+ // task cannot submit when running
+ if(taskInstance.getState() == ExecutionStatus.RUNNING_EXEUTION){
+ logger.info(String.format("submit to task, but task [%s] state
already be running. ", taskInstance.getName()));
+ return true;
+ }
+ logger.info("task ready to submit: {}" , taskInstance);
+ boolean submitTask = dispatch(taskInstance);
+ logger.info(String.format("master submit success, task : %s",
taskInstance.getName()) );
+ return submitTask;
+ }catch (Exception e){
+ logger.error("submit task Exception: ", e);
+ logger.error("task error : %s", JSONUtils.toJson(taskInstance));
+ return false;
+ }
+ }
+
/**
* submit wait complete
* @return true
diff --git
a/dolphinscheduler-server/src/main/java/org/apache/dolphinscheduler/server/master/runner/MasterExecThread.java
b/dolphinscheduler-server/src/main/java/org/apache/dolphinscheduler/server/master/runner/MasterExecThread.java
index d0f4927..576dc76 100644
---
a/dolphinscheduler-server/src/main/java/org/apache/dolphinscheduler/server/master/runner/MasterExecThread.java
+++
b/dolphinscheduler-server/src/main/java/org/apache/dolphinscheduler/server/master/runner/MasterExecThread.java
@@ -404,7 +404,7 @@ public class MasterExecThread implements Runnable {
}
/**
- * submit task to execute
+ * TODO submit task to execute
* @param taskInstance task instance
* @return TaskInstance
*/
@@ -873,7 +873,7 @@ public class MasterExecThread implements Runnable {
}
logger.info("task :{}, id:{} complete, state is {} ",
task.getName(), task.getId(),
task.getState().toString());
- // node success , post node submit
+ //TODO node success , post node submit
if(task.getState() == ExecutionStatus.SUCCESS){
completeTaskList.put(task.getName(), task);
submitPostNode(task.getName());
diff --git
a/dolphinscheduler-server/src/main/java/org/apache/dolphinscheduler/server/master/runner/MasterTaskExecThread.java
b/dolphinscheduler-server/src/main/java/org/apache/dolphinscheduler/server/master/runner/MasterTaskExecThread.java
index 66d1a3f..deac503 100644
---
a/dolphinscheduler-server/src/main/java/org/apache/dolphinscheduler/server/master/runner/MasterTaskExecThread.java
+++
b/dolphinscheduler-server/src/main/java/org/apache/dolphinscheduler/server/master/runner/MasterTaskExecThread.java
@@ -26,6 +26,9 @@ import
org.apache.dolphinscheduler.dao.entity.ProcessDefinition;
import org.apache.dolphinscheduler.dao.entity.ProcessInstance;
import org.apache.dolphinscheduler.dao.entity.TaskInstance;
import com.alibaba.fastjson.JSONObject;
+import
org.apache.dolphinscheduler.server.master.cache.TaskInstanceCacheManager;
+import
org.apache.dolphinscheduler.server.master.cache.impl.TaskInstanceCacheManagerImpl;
+import org.apache.dolphinscheduler.service.bean.SpringApplicationContext;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
@@ -43,6 +46,12 @@ public class MasterTaskExecThread extends
MasterBaseTaskExecThread {
*/
private static final Logger logger =
LoggerFactory.getLogger(MasterTaskExecThread.class);
+
+ /**
+ * taskInstance state manager
+ */
+ private TaskInstanceCacheManager taskInstanceCacheManager;
+
/**
* constructor of MasterTaskExecThread
* @param taskInstance task instance
@@ -50,6 +59,7 @@ public class MasterTaskExecThread extends
MasterBaseTaskExecThread {
*/
public MasterTaskExecThread(TaskInstance taskInstance, ProcessInstance
processInstance){
super(taskInstance, processInstance);
+ this.taskInstanceCacheManager =
SpringApplicationContext.getBean(TaskInstanceCacheManagerImpl.class);
}
/**
@@ -67,7 +77,7 @@ public class MasterTaskExecThread extends
MasterBaseTaskExecThread {
private Boolean alreadyKilled = false;
/**
- * submit task instance and wait complete
+ * TODO submit task instance and wait complete
* @return true is task quit is true
*/
@Override
@@ -89,12 +99,16 @@ public class MasterTaskExecThread extends
MasterBaseTaskExecThread {
}
/**
- * wait task quit
+ * TODO 在这里轮询数据库
+ * TODO wait task quit
* @return true if task quit success
*/
public Boolean waitTaskQuit(){
// query new state
- taskInstance =
processService.findTaskInstanceById(taskInstance.getId());
+ taskInstance =
taskInstanceCacheManager.getByTaskInstanceId(taskInstance.getId());
+ if (taskInstance == null){
+ taskInstance =
processService.findTaskInstanceById(taskInstance.getId());
+ }
logger.info("wait task: process id: {}, task id:{}, task name:{}
complete",
this.taskInstance.getProcessInstanceId(),
this.taskInstance.getId(), this.taskInstance.getName());
// task time out
@@ -119,6 +133,8 @@ public class MasterTaskExecThread extends
MasterBaseTaskExecThread {
}
// task instance finished
if (taskInstance.getState().typeIsFinished()){
+ // if task is final result , then remove taskInstance from
cache
+
taskInstanceCacheManager.removeByTaskInstanceId(taskInstance.getId());
break;
}
if(checkTimeout){
@@ -133,7 +149,7 @@ public class MasterTaskExecThread extends
MasterBaseTaskExecThread {
}
}
// updateProcessInstance task instance
- taskInstance =
processService.findTaskInstanceById(taskInstance.getId());
+ taskInstance =
taskInstanceCacheManager.getByTaskInstanceId(taskInstance.getId());
processInstance =
processService.findProcessInstanceById(processInstance.getId());
Thread.sleep(Constants.SLEEP_TIME_MILLIS);
} catch (Exception e) {
@@ -149,6 +165,7 @@ public class MasterTaskExecThread extends
MasterBaseTaskExecThread {
/**
+ * TODO Kill 任务
* task instance add queue , waiting worker to kill
*/
private void cancelTaskInstance(){
@@ -162,6 +179,7 @@ public class MasterTaskExecThread extends
MasterBaseTaskExecThread {
}
String queueValue = String.format("%s-%d",
host, taskInstance.getId());
+ // TODO 这里写
taskQueue.sadd(DOLPHINSCHEDULER_TASKS_KILL, queueValue);
logger.info("master add kill task :{} id:{} to kill queue",
diff --git
a/dolphinscheduler-server/src/main/java/org/apache/dolphinscheduler/server/worker/processor/TaskExecuteProcessor.java
b/dolphinscheduler-server/src/main/java/org/apache/dolphinscheduler/server/worker/processor/TaskExecuteProcessor.java
index 818e223..37dcc2c 100644
---
a/dolphinscheduler-server/src/main/java/org/apache/dolphinscheduler/server/worker/processor/TaskExecuteProcessor.java
+++
b/dolphinscheduler-server/src/main/java/org/apache/dolphinscheduler/server/worker/processor/TaskExecuteProcessor.java
@@ -155,6 +155,7 @@ public class TaskExecuteProcessor implements
NettyRequestProcessor {
}else{
ackCommand.setExecutePath(taskExecutionContext.getExecutePath());
}
+ taskExecutionContext.setLogPath(ackCommand.getLogPath());
return ackCommand;
}
diff --git
a/dolphinscheduler-server/src/main/java/org/apache/dolphinscheduler/server/worker/processor/TaskKillProcessor.java
b/dolphinscheduler-server/src/main/java/org/apache/dolphinscheduler/server/worker/processor/TaskKillProcessor.java
index 2985374..1f5aa3c 100644
---
a/dolphinscheduler-server/src/main/java/org/apache/dolphinscheduler/server/worker/processor/TaskKillProcessor.java
+++
b/dolphinscheduler-server/src/main/java/org/apache/dolphinscheduler/server/worker/processor/TaskKillProcessor.java
@@ -42,6 +42,12 @@ public class TaskKillProcessor implements
NettyRequestProcessor {
private final Logger logger =
LoggerFactory.getLogger(TaskKillProcessor.class);
+ /**
+ * task kill process
+ *
+ * @param channel channel
+ * @param command command
+ */
@Override
public void process(Channel channel, Command command) {
Preconditions.checkArgument(CommandType.KILL_TASK_REQUEST ==
command.getType(), String.format("invalid command type : %s",
command.getType()));
@@ -51,6 +57,11 @@ public class TaskKillProcessor implements
NettyRequestProcessor {
}
+ /**
+ * kill task logic
+ *
+ * @param killCommand killCommand
+ */
private void doKill(KillTaskRequestCommand killCommand){
try {
if(killCommand.getProcessId() == 0 ){
@@ -71,6 +82,14 @@ public class TaskKillProcessor implements
NettyRequestProcessor {
}
}
+ /**
+ * kill yarn job
+ *
+ * @param host host
+ * @param logPath logPath
+ * @param executePath executePath
+ * @param tenantCode tenantCode
+ */
public void killYarnJob(String host, String logPath, String executePath,
String tenantCode) {
try {
Thread.sleep(Constants.SLEEP_TIME_MILLIS);
diff --git
a/dolphinscheduler-server/src/main/java/org/apache/dolphinscheduler/server/worker/runner/TaskExecuteThread.java
b/dolphinscheduler-server/src/main/java/org/apache/dolphinscheduler/server/worker/runner/TaskExecuteThread.java
index be89401..c0dd25b 100644
---
a/dolphinscheduler-server/src/main/java/org/apache/dolphinscheduler/server/worker/runner/TaskExecuteThread.java
+++
b/dolphinscheduler-server/src/main/java/org/apache/dolphinscheduler/server/worker/runner/TaskExecuteThread.java
@@ -103,7 +103,6 @@ public class TaskExecuteThread implements Runnable {
// set task props
TaskProps taskProps = new TaskProps(taskNode.getParams(),
- taskExecutionContext.getExecutePath(),
taskExecutionContext.getScheduleTime(),
taskExecutionContext.getTaskName(),
taskExecutionContext.getTaskType(),
@@ -114,7 +113,10 @@ public class TaskExecuteThread implements Runnable {
taskExecutionContext.getStartTime(),
getGlobalParamsMap(),
null,
-
CommandType.of(taskExecutionContext.getCmdTypeIfComplement()));
+
CommandType.of(taskExecutionContext.getCmdTypeIfComplement()),
+ OSUtils.getHost(),
+ taskExecutionContext.getLogPath(),
+ taskExecutionContext.getExecutePath());
// set task timeout
setTaskTimeout(taskProps, taskNode);
@@ -142,7 +144,7 @@ public class TaskExecuteThread implements Runnable {
// task result process
task.after();
- //
+
responseCommand.setStatus(task.getExitStatus().getCode());
responseCommand.setEndTime(new Date());
logger.info("task instance id : {},task final status : {}",
taskExecutionContext.getTaskInstanceId(), task.getExitStatus());
diff --git
a/dolphinscheduler-server/src/main/java/org/apache/dolphinscheduler/server/worker/task/AbstractCommandExecutor.java
b/dolphinscheduler-server/src/main/java/org/apache/dolphinscheduler/server/worker/task/AbstractCommandExecutor.java
index c473f3a..c2e9e28 100644
---
a/dolphinscheduler-server/src/main/java/org/apache/dolphinscheduler/server/worker/task/AbstractCommandExecutor.java
+++
b/dolphinscheduler-server/src/main/java/org/apache/dolphinscheduler/server/worker/task/AbstractCommandExecutor.java
@@ -18,13 +18,17 @@ package org.apache.dolphinscheduler.server.worker.task;
import org.apache.dolphinscheduler.common.Constants;
import org.apache.dolphinscheduler.common.enums.ExecutionStatus;
+import org.apache.dolphinscheduler.common.thread.Stopper;
import org.apache.dolphinscheduler.common.thread.ThreadUtils;
import org.apache.dolphinscheduler.common.utils.HadoopUtils;
+import org.apache.dolphinscheduler.common.utils.OSUtils;
import org.apache.dolphinscheduler.common.utils.StringUtils;
+import org.apache.dolphinscheduler.dao.entity.ProcessInstance;
import org.apache.dolphinscheduler.dao.entity.TaskInstance;
import org.apache.dolphinscheduler.common.utils.LoggerUtils;
import org.apache.dolphinscheduler.server.utils.ProcessUtils;
import org.apache.dolphinscheduler.service.process.ProcessService;
+
import org.slf4j.Logger;
import java.io.*;
@@ -37,6 +41,8 @@ import java.util.function.Consumer;
import java.util.regex.Matcher;
import java.util.regex.Pattern;
+import static org.apache.dolphinscheduler.common.Constants.*;
+
/**
* abstract command executor
*/
@@ -69,7 +75,7 @@ public abstract class AbstractCommandExecutor {
/**
* task appId
*/
- protected final int taskInstId;
+ protected final int taskInstanceId;
/**
* tenant code , execute task linux user
@@ -101,93 +107,37 @@ public abstract class AbstractCommandExecutor {
*/
protected final List<String> logBuffer;
+ /**
+ * log path
+ */
+ private String logPath;
+
+ /**
+ * execute path
+ */
+ private String executePath;
public AbstractCommandExecutor(Consumer<List<String>> logHandler,
- String taskDir, String taskAppId,int
taskInstId,String tenantCode, String envFile,
- Date startTime, int timeout, Logger logger){
+ String taskDir,
+ String taskAppId,
+ Integer taskInstanceId,
+ String tenantCode, String envFile,
+ Date startTime, int timeout, String
logPath,String executePath,Logger logger){
this.logHandler = logHandler;
this.taskDir = taskDir;
this.taskAppId = taskAppId;
- this.taskInstId = taskInstId;
+ this.taskInstanceId = taskInstanceId;
this.tenantCode = tenantCode;
this.envFile = envFile;
this.startTime = startTime;
this.timeout = timeout;
+ this.logPath = logPath;
+ this.executePath = executePath;
this.logger = logger;
this.logBuffer = Collections.synchronizedList(new ArrayList<>());
}
/**
- * task specific execution logic
- *
- * @param execCommand exec command
- * @param processService process dao
- * @return exit status code
- */
- public int run(String execCommand, ProcessService processService) {
- int exitStatusCode;
-
- try {
- if (StringUtils.isEmpty(execCommand)) {
- exitStatusCode = 0;
- return exitStatusCode;
- }
-
- String commandFilePath = buildCommandFilePath();
-
- // create command file if not exists
- createCommandFileIfNotExists(execCommand, commandFilePath);
-
- //build process
- buildProcess(commandFilePath);
-
- // parse process output
- parseProcessOutput(process);
-
- // get process id
- int pid = getProcessId(process);
-
- processService.updatePidByTaskInstId(taskInstId, pid, "");
-
- logger.info("process start, process id is: {}", pid);
-
- // if timeout occurs, exit directly
- long remainTime = getRemaintime();
-
- // waiting for the run to finish
- boolean status = process.waitFor(remainTime, TimeUnit.SECONDS);
-
- if (status) {
- exitStatusCode = process.exitValue();
- logger.info("process has exited, work dir:{}, pid:{}
,exitStatusCode:{}", taskDir, pid,exitStatusCode);
- //update process state to db
- exitStatusCode = updateState(processService, exitStatusCode,
pid, taskInstId);
-
- } else {
- TaskInstance taskInstance =
processService.findTaskInstanceById(taskInstId);
- if (taskInstance == null) {
- logger.error("task instance id:{} not exist", taskInstId);
- } else {
- ProcessUtils.kill(taskInstance);
- }
- exitStatusCode = -1;
- logger.warn("process timeout, work dir:{}, pid:{}", taskDir,
pid);
- }
-
- } catch (InterruptedException e) {
- exitStatusCode = -1;
- logger.error(String.format("interrupt exception: {}, task may be
cancelled or killed",e.getMessage()), e);
- throw new RuntimeException("interrupt exception. exitCode is : "
+ exitStatusCode);
- } catch (Exception e) {
- exitStatusCode = -1;
- logger.error(e.getMessage(), e);
- throw new RuntimeException("process error . exitCode is : " +
exitStatusCode);
- }
-
- return exitStatusCode;
- }
-
- /**
* build process
*
* @param commandFile command file
@@ -217,35 +167,80 @@ public abstract class AbstractCommandExecutor {
}
/**
- * update process state to db
- *
- * @param processService process dao
- * @param exitStatusCode exit status code
- * @param pid process id
- * @param taskInstId task instance id
- * @return exit status code
+ * task specific execution logic
+ * @param execCommand execCommand
+ * @return CommandExecuteResult
+ * @throws Exception
*/
- private int updateState(ProcessService processService, int exitStatusCode,
int pid, int taskInstId) {
- //get yarn state by log
- if (exitStatusCode == 0) {
- TaskInstance taskInstance =
processService.findTaskInstanceById(taskInstId);
- logger.info("process id is {}", pid);
-
- List<String> appIds = getAppLinks(taskInstance.getLogPath());
- if (appIds.size() > 0) {
- String appUrl = String.join(Constants.COMMA, appIds);
- logger.info("yarn log url:{}",appUrl);
- processService.updatePidByTaskInstId(taskInstId, pid, appUrl);
- }
+ public CommandExecuteResult run(String execCommand) throws Exception{
- // check if all operations are completed
- if (!isSuccessOfYarnState(appIds)) {
- exitStatusCode = -1;
- }
+ CommandExecuteResult result = new CommandExecuteResult();
+
+
+ if (StringUtils.isEmpty(execCommand)) {
+ return result;
+ }
+
+ String commandFilePath = buildCommandFilePath();
+
+ // create command file if not exists
+ createCommandFileIfNotExists(execCommand, commandFilePath);
+
+ //build process
+ buildProcess(commandFilePath);
+
+ // parse process output
+ parseProcessOutput(process);
+
+
+ Integer processId = getProcessId(process);
+
+ result.setProcessId(processId);
+
+ // print process id
+ logger.info("process start, process id is: {}", processId);
+
+ // if timeout occurs, exit directly
+ long remainTime = getRemaintime();
+
+ // waiting for the run to finish
+ boolean status = process.waitFor(remainTime, TimeUnit.SECONDS);
+
+ // SHELL task state
+ result.setExitStatusCode(process.exitValue());
+
+ logger.info("process has exited, execute path:{}, processId:{}
,exitStatusCode:{}",
+ taskDir,
+ processId
+ , result.getExitStatusCode());
+
+ // if SHELL task exit
+ if (status) {
+ // set appIds
+ List<String> appIds = getAppIds(logPath);
+ result.setAppIds(String.join(Constants.COMMA, appIds));
+
+ // if yarn task , yarn state is final state
+ result.setExitStatusCode(isSuccessOfYarnState(appIds) ?
EXIT_CODE_SUCCESS : EXIT_CODE_FAILURE);
+ } else {
+ logger.error("process has failure , exitStatusCode : {} , ready to
kill ...", result.getExitStatusCode());
+ TaskInstance taskInstance = new TaskInstance();
+ taskInstance.setPid(processId);
+ taskInstance.setHost(OSUtils.getHost());
+ taskInstance.setLogPath(logPath);
+ taskInstance.setExecutePath(executePath);
+
+ ProcessInstance processInstance = new ProcessInstance();
+ processInstance.setTenantCode(tenantCode);
+ taskInstance.setProcessInstance(processInstance);
+
+ ProcessUtils.kill(taskInstance);
+ result.setExitStatusCode(EXIT_CODE_FAILURE);
}
- return exitStatusCode;
+ return result;
}
+
/**
* cancel application
* @throws Exception exception
@@ -378,10 +373,6 @@ public abstract class AbstractCommandExecutor {
parseProcessOutputExecutorService.shutdown();
}
- public int getPid() {
- return getProcessId(process);
- }
-
/**
* check yarn state
*
@@ -389,11 +380,10 @@ public abstract class AbstractCommandExecutor {
* @return is success of yarn task state
*/
public boolean isSuccessOfYarnState(List<String> appIds) {
-
boolean result = true;
try {
for (String appId : appIds) {
- while(true){
+ while(Stopper.isRunning()){
ExecutionStatus applicationStatus =
HadoopUtils.getInstance().getApplicationStatus(appId);
logger.info("appId:{}, final
state:{}",appId,applicationStatus.name());
if (applicationStatus.equals(ExecutionStatus.FAILURE) ||
@@ -406,7 +396,7 @@ public abstract class AbstractCommandExecutor {
}
Thread.sleep(Constants.SLEEP_TIME_MILLIS);
}
- }
+ }
} catch (Exception e) {
logger.error(String.format("yarn applications: %s status failed
", appIds.toString()),e);
result = false;
@@ -415,15 +405,20 @@ public abstract class AbstractCommandExecutor {
}
+ public int getProcessId() {
+ return getProcessId(process);
+ }
+
/**
* get app links
- * @param fileName file name
+ *
+ * @param logPath log path
* @return app id list
*/
- private List<String> getAppLinks(String fileName) {
- List<String> logs = convertFile2List(fileName);
+ private List<String> getAppIds(String logPath) {
+ List<String> logs = convertFile2List(logPath);
- List<String> appIds = new ArrayList<String>();
+ List<String> appIds = new ArrayList<>();
/**
* analysis log,get submited yarn application id
*/
@@ -565,6 +560,5 @@ public abstract class AbstractCommandExecutor {
}
protected abstract String buildCommandFilePath();
protected abstract String commandInterpreter();
- protected abstract boolean checkFindApp(String line);
protected abstract void createCommandFileIfNotExists(String execCommand,
String commandFile) throws IOException;
}
diff --git
a/dolphinscheduler-server/src/main/java/org/apache/dolphinscheduler/server/worker/task/AbstractTask.java
b/dolphinscheduler-server/src/main/java/org/apache/dolphinscheduler/server/worker/task/AbstractTask.java
index f2772d0..e7c4e69 100644
---
a/dolphinscheduler-server/src/main/java/org/apache/dolphinscheduler/server/worker/task/AbstractTask.java
+++
b/dolphinscheduler-server/src/main/java/org/apache/dolphinscheduler/server/worker/task/AbstractTask.java
@@ -56,6 +56,17 @@ public abstract class AbstractTask {
/**
+ * SHELL process pid
+ */
+ protected Integer processId;
+
+ /**
+ * other resource manager appId , for example : YARN etc
+ */
+ protected String appIds;
+
+
+ /**
* cancel
*/
protected volatile boolean cancel = false;
@@ -119,6 +130,22 @@ public abstract class AbstractTask {
this.exitStatusCode = exitStatusCode;
}
+ public String getAppIds() {
+ return appIds;
+ }
+
+ public void setAppIds(String appIds) {
+ this.appIds = appIds;
+ }
+
+ public Integer getProcessId() {
+ return processId;
+ }
+
+ public void setProcessId(Integer processId) {
+ this.processId = processId;
+ }
+
/**
* get task parameters
* @return AbstractParameters
@@ -126,6 +153,7 @@ public abstract class AbstractTask {
public abstract AbstractParameters getParameters();
+
/**
* result processing
*/
@@ -146,7 +174,7 @@ public abstract class AbstractTask {
&& paramsMap.containsKey("v_proc_date")){
String vProcDate = paramsMap.get("v_proc_date").getValue();
if (!StringUtils.isEmpty(vProcDate)){
- TaskRecordStatus taskRecordState =
TaskRecordDao.getTaskRecordState(taskProps.getNodeName(), vProcDate);
+ TaskRecordStatus taskRecordState =
TaskRecordDao.getTaskRecordState(taskProps.getTaskName(), vProcDate);
logger.info("task record status : {}",taskRecordState);
if (taskRecordState == TaskRecordStatus.FAILURE){
setExitStatusCode(Constants.EXIT_CODE_FAILURE);
diff --git
a/dolphinscheduler-server/src/main/java/org/apache/dolphinscheduler/server/worker/task/AbstractYarnTask.java
b/dolphinscheduler-server/src/main/java/org/apache/dolphinscheduler/server/worker/task/AbstractYarnTask.java
index 39f4dfb..7a6aca9 100644
---
a/dolphinscheduler-server/src/main/java/org/apache/dolphinscheduler/server/worker/task/AbstractYarnTask.java
+++
b/dolphinscheduler-server/src/main/java/org/apache/dolphinscheduler/server/worker/task/AbstractYarnTask.java
@@ -26,11 +26,6 @@ import org.slf4j.Logger;
* abstract yarn task
*/
public abstract class AbstractYarnTask extends AbstractTask {
-
- /**
- * process instance
- */
-
/**
* process task
*/
@@ -50,21 +45,26 @@ public abstract class AbstractYarnTask extends AbstractTask
{
super(taskProps, logger);
this.processService =
SpringApplicationContext.getBean(ProcessService.class);
this.shellCommandExecutor = new ShellCommandExecutor(this::logHandle,
- taskProps.getTaskDir(),
+ taskProps.getExecutePath(),
taskProps.getTaskAppId(),
- taskProps.getTaskInstId(),
+ taskProps.getTaskInstanceId(),
taskProps.getTenantCode(),
taskProps.getEnvFile(),
taskProps.getTaskStartTime(),
taskProps.getTaskTimeout(),
+ taskProps.getLogPath(),
+ taskProps.getExecutePath(),
logger);
}
@Override
public void handle() throws Exception {
try {
- // construct process
- exitStatusCode = shellCommandExecutor.run(buildCommand(),
processService);
+ // SHELL task exit code
+ CommandExecuteResult commandExecuteResult =
shellCommandExecutor.run(buildCommand());
+ setExitStatusCode(commandExecuteResult.getExitStatusCode());
+ setAppIds(commandExecuteResult.getAppIds());
+ setProcessId(commandExecuteResult.getProcessId());
} catch (Exception e) {
logger.error("yarn process failure", e);
exitStatusCode = -1;
@@ -82,7 +82,7 @@ public abstract class AbstractYarnTask extends AbstractTask {
cancel = true;
// cancel process
shellCommandExecutor.cancelApplication();
- TaskInstance taskInstance =
processService.findTaskInstanceById(taskProps.getTaskInstId());
+ TaskInstance taskInstance =
processService.findTaskInstanceById(taskProps.getTaskInstanceId());
if (status && taskInstance != null){
ProcessUtils.killYarnJob(taskInstance);
}
diff --git
a/dolphinscheduler-server/src/main/java/org/apache/dolphinscheduler/server/worker/task/CommandExecuteResult.java
b/dolphinscheduler-server/src/main/java/org/apache/dolphinscheduler/server/worker/task/CommandExecuteResult.java
new file mode 100644
index 0000000..5d1afe5
--- /dev/null
+++
b/dolphinscheduler-server/src/main/java/org/apache/dolphinscheduler/server/worker/task/CommandExecuteResult.java
@@ -0,0 +1,69 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.dolphinscheduler.server.worker.task;
+
+/**
+ * command execute result
+ */
+public class CommandExecuteResult {
+
+ /**
+ * command exit code
+ */
+ private Integer exitStatusCode;
+
+ /**
+ * appIds
+ */
+ private String appIds;
+
+ /**
+ * process id
+ */
+ private Integer processId;
+
+
+ public CommandExecuteResult(){
+ this.exitStatusCode = 0;
+ }
+
+
+ public Integer getExitStatusCode() {
+ return exitStatusCode;
+ }
+
+ public void setExitStatusCode(Integer exitStatusCode) {
+ this.exitStatusCode = exitStatusCode;
+ }
+
+ public String getAppIds() {
+ return appIds;
+ }
+
+ public void setAppIds(String appIds) {
+ this.appIds = appIds;
+ }
+
+ public Integer getProcessId() {
+ return processId;
+ }
+
+ public void setProcessId(Integer processId) {
+ this.processId = processId;
+ }
+}
diff --git
a/dolphinscheduler-server/src/main/java/org/apache/dolphinscheduler/server/worker/task/PythonCommandExecutor.java
b/dolphinscheduler-server/src/main/java/org/apache/dolphinscheduler/server/worker/task/PythonCommandExecutor.java
index a673134..544a355 100644
---
a/dolphinscheduler-server/src/main/java/org/apache/dolphinscheduler/server/worker/task/PythonCommandExecutor.java
+++
b/dolphinscheduler-server/src/main/java/org/apache/dolphinscheduler/server/worker/task/PythonCommandExecutor.java
@@ -67,8 +67,10 @@ public class PythonCommandExecutor extends
AbstractCommandExecutor {
String envFile,
Date startTime,
int timeout,
+ String logPath,
+ String executePath,
Logger logger) {
- super(logHandler,taskDir,taskAppId,taskInstId,tenantCode, envFile,
startTime, timeout, logger);
+ super(logHandler,taskDir,taskAppId,taskInstId,tenantCode, envFile,
startTime, timeout,logPath,executePath,logger);
}
@@ -132,15 +134,6 @@ public class PythonCommandExecutor extends
AbstractCommandExecutor {
return pythonHome;
}
- /**
- * check find yarn application id
- * @param line line
- * @return boolean
- */
- @Override
- protected boolean checkFindApp(String line) {
- return true;
- }
/**
diff --git
a/dolphinscheduler-server/src/main/java/org/apache/dolphinscheduler/server/worker/task/ShellCommandExecutor.java
b/dolphinscheduler-server/src/main/java/org/apache/dolphinscheduler/server/worker/task/ShellCommandExecutor.java
index db46d0d..7985253 100644
---
a/dolphinscheduler-server/src/main/java/org/apache/dolphinscheduler/server/worker/task/ShellCommandExecutor.java
+++
b/dolphinscheduler-server/src/main/java/org/apache/dolphinscheduler/server/worker/task/ShellCommandExecutor.java
@@ -53,13 +53,15 @@ public class ShellCommandExecutor extends
AbstractCommandExecutor {
public ShellCommandExecutor(Consumer<List<String>> logHandler,
String taskDir,
String taskAppId,
- int taskInstId,
+ Integer taskInstId,
String tenantCode,
String envFile,
Date startTime,
- int timeout,
+ Integer timeout,
+ String logPath,
+ String executePath,
Logger logger) {
- super(logHandler,taskDir,taskAppId,taskInstId,tenantCode, envFile,
startTime, timeout, logger);
+ super(logHandler,taskDir,taskAppId,taskInstId,tenantCode, envFile,
startTime, timeout,logPath,executePath,logger);
}
@@ -78,15 +80,6 @@ public class ShellCommandExecutor extends
AbstractCommandExecutor {
return SH;
}
- /**
- * check find yarn application id
- * @param line line
- * @return true if line contains task app id
- */
- @Override
- protected boolean checkFindApp(String line) {
- return line.contains(taskAppId);
- }
/**
* create command file if not exists
diff --git
a/dolphinscheduler-server/src/main/java/org/apache/dolphinscheduler/server/worker/task/TaskProps.java
b/dolphinscheduler-server/src/main/java/org/apache/dolphinscheduler/server/worker/task/TaskProps.java
index 8e5644e..a7b66bb 100644
---
a/dolphinscheduler-server/src/main/java/org/apache/dolphinscheduler/server/worker/task/TaskProps.java
+++
b/dolphinscheduler-server/src/main/java/org/apache/dolphinscheduler/server/worker/task/TaskProps.java
@@ -35,12 +35,12 @@ public class TaskProps {
/**
* task node name
**/
- private String nodeName;
+ private String taskName;
/**
* task instance id
**/
- private int taskInstId;
+ private int taskInstanceId;
/**
* tenant code , execute task linux user
@@ -58,11 +58,6 @@ public class TaskProps {
private String taskParams;
/**
- * task dir
- **/
- private String taskDir;
-
- /**
* queue
**/
private String queue;
@@ -111,6 +106,22 @@ public class TaskProps {
*/
private CommandType cmdTypeIfComplement;
+
+ /**
+ * host
+ */
+ private String host;
+
+ /**
+ * log path
+ */
+ private String logPath;
+
+ /**
+ * execute path
+ */
+ private String executePath;
+
/**
* constructor
*/
@@ -123,7 +134,7 @@ public class TaskProps {
* @param scheduleTime schedule time
* @param nodeName node name
* @param taskType task type
- * @param taskInstId task instance id
+ * @param taskInstanceId task instance id
* @param envFile env file
* @param tenantCode tenant code
* @param queue queue
@@ -133,24 +144,25 @@ public class TaskProps {
* @param cmdTypeIfComplement cmd type if complement
*/
public TaskProps(String taskParams,
- String taskDir,
Date scheduleTime,
String nodeName,
String taskType,
- int taskInstId,
+ int taskInstanceId,
String envFile,
String tenantCode,
String queue,
Date taskStartTime,
Map<String, String> definedParams,
String dependence,
- CommandType cmdTypeIfComplement){
+ CommandType cmdTypeIfComplement,
+ String host,
+ String logPath,
+ String executePath){
this.taskParams = taskParams;
- this.taskDir = taskDir;
this.scheduleTime = scheduleTime;
- this.nodeName = nodeName;
+ this.taskName = nodeName;
this.taskType = taskType;
- this.taskInstId = taskInstId;
+ this.taskInstanceId = taskInstanceId;
this.envFile = envFile;
this.tenantCode = tenantCode;
this.queue = queue;
@@ -158,7 +170,9 @@ public class TaskProps {
this.definedParams = definedParams;
this.dependence = dependence;
this.cmdTypeIfComplement = cmdTypeIfComplement;
-
+ this.host = host;
+ this.logPath = logPath;
+ this.executePath = executePath;
}
public String getTenantCode() {
@@ -177,12 +191,12 @@ public class TaskProps {
this.taskParams = taskParams;
}
- public String getTaskDir() {
- return taskDir;
+ public String getExecutePath() {
+ return executePath;
}
- public void setTaskDir(String taskDir) {
- this.taskDir = taskDir;
+ public void setExecutePath(String executePath) {
+ this.executePath = executePath;
}
public Map<String, String> getDefinedParams() {
@@ -202,20 +216,20 @@ public class TaskProps {
}
- public String getNodeName() {
- return nodeName;
+ public String getTaskName() {
+ return taskName;
}
- public void setNodeName(String nodeName) {
- this.nodeName = nodeName;
+ public void setTaskName(String taskName) {
+ this.taskName = taskName;
}
- public int getTaskInstId() {
- return taskInstId;
+ public int getTaskInstanceId() {
+ return taskInstanceId;
}
- public void setTaskInstId(int taskInstId) {
- this.taskInstId = taskInstId;
+ public void setTaskInstanceId(int taskInstanceId) {
+ this.taskInstanceId = taskInstanceId;
}
public String getQueue() {
@@ -291,6 +305,22 @@ public class TaskProps {
this.cmdTypeIfComplement = cmdTypeIfComplement;
}
+ public String getHost() {
+ return host;
+ }
+
+ public void setHost(String host) {
+ this.host = host;
+ }
+
+ public String getLogPath() {
+ return logPath;
+ }
+
+ public void setLogPath(String logPath) {
+ this.logPath = logPath;
+ }
+
/**
* get parameters map
* @return user defined params map
diff --git
a/dolphinscheduler-server/src/main/java/org/apache/dolphinscheduler/server/worker/task/datax/DataxTask.java
b/dolphinscheduler-server/src/main/java/org/apache/dolphinscheduler/server/worker/task/datax/DataxTask.java
index ef941cd..7c867f1 100755
---
a/dolphinscheduler-server/src/main/java/org/apache/dolphinscheduler/server/worker/task/datax/DataxTask.java
+++
b/dolphinscheduler-server/src/main/java/org/apache/dolphinscheduler/server/worker/task/datax/DataxTask.java
@@ -52,6 +52,7 @@ import org.apache.dolphinscheduler.dao.entity.ProcessInstance;
import org.apache.dolphinscheduler.server.utils.DataxUtils;
import org.apache.dolphinscheduler.server.utils.ParamUtils;
import org.apache.dolphinscheduler.server.worker.task.AbstractTask;
+import org.apache.dolphinscheduler.server.worker.task.CommandExecuteResult;
import org.apache.dolphinscheduler.server.worker.task.ShellCommandExecutor;
import org.apache.dolphinscheduler.server.worker.task.TaskProps;
import org.apache.dolphinscheduler.service.bean.SpringApplicationContext;
@@ -121,12 +122,12 @@ public class DataxTask extends AbstractTask {
public DataxTask(TaskProps props, Logger logger) {
super(props, logger);
- this.taskDir = props.getTaskDir();
+ this.taskDir = props.getExecutePath();
logger.info("task dir : {}", taskDir);
- this.shellCommandExecutor = new ShellCommandExecutor(this::logHandle,
props.getTaskDir(), props.getTaskAppId(),
- props.getTaskInstId(), props.getTenantCode(), props.getEnvFile(),
props.getTaskStartTime(),
- props.getTaskTimeout(), logger);
+ this.shellCommandExecutor = new ShellCommandExecutor(this::logHandle,
props.getExecutePath(), props.getTaskAppId(),
+ props.getTaskInstanceId(), props.getTenantCode(),
props.getEnvFile(), props.getTaskStartTime(),
+ props.getTaskTimeout(),
props.getLogPath(),props.getExecutePath(),logger);
this.processService =
SpringApplicationContext.getBean(ProcessService.class);
}
@@ -160,10 +161,15 @@ public class DataxTask extends AbstractTask {
// run datax process
String jsonFilePath = buildDataxJsonFile();
String shellCommandFilePath = buildShellCommandFile(jsonFilePath);
- exitStatusCode = shellCommandExecutor.run(shellCommandFilePath,
processService);
+ CommandExecuteResult commandExecuteResult =
shellCommandExecutor.run(shellCommandFilePath);
+
+ setExitStatusCode(commandExecuteResult.getExitStatusCode());
+ setAppIds(commandExecuteResult.getAppIds());
+ setProcessId(commandExecuteResult.getProcessId());
}
catch (Exception e) {
- exitStatusCode = -1;
+ logger.error("datax task failure", e);
+ setExitStatusCode(Constants.EXIT_CODE_FAILURE);
throw e;
}
}
@@ -355,7 +361,7 @@ public class DataxTask extends AbstractTask {
String dataxCommand = sbr.toString();
// find process instance by task id
- ProcessInstance processInstance =
processService.findProcessInstanceByTaskId(taskProps.getTaskInstId());
+ ProcessInstance processInstance =
processService.findProcessInstanceByTaskId(taskProps.getTaskInstanceId());
// combining local and global parameters
Map<String, Property> paramsMap =
ParamUtils.convert(taskProps.getUserDefParamsMap(),
diff --git
a/dolphinscheduler-server/src/main/java/org/apache/dolphinscheduler/server/worker/task/dependent/DependentTask.java
b/dolphinscheduler-server/src/main/java/org/apache/dolphinscheduler/server/worker/task/dependent/DependentTask.java
index f074d57..22cd60e 100644
---
a/dolphinscheduler-server/src/main/java/org/apache/dolphinscheduler/server/worker/task/dependent/DependentTask.java
+++
b/dolphinscheduler-server/src/main/java/org/apache/dolphinscheduler/server/worker/task/dependent/DependentTask.java
@@ -107,7 +107,7 @@ public class DependentTask extends AbstractTask {
try{
TaskInstance taskInstance = null;
while(Stopper.isRunning()){
- taskInstance =
processService.findTaskInstanceById(this.taskProps.getTaskInstId());
+ taskInstance =
processService.findTaskInstanceById(this.taskProps.getTaskInstanceId());
if(taskInstance == null){
exitStatusCode = -1;
diff --git
a/dolphinscheduler-server/src/main/java/org/apache/dolphinscheduler/server/worker/task/flink/FlinkTask.java
b/dolphinscheduler-server/src/main/java/org/apache/dolphinscheduler/server/worker/task/flink/FlinkTask.java
index c562fbe..ead61f0 100644
---
a/dolphinscheduler-server/src/main/java/org/apache/dolphinscheduler/server/worker/task/flink/FlinkTask.java
+++
b/dolphinscheduler-server/src/main/java/org/apache/dolphinscheduler/server/worker/task/flink/FlinkTask.java
@@ -68,7 +68,7 @@ public class FlinkTask extends AbstractYarnTask {
if (StringUtils.isNotEmpty(flinkParameters.getMainArgs())) {
String args = flinkParameters.getMainArgs();
// get process instance by task instance id
- ProcessInstance processInstance =
processService.findProcessInstanceByTaskId(taskProps.getTaskInstId());
+ ProcessInstance processInstance =
processService.findProcessInstanceByTaskId(taskProps.getTaskInstanceId());
/**
* combining local and global parameters
diff --git
a/dolphinscheduler-server/src/main/java/org/apache/dolphinscheduler/server/worker/task/http/HttpTask.java
b/dolphinscheduler-server/src/main/java/org/apache/dolphinscheduler/server/worker/task/http/HttpTask.java
index c925f90..c1d1ed8 100644
---
a/dolphinscheduler-server/src/main/java/org/apache/dolphinscheduler/server/worker/task/http/HttpTask.java
+++
b/dolphinscheduler-server/src/main/java/org/apache/dolphinscheduler/server/worker/task/http/HttpTask.java
@@ -138,7 +138,7 @@ public class HttpTask extends AbstractTask {
*/
protected CloseableHttpResponse sendRequest(CloseableHttpClient client)
throws IOException {
RequestBuilder builder = createRequestBuilder();
- ProcessInstance processInstance =
processService.findProcessInstanceByTaskId(taskProps.getTaskInstId());
+ ProcessInstance processInstance =
processService.findProcessInstanceByTaskId(taskProps.getTaskInstanceId());
Map<String, Property> paramsMap =
ParamUtils.convert(taskProps.getUserDefParamsMap(),
taskProps.getDefinedParams(),
diff --git
a/dolphinscheduler-server/src/main/java/org/apache/dolphinscheduler/server/worker/task/python/PythonTask.java
b/dolphinscheduler-server/src/main/java/org/apache/dolphinscheduler/server/worker/task/python/PythonTask.java
index fc212f8..cae5324 100644
---
a/dolphinscheduler-server/src/main/java/org/apache/dolphinscheduler/server/worker/task/python/PythonTask.java
+++
b/dolphinscheduler-server/src/main/java/org/apache/dolphinscheduler/server/worker/task/python/PythonTask.java
@@ -17,6 +17,7 @@
package org.apache.dolphinscheduler.server.worker.task.python;
+import org.apache.dolphinscheduler.common.Constants;
import org.apache.dolphinscheduler.common.process.Property;
import org.apache.dolphinscheduler.common.task.AbstractParameters;
import org.apache.dolphinscheduler.common.task.python.PythonParameters;
@@ -24,6 +25,7 @@ import org.apache.dolphinscheduler.common.utils.JSONUtils;
import org.apache.dolphinscheduler.common.utils.ParameterUtils;
import org.apache.dolphinscheduler.server.utils.ParamUtils;
import org.apache.dolphinscheduler.server.worker.task.AbstractTask;
+import org.apache.dolphinscheduler.server.worker.task.CommandExecuteResult;
import org.apache.dolphinscheduler.server.worker.task.PythonCommandExecutor;
import org.apache.dolphinscheduler.server.worker.task.TaskProps;
import org.apache.dolphinscheduler.service.bean.SpringApplicationContext;
@@ -65,16 +67,18 @@ public class PythonTask extends AbstractTask {
public PythonTask(TaskProps taskProps, Logger logger) {
super(taskProps, logger);
- this.taskDir = taskProps.getTaskDir();
+ this.taskDir = taskProps.getExecutePath();
this.pythonCommandExecutor = new PythonCommandExecutor(this::logHandle,
- taskProps.getTaskDir(),
+ taskProps.getExecutePath(),
taskProps.getTaskAppId(),
- taskProps.getTaskInstId(),
+ taskProps.getTaskInstanceId(),
taskProps.getTenantCode(),
taskProps.getEnvFile(),
taskProps.getTaskStartTime(),
taskProps.getTaskTimeout(),
+ taskProps.getLogPath(),
+ taskProps.getExecutePath(),
logger);
this.processService =
SpringApplicationContext.getBean(ProcessService.class);
}
@@ -94,10 +98,15 @@ public class PythonTask extends AbstractTask {
public void handle() throws Exception {
try {
// construct process
- exitStatusCode = pythonCommandExecutor.run(buildCommand(),
processService);
- } catch (Exception e) {
+ CommandExecuteResult commandExecuteResult =
pythonCommandExecutor.run(buildCommand());
+
+ setExitStatusCode(commandExecuteResult.getExitStatusCode());
+ setAppIds(commandExecuteResult.getAppIds());
+ setProcessId(commandExecuteResult.getProcessId());
+ }
+ catch (Exception e) {
logger.error("python task failure", e);
- exitStatusCode = -1;
+ setExitStatusCode(Constants.EXIT_CODE_FAILURE);
throw e;
}
}
diff --git
a/dolphinscheduler-server/src/main/java/org/apache/dolphinscheduler/server/worker/task/shell/ShellTask.java
b/dolphinscheduler-server/src/main/java/org/apache/dolphinscheduler/server/worker/task/shell/ShellTask.java
index 5704c80..b703a44 100644
---
a/dolphinscheduler-server/src/main/java/org/apache/dolphinscheduler/server/worker/task/shell/ShellTask.java
+++
b/dolphinscheduler-server/src/main/java/org/apache/dolphinscheduler/server/worker/task/shell/ShellTask.java
@@ -25,6 +25,7 @@ import org.apache.dolphinscheduler.common.utils.JSONUtils;
import org.apache.dolphinscheduler.common.utils.ParameterUtils;
import org.apache.dolphinscheduler.server.utils.ParamUtils;
import org.apache.dolphinscheduler.server.worker.task.AbstractTask;
+import org.apache.dolphinscheduler.server.worker.task.CommandExecuteResult;
import org.apache.dolphinscheduler.server.worker.task.ShellCommandExecutor;
import org.apache.dolphinscheduler.server.worker.task.TaskProps;
import org.apache.dolphinscheduler.service.bean.SpringApplicationContext;
@@ -74,15 +75,17 @@ public class ShellTask extends AbstractTask {
public ShellTask(TaskProps taskProps, Logger logger) {
super(taskProps, logger);
- this.taskDir = taskProps.getTaskDir();
+ this.taskDir = taskProps.getExecutePath();
- this.shellCommandExecutor = new ShellCommandExecutor(this::logHandle,
taskProps.getTaskDir(),
+ this.shellCommandExecutor = new ShellCommandExecutor(this::logHandle,
taskProps.getExecutePath(),
taskProps.getTaskAppId(),
- taskProps.getTaskInstId(),
+ taskProps.getTaskInstanceId(),
taskProps.getTenantCode(),
taskProps.getEnvFile(),
taskProps.getTaskStartTime(),
taskProps.getTaskTimeout(),
+ taskProps.getLogPath(),
+ taskProps.getExecutePath(),
logger);
this.processService =
SpringApplicationContext.getBean(ProcessService.class);
}
@@ -102,10 +105,13 @@ public class ShellTask extends AbstractTask {
public void handle() throws Exception {
try {
// construct process
- exitStatusCode = shellCommandExecutor.run(buildCommand(),
processService);
+ CommandExecuteResult commandExecuteResult =
shellCommandExecutor.run(buildCommand());
+ setExitStatusCode(commandExecuteResult.getExitStatusCode());
+ setAppIds(commandExecuteResult.getAppIds());
+ setProcessId(commandExecuteResult.getProcessId());
} catch (Exception e) {
logger.error("shell task failure", e);
- exitStatusCode = -1;
+ setExitStatusCode(Constants.EXIT_CODE_FAILURE);
throw e;
}
}
diff --git
a/dolphinscheduler-server/src/main/java/org/apache/dolphinscheduler/server/worker/task/sql/SqlTask.java
b/dolphinscheduler-server/src/main/java/org/apache/dolphinscheduler/server/worker/task/sql/SqlTask.java
index aae11f5..e3a4cf7 100644
---
a/dolphinscheduler-server/src/main/java/org/apache/dolphinscheduler/server/worker/task/sql/SqlTask.java
+++
b/dolphinscheduler-server/src/main/java/org/apache/dolphinscheduler/server/worker/task/sql/SqlTask.java
@@ -316,7 +316,7 @@ public class SqlTask extends AbstractTask {
sendAttachment(sqlParameters.getTitle(),
JSONObject.toJSONString(resultJSONArray,
SerializerFeature.WriteMapNullValue));
}else{
- sendAttachment(taskProps.getNodeName() + " query
resultsets ",
+ sendAttachment(taskProps.getTaskName() + " query
resultsets ",
JSONObject.toJSONString(resultJSONArray,
SerializerFeature.WriteMapNullValue));
}
}
@@ -384,7 +384,7 @@ public class SqlTask extends AbstractTask {
public void sendAttachment(String title,String content){
// process instance
- ProcessInstance instance =
processService.findProcessInstanceByTaskId(taskProps.getTaskInstId());
+ ProcessInstance instance =
processService.findProcessInstanceByTaskId(taskProps.getTaskInstanceId());
List<User> users =
alertDao.queryUserByAlertGroupId(instance.getWarningGroupId());
@@ -471,7 +471,7 @@ public class SqlTask extends AbstractTask {
*/
private void checkUdfPermission(Integer[] udfFunIds) throws Exception{
// process instance
- ProcessInstance processInstance =
processService.findProcessInstanceByTaskId(taskProps.getTaskInstId());
+ ProcessInstance processInstance =
processService.findProcessInstanceByTaskId(taskProps.getTaskInstanceId());
int userId = processInstance.getExecutorId();
PermissionCheck<Integer> permissionCheckUdf = new
PermissionCheck<Integer>(AuthorizationType.UDF,
processService,udfFunIds,userId,logger);
@@ -485,7 +485,7 @@ public class SqlTask extends AbstractTask {
*/
private void checkDataSourcePermission(int dataSourceId) throws Exception{
// process instance
- ProcessInstance processInstance =
processService.findProcessInstanceByTaskId(taskProps.getTaskInstId());
+ ProcessInstance processInstance =
processService.findProcessInstanceByTaskId(taskProps.getTaskInstanceId());
int userId = processInstance.getExecutorId();
PermissionCheck<Integer> permissionCheckDataSource = new
PermissionCheck<Integer>(AuthorizationType.DATASOURCE, processService,new
Integer[]{dataSourceId},userId,logger);
diff --git
a/dolphinscheduler-server/src/test/java/org/apache/dolphinscheduler/server/worker/shell/ShellCommandExecutorTest.java
b/dolphinscheduler-server/src/test/java/org/apache/dolphinscheduler/server/worker/shell/ShellCommandExecutorTest.java
index 5d42636..1a8a4ff 100644
---
a/dolphinscheduler-server/src/test/java/org/apache/dolphinscheduler/server/worker/shell/ShellCommandExecutorTest.java
+++
b/dolphinscheduler-server/src/test/java/org/apache/dolphinscheduler/server/worker/shell/ShellCommandExecutorTest.java
@@ -55,13 +55,13 @@ public class ShellCommandExecutorTest {
TaskProps taskProps = new TaskProps();
// processDefineId_processInstanceId_taskInstanceId
-
taskProps.setTaskDir("/opt/soft/program/tmp/dolphinscheduler/exec/flow/5/36/2864/7657");
+
taskProps.setExecutePath("/opt/soft/program/tmp/dolphinscheduler/exec/flow/5/36/2864/7657");
taskProps.setTaskAppId("36_2864_7657");
// set tenant -> task execute linux user
taskProps.setTenantCode("hdfs");
taskProps.setTaskStartTime(new Date());
taskProps.setTaskTimeout(360000);
- taskProps.setTaskInstId(7657);
+ taskProps.setTaskInstanceId(7657);
diff --git
a/dolphinscheduler-server/src/test/java/org/apache/dolphinscheduler/server/worker/sql/SqlExecutorTest.java
b/dolphinscheduler-server/src/test/java/org/apache/dolphinscheduler/server/worker/sql/SqlExecutorTest.java
index c395eab..32e0b2f 100644
---
a/dolphinscheduler-server/src/test/java/org/apache/dolphinscheduler/server/worker/sql/SqlExecutorTest.java
+++
b/dolphinscheduler-server/src/test/java/org/apache/dolphinscheduler/server/worker/sql/SqlExecutorTest.java
@@ -97,15 +97,15 @@ public class SqlExecutorTest {
*/
private void sharedTestSqlTask(String nodeName, String taskAppId, String
tenantCode, int taskInstId) throws Exception {
TaskProps taskProps = new TaskProps();
- taskProps.setTaskDir("");
+ taskProps.setExecutePath("");
// processDefineId_processInstanceId_taskInstanceId
taskProps.setTaskAppId(taskAppId);
// set tenant -> task execute linux user
taskProps.setTenantCode(tenantCode);
taskProps.setTaskStartTime(new Date());
taskProps.setTaskTimeout(360000);
- taskProps.setTaskInstId(taskInstId);
- taskProps.setNodeName(nodeName);
+ taskProps.setTaskInstanceId(taskInstId);
+ taskProps.setTaskName(nodeName);
taskProps.setCmdTypeIfComplement(CommandType.START_PROCESS);
diff --git
a/dolphinscheduler-server/src/test/java/org/apache/dolphinscheduler/server/worker/task/datax/DataxTaskTest.java
b/dolphinscheduler-server/src/test/java/org/apache/dolphinscheduler/server/worker/task/datax/DataxTaskTest.java
index bd7f275..d1b82f2 100644
---
a/dolphinscheduler-server/src/test/java/org/apache/dolphinscheduler/server/worker/task/datax/DataxTaskTest.java
+++
b/dolphinscheduler-server/src/test/java/org/apache/dolphinscheduler/server/worker/task/datax/DataxTaskTest.java
@@ -71,9 +71,9 @@ public class DataxTaskTest {
Mockito.when(applicationContext.getBean(ProcessService.class)).thenReturn(processService);
TaskProps props = new TaskProps();
- props.setTaskDir("/tmp");
+ props.setExecutePath("/tmp");
props.setTaskAppId(String.valueOf(System.currentTimeMillis()));
- props.setTaskInstId(1);
+ props.setTaskInstanceId(1);
props.setTenantCode("1");
props.setEnvFile(".dolphinscheduler_env.sh");
props.setTaskStartTime(new Date());
@@ -87,8 +87,8 @@ public class DataxTaskTest {
Mockito.when(processService.findDataSourceById(2)).thenReturn(getDataSource());
Mockito.when(processService.findProcessInstanceByTaskId(1)).thenReturn(getProcessInstance());
- String fileName = String.format("%s/%s_node.sh", props.getTaskDir(),
props.getTaskAppId());
- Mockito.when(shellCommandExecutor.run(fileName,
processService)).thenReturn(0);
+ String fileName = String.format("%s/%s_node.sh",
props.getExecutePath(), props.getTaskAppId());
+ Mockito.when(shellCommandExecutor.run(fileName)).thenReturn(null);
}
private DataSource getDataSource() {
@@ -118,9 +118,9 @@ public class DataxTaskTest {
public void testDataxTask()
throws Exception {
TaskProps props = new TaskProps();
- props.setTaskDir("/tmp");
+ props.setExecutePath("/tmp");
props.setTaskAppId(String.valueOf(System.currentTimeMillis()));
- props.setTaskInstId(1);
+ props.setTaskInstanceId(1);
props.setTenantCode("1");
Assert.assertNotNull(new DataxTask(props, logger));
}
diff --git
a/dolphinscheduler-server/src/test/java/org/apache/dolphinscheduler/server/worker/task/dependent/DependentTaskTest.java
b/dolphinscheduler-server/src/test/java/org/apache/dolphinscheduler/server/worker/task/dependent/DependentTaskTest.java
index 272fb54..a6a2587 100644
---
a/dolphinscheduler-server/src/test/java/org/apache/dolphinscheduler/server/worker/task/dependent/DependentTaskTest.java
+++
b/dolphinscheduler-server/src/test/java/org/apache/dolphinscheduler/server/worker/task/dependent/DependentTaskTest.java
@@ -50,7 +50,7 @@ public class DependentTaskTest {
"\"relation\":\"OR\"\n" +
"}";
- taskProps.setTaskInstId(252612);
+ taskProps.setTaskInstanceId(252612);
taskProps.setDependence(dependString);
DependentTask dependentTask = new DependentTask(taskProps, logger);
dependentTask.init();
diff --git
a/dolphinscheduler-ui/src/js/conf/home/pages/dag/_source/formModel/log.vue
b/dolphinscheduler-ui/src/js/conf/home/pages/dag/_source/formModel/log.vue
index dbe3e1d..7874b53 100644
--- a/dolphinscheduler-ui/src/js/conf/home/pages/dag/_source/formModel/log.vue
+++ b/dolphinscheduler-ui/src/js/conf/home/pages/dag/_source/formModel/log.vue
@@ -170,7 +170,7 @@
*/
_downloadLog () {
downloadFile('/dolphinscheduler/log/download-log', {
- taskInstId: this.stateId || this.logId
+ taskInstanceId: this.stateId || this.logId
})
},
/**
@@ -256,7 +256,7 @@
computed: {
_rtParam () {
return {
- taskInstId: this.stateId || this.logId,
+ taskInstanceId: this.stateId || this.logId,
skipLineNum: parseInt(`${this.loadingIndex ? this.loadingIndex +
'000' : 0}`),
limit: parseInt(`${this.loadingIndex ? this.loadingIndex + 1 :
1}000`)
}