This is an automated email from the ASF dual-hosted git repository.
journey pushed a commit to branch refactor-worker
in repository https://gitbox.apache.org/repos/asf/incubator-dolphinscheduler.git
The following commit(s) were added to refs/heads/refactor-worker by this push:
new 158322b add TaskResponseProcessor (#1983)
158322b is described below
commit 158322bd41d925eaab0c5f4ce7b06d6fa6fae2b9
Author: Tboy <[email protected]>
AuthorDate: Fri Feb 21 09:01:42 2020 +0800
add TaskResponseProcessor (#1983)
---
.../common/enums/ExecutionStatus.java | 9 ++
.../server/master/MasterServer.java | 15 ++
.../server/master/future/TaskFuture.java | 171 +++++++++++++++++++++
.../master/processor/TaskResponseProcessor.java | 59 +++++++
.../master/runner/MasterBaseTaskExecThread.java | 44 ++----
.../server/worker/WorkerServer.java | 5 +-
.../server/worker/runner/TaskScheduleThread.java | 20 ---
7 files changed, 273 insertions(+), 50 deletions(-)
diff --git
a/dolphinscheduler-common/src/main/java/org/apache/dolphinscheduler/common/enums/ExecutionStatus.java
b/dolphinscheduler-common/src/main/java/org/apache/dolphinscheduler/common/enums/ExecutionStatus.java
index 1270252..1c336c8 100644
---
a/dolphinscheduler-common/src/main/java/org/apache/dolphinscheduler/common/enums/ExecutionStatus.java
+++
b/dolphinscheduler-common/src/main/java/org/apache/dolphinscheduler/common/enums/ExecutionStatus.java
@@ -128,4 +128,13 @@ public enum ExecutionStatus {
public String getDescp() {
return descp;
}
+
+ public static ExecutionStatus of(int status){
+ for(ExecutionStatus es : values()){
+ if(es.getCode() == status){
+ return es;
+ }
+ }
+ throw new IllegalArgumentException("invalid status : " + status);
+ }
}
diff --git
a/dolphinscheduler-server/src/main/java/org/apache/dolphinscheduler/server/master/MasterServer.java
b/dolphinscheduler-server/src/main/java/org/apache/dolphinscheduler/server/master/MasterServer.java
index 6b5063c..0a153e7 100644
---
a/dolphinscheduler-server/src/main/java/org/apache/dolphinscheduler/server/master/MasterServer.java
+++
b/dolphinscheduler-server/src/main/java/org/apache/dolphinscheduler/server/master/MasterServer.java
@@ -23,7 +23,11 @@ import
org.apache.dolphinscheduler.common.thread.ThreadPoolExecutors;
import org.apache.dolphinscheduler.common.thread.ThreadUtils;
import org.apache.dolphinscheduler.common.utils.OSUtils;
import org.apache.dolphinscheduler.common.utils.StringUtils;
+import org.apache.dolphinscheduler.remote.NettyRemotingServer;
+import org.apache.dolphinscheduler.remote.command.CommandType;
+import org.apache.dolphinscheduler.remote.config.NettyServerConfig;
import org.apache.dolphinscheduler.server.master.config.MasterConfig;
+import
org.apache.dolphinscheduler.server.master.processor.TaskResponseProcessor;
import org.apache.dolphinscheduler.server.master.runner.MasterSchedulerThread;
import org.apache.dolphinscheduler.server.zk.ZKMasterClient;
import org.apache.dolphinscheduler.service.bean.SpringApplicationContext;
@@ -90,6 +94,8 @@ public class MasterServer implements IStoppable {
@Autowired
private SpringApplicationContext springApplicationContext;
+ private NettyRemotingServer nettyRemotingServer;
+
/**
* master server startup
@@ -108,6 +114,15 @@ public class MasterServer implements IStoppable {
*/
@PostConstruct
public void run(){
+
+ //
+ //init remoting server
+ NettyServerConfig serverConfig = new NettyServerConfig();
+ this.nettyRemotingServer = new NettyRemotingServer(serverConfig);
+
this.nettyRemotingServer.registerProcessor(CommandType.EXECUTE_TASK_RESPONSE,
new TaskResponseProcessor(processService));
+ this.nettyRemotingServer.start();
+
+ //
zkMasterClient.init();
masterSchedulerService =
ThreadUtils.newDaemonSingleThreadExecutor("Master-Scheduler-Thread");
diff --git
a/dolphinscheduler-server/src/main/java/org/apache/dolphinscheduler/server/master/future/TaskFuture.java
b/dolphinscheduler-server/src/main/java/org/apache/dolphinscheduler/server/master/future/TaskFuture.java
new file mode 100644
index 0000000..32fb55f
--- /dev/null
+++
b/dolphinscheduler-server/src/main/java/org/apache/dolphinscheduler/server/master/future/TaskFuture.java
@@ -0,0 +1,171 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.dolphinscheduler.server.master.future;
+
+
+import org.apache.dolphinscheduler.remote.command.Command;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import java.util.Iterator;
+import java.util.LinkedList;
+import java.util.List;
+import java.util.Map;
+import java.util.concurrent.ConcurrentHashMap;
+import java.util.concurrent.CountDownLatch;
+import java.util.concurrent.TimeUnit;
+
+public class TaskFuture {
+
+ private final static Logger LOGGER =
LoggerFactory.getLogger(TaskFuture.class);
+
+ private final static ConcurrentHashMap<Long,TaskFuture> FUTURE_TABLE = new
ConcurrentHashMap<>(256);
+
+ /**
+ * request unique identification
+ */
+ private final long opaque;
+
+ /**
+ * timeout
+ */
+ private final long timeoutMillis;
+
+ private final CountDownLatch latch = new CountDownLatch(1);
+
+ private final long beginTimestamp = System.currentTimeMillis();
+
+ /**
+ * response command
+ */
+ private volatile Command responseCommand;
+
+ private volatile boolean sendOk = true;
+
+ private volatile Throwable cause;
+
+ public TaskFuture(long opaque, long timeoutMillis) {
+ this.opaque = opaque;
+ this.timeoutMillis = timeoutMillis;
+ FUTURE_TABLE.put(opaque, this);
+ }
+
+ /**
+ * wait for response
+ * @return command
+ * @throws InterruptedException
+ */
+ public Command waitResponse() throws InterruptedException {
+ this.latch.await(timeoutMillis, TimeUnit.MILLISECONDS);
+ return this.responseCommand;
+ }
+
+ /**
+ * put response
+ *
+ * @param responseCommand responseCommand
+ */
+ public void putResponse(final Command responseCommand) {
+ this.responseCommand = responseCommand;
+ this.latch.countDown();
+ FUTURE_TABLE.remove(opaque);
+ }
+
+ /**
+ * whether timeout
+ * @return timeout
+ */
+ public boolean isTimeout() {
+ long diff = System.currentTimeMillis() - this.beginTimestamp;
+ return diff > this.timeoutMillis;
+ }
+
+ public static void notify(final Command responseCommand){
+ TaskFuture taskFuture =
FUTURE_TABLE.remove(responseCommand.getOpaque());
+ if(taskFuture != null){
+ taskFuture.putResponse(responseCommand);
+ }
+ }
+
+
+ public boolean isSendOK() {
+ return sendOk;
+ }
+
+ public void setSendOk(boolean sendOk) {
+ this.sendOk = sendOk;
+ }
+
+ public void setCause(Throwable cause) {
+ this.cause = cause;
+ }
+
+ public Throwable getCause() {
+ return cause;
+ }
+
+ public long getOpaque() {
+ return opaque;
+ }
+
+ public long getTimeoutMillis() {
+ return timeoutMillis;
+ }
+
+ public long getBeginTimestamp() {
+ return beginTimestamp;
+ }
+
+ public Command getResponseCommand() {
+ return responseCommand;
+ }
+
+ public void setResponseCommand(Command responseCommand) {
+ this.responseCommand = responseCommand;
+ }
+
+
+ @Override
+ public String toString() {
+ return "ResponseFuture{" +
+ "opaque=" + opaque +
+ ", timeoutMillis=" + timeoutMillis +
+ ", latch=" + latch +
+ ", beginTimestamp=" + beginTimestamp +
+ ", responseCommand=" + responseCommand +
+ ", sendOk=" + sendOk +
+ ", cause=" + cause +
+ '}';
+ }
+
+ /**
+ * scan future table
+ */
+ public static void scanFutureTable(){
+ final List<TaskFuture> futureList = new LinkedList<>();
+ Iterator<Map.Entry<Long, TaskFuture>> it =
FUTURE_TABLE.entrySet().iterator();
+ while (it.hasNext()) {
+ Map.Entry<Long, TaskFuture> next = it.next();
+ TaskFuture future = next.getValue();
+ if ((future.getBeginTimestamp() + future.getTimeoutMillis() +
1000) <= System.currentTimeMillis()) {
+ futureList.add(future);
+ it.remove();
+ LOGGER.warn("remove timeout request : {}", future);
+ }
+ }
+ }
+}
diff --git
a/dolphinscheduler-server/src/main/java/org/apache/dolphinscheduler/server/master/processor/TaskResponseProcessor.java
b/dolphinscheduler-server/src/main/java/org/apache/dolphinscheduler/server/master/processor/TaskResponseProcessor.java
new file mode 100644
index 0000000..0dd45f0
--- /dev/null
+++
b/dolphinscheduler-server/src/main/java/org/apache/dolphinscheduler/server/master/processor/TaskResponseProcessor.java
@@ -0,0 +1,59 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.dolphinscheduler.server.master.processor;
+
+import io.netty.channel.Channel;
+import org.apache.dolphinscheduler.common.enums.ExecutionStatus;
+import org.apache.dolphinscheduler.common.utils.Preconditions;
+import org.apache.dolphinscheduler.remote.command.Command;
+import org.apache.dolphinscheduler.remote.command.CommandType;
+import org.apache.dolphinscheduler.remote.command.ExecuteTaskResponseCommand;
+import org.apache.dolphinscheduler.remote.processor.NettyRequestProcessor;
+import org.apache.dolphinscheduler.remote.utils.FastJsonSerializer;
+import org.apache.dolphinscheduler.server.master.future.TaskFuture;
+import org.apache.dolphinscheduler.service.process.ProcessService;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+/**
+ * task response processor
+ */
+public class TaskResponseProcessor implements NettyRequestProcessor {
+
+ private final Logger logger =
LoggerFactory.getLogger(TaskResponseProcessor.class);
+
+ /**
+ * process service
+ */
+ private final ProcessService processService;
+
+ public TaskResponseProcessor(ProcessService processService){
+ this.processService = processService;
+ }
+
+ @Override
+ public void process(Channel channel, Command command) {
+ Preconditions.checkArgument(CommandType.EXECUTE_TASK_RESPONSE ==
command.getType(), String.format("invalid command type : %s",
command.getType()));
+ logger.info("received command : {}", command);
+ ExecuteTaskResponseCommand responseCommand =
FastJsonSerializer.deserialize(command.getBody(),
ExecuteTaskResponseCommand.class);
+
processService.changeTaskState(ExecutionStatus.of(responseCommand.getStatus()),
responseCommand.getEndTime(), responseCommand.getTaskInstanceId());
+ TaskFuture.notify(command);
+ }
+
+
+}
diff --git
a/dolphinscheduler-server/src/main/java/org/apache/dolphinscheduler/server/master/runner/MasterBaseTaskExecThread.java
b/dolphinscheduler-server/src/main/java/org/apache/dolphinscheduler/server/master/runner/MasterBaseTaskExecThread.java
index 8104c42..a261b34 100644
---
a/dolphinscheduler-server/src/main/java/org/apache/dolphinscheduler/server/master/runner/MasterBaseTaskExecThread.java
+++
b/dolphinscheduler-server/src/main/java/org/apache/dolphinscheduler/server/master/runner/MasterBaseTaskExecThread.java
@@ -16,14 +16,15 @@
*/
package org.apache.dolphinscheduler.server.master.runner;
-import com.alibaba.fastjson.JSONObject;
+import org.apache.dolphinscheduler.common.enums.ExecutionStatus;
import org.apache.dolphinscheduler.dao.AlertDao;
import org.apache.dolphinscheduler.dao.entity.ProcessInstance;
import org.apache.dolphinscheduler.dao.entity.TaskInstance;
import org.apache.dolphinscheduler.dao.utils.BeanContext;
import org.apache.dolphinscheduler.remote.NettyRemotingClient;
-import org.apache.dolphinscheduler.remote.command.*;
-import
org.apache.dolphinscheduler.remote.command.log.RollViewLogRequestCommand;
+import org.apache.dolphinscheduler.remote.command.Command;
+import org.apache.dolphinscheduler.remote.command.ExecuteTaskAckCommand;
+import org.apache.dolphinscheduler.remote.command.ExecuteTaskRequestCommand;
import org.apache.dolphinscheduler.remote.config.NettyClientConfig;
import org.apache.dolphinscheduler.remote.exceptions.RemotingException;
import org.apache.dolphinscheduler.remote.utils.Address;
@@ -121,31 +122,20 @@ public class MasterBaseTaskExecThread implements
Callable<Boolean> {
// TODO send task to worker
- public void sendToWorker(String taskInstanceJson){
+ public void sendToWorker(TaskInstance taskInstance){
final Address address = new Address("127.0.0.1", 12346);
- ExecuteTaskRequestCommand taskRequestCommand = new
ExecuteTaskRequestCommand(taskInstanceJson);
+ ExecuteTaskRequestCommand taskRequestCommand = new
ExecuteTaskRequestCommand(FastJsonSerializer.serializeToString(taskInstance));
try {
- Command responseCommand = nettyRemotingClient.sendSync(address,
- taskRequestCommand.convert2Command(), Integer.MAX_VALUE);
-
- logger.info("receive command : {}", responseCommand);
-
- final CommandType commandType = responseCommand.getType();
- switch (commandType){
- case EXECUTE_TASK_ACK:
- ExecuteTaskAckCommand taskAckCommand =
FastJsonSerializer.deserialize(
- responseCommand.getBody(),
ExecuteTaskAckCommand.class);
- logger.info("taskAckCommand : {}",taskAckCommand);
- break;
- case EXECUTE_TASK_RESPONSE:
- ExecuteTaskResponseCommand taskResponseCommand =
FastJsonSerializer.deserialize(
- responseCommand.getBody(),
ExecuteTaskResponseCommand.class);
- logger.info("taskResponseCommand :
{}",taskResponseCommand);
- break;
- default:
- throw new IllegalArgumentException("unknown commandType");
- }
- logger.info("response result : {}",responseCommand);
+ Command responseCommand = nettyRemotingClient.sendSync(address,
taskRequestCommand.convert2Command(), Integer.MAX_VALUE);
+ ExecuteTaskAckCommand taskAckCommand =
FastJsonSerializer.deserialize(responseCommand.getBody(),
ExecuteTaskAckCommand.class);
+ logger.info("taskAckCommand : {}",taskAckCommand);
+
processService.changeTaskState(ExecutionStatus.of(taskAckCommand.getStatus()),
+ taskAckCommand.getStartTime(),
+ taskAckCommand.getHost(),
+ taskAckCommand.getExecutePath(),
+ taskAckCommand.getLogPath(),
+ taskInstance.getId());
+
} catch (InterruptedException | RemotingException ex) {
logger.error(String.format("send command to : %s error", address),
ex);
}
@@ -174,7 +164,7 @@ public class MasterBaseTaskExecThread implements
Callable<Boolean> {
}
if(submitDB && !submitQueue){
// submit task to queue
- sendToWorker(JSONObject.toJSONString(task));
+ sendToWorker(task);
submitQueue = true;
}
if(submitDB && submitQueue){
diff --git
a/dolphinscheduler-server/src/main/java/org/apache/dolphinscheduler/server/worker/WorkerServer.java
b/dolphinscheduler-server/src/main/java/org/apache/dolphinscheduler/server/worker/WorkerServer.java
index 340a11a..2625d68 100644
---
a/dolphinscheduler-server/src/main/java/org/apache/dolphinscheduler/server/worker/WorkerServer.java
+++
b/dolphinscheduler-server/src/main/java/org/apache/dolphinscheduler/server/worker/WorkerServer.java
@@ -171,9 +171,8 @@ public class WorkerServer implements IStoppable {
this.nettyRemotingServer.registerProcessor(CommandType.EXECUTE_TASK_REQUEST,
new WorkerRequestProcessor(processService));
this.nettyRemotingServer.start();
- // TODO ,because there is a heartbeat, you can reuse the heartbeat
logic,worker registry
-// this.workerRegistry = new WorkerRegistry(zookeeperRegistryCenter,
serverConfig.getListenPort());
-// this.workerRegistry.registry();
+ this.workerRegistry = new WorkerRegistry(zookeeperRegistryCenter,
serverConfig.getListenPort());
+ this.workerRegistry.registry();
this.zkWorkerClient.init();
diff --git
a/dolphinscheduler-server/src/main/java/org/apache/dolphinscheduler/server/worker/runner/TaskScheduleThread.java
b/dolphinscheduler-server/src/main/java/org/apache/dolphinscheduler/server/worker/runner/TaskScheduleThread.java
index bb233b5..349e762 100644
---
a/dolphinscheduler-server/src/main/java/org/apache/dolphinscheduler/server/worker/runner/TaskScheduleThread.java
+++
b/dolphinscheduler-server/src/main/java/org/apache/dolphinscheduler/server/worker/runner/TaskScheduleThread.java
@@ -96,9 +96,6 @@ public class TaskScheduleThread implements Runnable {
@Override
public void run() {
- // TODO Need to be removed and kept temporarily update task instance
state
- updateTaskState(taskInstance.getTaskType());
-
ExecuteTaskResponseCommand responseCommand = new
ExecuteTaskResponseCommand(taskInstance.getId());
try {
@@ -167,31 +164,14 @@ public class TaskScheduleThread implements Runnable {
responseCommand.setStatus(task.getExitStatus().getCode());
responseCommand.setEndTime(new Date());
logger.info("task instance id : {},task final status : {}",
taskInstance.getId(), task.getExitStatus());
-
}catch (Exception e){
logger.error("task scheduler failure", e);
kill();
-
- //TODO Need to be removed and kept temporarily update task
instance state
- processService.changeTaskState(ExecutionStatus.FAILURE,
- new Date(),
- taskInstance.getId());
-
responseCommand.setStatus(ExecutionStatus.FAILURE.getCode());
responseCommand.setEndTime(new Date());
-
} finally {
taskInstanceCallbackService.sendResult(taskInstance.getId(),
responseCommand);
}
-
- logger.info("task instance id : {},task final status : {}",
- taskInstance.getId(),
- task.getExitStatus());
- // update task instance state
- processService.changeTaskState(task.getExitStatus(),
- new Date(),
- taskInstance.getId());
-
}
/**