This is an automated email from the ASF dual-hosted git repository.
kirs pushed a commit to branch dev
in repository https://gitbox.apache.org/repos/asf/dolphinscheduler.git
The following commit(s) were added to refs/heads/dev by this push:
new 939fa0c fix TaskResponseProcessor process NullPointerException (#7318)
939fa0c is described below
commit 939fa0c3bac50b43a519dfd3f4a6b8db1be92e92
Author: Kerwin <[email protected]>
AuthorDate: Fri Dec 10 16:55:56 2021 +0800
fix TaskResponseProcessor process NullPointerException (#7318)
---
.../server/master/MasterServer.java | 22 +++++++++++++++++-----
.../dispatch/executor/NettyExecutorManager.java | 5 ++++-
.../server/master/processor/CacheProcessor.java | 7 ++++---
.../master/processor/StateEventProcessor.java | 8 ++++----
.../master/processor/TaskEventProcessor.java | 8 ++++----
.../master/processor/TaskResponseProcessor.java | 2 ++
.../master/processor/CacheProcessorTest.java | 12 +++---------
7 files changed, 38 insertions(+), 26 deletions(-)
diff --git
a/dolphinscheduler-master/src/main/java/org/apache/dolphinscheduler/server/master/MasterServer.java
b/dolphinscheduler-master/src/main/java/org/apache/dolphinscheduler/server/master/MasterServer.java
index 9157ad7..41bc63a 100644
---
a/dolphinscheduler-master/src/main/java/org/apache/dolphinscheduler/server/master/MasterServer.java
+++
b/dolphinscheduler-master/src/main/java/org/apache/dolphinscheduler/server/master/MasterServer.java
@@ -77,6 +77,18 @@ public class MasterServer implements IStoppable {
@Autowired
private TaskAckProcessor taskAckProcessor;
+ @Autowired
+ private TaskResponseProcessor taskResponseProcessor;
+
+ @Autowired
+ private TaskEventProcessor taskEventProcessor;
+
+ @Autowired
+ private StateEventProcessor stateEventProcessor;
+
+ @Autowired
+ private CacheProcessor cacheProcessor;
+
public static void main(String[] args) {
Thread.currentThread().setName(Constants.THREAD_NAME_MASTER_SERVER);
SpringApplication.run(MasterServer.class);
@@ -91,13 +103,13 @@ public class MasterServer implements IStoppable {
NettyServerConfig serverConfig = new NettyServerConfig();
serverConfig.setListenPort(masterConfig.getListenPort());
this.nettyRemotingServer = new NettyRemotingServer(serverConfig);
-
this.nettyRemotingServer.registerProcessor(CommandType.TASK_EXECUTE_RESPONSE,
new TaskResponseProcessor());
+
this.nettyRemotingServer.registerProcessor(CommandType.TASK_EXECUTE_RESPONSE,
taskResponseProcessor);
this.nettyRemotingServer.registerProcessor(CommandType.TASK_EXECUTE_ACK,
taskAckProcessor);
this.nettyRemotingServer.registerProcessor(CommandType.TASK_KILL_RESPONSE, new
TaskKillResponseProcessor());
-
this.nettyRemotingServer.registerProcessor(CommandType.STATE_EVENT_REQUEST, new
StateEventProcessor());
-
this.nettyRemotingServer.registerProcessor(CommandType.TASK_FORCE_STATE_EVENT_REQUEST,
new TaskEventProcessor());
-
this.nettyRemotingServer.registerProcessor(CommandType.TASK_WAKEUP_EVENT_REQUEST,
new TaskEventProcessor());
- this.nettyRemotingServer.registerProcessor(CommandType.CACHE_EXPIRE,
new CacheProcessor());
+
this.nettyRemotingServer.registerProcessor(CommandType.STATE_EVENT_REQUEST,
stateEventProcessor);
+
this.nettyRemotingServer.registerProcessor(CommandType.TASK_FORCE_STATE_EVENT_REQUEST,
taskEventProcessor);
+
this.nettyRemotingServer.registerProcessor(CommandType.TASK_WAKEUP_EVENT_REQUEST,
taskEventProcessor);
+ this.nettyRemotingServer.registerProcessor(CommandType.CACHE_EXPIRE,
cacheProcessor);
this.nettyRemotingServer.start();
// self tolerant
diff --git
a/dolphinscheduler-master/src/main/java/org/apache/dolphinscheduler/server/master/dispatch/executor/NettyExecutorManager.java
b/dolphinscheduler-master/src/main/java/org/apache/dolphinscheduler/server/master/dispatch/executor/NettyExecutorManager.java
index 886a7fc..b0b8d25 100644
---
a/dolphinscheduler-master/src/main/java/org/apache/dolphinscheduler/server/master/dispatch/executor/NettyExecutorManager.java
+++
b/dolphinscheduler-master/src/main/java/org/apache/dolphinscheduler/server/master/dispatch/executor/NettyExecutorManager.java
@@ -62,6 +62,9 @@ public class NettyExecutorManager extends
AbstractExecutorManager<Boolean>{
@Autowired
private TaskAckProcessor taskAckProcessor;
+ @Autowired
+ private TaskResponseProcessor taskResponseProcessor;
+
/**
* netty remote client
*/
@@ -81,7 +84,7 @@ public class NettyExecutorManager extends
AbstractExecutorManager<Boolean>{
* register EXECUTE_TASK_RESPONSE command type TaskResponseProcessor
* register EXECUTE_TASK_ACK command type TaskAckProcessor
*/
-
this.nettyRemotingClient.registerProcessor(CommandType.TASK_EXECUTE_RESPONSE,
new TaskResponseProcessor());
+
this.nettyRemotingClient.registerProcessor(CommandType.TASK_EXECUTE_RESPONSE,
taskResponseProcessor);
this.nettyRemotingClient.registerProcessor(CommandType.TASK_EXECUTE_ACK,
taskAckProcessor);
this.nettyRemotingClient.registerProcessor(CommandType.TASK_KILL_RESPONSE, new
TaskKillResponseProcessor());
}
diff --git
a/dolphinscheduler-master/src/main/java/org/apache/dolphinscheduler/server/master/processor/CacheProcessor.java
b/dolphinscheduler-master/src/main/java/org/apache/dolphinscheduler/server/master/processor/CacheProcessor.java
index 6db7f65..0712162 100644
---
a/dolphinscheduler-master/src/main/java/org/apache/dolphinscheduler/server/master/processor/CacheProcessor.java
+++
b/dolphinscheduler-master/src/main/java/org/apache/dolphinscheduler/server/master/processor/CacheProcessor.java
@@ -27,8 +27,10 @@ import
org.apache.dolphinscheduler.service.bean.SpringApplicationContext;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
+import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.cache.Cache;
import org.springframework.cache.CacheManager;
+import org.springframework.stereotype.Component;
import com.google.common.base.Preconditions;
@@ -37,10 +39,12 @@ import io.netty.channel.Channel;
/**
* cache process from master/api
*/
+@Component
public class CacheProcessor implements NettyRequestProcessor {
private final Logger logger =
LoggerFactory.getLogger(CacheProcessor.class);
+ @Autowired
private CacheManager cacheManager;
@Override
@@ -55,9 +59,6 @@ public class CacheProcessor implements NettyRequestProcessor {
}
private void cacheExpire(CacheExpireCommand cacheExpireCommand) {
- if (cacheManager == null) {
- cacheManager =
SpringApplicationContext.getBean(CacheManager.class);
- }
if (cacheExpireCommand.getCacheKey().isEmpty()) {
return;
diff --git
a/dolphinscheduler-master/src/main/java/org/apache/dolphinscheduler/server/master/processor/StateEventProcessor.java
b/dolphinscheduler-master/src/main/java/org/apache/dolphinscheduler/server/master/processor/StateEventProcessor.java
index b039403..6b4c3da 100644
---
a/dolphinscheduler-master/src/main/java/org/apache/dolphinscheduler/server/master/processor/StateEventProcessor.java
+++
b/dolphinscheduler-master/src/main/java/org/apache/dolphinscheduler/server/master/processor/StateEventProcessor.java
@@ -30,6 +30,8 @@ import
org.apache.dolphinscheduler.service.bean.SpringApplicationContext;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
+import org.springframework.beans.factory.annotation.Autowired;
+import org.springframework.stereotype.Component;
import com.google.common.base.Preconditions;
@@ -38,16 +40,14 @@ import io.netty.channel.Channel;
/**
* handle state event received from master/api
*/
+@Component
public class StateEventProcessor implements NettyRequestProcessor {
private final Logger logger =
LoggerFactory.getLogger(StateEventProcessor.class);
+ @Autowired
private StateEventResponseService stateEventResponseService;
- public StateEventProcessor() {
- stateEventResponseService =
SpringApplicationContext.getBean(StateEventResponseService.class);
- }
-
@Override
public void process(Channel channel, Command command) {
Preconditions.checkArgument(CommandType.STATE_EVENT_REQUEST ==
command.getType(), String.format("invalid command type: %s",
command.getType()));
diff --git
a/dolphinscheduler-master/src/main/java/org/apache/dolphinscheduler/server/master/processor/TaskEventProcessor.java
b/dolphinscheduler-master/src/main/java/org/apache/dolphinscheduler/server/master/processor/TaskEventProcessor.java
index 2200e83..64a22cb 100644
---
a/dolphinscheduler-master/src/main/java/org/apache/dolphinscheduler/server/master/processor/TaskEventProcessor.java
+++
b/dolphinscheduler-master/src/main/java/org/apache/dolphinscheduler/server/master/processor/TaskEventProcessor.java
@@ -29,6 +29,8 @@ import
org.apache.dolphinscheduler.service.bean.SpringApplicationContext;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
+import org.springframework.beans.factory.annotation.Autowired;
+import org.springframework.stereotype.Component;
import com.google.common.base.Preconditions;
@@ -37,16 +39,14 @@ import io.netty.channel.Channel;
/**
* handle state event received from master/api
*/
+@Component
public class TaskEventProcessor implements NettyRequestProcessor {
private final Logger logger =
LoggerFactory.getLogger(TaskEventProcessor.class);
+ @Autowired
private StateEventResponseService stateEventResponseService;
- public TaskEventProcessor() {
- stateEventResponseService =
SpringApplicationContext.getBean(StateEventResponseService.class);
- }
-
@Override
public void process(Channel channel, Command command) {
Preconditions.checkArgument(CommandType.TASK_FORCE_STATE_EVENT_REQUEST
== command.getType()
diff --git
a/dolphinscheduler-master/src/main/java/org/apache/dolphinscheduler/server/master/processor/TaskResponseProcessor.java
b/dolphinscheduler-master/src/main/java/org/apache/dolphinscheduler/server/master/processor/TaskResponseProcessor.java
index 966a07d..a367cfe 100644
---
a/dolphinscheduler-master/src/main/java/org/apache/dolphinscheduler/server/master/processor/TaskResponseProcessor.java
+++
b/dolphinscheduler-master/src/main/java/org/apache/dolphinscheduler/server/master/processor/TaskResponseProcessor.java
@@ -34,10 +34,12 @@ import com.google.common.base.Preconditions;
import io.netty.channel.Channel;
import org.springframework.beans.factory.annotation.Autowired;
+import org.springframework.stereotype.Component;
/**
* task response processor
*/
+@Component
public class TaskResponseProcessor implements NettyRequestProcessor {
private final Logger logger =
LoggerFactory.getLogger(TaskResponseProcessor.class);
diff --git
a/dolphinscheduler-master/src/test/java/org/apache/dolphinscheduler/server/master/processor/CacheProcessorTest.java
b/dolphinscheduler-master/src/test/java/org/apache/dolphinscheduler/server/master/processor/CacheProcessorTest.java
index 5c177ca..20befa0 100644
---
a/dolphinscheduler-master/src/test/java/org/apache/dolphinscheduler/server/master/processor/CacheProcessorTest.java
+++
b/dolphinscheduler-master/src/test/java/org/apache/dolphinscheduler/server/master/processor/CacheProcessorTest.java
@@ -21,15 +21,13 @@ import org.apache.dolphinscheduler.common.enums.CacheType;
import org.apache.dolphinscheduler.dao.entity.Tenant;
import org.apache.dolphinscheduler.remote.command.CacheExpireCommand;
import org.apache.dolphinscheduler.remote.command.Command;
-import org.apache.dolphinscheduler.service.bean.SpringApplicationContext;
import org.junit.Before;
import org.junit.Test;
import org.junit.runner.RunWith;
+import org.mockito.InjectMocks;
import org.mockito.Mock;
import org.mockito.Mockito;
-import org.powermock.api.mockito.PowerMockito;
-import org.powermock.core.classloader.annotations.PrepareForTest;
import org.powermock.modules.junit4.PowerMockRunner;
import org.springframework.cache.Cache;
import org.springframework.cache.CacheManager;
@@ -40,10 +38,9 @@ import io.netty.channel.Channel;
* task ack processor test
*/
@RunWith(PowerMockRunner.class)
-@PrepareForTest({SpringApplicationContext.class})
public class CacheProcessorTest {
-
- private CacheProcessor cacheProcessor;
+ @InjectMocks
+ private CacheProcessor cacheProcessor = new CacheProcessor();
@Mock
private Channel channel;
@@ -56,10 +53,7 @@ public class CacheProcessorTest {
@Before
public void before() {
- PowerMockito.mockStatic(SpringApplicationContext.class);
-
PowerMockito.when(SpringApplicationContext.getBean(CacheManager.class)).thenReturn(cacheManager);
Mockito.when(cacheManager.getCache(CacheType.TENANT.getCacheName())).thenReturn(cache);
- cacheProcessor = new CacheProcessor();
}
@Test