This is an automated email from the ASF dual-hosted git repository.

kirs pushed a commit to branch dev
in repository https://gitbox.apache.org/repos/asf/dolphinscheduler.git


The following commit(s) were added to refs/heads/dev by this push:
     new 939fa0c  fix TaskResponseProcessor process NullPointerException (#7318)
939fa0c is described below

commit 939fa0c3bac50b43a519dfd3f4a6b8db1be92e92
Author: Kerwin <[email protected]>
AuthorDate: Fri Dec 10 16:55:56 2021 +0800

    fix TaskResponseProcessor process NullPointerException (#7318)
---
 .../server/master/MasterServer.java                | 22 +++++++++++++++++-----
 .../dispatch/executor/NettyExecutorManager.java    |  5 ++++-
 .../server/master/processor/CacheProcessor.java    |  7 ++++---
 .../master/processor/StateEventProcessor.java      |  8 ++++----
 .../master/processor/TaskEventProcessor.java       |  8 ++++----
 .../master/processor/TaskResponseProcessor.java    |  2 ++
 .../master/processor/CacheProcessorTest.java       | 12 +++---------
 7 files changed, 38 insertions(+), 26 deletions(-)

diff --git 
a/dolphinscheduler-master/src/main/java/org/apache/dolphinscheduler/server/master/MasterServer.java
 
b/dolphinscheduler-master/src/main/java/org/apache/dolphinscheduler/server/master/MasterServer.java
index 9157ad7..41bc63a 100644
--- 
a/dolphinscheduler-master/src/main/java/org/apache/dolphinscheduler/server/master/MasterServer.java
+++ 
b/dolphinscheduler-master/src/main/java/org/apache/dolphinscheduler/server/master/MasterServer.java
@@ -77,6 +77,18 @@ public class MasterServer implements IStoppable {
     @Autowired
     private TaskAckProcessor taskAckProcessor;
 
+    @Autowired
+    private TaskResponseProcessor taskResponseProcessor;
+
+    @Autowired
+    private TaskEventProcessor taskEventProcessor;
+
+    @Autowired
+    private StateEventProcessor stateEventProcessor;
+
+    @Autowired
+    private CacheProcessor cacheProcessor;
+
     public static void main(String[] args) {
         Thread.currentThread().setName(Constants.THREAD_NAME_MASTER_SERVER);
         SpringApplication.run(MasterServer.class);
@@ -91,13 +103,13 @@ public class MasterServer implements IStoppable {
         NettyServerConfig serverConfig = new NettyServerConfig();
         serverConfig.setListenPort(masterConfig.getListenPort());
         this.nettyRemotingServer = new NettyRemotingServer(serverConfig);
-        
this.nettyRemotingServer.registerProcessor(CommandType.TASK_EXECUTE_RESPONSE, 
new TaskResponseProcessor());
+        
this.nettyRemotingServer.registerProcessor(CommandType.TASK_EXECUTE_RESPONSE, 
taskResponseProcessor);
         
this.nettyRemotingServer.registerProcessor(CommandType.TASK_EXECUTE_ACK, 
taskAckProcessor);
         
this.nettyRemotingServer.registerProcessor(CommandType.TASK_KILL_RESPONSE, new 
TaskKillResponseProcessor());
-        
this.nettyRemotingServer.registerProcessor(CommandType.STATE_EVENT_REQUEST, new 
StateEventProcessor());
-        
this.nettyRemotingServer.registerProcessor(CommandType.TASK_FORCE_STATE_EVENT_REQUEST,
 new TaskEventProcessor());
-        
this.nettyRemotingServer.registerProcessor(CommandType.TASK_WAKEUP_EVENT_REQUEST,
 new TaskEventProcessor());
-        this.nettyRemotingServer.registerProcessor(CommandType.CACHE_EXPIRE, 
new CacheProcessor());
+        
this.nettyRemotingServer.registerProcessor(CommandType.STATE_EVENT_REQUEST, 
stateEventProcessor);
+        
this.nettyRemotingServer.registerProcessor(CommandType.TASK_FORCE_STATE_EVENT_REQUEST,
 taskEventProcessor);
+        
this.nettyRemotingServer.registerProcessor(CommandType.TASK_WAKEUP_EVENT_REQUEST,
 taskEventProcessor);
+        this.nettyRemotingServer.registerProcessor(CommandType.CACHE_EXPIRE, 
cacheProcessor);
         this.nettyRemotingServer.start();
 
         // self tolerant
diff --git 
a/dolphinscheduler-master/src/main/java/org/apache/dolphinscheduler/server/master/dispatch/executor/NettyExecutorManager.java
 
b/dolphinscheduler-master/src/main/java/org/apache/dolphinscheduler/server/master/dispatch/executor/NettyExecutorManager.java
index 886a7fc..b0b8d25 100644
--- 
a/dolphinscheduler-master/src/main/java/org/apache/dolphinscheduler/server/master/dispatch/executor/NettyExecutorManager.java
+++ 
b/dolphinscheduler-master/src/main/java/org/apache/dolphinscheduler/server/master/dispatch/executor/NettyExecutorManager.java
@@ -62,6 +62,9 @@ public class NettyExecutorManager extends 
AbstractExecutorManager<Boolean>{
     @Autowired
     private TaskAckProcessor taskAckProcessor;
 
+    @Autowired
+    private TaskResponseProcessor taskResponseProcessor;
+
     /**
      * netty remote client
      */
@@ -81,7 +84,7 @@ public class NettyExecutorManager extends 
AbstractExecutorManager<Boolean>{
          * register EXECUTE_TASK_RESPONSE command type TaskResponseProcessor
          * register EXECUTE_TASK_ACK command type TaskAckProcessor
          */
-        
this.nettyRemotingClient.registerProcessor(CommandType.TASK_EXECUTE_RESPONSE, 
new TaskResponseProcessor());
+        
this.nettyRemotingClient.registerProcessor(CommandType.TASK_EXECUTE_RESPONSE, 
taskResponseProcessor);
         
this.nettyRemotingClient.registerProcessor(CommandType.TASK_EXECUTE_ACK, 
taskAckProcessor);
         
this.nettyRemotingClient.registerProcessor(CommandType.TASK_KILL_RESPONSE, new 
TaskKillResponseProcessor());
     }
diff --git 
a/dolphinscheduler-master/src/main/java/org/apache/dolphinscheduler/server/master/processor/CacheProcessor.java
 
b/dolphinscheduler-master/src/main/java/org/apache/dolphinscheduler/server/master/processor/CacheProcessor.java
index 6db7f65..0712162 100644
--- 
a/dolphinscheduler-master/src/main/java/org/apache/dolphinscheduler/server/master/processor/CacheProcessor.java
+++ 
b/dolphinscheduler-master/src/main/java/org/apache/dolphinscheduler/server/master/processor/CacheProcessor.java
@@ -27,8 +27,10 @@ import 
org.apache.dolphinscheduler.service.bean.SpringApplicationContext;
 
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
+import org.springframework.beans.factory.annotation.Autowired;
 import org.springframework.cache.Cache;
 import org.springframework.cache.CacheManager;
+import org.springframework.stereotype.Component;
 
 import com.google.common.base.Preconditions;
 
@@ -37,10 +39,12 @@ import io.netty.channel.Channel;
 /**
  * cache process from master/api
  */
+@Component
 public class CacheProcessor implements NettyRequestProcessor {
 
     private final Logger logger = 
LoggerFactory.getLogger(CacheProcessor.class);
 
+    @Autowired
     private CacheManager cacheManager;
 
     @Override
@@ -55,9 +59,6 @@ public class CacheProcessor implements NettyRequestProcessor {
     }
 
     private void cacheExpire(CacheExpireCommand cacheExpireCommand) {
-        if (cacheManager == null) {
-            cacheManager = 
SpringApplicationContext.getBean(CacheManager.class);
-        }
 
         if (cacheExpireCommand.getCacheKey().isEmpty()) {
             return;
diff --git 
a/dolphinscheduler-master/src/main/java/org/apache/dolphinscheduler/server/master/processor/StateEventProcessor.java
 
b/dolphinscheduler-master/src/main/java/org/apache/dolphinscheduler/server/master/processor/StateEventProcessor.java
index b039403..6b4c3da 100644
--- 
a/dolphinscheduler-master/src/main/java/org/apache/dolphinscheduler/server/master/processor/StateEventProcessor.java
+++ 
b/dolphinscheduler-master/src/main/java/org/apache/dolphinscheduler/server/master/processor/StateEventProcessor.java
@@ -30,6 +30,8 @@ import 
org.apache.dolphinscheduler.service.bean.SpringApplicationContext;
 
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
+import org.springframework.beans.factory.annotation.Autowired;
+import org.springframework.stereotype.Component;
 
 import com.google.common.base.Preconditions;
 
@@ -38,16 +40,14 @@ import io.netty.channel.Channel;
 /**
  * handle state event received from master/api
  */
+@Component
 public class StateEventProcessor implements NettyRequestProcessor {
 
     private final Logger logger = 
LoggerFactory.getLogger(StateEventProcessor.class);
 
+    @Autowired
     private StateEventResponseService stateEventResponseService;
 
-    public StateEventProcessor() {
-        stateEventResponseService = 
SpringApplicationContext.getBean(StateEventResponseService.class);
-    }
-
     @Override
     public void process(Channel channel, Command command) {
         Preconditions.checkArgument(CommandType.STATE_EVENT_REQUEST == 
command.getType(), String.format("invalid command type: %s", 
command.getType()));
diff --git 
a/dolphinscheduler-master/src/main/java/org/apache/dolphinscheduler/server/master/processor/TaskEventProcessor.java
 
b/dolphinscheduler-master/src/main/java/org/apache/dolphinscheduler/server/master/processor/TaskEventProcessor.java
index 2200e83..64a22cb 100644
--- 
a/dolphinscheduler-master/src/main/java/org/apache/dolphinscheduler/server/master/processor/TaskEventProcessor.java
+++ 
b/dolphinscheduler-master/src/main/java/org/apache/dolphinscheduler/server/master/processor/TaskEventProcessor.java
@@ -29,6 +29,8 @@ import 
org.apache.dolphinscheduler.service.bean.SpringApplicationContext;
 
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
+import org.springframework.beans.factory.annotation.Autowired;
+import org.springframework.stereotype.Component;
 
 import com.google.common.base.Preconditions;
 
@@ -37,16 +39,14 @@ import io.netty.channel.Channel;
 /**
  * handle state event received from master/api
  */
+@Component
 public class TaskEventProcessor implements NettyRequestProcessor {
 
     private final Logger logger = 
LoggerFactory.getLogger(TaskEventProcessor.class);
 
+    @Autowired
     private StateEventResponseService stateEventResponseService;
 
-    public TaskEventProcessor() {
-        stateEventResponseService = 
SpringApplicationContext.getBean(StateEventResponseService.class);
-    }
-
     @Override
     public void process(Channel channel, Command command) {
         Preconditions.checkArgument(CommandType.TASK_FORCE_STATE_EVENT_REQUEST 
== command.getType()
diff --git 
a/dolphinscheduler-master/src/main/java/org/apache/dolphinscheduler/server/master/processor/TaskResponseProcessor.java
 
b/dolphinscheduler-master/src/main/java/org/apache/dolphinscheduler/server/master/processor/TaskResponseProcessor.java
index 966a07d..a367cfe 100644
--- 
a/dolphinscheduler-master/src/main/java/org/apache/dolphinscheduler/server/master/processor/TaskResponseProcessor.java
+++ 
b/dolphinscheduler-master/src/main/java/org/apache/dolphinscheduler/server/master/processor/TaskResponseProcessor.java
@@ -34,10 +34,12 @@ import com.google.common.base.Preconditions;
 
 import io.netty.channel.Channel;
 import org.springframework.beans.factory.annotation.Autowired;
+import org.springframework.stereotype.Component;
 
 /**
  * task response processor
  */
+@Component
 public class TaskResponseProcessor implements NettyRequestProcessor {
 
     private final Logger logger = 
LoggerFactory.getLogger(TaskResponseProcessor.class);
diff --git 
a/dolphinscheduler-master/src/test/java/org/apache/dolphinscheduler/server/master/processor/CacheProcessorTest.java
 
b/dolphinscheduler-master/src/test/java/org/apache/dolphinscheduler/server/master/processor/CacheProcessorTest.java
index 5c177ca..20befa0 100644
--- 
a/dolphinscheduler-master/src/test/java/org/apache/dolphinscheduler/server/master/processor/CacheProcessorTest.java
+++ 
b/dolphinscheduler-master/src/test/java/org/apache/dolphinscheduler/server/master/processor/CacheProcessorTest.java
@@ -21,15 +21,13 @@ import org.apache.dolphinscheduler.common.enums.CacheType;
 import org.apache.dolphinscheduler.dao.entity.Tenant;
 import org.apache.dolphinscheduler.remote.command.CacheExpireCommand;
 import org.apache.dolphinscheduler.remote.command.Command;
-import org.apache.dolphinscheduler.service.bean.SpringApplicationContext;
 
 import org.junit.Before;
 import org.junit.Test;
 import org.junit.runner.RunWith;
+import org.mockito.InjectMocks;
 import org.mockito.Mock;
 import org.mockito.Mockito;
-import org.powermock.api.mockito.PowerMockito;
-import org.powermock.core.classloader.annotations.PrepareForTest;
 import org.powermock.modules.junit4.PowerMockRunner;
 import org.springframework.cache.Cache;
 import org.springframework.cache.CacheManager;
@@ -40,10 +38,9 @@ import io.netty.channel.Channel;
  * task ack processor test
  */
 @RunWith(PowerMockRunner.class)
-@PrepareForTest({SpringApplicationContext.class})
 public class CacheProcessorTest {
-
-    private CacheProcessor cacheProcessor;
+    @InjectMocks
+    private CacheProcessor cacheProcessor = new CacheProcessor();
 
     @Mock
     private Channel channel;
@@ -56,10 +53,7 @@ public class CacheProcessorTest {
 
     @Before
     public void before() {
-        PowerMockito.mockStatic(SpringApplicationContext.class);
-        
PowerMockito.when(SpringApplicationContext.getBean(CacheManager.class)).thenReturn(cacheManager);
         
Mockito.when(cacheManager.getCache(CacheType.TENANT.getCacheName())).thenReturn(cache);
-        cacheProcessor = new CacheProcessor();
     }
 
     @Test

Reply via email to