This is an automated email from the ASF dual-hosted git repository.
wenjun pushed a commit to branch dev
in repository https://gitbox.apache.org/repos/asf/dolphinscheduler.git
The following commit(s) were added to refs/heads/dev by this push:
new f88def8 [IMPROVEMENT-8178] Add Netty processor in Spring container
(#8179)
f88def8 is described below
commit f88def8ef740702442f0e3ae1a9bd4a7a919aabb
Author: Wenjun Ruan <[email protected]>
AuthorDate: Tue Jan 25 21:33:09 2022 +0800
[IMPROVEMENT-8178] Add Netty processor in Spring container (#8179)
* Add Netty processor in Spring container
---
.gitignore | 1 +
.../server/log/LoggerRequestProcessor.java | 2 ++
.../server/master/MasterServer.java | 9 +++--
.../dispatch/executor/NettyExecutorManager.java | 9 +++--
.../processor/TaskKillResponseProcessor.java | 2 ++
.../service/alert/AlertClientService.java | 3 +-
.../server/worker/WorkerServer.java | 34 ++++++++++++------
.../server/worker/config/BeanConfig.java | 34 ++++++++++++++++++
.../worker/processor/DBTaskAckProcessor.java | 3 +-
.../worker/processor/DBTaskResponseProcessor.java | 2 ++
.../worker/processor/HostUpdateProcessor.java | 2 ++
.../worker/processor/TaskExecuteProcessor.java | 29 +++++++---------
.../server/worker/processor/TaskKillProcessor.java | 24 +++++--------
.../server/worker/config/BeanConfigTest.java | 40 ++++++++++++++++++++++
.../processor/TaskCallbackServiceTestConfig.java | 0
.../processor/TaskExecuteProcessorTest.java | 2 --
.../registry/WorkerRegistryClientTest.java | 0
17 files changed, 142 insertions(+), 54 deletions(-)
diff --git a/.gitignore b/.gitignore
index 3ddb824..edc8066 100644
--- a/.gitignore
+++ b/.gitignore
@@ -47,6 +47,7 @@ dolphinscheduler-ui/dist
dolphinscheduler-ui/node
dolphinscheduler-common/sql
dolphinscheduler-common/test
+dolphinscheduler-worker/logs
# ------------------
# pydolphinscheduler
diff --git
a/dolphinscheduler-log-server/src/main/java/org/apache/dolphinscheduler/server/log/LoggerRequestProcessor.java
b/dolphinscheduler-log-server/src/main/java/org/apache/dolphinscheduler/server/log/LoggerRequestProcessor.java
index f6e23f0..6e85b21 100644
---
a/dolphinscheduler-log-server/src/main/java/org/apache/dolphinscheduler/server/log/LoggerRequestProcessor.java
+++
b/dolphinscheduler-log-server/src/main/java/org/apache/dolphinscheduler/server/log/LoggerRequestProcessor.java
@@ -48,12 +48,14 @@ import java.util.stream.Stream;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
+import org.springframework.stereotype.Component;
import io.netty.channel.Channel;
/**
* logger request process logic
*/
+@Component
public class LoggerRequestProcessor implements NettyRequestProcessor {
private final Logger logger =
LoggerFactory.getLogger(LoggerRequestProcessor.class);
diff --git
a/dolphinscheduler-master/src/main/java/org/apache/dolphinscheduler/server/master/MasterServer.java
b/dolphinscheduler-master/src/main/java/org/apache/dolphinscheduler/server/master/MasterServer.java
index 3021c02..22b4a69 100644
---
a/dolphinscheduler-master/src/main/java/org/apache/dolphinscheduler/server/master/MasterServer.java
+++
b/dolphinscheduler-master/src/main/java/org/apache/dolphinscheduler/server/master/MasterServer.java
@@ -90,11 +90,17 @@ public class MasterServer implements IStoppable {
private CacheProcessor cacheProcessor;
@Autowired
+ private TaskKillResponseProcessor taskKillResponseProcessor;
+
+ @Autowired
private EventExecuteService eventExecuteService;
@Autowired
private FailoverExecuteThread failoverExecuteThread;
+ @Autowired
+ private LoggerRequestProcessor loggerRequestProcessor;
+
public static void main(String[] args) {
Thread.currentThread().setName(Constants.THREAD_NAME_MASTER_SERVER);
SpringApplication.run(MasterServer.class);
@@ -111,14 +117,13 @@ public class MasterServer implements IStoppable {
this.nettyRemotingServer = new NettyRemotingServer(serverConfig);
this.nettyRemotingServer.registerProcessor(CommandType.TASK_EXECUTE_RESPONSE,
taskResponseProcessor);
this.nettyRemotingServer.registerProcessor(CommandType.TASK_EXECUTE_ACK,
taskAckProcessor);
-
this.nettyRemotingServer.registerProcessor(CommandType.TASK_KILL_RESPONSE, new
TaskKillResponseProcessor());
+
this.nettyRemotingServer.registerProcessor(CommandType.TASK_KILL_RESPONSE,
taskKillResponseProcessor);
this.nettyRemotingServer.registerProcessor(CommandType.STATE_EVENT_REQUEST,
stateEventProcessor);
this.nettyRemotingServer.registerProcessor(CommandType.TASK_FORCE_STATE_EVENT_REQUEST,
taskEventProcessor);
this.nettyRemotingServer.registerProcessor(CommandType.TASK_WAKEUP_EVENT_REQUEST,
taskEventProcessor);
this.nettyRemotingServer.registerProcessor(CommandType.CACHE_EXPIRE,
cacheProcessor);
// logger server
- LoggerRequestProcessor loggerRequestProcessor = new
LoggerRequestProcessor();
this.nettyRemotingServer.registerProcessor(CommandType.GET_LOG_BYTES_REQUEST,
loggerRequestProcessor);
this.nettyRemotingServer.registerProcessor(CommandType.ROLL_VIEW_LOG_REQUEST,
loggerRequestProcessor);
this.nettyRemotingServer.registerProcessor(CommandType.VIEW_WHOLE_LOG_REQUEST,
loggerRequestProcessor);
diff --git
a/dolphinscheduler-master/src/main/java/org/apache/dolphinscheduler/server/master/dispatch/executor/NettyExecutorManager.java
b/dolphinscheduler-master/src/main/java/org/apache/dolphinscheduler/server/master/dispatch/executor/NettyExecutorManager.java
index b0b8d25..521d0c4 100644
---
a/dolphinscheduler-master/src/main/java/org/apache/dolphinscheduler/server/master/dispatch/executor/NettyExecutorManager.java
+++
b/dolphinscheduler-master/src/main/java/org/apache/dolphinscheduler/server/master/dispatch/executor/NettyExecutorManager.java
@@ -63,6 +63,9 @@ public class NettyExecutorManager extends
AbstractExecutorManager<Boolean>{
private TaskAckProcessor taskAckProcessor;
@Autowired
+ private TaskKillResponseProcessor taskKillResponseProcessor;
+
+ @Autowired
private TaskResponseProcessor taskResponseProcessor;
/**
@@ -80,13 +83,9 @@ public class NettyExecutorManager extends
AbstractExecutorManager<Boolean>{
@PostConstruct
public void init(){
- /**
- * register EXECUTE_TASK_RESPONSE command type TaskResponseProcessor
- * register EXECUTE_TASK_ACK command type TaskAckProcessor
- */
this.nettyRemotingClient.registerProcessor(CommandType.TASK_EXECUTE_RESPONSE,
taskResponseProcessor);
this.nettyRemotingClient.registerProcessor(CommandType.TASK_EXECUTE_ACK,
taskAckProcessor);
-
this.nettyRemotingClient.registerProcessor(CommandType.TASK_KILL_RESPONSE, new
TaskKillResponseProcessor());
+
this.nettyRemotingClient.registerProcessor(CommandType.TASK_KILL_RESPONSE,
taskKillResponseProcessor);
}
/**
diff --git
a/dolphinscheduler-master/src/main/java/org/apache/dolphinscheduler/server/master/processor/TaskKillResponseProcessor.java
b/dolphinscheduler-master/src/main/java/org/apache/dolphinscheduler/server/master/processor/TaskKillResponseProcessor.java
index 28f18fe..135257c 100644
---
a/dolphinscheduler-master/src/main/java/org/apache/dolphinscheduler/server/master/processor/TaskKillResponseProcessor.java
+++
b/dolphinscheduler-master/src/main/java/org/apache/dolphinscheduler/server/master/processor/TaskKillResponseProcessor.java
@@ -25,6 +25,7 @@ import
org.apache.dolphinscheduler.remote.processor.NettyRequestProcessor;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
+import org.springframework.stereotype.Component;
import com.google.common.base.Preconditions;
@@ -33,6 +34,7 @@ import io.netty.channel.Channel;
/**
* task response processor
*/
+@Component
public class TaskKillResponseProcessor implements NettyRequestProcessor {
private final Logger logger =
LoggerFactory.getLogger(TaskKillResponseProcessor.class);
diff --git
a/dolphinscheduler-service/src/main/java/org/apache/dolphinscheduler/service/alert/AlertClientService.java
b/dolphinscheduler-service/src/main/java/org/apache/dolphinscheduler/service/alert/AlertClientService.java
index 49977fa..0aeb25b 100644
---
a/dolphinscheduler-service/src/main/java/org/apache/dolphinscheduler/service/alert/AlertClientService.java
+++
b/dolphinscheduler-service/src/main/java/org/apache/dolphinscheduler/service/alert/AlertClientService.java
@@ -28,7 +28,7 @@ import
org.apache.dolphinscheduler.remote.utils.JsonSerializer;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
-public class AlertClientService {
+public class AlertClientService implements AutoCloseable {
private static final Logger logger =
LoggerFactory.getLogger(AlertClientService.class);
@@ -70,6 +70,7 @@ public class AlertClientService {
/**
* close
*/
+ @Override
public void close() {
this.client.close();
this.isRunning = false;
diff --git
a/dolphinscheduler-worker/src/main/java/org/apache/dolphinscheduler/server/worker/WorkerServer.java
b/dolphinscheduler-worker/src/main/java/org/apache/dolphinscheduler/server/worker/WorkerServer.java
index 79e2e82..6296aeb 100644
---
a/dolphinscheduler-worker/src/main/java/org/apache/dolphinscheduler/server/worker/WorkerServer.java
+++
b/dolphinscheduler-worker/src/main/java/org/apache/dolphinscheduler/server/worker/WorkerServer.java
@@ -81,6 +81,7 @@ public class WorkerServer implements IStoppable {
/**
* alert model netty remote server
*/
+ @Autowired
private AlertClientService alertClientService;
@Autowired
@@ -98,6 +99,24 @@ public class WorkerServer implements IStoppable {
@Autowired
private TaskPluginManager taskPluginManager;
+ @Autowired
+ private TaskExecuteProcessor taskExecuteProcessor;
+
+ @Autowired
+ private TaskKillProcessor taskKillProcessor;
+
+ @Autowired
+ private DBTaskAckProcessor dbTaskAckProcessor;
+
+ @Autowired
+ private DBTaskResponseProcessor dbTaskResponseProcessor;
+
+ @Autowired
+ private HostUpdateProcessor hostUpdateProcessor;
+
+ @Autowired
+ private LoggerRequestProcessor loggerRequestProcessor;
+
/**
* worker server startup, not use web service
*
@@ -113,22 +132,17 @@ public class WorkerServer implements IStoppable {
*/
@PostConstruct
public void run() {
- // alert-server client registry
- alertClientService = new
AlertClientService(workerConfig.getAlertListenHost(),
-
workerConfig.getAlertListenPort());
-
// init remoting server
NettyServerConfig serverConfig = new NettyServerConfig();
serverConfig.setListenPort(workerConfig.getListenPort());
this.nettyRemotingServer = new NettyRemotingServer(serverConfig);
-
this.nettyRemotingServer.registerProcessor(CommandType.TASK_EXECUTE_REQUEST,
new TaskExecuteProcessor(alertClientService, taskPluginManager));
-
this.nettyRemotingServer.registerProcessor(CommandType.TASK_KILL_REQUEST, new
TaskKillProcessor());
- this.nettyRemotingServer.registerProcessor(CommandType.DB_TASK_ACK,
new DBTaskAckProcessor());
-
this.nettyRemotingServer.registerProcessor(CommandType.DB_TASK_RESPONSE, new
DBTaskResponseProcessor());
-
this.nettyRemotingServer.registerProcessor(CommandType.PROCESS_HOST_UPDATE_REQUEST,
new HostUpdateProcessor());
+
this.nettyRemotingServer.registerProcessor(CommandType.TASK_EXECUTE_REQUEST,
taskExecuteProcessor);
+
this.nettyRemotingServer.registerProcessor(CommandType.TASK_KILL_REQUEST,
taskKillProcessor);
+ this.nettyRemotingServer.registerProcessor(CommandType.DB_TASK_ACK,
dbTaskAckProcessor);
+
this.nettyRemotingServer.registerProcessor(CommandType.DB_TASK_RESPONSE,
dbTaskResponseProcessor);
+
this.nettyRemotingServer.registerProcessor(CommandType.PROCESS_HOST_UPDATE_REQUEST,
hostUpdateProcessor);
// logger server
- LoggerRequestProcessor loggerRequestProcessor = new
LoggerRequestProcessor();
this.nettyRemotingServer.registerProcessor(CommandType.GET_LOG_BYTES_REQUEST,
loggerRequestProcessor);
this.nettyRemotingServer.registerProcessor(CommandType.ROLL_VIEW_LOG_REQUEST,
loggerRequestProcessor);
this.nettyRemotingServer.registerProcessor(CommandType.VIEW_WHOLE_LOG_REQUEST,
loggerRequestProcessor);
diff --git
a/dolphinscheduler-worker/src/main/java/org/apache/dolphinscheduler/server/worker/config/BeanConfig.java
b/dolphinscheduler-worker/src/main/java/org/apache/dolphinscheduler/server/worker/config/BeanConfig.java
new file mode 100644
index 0000000..7b4c6d8
--- /dev/null
+++
b/dolphinscheduler-worker/src/main/java/org/apache/dolphinscheduler/server/worker/config/BeanConfig.java
@@ -0,0 +1,34 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.dolphinscheduler.server.worker.config;
+
+import org.apache.dolphinscheduler.service.alert.AlertClientService;
+
+import org.springframework.context.annotation.Bean;
+import org.springframework.context.annotation.Configuration;
+
+@Configuration
+public class BeanConfig {
+
+ @Bean
+ public AlertClientService alertClientService(WorkerConfig workerConfig) {
+ return new AlertClientService(
+ workerConfig.getAlertListenHost(),
+ workerConfig.getAlertListenPort());
+ }
+}
diff --git
a/dolphinscheduler-worker/src/main/java/org/apache/dolphinscheduler/server/worker/processor/DBTaskAckProcessor.java
b/dolphinscheduler-worker/src/main/java/org/apache/dolphinscheduler/server/worker/processor/DBTaskAckProcessor.java
index 2c9bfe7..186b99d 100644
---
a/dolphinscheduler-worker/src/main/java/org/apache/dolphinscheduler/server/worker/processor/DBTaskAckProcessor.java
+++
b/dolphinscheduler-worker/src/main/java/org/apache/dolphinscheduler/server/worker/processor/DBTaskAckProcessor.java
@@ -25,17 +25,18 @@ import
org.apache.dolphinscheduler.remote.processor.NettyRequestProcessor;
import org.apache.dolphinscheduler.server.worker.cache.ResponseCache;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
+import org.springframework.stereotype.Component;
import com.google.common.base.Preconditions;
/**
* db task ack processor
*/
+@Component
public class DBTaskAckProcessor implements NettyRequestProcessor {
private final Logger logger =
LoggerFactory.getLogger(DBTaskAckProcessor.class);
-
@Override
public void process(Channel channel, Command command) {
Preconditions.checkArgument(CommandType.DB_TASK_ACK ==
command.getType(),
diff --git
a/dolphinscheduler-worker/src/main/java/org/apache/dolphinscheduler/server/worker/processor/DBTaskResponseProcessor.java
b/dolphinscheduler-worker/src/main/java/org/apache/dolphinscheduler/server/worker/processor/DBTaskResponseProcessor.java
index fba6729..07fbf06 100644
---
a/dolphinscheduler-worker/src/main/java/org/apache/dolphinscheduler/server/worker/processor/DBTaskResponseProcessor.java
+++
b/dolphinscheduler-worker/src/main/java/org/apache/dolphinscheduler/server/worker/processor/DBTaskResponseProcessor.java
@@ -27,6 +27,7 @@ import
org.apache.dolphinscheduler.server.worker.cache.ResponseCache;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
+import org.springframework.stereotype.Component;
import com.google.common.base.Preconditions;
@@ -35,6 +36,7 @@ import io.netty.channel.Channel;
/**
* db task response processor
*/
+@Component
public class DBTaskResponseProcessor implements NettyRequestProcessor {
private final Logger logger =
LoggerFactory.getLogger(DBTaskResponseProcessor.class);
diff --git
a/dolphinscheduler-worker/src/main/java/org/apache/dolphinscheduler/server/worker/processor/HostUpdateProcessor.java
b/dolphinscheduler-worker/src/main/java/org/apache/dolphinscheduler/server/worker/processor/HostUpdateProcessor.java
index 8928d50..aa12938 100644
---
a/dolphinscheduler-worker/src/main/java/org/apache/dolphinscheduler/server/worker/processor/HostUpdateProcessor.java
+++
b/dolphinscheduler-worker/src/main/java/org/apache/dolphinscheduler/server/worker/processor/HostUpdateProcessor.java
@@ -27,6 +27,7 @@ import
org.apache.dolphinscheduler.service.bean.SpringApplicationContext;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
+import org.springframework.stereotype.Component;
import com.google.common.base.Preconditions;
@@ -36,6 +37,7 @@ import io.netty.channel.Channel;
* update process host
* this used when master failover
*/
+@Component
public class HostUpdateProcessor implements NettyRequestProcessor {
private final Logger logger =
LoggerFactory.getLogger(HostUpdateProcessor.class);
diff --git
a/dolphinscheduler-worker/src/main/java/org/apache/dolphinscheduler/server/worker/processor/TaskExecuteProcessor.java
b/dolphinscheduler-worker/src/main/java/org/apache/dolphinscheduler/server/worker/processor/TaskExecuteProcessor.java
index 1f9770a..bca87c3 100644
---
a/dolphinscheduler-worker/src/main/java/org/apache/dolphinscheduler/server/worker/processor/TaskExecuteProcessor.java
+++
b/dolphinscheduler-worker/src/main/java/org/apache/dolphinscheduler/server/worker/processor/TaskExecuteProcessor.java
@@ -40,7 +40,6 @@ import
org.apache.dolphinscheduler.server.worker.plugin.TaskPluginManager;
import org.apache.dolphinscheduler.server.worker.runner.TaskExecuteThread;
import org.apache.dolphinscheduler.server.worker.runner.WorkerManagerThread;
import org.apache.dolphinscheduler.service.alert.AlertClientService;
-import org.apache.dolphinscheduler.service.bean.SpringApplicationContext;
import org.apache.dolphinscheduler.service.queue.entity.TaskExecutionContext;
import org.apache.dolphinscheduler.spi.task.TaskExecutionContextCacheManager;
import org.apache.dolphinscheduler.spi.task.request.TaskRequest;
@@ -49,6 +48,8 @@ import java.util.Date;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
+import org.springframework.beans.factory.annotation.Autowired;
+import org.springframework.stereotype.Component;
import com.google.common.base.Preconditions;
@@ -57,6 +58,7 @@ import io.netty.channel.Channel;
/**
* worker request processor
*/
+@Component
public class TaskExecuteProcessor implements NettyRequestProcessor {
private static final Logger logger =
LoggerFactory.getLogger(TaskExecuteProcessor.class);
@@ -64,30 +66,29 @@ public class TaskExecuteProcessor implements
NettyRequestProcessor {
/**
* worker config
*/
- private final WorkerConfig workerConfig;
+ @Autowired
+ private WorkerConfig workerConfig;
/**
* task callback service
*/
- private final TaskCallbackService taskCallbackService;
+ @Autowired
+ private TaskCallbackService taskCallbackService;
/**
* alert client service
*/
+ @Autowired
private AlertClientService alertClientService;
+ @Autowired
private TaskPluginManager taskPluginManager;
- /*
+ /**
* task execute manager
*/
- private final WorkerManagerThread workerManager;
-
- public TaskExecuteProcessor() {
- this.taskCallbackService =
SpringApplicationContext.getBean(TaskCallbackService.class);
- this.workerConfig =
SpringApplicationContext.getBean(WorkerConfig.class);
- this.workerManager =
SpringApplicationContext.getBean(WorkerManagerThread.class);
- }
+ @Autowired
+ private WorkerManagerThread workerManager;
/**
* Pre-cache task to avoid extreme situations when kill task. There is no
such task in the cache
@@ -101,12 +102,6 @@ public class TaskExecuteProcessor implements
NettyRequestProcessor {
TaskExecutionContextCacheManager.cacheTaskExecutionContext(taskRequest);
}
- public TaskExecuteProcessor(AlertClientService alertClientService,
TaskPluginManager taskPluginManager) {
- this();
- this.alertClientService = alertClientService;
- this.taskPluginManager = taskPluginManager;
- }
-
@Override
public void process(Channel channel, Command command) {
Preconditions.checkArgument(CommandType.TASK_EXECUTE_REQUEST ==
command.getType(),
diff --git
a/dolphinscheduler-worker/src/main/java/org/apache/dolphinscheduler/server/worker/processor/TaskKillProcessor.java
b/dolphinscheduler-worker/src/main/java/org/apache/dolphinscheduler/server/worker/processor/TaskKillProcessor.java
index 96b51f3..d036110 100644
---
a/dolphinscheduler-worker/src/main/java/org/apache/dolphinscheduler/server/worker/processor/TaskKillProcessor.java
+++
b/dolphinscheduler-worker/src/main/java/org/apache/dolphinscheduler/server/worker/processor/TaskKillProcessor.java
@@ -30,9 +30,7 @@ import
org.apache.dolphinscheduler.remote.processor.NettyRequestProcessor;
import org.apache.dolphinscheduler.remote.utils.Host;
import org.apache.dolphinscheduler.remote.utils.Pair;
import org.apache.dolphinscheduler.server.utils.ProcessUtils;
-import org.apache.dolphinscheduler.server.worker.config.WorkerConfig;
import org.apache.dolphinscheduler.server.worker.runner.WorkerManagerThread;
-import org.apache.dolphinscheduler.service.bean.SpringApplicationContext;
import org.apache.dolphinscheduler.service.log.LogClientService;
import org.apache.dolphinscheduler.service.queue.entity.TaskExecutionContext;
import org.apache.dolphinscheduler.spi.task.TaskExecutionContextCacheManager;
@@ -45,6 +43,8 @@ import java.util.List;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
+import org.springframework.beans.factory.annotation.Autowired;
+import org.springframework.stereotype.Component;
import com.google.common.base.Preconditions;
@@ -53,30 +53,22 @@ import io.netty.channel.Channel;
/**
* task kill processor
*/
+@Component
public class TaskKillProcessor implements NettyRequestProcessor {
private final Logger logger =
LoggerFactory.getLogger(TaskKillProcessor.class);
/**
- * worker config
- */
- private final WorkerConfig workerConfig;
-
- /**
* task callback service
*/
- private final TaskCallbackService taskCallbackService;
+ @Autowired
+ private TaskCallbackService taskCallbackService;
- /*
+ /**
* task execute manager
*/
- private final WorkerManagerThread workerManager;
-
- public TaskKillProcessor() {
- this.taskCallbackService =
SpringApplicationContext.getBean(TaskCallbackService.class);
- this.workerConfig =
SpringApplicationContext.getBean(WorkerConfig.class);
- this.workerManager =
SpringApplicationContext.getBean(WorkerManagerThread.class);
- }
+ @Autowired
+ private WorkerManagerThread workerManager;
/**
* task kill process
diff --git
a/dolphinscheduler-worker/src/test/java/org/apache/dolphinscheduler/server/worker/config/BeanConfigTest.java
b/dolphinscheduler-worker/src/test/java/org/apache/dolphinscheduler/server/worker/config/BeanConfigTest.java
new file mode 100644
index 0000000..85d208e
--- /dev/null
+++
b/dolphinscheduler-worker/src/test/java/org/apache/dolphinscheduler/server/worker/config/BeanConfigTest.java
@@ -0,0 +1,40 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.dolphinscheduler.server.worker.config;
+
+import org.apache.dolphinscheduler.service.alert.AlertClientService;
+
+import org.junit.Assert;
+import org.junit.Test;
+import org.junit.runner.RunWith;
+import org.springframework.beans.factory.annotation.Autowired;
+import org.springframework.boot.test.context.SpringBootTest;
+import org.springframework.test.context.junit4.SpringJUnit4ClassRunner;
+
+@RunWith(SpringJUnit4ClassRunner.class)
+@SpringBootTest(classes = {BeanConfig.class, WorkerConfig.class})
+public class BeanConfigTest {
+
+ @Autowired
+ private AlertClientService alertClientService;
+
+ @Test
+ public void alertClientService() {
+ Assert.assertNotNull(alertClientService);
+ }
+}
\ No newline at end of file
diff --git
a/dolphinscheduler-worker/src/test/java/org/apache/dolphinscheduler/server/worker/worker/processor/TaskCallbackServiceTestConfig.java
b/dolphinscheduler-worker/src/test/java/org/apache/dolphinscheduler/server/worker/processor/TaskCallbackServiceTestConfig.java
similarity index 100%
rename from
dolphinscheduler-worker/src/test/java/org/apache/dolphinscheduler/server/worker/worker/processor/TaskCallbackServiceTestConfig.java
rename to
dolphinscheduler-worker/src/test/java/org/apache/dolphinscheduler/server/worker/processor/TaskCallbackServiceTestConfig.java
diff --git
a/dolphinscheduler-worker/src/test/java/org/apache/dolphinscheduler/server/worker/worker/processor/TaskExecuteProcessorTest.java
b/dolphinscheduler-worker/src/test/java/org/apache/dolphinscheduler/server/worker/processor/TaskExecuteProcessorTest.java
similarity index 99%
rename from
dolphinscheduler-worker/src/test/java/org/apache/dolphinscheduler/server/worker/worker/processor/TaskExecuteProcessorTest.java
rename to
dolphinscheduler-worker/src/test/java/org/apache/dolphinscheduler/server/worker/processor/TaskExecuteProcessorTest.java
index 5a345ab..b07ffa6 100644
---
a/dolphinscheduler-worker/src/test/java/org/apache/dolphinscheduler/server/worker/worker/processor/TaskExecuteProcessorTest.java
+++
b/dolphinscheduler-worker/src/test/java/org/apache/dolphinscheduler/server/worker/processor/TaskExecuteProcessorTest.java
@@ -49,7 +49,6 @@ import org.powermock.api.mockito.PowerMockito;
import org.powermock.core.classloader.annotations.PrepareForTest;
import org.powermock.modules.junit4.PowerMockRunner;
import org.slf4j.Logger;
-import org.slf4j.LoggerFactory;
/**
* test task execute processor
@@ -74,7 +73,6 @@ public class TaskExecuteProcessorTest {
private TaskExecuteRequestCommand taskRequestCommand;
-
private AlertClientService alertClientService;
private WorkerManagerThread workerManager;
diff --git
a/dolphinscheduler-worker/src/test/java/org/apache/dolphinscheduler/server/worker/worker/registry/WorkerRegistryClientTest.java
b/dolphinscheduler-worker/src/test/java/org/apache/dolphinscheduler/server/worker/registry/WorkerRegistryClientTest.java
similarity index 100%
rename from
dolphinscheduler-worker/src/test/java/org/apache/dolphinscheduler/server/worker/worker/registry/WorkerRegistryClientTest.java
rename to
dolphinscheduler-worker/src/test/java/org/apache/dolphinscheduler/server/worker/registry/WorkerRegistryClientTest.java