Radeity commented on code in PR #14569:
URL: 
https://github.com/apache/dolphinscheduler/pull/14569#discussion_r1335370167


##########
dolphinscheduler-master/src/main/java/org/apache/dolphinscheduler/server/master/runner/WorkflowExecuteRunnable.java:
##########
@@ -2248,4 +2269,118 @@ private void mergeTaskInstanceVarPool(TaskInstance 
taskInstance) {
 
         workflowInstance.setVarPool(JSONUtils.toJsonString(processVarPool));
     }
+
+    /**
+     * determines whether the process is running
+     *
+     * @param pid process_id
+     * @return boolean
+     */
+    public boolean isProcessRunning(String pid) throws Exception {

Review Comment:
   May I ask why you pass `processPath`  here? Also, for many types of tasks, 
`executePath` is generated on `Worker`, you can not use this path in Master. 



##########
dolphinscheduler-master/src/main/java/org/apache/dolphinscheduler/server/master/runner/WorkflowExecuteRunnable.java:
##########
@@ -2248,4 +2269,118 @@ private void mergeTaskInstanceVarPool(TaskInstance 
taskInstance) {
 
         workflowInstance.setVarPool(JSONUtils.toJsonString(processVarPool));
     }
+
+    /**
+     * determines whether the process is running
+     *
+     * @param pid process_id
+     * @return boolean
+     */
+    public boolean isProcessRunning(String pid) throws Exception {
+
+        String processPath = pid;
+
+        // build shell commands, use ps-ef to list all processes, and use GREP 
filters to match the process id
+        String command = "/bin/sh -c \"ps -ef | grep " + processPath + " | 
grep -v grep\"";
+
+        // use the ProcessBuilder class to create a new process and set its 
command
+        ProcessBuilder processBuilder = new ProcessBuilder("/bin/sh", "-c", 
command);
+
+        // start the process
+        Process process = processBuilder.start();
+
+        // reads the output of the command and gets the standard input stream 
for the process
+        BufferedReader reader = new BufferedReader(new 
InputStreamReader(process.getInputStream()));
+
+        // reads each line in the process input stream
+        String line;
+        while ((line = reader.readLine()) != null) {
+            if (line.contains(processPath)) {
+                return true;
+            }
+        }
+        try {
+            // gets the exit code for the command.
+            int exitCode = process.waitFor();
+            // if the exit code is 0, and the output contains process id, the 
process exists.
+            return exitCode == 0;
+        } catch (InterruptedException e) {
+            e.printStackTrace();
+        }
+        return false;
+    }
+
+    /**
+     * determine if the yarn task is running
+     *
+     * @param applicationId applicationId
+     * @return boolean
+     */
+    public boolean isApplicationRunning(String applicationId ) throws 
Exception {
+
+        // build shell commands, use yarn application -status applicationId
+        String[] command = {"/bin/bash", "-c", "yarn application -status " + 
applicationId};
+
+        // use the ProcessBuilder class to create a new process and set its 
command
+        ProcessBuilder processBuilder = new ProcessBuilder(command);
+
+        // start the process
+        Process process = processBuilder.start();
+
+        // reads the output of the command and gets the standard input stream 
for the process
+        BufferedReader reader = new BufferedReader(new 
InputStreamReader(process.getInputStream()));
+
+        String line;
+        String state = null;
+        // reads each line in the process input stream
+        while ((line = reader.readLine()) != null) {
+            // get the yarn task running status
+            if (line.contains("State")) {
+                state = line.split(":")[1].trim();
+                break;
+            }
+        }
+        try {
+            // gets the exit code for the command.
+            int exitCode = process.waitFor();
+            // if the exit code is 0, and the yarn task is running
+            if (exitCode == 0 && state.equals("RUNNING")) {
+                // process is running
+                return true;
+            } else {
+                return false;
+            }
+        } catch (InterruptedException e) {
+            e.printStackTrace();
+        }
+        return false;
+    }
+    public boolean determineWhetherTaskIsRunning(TaskInstance task) throws 
Exception {
+        String pidPath = null;
+        if(task.getExecutePath() != null){
+            pidPath = task.getExecutePath();
+
+        }
+        String pid = getDesiredPath(pidPath);
+
+        // Yarn task is determined by parsing whether the task log contains 
the content of the application
+        String applicationId  = task.getAppLink();

Review Comment:
   Currently, DS does not implement methods to split submit and track remote 
application, you can check `AbstractRemoteTask.handle()`, so we can not get 
appId by `task.getAppLink()`. Maybe you have to figure out how to take-over 
remote application effectively.
   
   



##########
dolphinscheduler-master/src/main/java/org/apache/dolphinscheduler/server/master/runner/WorkflowExecuteRunnable.java:
##########
@@ -825,6 +825,27 @@ private void initTaskQueue() throws 
StateEventHandleException, CronParseExceptio
             List<TaskInstance> validTaskInstanceList =
                     
taskInstanceDao.queryValidTaskListByWorkflowInstanceId(workflowInstance.getId(),
                             workflowInstance.getTestFlag());
+            // the task instance needs fault tolerance
+            if(workflowInstance.getRecovery() == Flag.YES){
+                //determine whether the task is running
+                for (int i = 0; i < validTaskInstanceList.size(); i++) {
+                    TaskInstance task = validTaskInstanceList.get(i);
+                    boolean isRunningTaskFaultTolerance = false;
+                    try {
+                        isRunningTaskFaultTolerance = 
determineWhetherTaskIsRunning(task);

Review Comment:
   Agree with @ruanwenjun, it's better `TAKE-OVER` the running application, 
need some detailed design, such as modify `workflowInstanceHost` on Worker.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: [email protected]

For queries about this service, please contact Infrastructure at:
[email protected]

Reply via email to