This is an automated email from the ASF dual-hosted git repository.
journey pushed a commit to branch refactor-worker
in repository https://gitbox.apache.org/repos/asf/incubator-dolphinscheduler.git
The following commit(s) were added to refs/heads/refactor-worker by this push:
new 23ba035 add TaskResponseServiceTest (#2325)
23ba035 is described below
commit 23ba035182a30700a3589f4bdd1212b523e89ea4
Author: Tboy <[email protected]>
AuthorDate: Fri Mar 27 22:03:09 2020 +0800
add TaskResponseServiceTest (#2325)
* let quartz use the same datasource
* move master/worker config from dao.properties to each config
add master/worker registry test
* move mybatis config from application.properties to SpringConnectionFactory
* move mybatis-plus config from application.properties to
SpringConnectionFactory
* refactor TaskCallbackService
* add ZookeeperNodeManagerTest
* add NettyExecutorManagerTest
* refactor TaskKillProcessor
* add RandomSelectorTest, RoundRobinSelectorTest, TaskCallbackServiceTest
* add RoundRobinHostManagerTest, ExecutorDispatcherTest
* refactor task response service
* add TaskResponseServiceTest
---
.../processor/queue/TaskResponseService.java | 100 +++++++++++++--------
.../processor/queue/TaskResponseServiceTest.java | 66 ++++++++++++++
2 files changed, 129 insertions(+), 37 deletions(-)
diff --git
a/dolphinscheduler-server/src/main/java/org/apache/dolphinscheduler/server/master/processor/queue/TaskResponseService.java
b/dolphinscheduler-server/src/main/java/org/apache/dolphinscheduler/server/master/processor/queue/TaskResponseService.java
index c471c9c..b9772ca 100644
---
a/dolphinscheduler-server/src/main/java/org/apache/dolphinscheduler/server/master/processor/queue/TaskResponseService.java
+++
b/dolphinscheduler-server/src/main/java/org/apache/dolphinscheduler/server/master/processor/queue/TaskResponseService.java
@@ -25,6 +25,9 @@ import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.stereotype.Component;
import javax.annotation.PostConstruct;
+import javax.annotation.PreDestroy;
+import java.util.ArrayList;
+import java.util.List;
import java.util.concurrent.BlockingQueue;
import java.util.concurrent.LinkedBlockingQueue;
@@ -37,12 +40,12 @@ public class TaskResponseService {
/**
* logger
*/
- private static final Logger logger =
LoggerFactory.getLogger(TaskResponseService.class);
+ private final Logger logger =
LoggerFactory.getLogger(TaskResponseService.class);
/**
* attemptQueue
*/
- private final BlockingQueue<TaskResponseEvent> attemptQueue = new
LinkedBlockingQueue<>(5000);
+ private final BlockingQueue<TaskResponseEvent> eventQueue = new
LinkedBlockingQueue<>(5000);
/**
@@ -51,12 +54,29 @@ public class TaskResponseService {
@Autowired
private ProcessService processService;
+ /**
+ * task response worker
+ */
+ private Thread taskResponseWorker;
+
@PostConstruct
- public void init(){
- TaskWorker taskWorker = new TaskWorker();
- taskWorker.setName("TaskWorkerThread");
- taskWorker.start();
+ public void start(){
+ this.taskResponseWorker = new TaskResponseWorker();
+ this.taskResponseWorker.setName("TaskResponseWorker");
+ this.taskResponseWorker.start();
+ }
+
+ @PreDestroy
+ public void stop(){
+ this.taskResponseWorker.interrupt();
+ if(!eventQueue.isEmpty()){
+ List<TaskResponseEvent> remainEvents = new
ArrayList<>(eventQueue.size());
+ eventQueue.drainTo(remainEvents);
+ for(TaskResponseEvent event : remainEvents){
+ this.persist(event);
+ }
+ }
}
/**
@@ -66,7 +86,7 @@ public class TaskResponseService {
*/
public void addResponse(TaskResponseEvent taskResponseEvent){
try {
- attemptQueue.put(taskResponseEvent);
+ eventQueue.put(taskResponseEvent);
} catch (InterruptedException e) {
logger.error("put task : {} error :{}", taskResponseEvent,e);
}
@@ -76,7 +96,7 @@ public class TaskResponseService {
/**
* task worker thread
*/
- class TaskWorker extends Thread {
+ class TaskResponseWorker extends Thread {
@Override
public void run() {
@@ -84,41 +104,47 @@ public class TaskResponseService {
while (Stopper.isRunning()){
try {
// if not task , blocking here
- TaskResponseEvent taskResponseEvent = attemptQueue.take();
+ TaskResponseEvent taskResponseEvent = eventQueue.take();
persist(taskResponseEvent);
-
- }catch (Exception e){
+ } catch (InterruptedException e){
+ break;
+ } catch (Exception e){
logger.error("persist task error",e);
}
}
+ logger.info("TaskResponseWorker stopped");
}
+ }
- /**
- * persist taskResponseEvent
- * @param taskResponseEvent taskResponseEvent
- */
- private void persist(TaskResponseEvent taskResponseEvent){
- TaskResponseEvent.Event event = taskResponseEvent.getEvent();
-
- switch (event){
- case ACK:
-
processService.changeTaskState(taskResponseEvent.getState(),
- taskResponseEvent.getStartTime(),
- taskResponseEvent.getWorkerAddress(),
- taskResponseEvent.getExecutePath(),
- taskResponseEvent.getLogPath(),
- taskResponseEvent.getTaskInstanceId());
- break;
- case RESULT:
-
processService.changeTaskState(taskResponseEvent.getState(),
- taskResponseEvent.getEndTime(),
- taskResponseEvent.getProcessId(),
- taskResponseEvent.getAppIds(),
- taskResponseEvent.getTaskInstanceId());
- break;
- default:
- throw new IllegalArgumentException("invalid event type : "
+ event);
- }
+ /**
+ * persist taskResponseEvent
+ * @param taskResponseEvent taskResponseEvent
+ */
+ private void persist(TaskResponseEvent taskResponseEvent){
+ TaskResponseEvent.Event event = taskResponseEvent.getEvent();
+
+ switch (event){
+ case ACK:
+ processService.changeTaskState(taskResponseEvent.getState(),
+ taskResponseEvent.getStartTime(),
+ taskResponseEvent.getWorkerAddress(),
+ taskResponseEvent.getExecutePath(),
+ taskResponseEvent.getLogPath(),
+ taskResponseEvent.getTaskInstanceId());
+ break;
+ case RESULT:
+ processService.changeTaskState(taskResponseEvent.getState(),
+ taskResponseEvent.getEndTime(),
+ taskResponseEvent.getProcessId(),
+ taskResponseEvent.getAppIds(),
+ taskResponseEvent.getTaskInstanceId());
+ break;
+ default:
+ throw new IllegalArgumentException("invalid event type : " +
event);
}
}
+
+ public BlockingQueue<TaskResponseEvent> getEventQueue() {
+ return eventQueue;
+ }
}
diff --git
a/dolphinscheduler-server/src/test/java/org/apache/dolphinscheduler/server/master/processor/queue/TaskResponseServiceTest.java
b/dolphinscheduler-server/src/test/java/org/apache/dolphinscheduler/server/master/processor/queue/TaskResponseServiceTest.java
new file mode 100644
index 0000000..dcba832
--- /dev/null
+++
b/dolphinscheduler-server/src/test/java/org/apache/dolphinscheduler/server/master/processor/queue/TaskResponseServiceTest.java
@@ -0,0 +1,66 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.dolphinscheduler.server.master.processor.queue;
+
+
+import org.apache.dolphinscheduler.common.enums.ExecutionStatus;
+import org.apache.dolphinscheduler.server.registry.DependencyConfig;
+import org.apache.dolphinscheduler.server.registry.ZookeeperNodeManager;
+import org.apache.dolphinscheduler.server.registry.ZookeeperRegistryCenter;
+import org.apache.dolphinscheduler.server.zk.SpringZKServer;
+import org.apache.dolphinscheduler.service.zk.ZookeeperCachedOperator;
+import org.apache.dolphinscheduler.service.zk.ZookeeperConfig;
+import org.junit.Assert;
+import org.junit.Test;
+import org.junit.runner.RunWith;
+import org.springframework.beans.factory.annotation.Autowired;
+import org.springframework.test.context.ContextConfiguration;
+import org.springframework.test.context.junit4.SpringJUnit4ClassRunner;
+
+import java.util.Date;
+
+@RunWith(SpringJUnit4ClassRunner.class)
+@ContextConfiguration(classes={DependencyConfig.class, SpringZKServer.class,
TaskResponseService.class, ZookeeperRegistryCenter.class,
+ ZookeeperCachedOperator.class, ZookeeperConfig.class,
ZookeeperNodeManager.class, TaskResponseService.class})
+public class TaskResponseServiceTest {
+
+ @Autowired
+ private TaskResponseService taskResponseService;
+
+ @Test
+ public void testAdd(){
+ TaskResponseEvent taskResponseEvent =
TaskResponseEvent.newAck(ExecutionStatus.RUNNING_EXEUTION, new Date(),
+ "", "", "", 1);
+ taskResponseService.addResponse(taskResponseEvent);
+ Assert.assertTrue(taskResponseService.getEventQueue().size() == 1);
+ try {
+ Thread.sleep(10);
+ } catch (InterruptedException ignore) {
+ }
+ //after sleep, inner worker will take the event
+ Assert.assertTrue(taskResponseService.getEventQueue().size() == 0);
+ }
+
+ @Test
+ public void testStop(){
+ TaskResponseEvent taskResponseEvent =
TaskResponseEvent.newAck(ExecutionStatus.RUNNING_EXEUTION, new Date(),
+ "", "", "", 1);
+ taskResponseService.addResponse(taskResponseEvent);
+ taskResponseService.stop();
+ Assert.assertTrue(taskResponseService.getEventQueue().size() == 0);
+ }
+}