This is an automated email from the ASF dual-hosted git repository.
kirs pushed a commit to branch dev
in repository https://gitbox.apache.org/repos/asf/dolphinscheduler.git
The following commit(s) were added to refs/heads/dev by this push:
new 8850baf [DS-6640][WorkerServer] support process update host command
type (#6642)
8850baf is described below
commit 8850baff07cc5174a57a0560cbeeb1a3368a4dca
Author: wind <[email protected]>
AuthorDate: Sun Oct 31 21:10:51 2021 +0800
[DS-6640][WorkerServer] support process update host command type (#6642)
Co-authored-by: caishunfeng <[email protected]>
---
.../java/org/apache/dolphinscheduler/remote/command/CommandType.java | 2 +-
.../org/apache/dolphinscheduler/remote/command/HostUpdateCommand.java | 2 +-
.../dolphinscheduler/remote/command/HostUpdateResponseCommand.java | 2 +-
.../java/org/apache/dolphinscheduler/server/worker/WorkerServer.java | 2 ++
.../dolphinscheduler/server/worker/processor/HostUpdateProcessor.java | 2 +-
5 files changed, 6 insertions(+), 4 deletions(-)
diff --git
a/dolphinscheduler-remote/src/main/java/org/apache/dolphinscheduler/remote/command/CommandType.java
b/dolphinscheduler-remote/src/main/java/org/apache/dolphinscheduler/remote/command/CommandType.java
index 4301910..786d10c 100644
---
a/dolphinscheduler-remote/src/main/java/org/apache/dolphinscheduler/remote/command/CommandType.java
+++
b/dolphinscheduler-remote/src/main/java/org/apache/dolphinscheduler/remote/command/CommandType.java
@@ -126,7 +126,7 @@ public enum CommandType {
/**
* process host update
*/
- PROCESS_HOST_UPDATE_REQUST,
+ PROCESS_HOST_UPDATE_REQUEST,
/**
* process host update response
diff --git
a/dolphinscheduler-remote/src/main/java/org/apache/dolphinscheduler/remote/command/HostUpdateCommand.java
b/dolphinscheduler-remote/src/main/java/org/apache/dolphinscheduler/remote/command/HostUpdateCommand.java
index d70124b..4fc752e 100644
---
a/dolphinscheduler-remote/src/main/java/org/apache/dolphinscheduler/remote/command/HostUpdateCommand.java
+++
b/dolphinscheduler-remote/src/main/java/org/apache/dolphinscheduler/remote/command/HostUpdateCommand.java
@@ -56,7 +56,7 @@ public class HostUpdateCommand implements Serializable {
*/
public Command convert2Command() {
Command command = new Command();
- command.setType(CommandType.PROCESS_HOST_UPDATE_REQUST);
+ command.setType(CommandType.PROCESS_HOST_UPDATE_REQUEST);
byte[] body = JSONUtils.toJsonByteArray(this);
command.setBody(body);
return command;
diff --git
a/dolphinscheduler-remote/src/main/java/org/apache/dolphinscheduler/remote/command/HostUpdateResponseCommand.java
b/dolphinscheduler-remote/src/main/java/org/apache/dolphinscheduler/remote/command/HostUpdateResponseCommand.java
index ddf4fc2..b44856c 100644
---
a/dolphinscheduler-remote/src/main/java/org/apache/dolphinscheduler/remote/command/HostUpdateResponseCommand.java
+++
b/dolphinscheduler-remote/src/main/java/org/apache/dolphinscheduler/remote/command/HostUpdateResponseCommand.java
@@ -66,7 +66,7 @@ public class HostUpdateResponseCommand implements
Serializable {
*/
public Command convert2Command() {
Command command = new Command();
- command.setType(CommandType.PROCESS_HOST_UPDATE_REQUST);
+ command.setType(CommandType.PROCESS_HOST_UPDATE_REQUEST);
byte[] body = JSONUtils.toJsonByteArray(this);
command.setBody(body);
return command;
diff --git
a/dolphinscheduler-server/src/main/java/org/apache/dolphinscheduler/server/worker/WorkerServer.java
b/dolphinscheduler-server/src/main/java/org/apache/dolphinscheduler/server/worker/WorkerServer.java
index 7c03f22..50d2eab 100644
---
a/dolphinscheduler-server/src/main/java/org/apache/dolphinscheduler/server/worker/WorkerServer.java
+++
b/dolphinscheduler-server/src/main/java/org/apache/dolphinscheduler/server/worker/WorkerServer.java
@@ -28,6 +28,7 @@ import
org.apache.dolphinscheduler.server.worker.config.WorkerConfig;
import org.apache.dolphinscheduler.server.worker.plugin.TaskPluginManager;
import org.apache.dolphinscheduler.server.worker.processor.DBTaskAckProcessor;
import
org.apache.dolphinscheduler.server.worker.processor.DBTaskResponseProcessor;
+import org.apache.dolphinscheduler.server.worker.processor.HostUpdateProcessor;
import
org.apache.dolphinscheduler.server.worker.processor.TaskExecuteProcessor;
import org.apache.dolphinscheduler.server.worker.processor.TaskKillProcessor;
import org.apache.dolphinscheduler.server.worker.registry.WorkerRegistryClient;
@@ -140,6 +141,7 @@ public class WorkerServer implements IStoppable {
this.nettyRemotingServer.registerProcessor(CommandType.TASK_KILL_REQUEST, new
TaskKillProcessor());
this.nettyRemotingServer.registerProcessor(CommandType.DB_TASK_ACK,
new DBTaskAckProcessor());
this.nettyRemotingServer.registerProcessor(CommandType.DB_TASK_RESPONSE, new
DBTaskResponseProcessor());
+
this.nettyRemotingServer.registerProcessor(CommandType.PROCESS_HOST_UPDATE_REQUEST,
new HostUpdateProcessor());
this.nettyRemotingServer.start();
// worker registry
diff --git
a/dolphinscheduler-server/src/main/java/org/apache/dolphinscheduler/server/worker/processor/HostUpdateProcessor.java
b/dolphinscheduler-server/src/main/java/org/apache/dolphinscheduler/server/worker/processor/HostUpdateProcessor.java
index 5be3276..8928d50 100644
---
a/dolphinscheduler-server/src/main/java/org/apache/dolphinscheduler/server/worker/processor/HostUpdateProcessor.java
+++
b/dolphinscheduler-server/src/main/java/org/apache/dolphinscheduler/server/worker/processor/HostUpdateProcessor.java
@@ -51,7 +51,7 @@ public class HostUpdateProcessor implements
NettyRequestProcessor {
@Override
public void process(Channel channel, Command command) {
- Preconditions.checkArgument(CommandType.PROCESS_HOST_UPDATE_REQUST ==
command.getType(), String.format("invalid command type : %s",
command.getType()));
+ Preconditions.checkArgument(CommandType.PROCESS_HOST_UPDATE_REQUEST ==
command.getType(), String.format("invalid command type : %s",
command.getType()));
HostUpdateCommand updateCommand =
JSONUtils.parseObject(command.getBody(), HostUpdateCommand.class);
logger.info("received host update command : {}", updateCommand);
taskCallbackService.changeRemoteChannel(updateCommand.getTaskInstanceId(), new
NettyRemoteChannel(channel, command.getOpaque()));