This is an automated email from the ASF dual-hosted git repository.
journey pushed a commit to branch refactor-worker
in repository https://gitbox.apache.org/repos/asf/incubator-dolphinscheduler.git
The following commit(s) were added to refs/heads/refactor-worker by this push:
new 4eebde8 Refactor worker (#2103)
4eebde8 is described below
commit 4eebde835594e0a7f719eaf1d3d288bbbdf9e3f0
Author: Tboy <[email protected]>
AuthorDate: Fri Mar 6 18:18:49 2020 +0800
Refactor worker (#2103)
* refactor kill logic
* refactor ExecutionContext
* refactor worker
---
.../server/master/MasterServer.java | 13 +--
.../server/worker/WorkerServer.java | 27 +-----
.../dolphinscheduler/server/zk/ZKMasterClient.java | 4 -
.../dolphinscheduler/server/zk/ZKWorkerClient.java | 103 ---------------------
.../service/zk/AbstractZKClient.java | 17 ----
5 files changed, 8 insertions(+), 156 deletions(-)
diff --git
a/dolphinscheduler-server/src/main/java/org/apache/dolphinscheduler/server/master/MasterServer.java
b/dolphinscheduler-server/src/main/java/org/apache/dolphinscheduler/server/master/MasterServer.java
index 12fe25b..292bfae 100644
---
a/dolphinscheduler-server/src/main/java/org/apache/dolphinscheduler/server/master/MasterServer.java
+++
b/dolphinscheduler-server/src/main/java/org/apache/dolphinscheduler/server/master/MasterServer.java
@@ -54,7 +54,7 @@ import java.util.concurrent.ExecutorService;
* master server
*/
@ComponentScan("org.apache.dolphinscheduler")
-public class MasterServer implements IStoppable {
+public class MasterServer {
/**
* logger of MasterServer
@@ -142,8 +142,6 @@ public class MasterServer implements IStoppable {
masterSchedulerService =
ThreadUtils.newDaemonSingleThreadExecutor("Master-Scheduler-Thread");
- zkMasterClient.setStoppable(this);
-
// master scheduler thread
MasterSchedulerThread masterSchedulerThread = new
MasterSchedulerThread(
zkMasterClient,
@@ -180,7 +178,7 @@ public class MasterServer implements IStoppable {
zkMasterClient.getAlertDao().sendServerStopedAlert(
1, OSUtils.getHost(), "Master-Server");
}
- stop("shutdownhook");
+ close("shutdownhook");
}
}));
}
@@ -190,8 +188,7 @@ public class MasterServer implements IStoppable {
* gracefully stop
* @param cause why stopping
*/
- @Override
- public synchronized void stop(String cause) {
+ public void close(String cause) {
try {
//execute only once
@@ -225,10 +222,10 @@ public class MasterServer implements IStoppable {
try {
ThreadPoolExecutors.getInstance().shutdown();
}catch (Exception e){
- logger.warn("threadpool service stopped
exception:{}",e.getMessage());
+ logger.warn("threadPool service stopped
exception:{}",e.getMessage());
}
- logger.info("threadpool service stopped");
+ logger.info("threadPool service stopped");
try {
masterSchedulerService.shutdownNow();
diff --git
a/dolphinscheduler-server/src/main/java/org/apache/dolphinscheduler/server/worker/WorkerServer.java
b/dolphinscheduler-server/src/main/java/org/apache/dolphinscheduler/server/worker/WorkerServer.java
index fb35f47..ff8ff00 100644
---
a/dolphinscheduler-server/src/main/java/org/apache/dolphinscheduler/server/worker/WorkerServer.java
+++
b/dolphinscheduler-server/src/main/java/org/apache/dolphinscheduler/server/worker/WorkerServer.java
@@ -17,7 +17,6 @@
package org.apache.dolphinscheduler.server.worker;
import org.apache.dolphinscheduler.common.Constants;
-import org.apache.dolphinscheduler.common.IStoppable;
import org.apache.dolphinscheduler.common.thread.Stopper;
import org.apache.dolphinscheduler.common.thread.ThreadPoolExecutors;
import org.apache.dolphinscheduler.common.thread.ThreadUtils;
@@ -29,9 +28,7 @@ import
org.apache.dolphinscheduler.server.worker.config.WorkerConfig;
import
org.apache.dolphinscheduler.server.worker.processor.TaskExecuteProcessor;
import org.apache.dolphinscheduler.server.worker.processor.TaskKillProcessor;
import org.apache.dolphinscheduler.server.worker.registry.WorkerRegistry;
-import org.apache.dolphinscheduler.server.zk.ZKWorkerClient;
import org.apache.dolphinscheduler.service.bean.SpringApplicationContext;
-import org.apache.dolphinscheduler.service.queue.TaskQueueFactory;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.springframework.beans.factory.annotation.Autowired;
@@ -47,20 +44,13 @@ import java.util.concurrent.ExecutorService;
* worker server
*/
@ComponentScan("org.apache.dolphinscheduler")
-public class WorkerServer implements IStoppable {
+public class WorkerServer {
/**
* logger
*/
private static final Logger logger =
LoggerFactory.getLogger(WorkerServer.class);
- /**
- * zk worker client
- */
- @Autowired
- private ZKWorkerClient zkWorkerClient = null;
-
-
/**
* fetch task executor service
@@ -130,21 +120,16 @@ public class WorkerServer implements IStoppable {
this.workerRegistry = new WorkerRegistry(zookeeperRegistryCenter,
serverConfig.getListenPort(), workerConfig.getWorkerHeartbeatInterval(),
workerConfig.getWorkerGroup());
this.workerRegistry.registry();
- this.zkWorkerClient.init();
-
-
this.fetchTaskExecutorService =
ThreadUtils.newDaemonSingleThreadExecutor("Worker-Fetch-Thread-Executor");
- zkWorkerClient.setStoppable(this);
-
/**
* register hooks, which are called before the process exits
*/
Runtime.getRuntime().addShutdownHook(new Thread(new Runnable() {
@Override
public void run() {
- stop("shutdownHook");
+ close("shutdownHook");
}
}));
@@ -156,8 +141,7 @@ public class WorkerServer implements IStoppable {
}
}
- @Override
- public synchronized void stop(String cause) {
+ public void close(String cause) {
try {
//execute only once
@@ -195,11 +179,6 @@ public class WorkerServer implements IStoppable {
}
logger.info("worker fetch task service stopped");
- try{
- zkWorkerClient.close();
- }catch (Exception e){
- logger.warn("zookeeper service stopped
exception:{}",e.getMessage());
- }
latch.countDown();
logger.info("zookeeper service stopped");
diff --git
a/dolphinscheduler-server/src/main/java/org/apache/dolphinscheduler/server/zk/ZKMasterClient.java
b/dolphinscheduler-server/src/main/java/org/apache/dolphinscheduler/server/zk/ZKMasterClient.java
index 77d2139..7fc91dc 100644
---
a/dolphinscheduler-server/src/main/java/org/apache/dolphinscheduler/server/zk/ZKMasterClient.java
+++
b/dolphinscheduler-server/src/main/java/org/apache/dolphinscheduler/server/zk/ZKMasterClient.java
@@ -245,10 +245,6 @@ public class ZKMasterClient extends AbstractZKClient {
logger.info("master node added : {}", path);
break;
case NODE_REMOVED:
- String serverHost =
getHostByEventDataPath(path);
- if (checkServerSelfDead(serverHost,
ZKNodeType.MASTER)) {
- return;
- }
removeZKNodePath(path, ZKNodeType.MASTER, true);
break;
default:
diff --git
a/dolphinscheduler-server/src/main/java/org/apache/dolphinscheduler/server/zk/ZKWorkerClient.java
b/dolphinscheduler-server/src/main/java/org/apache/dolphinscheduler/server/zk/ZKWorkerClient.java
deleted file mode 100644
index a1d70f8..0000000
---
a/dolphinscheduler-server/src/main/java/org/apache/dolphinscheduler/server/zk/ZKWorkerClient.java
+++ /dev/null
@@ -1,103 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements. See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package org.apache.dolphinscheduler.server.zk;
-
-import org.apache.curator.framework.recipes.cache.TreeCacheEvent;
-import org.apache.dolphinscheduler.common.Constants;
-import org.apache.dolphinscheduler.common.enums.ZKNodeType;
-import org.apache.commons.lang.StringUtils;
-import org.apache.curator.framework.CuratorFramework;
-import org.apache.dolphinscheduler.service.zk.AbstractZKClient;
-import org.slf4j.Logger;
-import org.slf4j.LoggerFactory;
-import org.springframework.stereotype.Component;
-
-
-/**
- * zookeeper worker client
- * single instance
- */
-@Component
-public class ZKWorkerClient extends AbstractZKClient {
-
- /**
- * logger
- */
- private static final Logger logger =
LoggerFactory.getLogger(ZKWorkerClient.class);
-
-
- /**
- * worker znode
- */
- private String workerZNode = null;
-
-
- /**
- * init
- */
- public void init(){
-
- logger.info("initialize worker client...");
- // init system znode
- this.initSystemZNode();
-
- }
-
- /**
- * handle path events that this class cares about
- * @param client zkClient
- * @param event path event
- * @param path zk path
- */
- @Override
- protected void dataChanged(CuratorFramework client, TreeCacheEvent
event, String path) {
-
if(path.startsWith(getZNodeParentPath(ZKNodeType.WORKER)+Constants.SINGLE_SLASH)){
- handleWorkerEvent(event,path);
- }
- }
-
- /**
- * monitor worker
- * @param event event
- * @param path path
- */
- public void handleWorkerEvent(TreeCacheEvent event, String path){
- switch (event.getType()) {
- case NODE_ADDED:
- logger.info("worker node added : {}", path);
- break;
- case NODE_REMOVED:
- //find myself dead
- String serverHost =
getHostByEventDataPath(path);
- if(checkServerSelfDead(serverHost,
ZKNodeType.WORKER)){
- return;
- }
- break;
- default:
- break;
- }
- }
-
- /**
- * get worker znode
- * @return worker zookeeper node
- */
- public String getWorkerZNode() {
- return workerZNode;
- }
-
-}
diff --git
a/dolphinscheduler-service/src/main/java/org/apache/dolphinscheduler/service/zk/AbstractZKClient.java
b/dolphinscheduler-service/src/main/java/org/apache/dolphinscheduler/service/zk/AbstractZKClient.java
index 6e887f8..24bf259 100644
---
a/dolphinscheduler-service/src/main/java/org/apache/dolphinscheduler/service/zk/AbstractZKClient.java
+++
b/dolphinscheduler-service/src/main/java/org/apache/dolphinscheduler/service/zk/AbstractZKClient.java
@@ -370,23 +370,6 @@ public abstract class AbstractZKClient extends
ZookeeperCachedOperator {
}
/**
- * server self dead, stop all threads
- * @param serverHost server host
- * @param zkNodeType zookeeper node type
- * @return true if server dead and stop all threads
- */
- protected boolean checkServerSelfDead(String serverHost, ZKNodeType
zkNodeType) {
- if (serverHost.equals(OSUtils.getHost())) {
- logger.error("{} server({}) of myself dead ,
stopping...",
- zkNodeType.toString(), serverHost);
- stoppable.stop(String.format(" %s server %s of myself
dead , stopping...",
- zkNodeType.toString(), serverHost));
- return true;
- }
- return false;
- }
-
- /**
* get host ip, string format: masterParentPath/ip
* @param path path
* @return host ip, string format: masterParentPath/ip