ruanwenjun commented on code in PR #13250:
URL:
https://github.com/apache/dolphinscheduler/pull/13250#discussion_r1056727269
##########
dolphinscheduler-dao/src/main/java/org/apache/dolphinscheduler/dao/entity/TaskInstance.java:
##########
@@ -398,9 +398,6 @@ public boolean taskCanRetry() {
if (this.isSubProcess()) {
return false;
}
- if (this.getState() == TaskExecutionStatus.NEED_FAULT_TOLERANCE) {
- return true;
- }
Review Comment:
Why you remove this, if you remove this the failover task instance will not
create a new task instance. When we do failover, some task information will be
override if we don't create a new one.
##########
dolphinscheduler-service/src/main/java/org/apache/dolphinscheduler/service/utils/ProcessUtils.java:
##########
@@ -186,6 +188,42 @@ public static String getPidsStr(int processId) throws
Exception {
return String.join(" ", pidList).trim();
}
+ /**
+ * get remote host pids str
+ * @param host
+ * @param processId
+ * @return
+ * @throws Exception
+ */
+ public static String getHostPidsStr(String host, int processId) throws
Exception {
Review Comment:
Why we need to add this method?
##########
dolphinscheduler-service/src/main/java/org/apache/dolphinscheduler/service/storage/impl/HadoopUtils.java:
##########
@@ -650,6 +652,42 @@ public TaskExecutionStatus getApplicationStatus(String
applicationId) throws Bas
return getExecutionStatus(result);
}
+ public TaskExecutionStatus waitApplicationAccepted(String applicationId)
throws BaseException {
+ if (StringUtils.isEmpty(applicationId)) {
+ return null;
+ }
+
+ String result;
+ String applicationUrl = getApplicationUrl(applicationId);
+ logger.debug("generate yarn application url, applicationUrl={}",
applicationUrl);
+ long startTime = System.currentTimeMillis();
+ while (System.currentTimeMillis() - startTime < 60 * 1000) {
+ String responseContent = Boolean.TRUE
+
.equals(PropertyUtils.getBoolean(Constants.HADOOP_SECURITY_AUTHENTICATION_STARTUP_STATE,
false))
+ ? KerberosHttpClient.get(applicationUrl)
+ : HttpUtils.get(applicationUrl);
+ if (responseContent != null) {
+ ObjectNode jsonObject = JSONUtils.parseObject(responseContent);
Review Comment:
Please don't use `ObjectNode`, this is not a good practice.
##########
dolphinscheduler-task-plugin/dolphinscheduler-task-dms/src/main/java/org/apache/dolphinscheduler/plugin/task/dms/DmsTask.java:
##########
@@ -77,8 +77,8 @@ public void init() throws TaskException {
}
@Override
- public List<String> getApplicationIds() throws TaskException {
- return Collections.emptyList();
+ public Set<String> getApplicationIds() throws TaskException {
Review Comment:
You don't need to do this change here, the return result should be distinct
by task.
##########
dolphinscheduler-master/src/main/java/org/apache/dolphinscheduler/server/master/builder/TaskExecutionContextBuilder.java:
##########
@@ -79,6 +79,7 @@ public TaskExecutionContextBuilder
buildTaskInstanceRelatedInfo(TaskInstance tas
taskExecutionContext.setCpuQuota(taskInstance.getCpuQuota());
taskExecutionContext.setMemoryMax(taskInstance.getMemoryMax());
taskExecutionContext.setAppIds(taskInstance.getAppLink());
+ taskExecutionContext.setProcessId(taskInstance.getPid());
Review Comment:
It's not a good practice to set process id here, when the worker failover
the pid is useless, when master failover, the pid can get from worker by task
instance id.
##########
dolphinscheduler-remote/src/main/java/org/apache/dolphinscheduler/remote/command/HostUpdateResponseCommand.java:
##########
@@ -72,6 +75,14 @@ public Command convert2Command() {
return command;
}
+ public Command convert2ResponseCommand(long opaque) {
Review Comment:
Could you please split this with request/response? don't use one class to
represent request and response.
##########
dolphinscheduler-master/src/main/java/org/apache/dolphinscheduler/server/master/runner/WorkflowExecuteRunnable.java:
##########
@@ -959,9 +966,15 @@ private Optional<TaskInstance> submitTaskExec(TaskInstance
taskInstance) {
ITaskProcessor taskProcessor =
TaskProcessorFactory.getTaskProcessor(taskInstance.getTaskType());
taskProcessor.init(taskInstance, processInstance);
- if (taskInstance.getState().isRunning()
+ // rebuild channel when master crashes
+ if (taskInstance.getState().isNeedFaultTolerance()
Review Comment:
It's better to use other flag(When we create a failover workflow we can know
this) to judge if the current workflow is execute from master crash, rather
than use isNeedFaultTolerance.
##########
dolphinscheduler-worker/src/main/java/org/apache/dolphinscheduler/server/worker/runner/WorkerTaskExecuteRunnable.java:
##########
@@ -110,9 +118,24 @@ protected void afterExecute() throws TaskException {
if (task == null) {
throw new TaskException("The current task instance is null");
}
+ TaskExecutionStatus taskExecutionStatus = task.getExitStatus();
+
+ if (task.getExitStatus() == TaskExecutionStatus.SUCCESS &&
StringUtils.isNotEmpty(task.getAppIds())) {
+ // monitor task submitted before
+ logger.info("monitor task by appId {}, maybe has process id {}",
task.getAppIds(), task.getProcessId());
+
+ taskExecutionStatus = waitApplicationEnd(task.getAppIds());
+ } else if (task.getExitStatus() == TaskExecutionStatus.SUCCESS &&
task.getProcessId() > 0) {
+ // monitor task by process id
+ logger.info("monitor task by process id {}, maybe has appId {}",
task.getProcessId(), task.getAppIds());
+
+ taskExecutionStatus = waitProcessEnd(task.getProcess());
+
+ }
Review Comment:
Why we need to wait status here? right now all task is sync task, the
process should already exist here. And the remote status should be generate by
task plugin rathere than this runnable.
--
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
To unsubscribe, e-mail: [email protected]
For queries about this service, please contact Infrastructure at:
[email protected]