This is an automated email from the ASF dual-hosted git repository.
journey pushed a commit to branch refactor-worker
in repository https://gitbox.apache.org/repos/asf/incubator-dolphinscheduler.git
The following commit(s) were added to refs/heads/refactor-worker by this push:
new e720ca5 Refactor worker (#2312)
e720ca5 is described below
commit e720ca54630817e0950d5dd6afab7fe2c47c9258
Author: Tboy <[email protected]>
AuthorDate: Thu Mar 26 10:20:20 2020 +0800
Refactor worker (#2312)
* let quartz use the same datasource
* move master/worker config from dao.properties to each config
add master/worker registry test
* move mybatis config from application.properties to SpringConnectionFactory
* move mybatis-plus config from application.properties to
SpringConnectionFactory
* refactor TaskCallbackService
* add ZookeeperNodeManagerTest
* add NettyExecutorManagerTest
* refactor TaskKillProcessor
* add RandomSelectorTest, RoundRobinSelectorTest, TaskCallbackServiceTest
* add RoundRobinHostManagerTest, ExecutorDispatcherTest
---
.../master/dispatch/host/CommonHostManager.java | 2 +-
.../master/dispatch/ExecutorDispatcherTest.java | 82 ++++++++++++++++++++++
.../dispatch/host/RoundRobinHostManagerTest.java | 78 ++++++++++++++++++++
.../{ => assign}/LowerWeightRoundRobinTest.java | 4 +-
.../host/{ => assign}/RandomSelectorTest.java | 3 +-
.../host/{ => assign}/RoundRobinSelectorTest.java | 3 +-
.../server/registry/DependencyConfig.java | 13 ++++
.../server/utils/ExecutionContextTestUtils.java | 54 ++++++++++++++
8 files changed, 231 insertions(+), 8 deletions(-)
diff --git
a/dolphinscheduler-server/src/main/java/org/apache/dolphinscheduler/server/master/dispatch/host/CommonHostManager.java
b/dolphinscheduler-server/src/main/java/org/apache/dolphinscheduler/server/master/dispatch/host/CommonHostManager.java
index 080ce7a..58006bf 100644
---
a/dolphinscheduler-server/src/main/java/org/apache/dolphinscheduler/server/master/dispatch/host/CommonHostManager.java
+++
b/dolphinscheduler-server/src/main/java/org/apache/dolphinscheduler/server/master/dispatch/host/CommonHostManager.java
@@ -76,7 +76,7 @@ public abstract class CommonHostManager implements
HostManager {
return select(candidateHosts);
}
- public abstract Host select(Collection<Host> nodes);
+ protected abstract Host select(Collection<Host> nodes);
public void setZookeeperNodeManager(ZookeeperNodeManager
zookeeperNodeManager) {
this.zookeeperNodeManager = zookeeperNodeManager;
diff --git
a/dolphinscheduler-server/src/test/java/org/apache/dolphinscheduler/server/master/dispatch/ExecutorDispatcherTest.java
b/dolphinscheduler-server/src/test/java/org/apache/dolphinscheduler/server/master/dispatch/ExecutorDispatcherTest.java
new file mode 100644
index 0000000..958df01
--- /dev/null
+++
b/dolphinscheduler-server/src/test/java/org/apache/dolphinscheduler/server/master/dispatch/ExecutorDispatcherTest.java
@@ -0,0 +1,82 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.dolphinscheduler.server.master.dispatch;
+
+
+import org.apache.dolphinscheduler.remote.NettyRemotingServer;
+import org.apache.dolphinscheduler.remote.config.NettyServerConfig;
+import
org.apache.dolphinscheduler.server.master.dispatch.context.ExecutionContext;
+import
org.apache.dolphinscheduler.server.master.dispatch.exceptions.ExecuteException;
+import
org.apache.dolphinscheduler.server.master.dispatch.executor.NettyExecutorManager;
+import org.apache.dolphinscheduler.server.registry.DependencyConfig;
+import org.apache.dolphinscheduler.server.registry.ZookeeperNodeManager;
+import org.apache.dolphinscheduler.server.registry.ZookeeperRegistryCenter;
+import org.apache.dolphinscheduler.server.utils.ExecutionContextTestUtils;
+import org.apache.dolphinscheduler.server.worker.config.WorkerConfig;
+import
org.apache.dolphinscheduler.server.worker.processor.TaskExecuteProcessor;
+import org.apache.dolphinscheduler.server.worker.registry.WorkerRegistry;
+import org.apache.dolphinscheduler.server.zk.SpringZKServer;
+import org.apache.dolphinscheduler.service.bean.SpringApplicationContext;
+import org.apache.dolphinscheduler.service.zk.ZookeeperCachedOperator;
+import org.apache.dolphinscheduler.service.zk.ZookeeperConfig;
+import org.junit.Test;
+import org.junit.runner.RunWith;
+import org.mockito.Mockito;
+import org.springframework.beans.factory.annotation.Autowired;
+import org.springframework.test.context.ContextConfiguration;
+import org.springframework.test.context.junit4.SpringJUnit4ClassRunner;
+
+/**
+ * executor dispatch test
+ */
+@RunWith(SpringJUnit4ClassRunner.class)
+@ContextConfiguration(classes={DependencyConfig.class,
SpringApplicationContext.class, SpringZKServer.class, WorkerRegistry.class,
+ NettyExecutorManager.class, ExecutorDispatcher.class,
ZookeeperRegistryCenter.class, WorkerConfig.class,
+ ZookeeperNodeManager.class, ZookeeperCachedOperator.class,
ZookeeperConfig.class})
+public class ExecutorDispatcherTest {
+
+ @Autowired
+ private ExecutorDispatcher executorDispatcher;
+
+ @Autowired
+ private WorkerRegistry workerRegistry;
+
+ @Autowired
+ private WorkerConfig workerConfig;
+
+ @Test(expected = ExecuteException.class)
+ public void testDispatchWithException() throws ExecuteException {
+ ExecutionContext executionContext =
ExecutionContextTestUtils.getExecutionContext(10000);
+ executorDispatcher.dispatch(executionContext);
+ }
+
+ @Test
+ public void testDispatch() throws ExecuteException {
+ int port = 30000;
+ final NettyServerConfig serverConfig = new NettyServerConfig();
+ serverConfig.setListenPort(port);
+ NettyRemotingServer nettyRemotingServer = new
NettyRemotingServer(serverConfig);
+
nettyRemotingServer.registerProcessor(org.apache.dolphinscheduler.remote.command.CommandType.TASK_EXECUTE_REQUEST,
Mockito.mock(TaskExecuteProcessor.class));
+ nettyRemotingServer.start();
+ //
+ workerConfig.setListenPort(port);
+ workerRegistry.registry();
+
+ ExecutionContext executionContext =
ExecutionContextTestUtils.getExecutionContext(port);
+ executorDispatcher.dispatch(executionContext);
+ }
+}
diff --git
a/dolphinscheduler-server/src/test/java/org/apache/dolphinscheduler/server/master/dispatch/host/RoundRobinHostManagerTest.java
b/dolphinscheduler-server/src/test/java/org/apache/dolphinscheduler/server/master/dispatch/host/RoundRobinHostManagerTest.java
new file mode 100644
index 0000000..e223a76
--- /dev/null
+++
b/dolphinscheduler-server/src/test/java/org/apache/dolphinscheduler/server/master/dispatch/host/RoundRobinHostManagerTest.java
@@ -0,0 +1,78 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.dolphinscheduler.server.master.dispatch.host;
+
+
+import org.apache.dolphinscheduler.common.utils.OSUtils;
+import org.apache.dolphinscheduler.common.utils.StringUtils;
+import org.apache.dolphinscheduler.remote.utils.Host;
+import
org.apache.dolphinscheduler.server.master.dispatch.context.ExecutionContext;
+import org.apache.dolphinscheduler.server.registry.DependencyConfig;
+import org.apache.dolphinscheduler.server.registry.ZookeeperNodeManager;
+import org.apache.dolphinscheduler.server.registry.ZookeeperRegistryCenter;
+import org.apache.dolphinscheduler.server.utils.ExecutionContextTestUtils;
+import org.apache.dolphinscheduler.server.worker.config.WorkerConfig;
+import org.apache.dolphinscheduler.server.worker.registry.WorkerRegistry;
+import org.apache.dolphinscheduler.server.zk.SpringZKServer;
+import org.apache.dolphinscheduler.service.zk.ZookeeperCachedOperator;
+import org.apache.dolphinscheduler.service.zk.ZookeeperConfig;
+import org.junit.Assert;
+import org.junit.Test;
+import org.junit.runner.RunWith;
+import org.springframework.beans.factory.annotation.Autowired;
+import org.springframework.test.context.ContextConfiguration;
+import org.springframework.test.context.junit4.SpringRunner;
+
+
+/**
+ * round robin host manager test
+ */
+@RunWith(SpringRunner.class)
+@ContextConfiguration(classes={DependencyConfig.class, SpringZKServer.class,
WorkerRegistry.class, ZookeeperRegistryCenter.class, WorkerConfig.class,
+ ZookeeperNodeManager.class, ZookeeperCachedOperator.class,
ZookeeperConfig.class})
+public class RoundRobinHostManagerTest {
+
+
+ @Autowired
+ private ZookeeperNodeManager zookeeperNodeManager;
+
+ @Autowired
+ private WorkerRegistry workerRegistry;
+
+ @Autowired
+ private WorkerConfig workerConfig;
+
+ @Test
+ public void testSelectWithEmptyResult(){
+ RoundRobinHostManager roundRobinHostManager = new
RoundRobinHostManager();
+ roundRobinHostManager.setZookeeperNodeManager(zookeeperNodeManager);
+ ExecutionContext context =
ExecutionContextTestUtils.getExecutionContext(10000);
+ Host emptyHost = roundRobinHostManager.select(context);
+ Assert.assertTrue(StringUtils.isEmpty(emptyHost.getAddress()));
+ }
+
+ @Test
+ public void testSelectWithResult(){
+ workerRegistry.registry();
+ RoundRobinHostManager roundRobinHostManager = new
RoundRobinHostManager();
+ roundRobinHostManager.setZookeeperNodeManager(zookeeperNodeManager);
+ ExecutionContext context =
ExecutionContextTestUtils.getExecutionContext(10000);
+ Host host = roundRobinHostManager.select(context);
+ Assert.assertTrue(StringUtils.isNotEmpty(host.getAddress()));
+ Assert.assertTrue(host.getAddress().equalsIgnoreCase(OSUtils.getHost()
+ ":" + workerConfig.getListenPort()));
+ }
+}
diff --git
a/dolphinscheduler-server/src/test/java/org/apache/dolphinscheduler/server/master/dispatch/host/LowerWeightRoundRobinTest.java
b/dolphinscheduler-server/src/test/java/org/apache/dolphinscheduler/server/master/dispatch/host/assign/LowerWeightRoundRobinTest.java
similarity index 86%
rename from
dolphinscheduler-server/src/test/java/org/apache/dolphinscheduler/server/master/dispatch/host/LowerWeightRoundRobinTest.java
rename to
dolphinscheduler-server/src/test/java/org/apache/dolphinscheduler/server/master/dispatch/host/assign/LowerWeightRoundRobinTest.java
index 10936a6..fadaa84 100644
---
a/dolphinscheduler-server/src/test/java/org/apache/dolphinscheduler/server/master/dispatch/host/LowerWeightRoundRobinTest.java
+++
b/dolphinscheduler-server/src/test/java/org/apache/dolphinscheduler/server/master/dispatch/host/assign/LowerWeightRoundRobinTest.java
@@ -14,11 +14,9 @@
* See the License for the specific language governing permissions and
* limitations under the License.
*/
-package org.apache.dolphinscheduler.server.master.dispatch.host;
+package org.apache.dolphinscheduler.server.master.dispatch.host.assign;
import org.apache.dolphinscheduler.remote.utils.Host;
-import
org.apache.dolphinscheduler.server.master.dispatch.host.assign.HostWeight;
-import
org.apache.dolphinscheduler.server.master.dispatch.host.assign.LowerWeightRoundRobin;
import org.junit.Test;
import java.util.ArrayList;
diff --git
a/dolphinscheduler-server/src/test/java/org/apache/dolphinscheduler/server/master/dispatch/host/RandomSelectorTest.java
b/dolphinscheduler-server/src/test/java/org/apache/dolphinscheduler/server/master/dispatch/host/assign/RandomSelectorTest.java
similarity index 92%
rename from
dolphinscheduler-server/src/test/java/org/apache/dolphinscheduler/server/master/dispatch/host/RandomSelectorTest.java
rename to
dolphinscheduler-server/src/test/java/org/apache/dolphinscheduler/server/master/dispatch/host/assign/RandomSelectorTest.java
index 1d7e03e..a14ea32 100644
---
a/dolphinscheduler-server/src/test/java/org/apache/dolphinscheduler/server/master/dispatch/host/RandomSelectorTest.java
+++
b/dolphinscheduler-server/src/test/java/org/apache/dolphinscheduler/server/master/dispatch/host/assign/RandomSelectorTest.java
@@ -14,10 +14,9 @@
* See the License for the specific language governing permissions and
* limitations under the License.
*/
-package org.apache.dolphinscheduler.server.master.dispatch.host;
+package org.apache.dolphinscheduler.server.master.dispatch.host.assign;
import org.apache.dolphinscheduler.common.utils.StringUtils;
-import
org.apache.dolphinscheduler.server.master.dispatch.host.assign.RandomSelector;
import org.junit.Assert;
import org.junit.Test;
diff --git
a/dolphinscheduler-server/src/test/java/org/apache/dolphinscheduler/server/master/dispatch/host/RoundRobinSelectorTest.java
b/dolphinscheduler-server/src/test/java/org/apache/dolphinscheduler/server/master/dispatch/host/assign/RoundRobinSelectorTest.java
similarity index 92%
rename from
dolphinscheduler-server/src/test/java/org/apache/dolphinscheduler/server/master/dispatch/host/RoundRobinSelectorTest.java
rename to
dolphinscheduler-server/src/test/java/org/apache/dolphinscheduler/server/master/dispatch/host/assign/RoundRobinSelectorTest.java
index a34e667..adc55a4 100644
---
a/dolphinscheduler-server/src/test/java/org/apache/dolphinscheduler/server/master/dispatch/host/RoundRobinSelectorTest.java
+++
b/dolphinscheduler-server/src/test/java/org/apache/dolphinscheduler/server/master/dispatch/host/assign/RoundRobinSelectorTest.java
@@ -14,10 +14,9 @@
* See the License for the specific language governing permissions and
* limitations under the License.
*/
-package org.apache.dolphinscheduler.server.master.dispatch.host;
+package org.apache.dolphinscheduler.server.master.dispatch.host.assign;
import org.apache.dolphinscheduler.common.utils.StringUtils;
-import
org.apache.dolphinscheduler.server.master.dispatch.host.assign.RoundRobinSelector;
import org.junit.Assert;
import org.junit.Test;
diff --git
a/dolphinscheduler-server/src/test/java/org/apache/dolphinscheduler/server/registry/DependencyConfig.java
b/dolphinscheduler-server/src/test/java/org/apache/dolphinscheduler/server/registry/DependencyConfig.java
index cd5a221..2d6e3ec 100644
---
a/dolphinscheduler-server/src/test/java/org/apache/dolphinscheduler/server/registry/DependencyConfig.java
+++
b/dolphinscheduler-server/src/test/java/org/apache/dolphinscheduler/server/registry/DependencyConfig.java
@@ -20,6 +20,9 @@ package org.apache.dolphinscheduler.server.registry;
import org.apache.dolphinscheduler.dao.AlertDao;
import org.apache.dolphinscheduler.dao.mapper.*;
import
org.apache.dolphinscheduler.server.master.cache.impl.TaskInstanceCacheManagerImpl;
+import org.apache.dolphinscheduler.server.master.dispatch.host.HostManager;
+import
org.apache.dolphinscheduler.server.master.dispatch.host.RandomHostManager;
+import org.apache.dolphinscheduler.server.master.manager.TaskManager;
import org.apache.dolphinscheduler.server.worker.processor.TaskCallbackService;
import org.apache.dolphinscheduler.service.process.ProcessService;
import org.mockito.Mockito;
@@ -131,4 +134,14 @@ public class DependencyConfig {
public TaskCallbackService taskCallbackService(){
return Mockito.mock(TaskCallbackService.class);
}
+
+ @Bean
+ public HostManager hostManager(){
+ return new RandomHostManager();
+ }
+
+ @Bean
+ public TaskManager taskManager(){
+ return Mockito.mock(TaskManager.class);
+ }
}
diff --git
a/dolphinscheduler-server/src/test/java/org/apache/dolphinscheduler/server/utils/ExecutionContextTestUtils.java
b/dolphinscheduler-server/src/test/java/org/apache/dolphinscheduler/server/utils/ExecutionContextTestUtils.java
new file mode 100644
index 0000000..26d904f
--- /dev/null
+++
b/dolphinscheduler-server/src/test/java/org/apache/dolphinscheduler/server/utils/ExecutionContextTestUtils.java
@@ -0,0 +1,54 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.dolphinscheduler.server.utils;
+
+
+import org.apache.dolphinscheduler.common.enums.CommandType;
+import org.apache.dolphinscheduler.common.utils.OSUtils;
+import org.apache.dolphinscheduler.dao.entity.ProcessDefinition;
+import org.apache.dolphinscheduler.dao.entity.ProcessInstance;
+import org.apache.dolphinscheduler.dao.entity.TaskInstance;
+import org.apache.dolphinscheduler.remote.utils.Host;
+import org.apache.dolphinscheduler.server.builder.TaskExecutionContextBuilder;
+import org.apache.dolphinscheduler.server.entity.TaskExecutionContext;
+import
org.apache.dolphinscheduler.server.master.dispatch.context.ExecutionContext;
+import org.apache.dolphinscheduler.server.master.dispatch.enums.ExecutorType;
+import org.mockito.Mockito;
+
+/**
+ * for test use only
+ */
+public class ExecutionContextTestUtils {
+
+
+ public static ExecutionContext getExecutionContext(int port){
+ TaskInstance taskInstance = Mockito.mock(TaskInstance.class);
+ ProcessDefinition processDefinition =
Mockito.mock(ProcessDefinition.class);
+ ProcessInstance processInstance = new ProcessInstance();
+ processInstance.setCommandType(CommandType.COMPLEMENT_DATA);
+ taskInstance.setProcessInstance(processInstance);
+ TaskExecutionContext context = TaskExecutionContextBuilder.get()
+ .buildTaskInstanceRelatedInfo(taskInstance)
+ .buildProcessInstanceRelatedInfo(processInstance)
+ .buildProcessDefinitionRelatedInfo(processDefinition)
+ .create();
+ ExecutionContext executionContext = new
ExecutionContext(context.toCommand(), ExecutorType.WORKER);
+ executionContext.setHost(Host.of(OSUtils.getHost() + ":" + port));
+
+ return executionContext;
+ }
+}