This is an automated email from the ASF dual-hosted git repository.

journey pushed a commit to branch refactor-worker
in repository https://gitbox.apache.org/repos/asf/incubator-dolphinscheduler.git


The following commit(s) were added to refs/heads/refactor-worker by this push:
     new 158322b  add TaskResponseProcessor (#1983)
158322b is described below

commit 158322bd41d925eaab0c5f4ce7b06d6fa6fae2b9
Author: Tboy <[email protected]>
AuthorDate: Fri Feb 21 09:01:42 2020 +0800

    add TaskResponseProcessor (#1983)
---
 .../common/enums/ExecutionStatus.java              |   9 ++
 .../server/master/MasterServer.java                |  15 ++
 .../server/master/future/TaskFuture.java           | 171 +++++++++++++++++++++
 .../master/processor/TaskResponseProcessor.java    |  59 +++++++
 .../master/runner/MasterBaseTaskExecThread.java    |  44 ++----
 .../server/worker/WorkerServer.java                |   5 +-
 .../server/worker/runner/TaskScheduleThread.java   |  20 ---
 7 files changed, 273 insertions(+), 50 deletions(-)

diff --git 
a/dolphinscheduler-common/src/main/java/org/apache/dolphinscheduler/common/enums/ExecutionStatus.java
 
b/dolphinscheduler-common/src/main/java/org/apache/dolphinscheduler/common/enums/ExecutionStatus.java
index 1270252..1c336c8 100644
--- 
a/dolphinscheduler-common/src/main/java/org/apache/dolphinscheduler/common/enums/ExecutionStatus.java
+++ 
b/dolphinscheduler-common/src/main/java/org/apache/dolphinscheduler/common/enums/ExecutionStatus.java
@@ -128,4 +128,13 @@ public enum ExecutionStatus {
     public String getDescp() {
         return descp;
     }
+
+    public static ExecutionStatus of(int status){
+        for(ExecutionStatus es : values()){
+            if(es.getCode() == status){
+                return es;
+            }
+        }
+        throw new IllegalArgumentException("invalid status : " + status);
+    }
 }
diff --git 
a/dolphinscheduler-server/src/main/java/org/apache/dolphinscheduler/server/master/MasterServer.java
 
b/dolphinscheduler-server/src/main/java/org/apache/dolphinscheduler/server/master/MasterServer.java
index 6b5063c..0a153e7 100644
--- 
a/dolphinscheduler-server/src/main/java/org/apache/dolphinscheduler/server/master/MasterServer.java
+++ 
b/dolphinscheduler-server/src/main/java/org/apache/dolphinscheduler/server/master/MasterServer.java
@@ -23,7 +23,11 @@ import 
org.apache.dolphinscheduler.common.thread.ThreadPoolExecutors;
 import org.apache.dolphinscheduler.common.thread.ThreadUtils;
 import org.apache.dolphinscheduler.common.utils.OSUtils;
 import org.apache.dolphinscheduler.common.utils.StringUtils;
+import org.apache.dolphinscheduler.remote.NettyRemotingServer;
+import org.apache.dolphinscheduler.remote.command.CommandType;
+import org.apache.dolphinscheduler.remote.config.NettyServerConfig;
 import org.apache.dolphinscheduler.server.master.config.MasterConfig;
+import 
org.apache.dolphinscheduler.server.master.processor.TaskResponseProcessor;
 import org.apache.dolphinscheduler.server.master.runner.MasterSchedulerThread;
 import org.apache.dolphinscheduler.server.zk.ZKMasterClient;
 import org.apache.dolphinscheduler.service.bean.SpringApplicationContext;
@@ -90,6 +94,8 @@ public class MasterServer implements IStoppable {
     @Autowired
     private SpringApplicationContext springApplicationContext;
 
+    private NettyRemotingServer nettyRemotingServer;
+
 
     /**
      * master server startup
@@ -108,6 +114,15 @@ public class MasterServer implements IStoppable {
      */
     @PostConstruct
     public void run(){
+
+        //
+        //init remoting server
+        NettyServerConfig serverConfig = new NettyServerConfig();
+        this.nettyRemotingServer = new NettyRemotingServer(serverConfig);
+        
this.nettyRemotingServer.registerProcessor(CommandType.EXECUTE_TASK_RESPONSE, 
new TaskResponseProcessor(processService));
+        this.nettyRemotingServer.start();
+
+        //
         zkMasterClient.init();
 
         masterSchedulerService = 
ThreadUtils.newDaemonSingleThreadExecutor("Master-Scheduler-Thread");
diff --git 
a/dolphinscheduler-server/src/main/java/org/apache/dolphinscheduler/server/master/future/TaskFuture.java
 
b/dolphinscheduler-server/src/main/java/org/apache/dolphinscheduler/server/master/future/TaskFuture.java
new file mode 100644
index 0000000..32fb55f
--- /dev/null
+++ 
b/dolphinscheduler-server/src/main/java/org/apache/dolphinscheduler/server/master/future/TaskFuture.java
@@ -0,0 +1,171 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.dolphinscheduler.server.master.future;
+
+
+import org.apache.dolphinscheduler.remote.command.Command;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import java.util.Iterator;
+import java.util.LinkedList;
+import java.util.List;
+import java.util.Map;
+import java.util.concurrent.ConcurrentHashMap;
+import java.util.concurrent.CountDownLatch;
+import java.util.concurrent.TimeUnit;
+
+public class TaskFuture {
+
+    private final static Logger LOGGER = 
LoggerFactory.getLogger(TaskFuture.class);
+
+    private final static ConcurrentHashMap<Long,TaskFuture> FUTURE_TABLE = new 
ConcurrentHashMap<>(256);
+
+    /**
+     *  request unique identification
+     */
+    private final long opaque;
+
+    /**
+     *  timeout
+     */
+    private final long timeoutMillis;
+
+    private final CountDownLatch latch = new CountDownLatch(1);
+
+    private final long beginTimestamp = System.currentTimeMillis();
+
+    /**
+     *  response command
+     */
+    private volatile Command responseCommand;
+
+    private volatile boolean sendOk = true;
+
+    private volatile Throwable cause;
+
+    public TaskFuture(long opaque, long timeoutMillis) {
+        this.opaque = opaque;
+        this.timeoutMillis = timeoutMillis;
+        FUTURE_TABLE.put(opaque, this);
+    }
+
+    /**
+     *  wait for response
+     * @return command
+     * @throws InterruptedException
+     */
+    public Command waitResponse() throws InterruptedException {
+        this.latch.await(timeoutMillis, TimeUnit.MILLISECONDS);
+        return this.responseCommand;
+    }
+
+    /**
+     *  put response
+     *
+     * @param responseCommand responseCommand
+     */
+    public void putResponse(final Command responseCommand) {
+        this.responseCommand = responseCommand;
+        this.latch.countDown();
+        FUTURE_TABLE.remove(opaque);
+    }
+
+    /**
+     *  whether timeout
+     * @return timeout
+     */
+    public boolean isTimeout() {
+        long diff = System.currentTimeMillis() - this.beginTimestamp;
+        return diff > this.timeoutMillis;
+    }
+
+    public static void notify(final Command responseCommand){
+        TaskFuture taskFuture = 
FUTURE_TABLE.remove(responseCommand.getOpaque());
+        if(taskFuture != null){
+            taskFuture.putResponse(responseCommand);
+        }
+    }
+
+
+    public boolean isSendOK() {
+        return sendOk;
+    }
+
+    public void setSendOk(boolean sendOk) {
+        this.sendOk = sendOk;
+    }
+
+    public void setCause(Throwable cause) {
+        this.cause = cause;
+    }
+
+    public Throwable getCause() {
+        return cause;
+    }
+
+    public long getOpaque() {
+        return opaque;
+    }
+
+    public long getTimeoutMillis() {
+        return timeoutMillis;
+    }
+
+    public long getBeginTimestamp() {
+        return beginTimestamp;
+    }
+
+    public Command getResponseCommand() {
+        return responseCommand;
+    }
+
+    public void setResponseCommand(Command responseCommand) {
+        this.responseCommand = responseCommand;
+    }
+
+
+    @Override
+    public String toString() {
+        return "ResponseFuture{" +
+                "opaque=" + opaque +
+                ", timeoutMillis=" + timeoutMillis +
+                ", latch=" + latch +
+                ", beginTimestamp=" + beginTimestamp +
+                ", responseCommand=" + responseCommand +
+                ", sendOk=" + sendOk +
+                ", cause=" + cause +
+                '}';
+    }
+
+    /**
+     * scan future table
+     */
+    public static void scanFutureTable(){
+        final List<TaskFuture> futureList = new LinkedList<>();
+        Iterator<Map.Entry<Long, TaskFuture>> it = 
FUTURE_TABLE.entrySet().iterator();
+        while (it.hasNext()) {
+            Map.Entry<Long, TaskFuture> next = it.next();
+            TaskFuture future = next.getValue();
+            if ((future.getBeginTimestamp() + future.getTimeoutMillis() + 
1000) <= System.currentTimeMillis()) {
+                futureList.add(future);
+                it.remove();
+                LOGGER.warn("remove timeout request : {}", future);
+            }
+        }
+    }
+}
diff --git 
a/dolphinscheduler-server/src/main/java/org/apache/dolphinscheduler/server/master/processor/TaskResponseProcessor.java
 
b/dolphinscheduler-server/src/main/java/org/apache/dolphinscheduler/server/master/processor/TaskResponseProcessor.java
new file mode 100644
index 0000000..0dd45f0
--- /dev/null
+++ 
b/dolphinscheduler-server/src/main/java/org/apache/dolphinscheduler/server/master/processor/TaskResponseProcessor.java
@@ -0,0 +1,59 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.dolphinscheduler.server.master.processor;
+
+import io.netty.channel.Channel;
+import org.apache.dolphinscheduler.common.enums.ExecutionStatus;
+import org.apache.dolphinscheduler.common.utils.Preconditions;
+import org.apache.dolphinscheduler.remote.command.Command;
+import org.apache.dolphinscheduler.remote.command.CommandType;
+import org.apache.dolphinscheduler.remote.command.ExecuteTaskResponseCommand;
+import org.apache.dolphinscheduler.remote.processor.NettyRequestProcessor;
+import org.apache.dolphinscheduler.remote.utils.FastJsonSerializer;
+import org.apache.dolphinscheduler.server.master.future.TaskFuture;
+import org.apache.dolphinscheduler.service.process.ProcessService;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+/**
+ *  task response processor
+ */
+public class TaskResponseProcessor implements NettyRequestProcessor {
+
+    private final Logger logger = 
LoggerFactory.getLogger(TaskResponseProcessor.class);
+
+    /**
+     * process service
+     */
+    private final ProcessService processService;
+
+    public TaskResponseProcessor(ProcessService processService){
+        this.processService = processService;
+    }
+
+    @Override
+    public void process(Channel channel, Command command) {
+        Preconditions.checkArgument(CommandType.EXECUTE_TASK_RESPONSE == 
command.getType(), String.format("invalid command type : %s", 
command.getType()));
+        logger.info("received command : {}", command);
+        ExecuteTaskResponseCommand responseCommand = 
FastJsonSerializer.deserialize(command.getBody(), 
ExecuteTaskResponseCommand.class);
+        
processService.changeTaskState(ExecutionStatus.of(responseCommand.getStatus()), 
responseCommand.getEndTime(), responseCommand.getTaskInstanceId());
+        TaskFuture.notify(command);
+    }
+
+
+}
diff --git 
a/dolphinscheduler-server/src/main/java/org/apache/dolphinscheduler/server/master/runner/MasterBaseTaskExecThread.java
 
b/dolphinscheduler-server/src/main/java/org/apache/dolphinscheduler/server/master/runner/MasterBaseTaskExecThread.java
index 8104c42..a261b34 100644
--- 
a/dolphinscheduler-server/src/main/java/org/apache/dolphinscheduler/server/master/runner/MasterBaseTaskExecThread.java
+++ 
b/dolphinscheduler-server/src/main/java/org/apache/dolphinscheduler/server/master/runner/MasterBaseTaskExecThread.java
@@ -16,14 +16,15 @@
  */
 package org.apache.dolphinscheduler.server.master.runner;
 
-import com.alibaba.fastjson.JSONObject;
+import org.apache.dolphinscheduler.common.enums.ExecutionStatus;
 import org.apache.dolphinscheduler.dao.AlertDao;
 import org.apache.dolphinscheduler.dao.entity.ProcessInstance;
 import org.apache.dolphinscheduler.dao.entity.TaskInstance;
 import org.apache.dolphinscheduler.dao.utils.BeanContext;
 import org.apache.dolphinscheduler.remote.NettyRemotingClient;
-import org.apache.dolphinscheduler.remote.command.*;
-import 
org.apache.dolphinscheduler.remote.command.log.RollViewLogRequestCommand;
+import org.apache.dolphinscheduler.remote.command.Command;
+import org.apache.dolphinscheduler.remote.command.ExecuteTaskAckCommand;
+import org.apache.dolphinscheduler.remote.command.ExecuteTaskRequestCommand;
 import org.apache.dolphinscheduler.remote.config.NettyClientConfig;
 import org.apache.dolphinscheduler.remote.exceptions.RemotingException;
 import org.apache.dolphinscheduler.remote.utils.Address;
@@ -121,31 +122,20 @@ public class MasterBaseTaskExecThread implements 
Callable<Boolean> {
 
 
     // TODO send task to worker
-    public void sendToWorker(String taskInstanceJson){
+    public void sendToWorker(TaskInstance taskInstance){
         final Address address = new Address("127.0.0.1", 12346);
-        ExecuteTaskRequestCommand taskRequestCommand = new 
ExecuteTaskRequestCommand(taskInstanceJson);
+        ExecuteTaskRequestCommand taskRequestCommand = new 
ExecuteTaskRequestCommand(FastJsonSerializer.serializeToString(taskInstance));
         try {
-            Command responseCommand = nettyRemotingClient.sendSync(address,
-                    taskRequestCommand.convert2Command(), Integer.MAX_VALUE);
-
-            logger.info("receive command : {}", responseCommand);
-
-            final CommandType commandType = responseCommand.getType();
-            switch (commandType){
-                case EXECUTE_TASK_ACK:
-                    ExecuteTaskAckCommand taskAckCommand = 
FastJsonSerializer.deserialize(
-                            responseCommand.getBody(), 
ExecuteTaskAckCommand.class);
-                    logger.info("taskAckCommand : {}",taskAckCommand);
-                    break;
-                case EXECUTE_TASK_RESPONSE:
-                    ExecuteTaskResponseCommand taskResponseCommand = 
FastJsonSerializer.deserialize(
-                            responseCommand.getBody(), 
ExecuteTaskResponseCommand.class);
-                    logger.info("taskResponseCommand : 
{}",taskResponseCommand);
-                    break;
-                default:
-                    throw new IllegalArgumentException("unknown commandType");
-            }
-            logger.info("response result : {}",responseCommand);
+            Command responseCommand = nettyRemotingClient.sendSync(address, 
taskRequestCommand.convert2Command(), Integer.MAX_VALUE);
+            ExecuteTaskAckCommand taskAckCommand = 
FastJsonSerializer.deserialize(responseCommand.getBody(), 
ExecuteTaskAckCommand.class);
+            logger.info("taskAckCommand : {}",taskAckCommand);
+            
processService.changeTaskState(ExecutionStatus.of(taskAckCommand.getStatus()),
+                    taskAckCommand.getStartTime(),
+                    taskAckCommand.getHost(),
+                    taskAckCommand.getExecutePath(),
+                    taskAckCommand.getLogPath(),
+                    taskInstance.getId());
+
         } catch (InterruptedException | RemotingException ex) {
             logger.error(String.format("send command to : %s error", address), 
ex);
         }
@@ -174,7 +164,7 @@ public class MasterBaseTaskExecThread implements 
Callable<Boolean> {
                 }
                 if(submitDB && !submitQueue){
                     // submit task to queue
-                    sendToWorker(JSONObject.toJSONString(task));
+                    sendToWorker(task);
                     submitQueue = true;
                 }
                 if(submitDB && submitQueue){
diff --git 
a/dolphinscheduler-server/src/main/java/org/apache/dolphinscheduler/server/worker/WorkerServer.java
 
b/dolphinscheduler-server/src/main/java/org/apache/dolphinscheduler/server/worker/WorkerServer.java
index 340a11a..2625d68 100644
--- 
a/dolphinscheduler-server/src/main/java/org/apache/dolphinscheduler/server/worker/WorkerServer.java
+++ 
b/dolphinscheduler-server/src/main/java/org/apache/dolphinscheduler/server/worker/WorkerServer.java
@@ -171,9 +171,8 @@ public class WorkerServer implements IStoppable {
         
this.nettyRemotingServer.registerProcessor(CommandType.EXECUTE_TASK_REQUEST, 
new WorkerRequestProcessor(processService));
         this.nettyRemotingServer.start();
 
-        // TODO ,because there is a heartbeat, you can reuse the heartbeat 
logic,worker registry
-//        this.workerRegistry = new WorkerRegistry(zookeeperRegistryCenter, 
serverConfig.getListenPort());
-//        this.workerRegistry.registry();
+        this.workerRegistry = new WorkerRegistry(zookeeperRegistryCenter, 
serverConfig.getListenPort());
+        this.workerRegistry.registry();
 
         this.zkWorkerClient.init();
 
diff --git 
a/dolphinscheduler-server/src/main/java/org/apache/dolphinscheduler/server/worker/runner/TaskScheduleThread.java
 
b/dolphinscheduler-server/src/main/java/org/apache/dolphinscheduler/server/worker/runner/TaskScheduleThread.java
index bb233b5..349e762 100644
--- 
a/dolphinscheduler-server/src/main/java/org/apache/dolphinscheduler/server/worker/runner/TaskScheduleThread.java
+++ 
b/dolphinscheduler-server/src/main/java/org/apache/dolphinscheduler/server/worker/runner/TaskScheduleThread.java
@@ -96,9 +96,6 @@ public class TaskScheduleThread implements Runnable {
     @Override
     public void run() {
 
-        // TODO Need to be removed and kept temporarily update task instance 
state
-        updateTaskState(taskInstance.getTaskType());
-
         ExecuteTaskResponseCommand responseCommand = new 
ExecuteTaskResponseCommand(taskInstance.getId());
 
         try {
@@ -167,31 +164,14 @@ public class TaskScheduleThread implements Runnable {
             responseCommand.setStatus(task.getExitStatus().getCode());
             responseCommand.setEndTime(new Date());
             logger.info("task instance id : {},task final status : {}", 
taskInstance.getId(), task.getExitStatus());
-
         }catch (Exception e){
             logger.error("task scheduler failure", e);
             kill();
-
-            //TODO Need to be removed and kept temporarily update task 
instance state
-            processService.changeTaskState(ExecutionStatus.FAILURE,
-                    new Date(),
-                    taskInstance.getId());
-
             responseCommand.setStatus(ExecutionStatus.FAILURE.getCode());
             responseCommand.setEndTime(new Date());
-
         } finally {
             taskInstanceCallbackService.sendResult(taskInstance.getId(), 
responseCommand);
         }
-
-        logger.info("task instance id : {},task final status : {}",
-                taskInstance.getId(),
-                task.getExitStatus());
-        // update task instance state
-        processService.changeTaskState(task.getExitStatus(),
-                new Date(),
-                taskInstance.getId());
-
     }
 
     /**

Reply via email to