caishunfeng commented on code in PR #10886:
URL: https://github.com/apache/dolphinscheduler/pull/10886#discussion_r918524582


##########
dolphinscheduler-master/src/main/java/org/apache/dolphinscheduler/server/master/processor/TaskExecuteRunningProcessor.java:
##########
@@ -54,10 +54,10 @@ public class TaskExecuteRunningProcessor implements 
NettyRequestProcessor {
     @Override
     public void process(Channel channel, Command command) {
         Preconditions.checkArgument(CommandType.TASK_EXECUTE_RUNNING == 
command.getType(), String.format("invalid command type : %s", 
command.getType()));
-        TaskExecuteRunningCommand taskExecuteRunningCommand = 
JSONUtils.parseObject(command.getBody(), TaskExecuteRunningCommand.class);
-        logger.info("taskExecuteRunningCommand: {}", 
taskExecuteRunningCommand);
+        TaskExecuteRunningCommand taskExecuteRunningMessage = 
JSONUtils.parseObject(command.getBody(), TaskExecuteRunningCommand.class);

Review Comment:
   nip: revert variable name 



##########
dolphinscheduler-master/src/test/java/org/apache/dolphinscheduler/server/master/processor/TaskAckProcessorTest.java:
##########
@@ -44,7 +44,7 @@ public class TaskAckProcessorTest {
     private TaskExecuteRunningProcessor taskExecuteRunningProcessor;
     private TaskEventService taskEventService;
     private ProcessService processService;
-    private TaskExecuteRunningCommand taskExecuteRunningCommand;
+    private TaskExecuteRunningCommand taskExecuteRunningMessage;

Review Comment:
   same here



##########
dolphinscheduler-worker/src/main/java/org/apache/dolphinscheduler/server/worker/processor/TaskCallbackService.java:
##########
@@ -76,201 +59,201 @@ public TaskCallbackService() {
         final NettyClientConfig clientConfig = new NettyClientConfig();
         this.nettyRemotingClient = new NettyRemotingClient(clientConfig);
         
this.nettyRemotingClient.registerProcessor(CommandType.TASK_EXECUTE_RUNNING_ACK,
 taskExecuteRunningProcessor);
-        
this.nettyRemotingClient.registerProcessor(CommandType.TASK_EXECUTE_RESPONSE_ACK,
 taskExecuteResponseAckProcessor);
-    }
-
-    /**
-     * add callback channel
-     *
-     * @param taskInstanceId taskInstanceId
-     * @param channel channel
-     */
-    public void addRemoteChannel(int taskInstanceId, NettyRemoteChannel 
channel) {
-        REMOTE_CHANNELS.put(taskInstanceId, channel);
-    }
-
-    /**
-     * change remote channel
-     */
-    public void changeRemoteChannel(int taskInstanceId, NettyRemoteChannel 
channel) {
-        REMOTE_CHANNELS.put(taskInstanceId, channel);
-    }
-
-    /**
-     * get callback channel
-     *
-     * @param taskInstanceId taskInstanceId
-     * @return callback channel
-     */
-    private Optional<NettyRemoteChannel> getRemoteChannel(int taskInstanceId) {
-        Channel newChannel;
-        NettyRemoteChannel nettyRemoteChannel = 
REMOTE_CHANNELS.get(taskInstanceId);
-        if (nettyRemoteChannel != null) {
-            if (nettyRemoteChannel.isActive()) {
-                return Optional.of(nettyRemoteChannel);
-            }
-            newChannel = 
nettyRemotingClient.getChannel(nettyRemoteChannel.getHost());
-            if (newChannel != null) {
-                return Optional.of(getRemoteChannel(newChannel, 
nettyRemoteChannel.getOpaque(), taskInstanceId));
-            }
-        }
-        return Optional.empty();
-    }
-
-    public long pause(int ntries) {
-        return SLEEP_TIME_MILLIS * RETRY_BACKOFF[ntries % 
RETRY_BACKOFF.length];
-    }
-
-    private NettyRemoteChannel getRemoteChannel(Channel newChannel, long 
opaque, int taskInstanceId) {
-        NettyRemoteChannel remoteChannel = new NettyRemoteChannel(newChannel, 
opaque);
-        addRemoteChannel(taskInstanceId, remoteChannel);
-        return remoteChannel;
-    }
-
-    private NettyRemoteChannel getRemoteChannel(Channel newChannel, int 
taskInstanceId) {
-        NettyRemoteChannel remoteChannel = new NettyRemoteChannel(newChannel);
-        addRemoteChannel(taskInstanceId, remoteChannel);
-        return remoteChannel;
+        
this.nettyRemotingClient.registerProcessor(CommandType.TASK_EXECUTE_RESULT_ACK, 
taskExecuteResultAckProcessor);
     }
 
-    /**
-     * remove callback channels
-     *
-     * @param taskInstanceId taskInstanceId
-     */
-    public static void remove(int taskInstanceId) {
-        REMOTE_CHANNELS.remove(taskInstanceId);
-    }
-
-    /**
-     * send result
-     *
-     * @param taskInstanceId taskInstanceId
-     * @param command command
-     */
-    public void send(int taskInstanceId, Command command) {
-        Optional<NettyRemoteChannel> nettyRemoteChannel = 
getRemoteChannel(taskInstanceId);
-        if (nettyRemoteChannel.isPresent()) {
-            nettyRemoteChannel.get().writeAndFlush(command).addListener(new 
ChannelFutureListener() {
-                @Override
-                public void operationComplete(ChannelFuture future) {
-                    if (!future.isSuccess()) {
-                        logger.error("Send callback command error, 
taskInstanceId: {}, command: {}", taskInstanceId, command);
-                    }
-                }
-            });
-        } else {
-            logger.warn("Remote channel of taskInstanceId is null: {}, cannot 
send command: {}", taskInstanceId, command);
-        }
-    }
-
-    /**
-     * build task execute running command
-     *
-     * @param taskExecutionContext taskExecutionContext
-     * @return TaskExecuteAckCommand
-     */
-    private TaskExecuteRunningCommand 
buildTaskExecuteRunningCommand(TaskExecutionContext taskExecutionContext) {
-        TaskExecuteRunningCommand command = new TaskExecuteRunningCommand();
-        command.setTaskInstanceId(taskExecutionContext.getTaskInstanceId());
-        
command.setProcessInstanceId(taskExecutionContext.getProcessInstanceId());
-        
command.setStatus(taskExecutionContext.getCurrentExecutionStatus().getCode());
-        command.setLogPath(taskExecutionContext.getLogPath());
-        command.setHost(taskExecutionContext.getHost());
-        command.setStartTime(taskExecutionContext.getStartTime());
-        command.setExecutePath(taskExecutionContext.getExecutePath());
-        return command;
-    }
-
-    /**
-     * build task execute response command
-     *
-     * @param taskExecutionContext taskExecutionContext
-     * @return TaskExecuteResponseCommand
-     */
-    private TaskExecuteResponseCommand 
buildTaskExecuteResponseCommand(TaskExecutionContext taskExecutionContext) {
-        TaskExecuteResponseCommand command = new TaskExecuteResponseCommand();
-        
command.setProcessInstanceId(taskExecutionContext.getProcessInstanceId());
-        command.setTaskInstanceId(taskExecutionContext.getTaskInstanceId());
-        
command.setStatus(taskExecutionContext.getCurrentExecutionStatus().getCode());
-        command.setLogPath(taskExecutionContext.getLogPath());
-        command.setExecutePath(taskExecutionContext.getExecutePath());
-        command.setAppIds(taskExecutionContext.getAppIds());
-        command.setProcessId(taskExecutionContext.getProcessId());
-        command.setHost(taskExecutionContext.getHost());
-        command.setStartTime(taskExecutionContext.getStartTime());
-        command.setEndTime(taskExecutionContext.getEndTime());
-        command.setVarPool(taskExecutionContext.getVarPool());
-        command.setExecutePath(taskExecutionContext.getExecutePath());
-        return command;
-    }
-
-    /**
-     * build TaskKillResponseCommand
-     *
-     * @param taskExecutionContext taskExecutionContext
-     * @return build TaskKillResponseCommand
-     */
-    private TaskKillResponseCommand 
buildKillTaskResponseCommand(TaskExecutionContext taskExecutionContext) {
-        TaskKillResponseCommand taskKillResponseCommand = new 
TaskKillResponseCommand();
-        
taskKillResponseCommand.setStatus(taskExecutionContext.getCurrentExecutionStatus().getCode());
-        
taskKillResponseCommand.setAppIds(Arrays.asList(taskExecutionContext.getAppIds().split(TaskConstants.COMMA)));
-        
taskKillResponseCommand.setTaskInstanceId(taskExecutionContext.getTaskInstanceId());
-        taskKillResponseCommand.setHost(taskExecutionContext.getHost());
-        
taskKillResponseCommand.setProcessId(taskExecutionContext.getProcessId());
-        return taskKillResponseCommand;
-    }
-
-    private TaskRecallCommand buildRecallCommand(TaskExecutionContext 
taskExecutionContext) {
-        TaskRecallCommand taskRecallCommand = new TaskRecallCommand();
-        
taskRecallCommand.setTaskInstanceId(taskExecutionContext.getTaskInstanceId());
-        
taskRecallCommand.setProcessInstanceId(taskExecutionContext.getProcessInstanceId());
-        taskRecallCommand.setHost(taskExecutionContext.getHost());
-        return taskRecallCommand;
-    }
-
-    /**
-     * send task execute running command
-     * todo unified callback command
-     */
-    public void sendTaskExecuteRunningCommand(TaskExecutionContext 
taskExecutionContext) {
-        TaskExecuteRunningCommand command = 
buildTaskExecuteRunningCommand(taskExecutionContext);
-        // add response cache
-        ResponseCache.get().cache(taskExecutionContext.getTaskInstanceId(), 
command.convert2Command(), TaskEventType.RUNNING);
-        send(taskExecutionContext.getTaskInstanceId(), 
command.convert2Command());
-    }
-
-    /**
-     * send task execute delay command
-     * todo unified callback command
-     */
-    public void sendTaskExecuteDelayCommand(TaskExecutionContext 
taskExecutionContext) {
-        TaskExecuteRunningCommand command = 
buildTaskExecuteRunningCommand(taskExecutionContext);
-        send(taskExecutionContext.getTaskInstanceId(), 
command.convert2Command());
-    }
-
-    /**
-     * send task execute response command
-     * todo unified callback command
-     */
-    public void sendTaskExecuteResponseCommand(TaskExecutionContext 
taskExecutionContext) {
-        TaskExecuteResponseCommand command = 
buildTaskExecuteResponseCommand(taskExecutionContext);
-        // add response cache
-        ResponseCache.get().cache(taskExecutionContext.getTaskInstanceId(), 
command.convert2Command(), TaskEventType.RESULT);
-        send(taskExecutionContext.getTaskInstanceId(), 
command.convert2Command());
-    }
-
-    public void sendTaskKillResponseCommand(TaskExecutionContext 
taskExecutionContext) {
-        TaskKillResponseCommand taskKillResponseCommand = 
buildKillTaskResponseCommand(taskExecutionContext);
-        send(taskExecutionContext.getTaskInstanceId(), 
taskKillResponseCommand.convert2Command());
-    }
-
-    /**
-     * send task execute response command
-     */
-    public void sendRecallCommand(TaskExecutionContext taskExecutionContext) {
-        TaskRecallCommand taskRecallCommand = 
buildRecallCommand(taskExecutionContext);
-        ResponseCache.get().cache(taskExecutionContext.getTaskInstanceId(), 
taskRecallCommand.convert2Command(), TaskEventType.WORKER_REJECT);
-        send(taskExecutionContext.getTaskInstanceId(), 
taskRecallCommand.convert2Command());
-    }
+    //    /**

Review Comment:
   maybe it should be removed.



##########
dolphinscheduler-worker/src/main/java/org/apache/dolphinscheduler/server/worker/message/MessageSender.java:
##########
@@ -0,0 +1,43 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.dolphinscheduler.server.worker.message;
+
+import org.apache.dolphinscheduler.plugin.task.api.TaskExecutionContext;
+import org.apache.dolphinscheduler.remote.command.BaseCommand;
+import org.apache.dolphinscheduler.remote.command.CommandType;
+import org.apache.dolphinscheduler.remote.exceptions.RemotingException;
+
+public interface MessageSender<T extends BaseCommand> {

Review Comment:
   :+1: 



##########
dolphinscheduler-worker/src/main/java/org/apache/dolphinscheduler/server/worker/rpc/WorkerMessageSender.java:
##########
@@ -0,0 +1,92 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.dolphinscheduler.server.worker.rpc;
+
+import org.apache.dolphinscheduler.plugin.task.api.TaskExecutionContext;
+import org.apache.dolphinscheduler.remote.command.BaseCommand;
+import org.apache.dolphinscheduler.remote.command.CommandType;
+import org.apache.dolphinscheduler.remote.exceptions.RemotingException;
+import org.apache.dolphinscheduler.server.worker.message.MessageRetryRunner;
+import org.apache.dolphinscheduler.server.worker.message.MessageSender;
+
+import java.util.HashMap;
+import java.util.Map;
+
+import javax.annotation.PostConstruct;
+
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+import org.springframework.beans.factory.annotation.Autowired;
+import org.springframework.context.ApplicationContext;
+import org.springframework.stereotype.Component;
+
+import lombok.NonNull;
+
+@Component
+public class WorkerMessageSender {
+
+    private final Logger logger = 
LoggerFactory.getLogger(WorkerMessageSender.class);
+
+    @Autowired
+    private MessageRetryRunner messageRetryRunner;
+
+    @Autowired
+    private ApplicationContext applicationContext;
+
+    private Map<CommandType, MessageSender> messageSenderMap = new HashMap<>();
+
+    @PostConstruct
+    public void init() {
+        Map<String, MessageSender> messageSenders = 
applicationContext.getBeansOfType(MessageSender.class);
+        messageSenders.values().forEach(messageSender -> 
messageSenderMap.put(messageSender.getMessageType(),
+                                                                              
messageSender));
+    }
+
+    // todo: use message rather than context
+    public void sendMessageNeedAck(@NonNull TaskExecutionContext 
taskExecutionContext,

Review Comment:
   ```suggestion
       public void sendMessageWithRetry(@NonNull TaskExecutionContext 
taskExecutionContext,
   ```



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: [email protected]

For queries about this service, please contact Infrastructure at:
[email protected]

Reply via email to