This is an automated email from the ASF dual-hosted git repository.

journey pushed a commit to branch refactor-worker
in repository https://gitbox.apache.org/repos/asf/incubator-dolphinscheduler.git


The following commit(s) were added to refs/heads/refactor-worker by this push:
     new 4eebde8  Refactor worker (#2103)
4eebde8 is described below

commit 4eebde835594e0a7f719eaf1d3d288bbbdf9e3f0
Author: Tboy <[email protected]>
AuthorDate: Fri Mar 6 18:18:49 2020 +0800

    Refactor worker (#2103)
    
    * refactor kill logic
    
    * refactor ExecutionContext
    
    * refactor worker
---
 .../server/master/MasterServer.java                |  13 +--
 .../server/worker/WorkerServer.java                |  27 +-----
 .../dolphinscheduler/server/zk/ZKMasterClient.java |   4 -
 .../dolphinscheduler/server/zk/ZKWorkerClient.java | 103 ---------------------
 .../service/zk/AbstractZKClient.java               |  17 ----
 5 files changed, 8 insertions(+), 156 deletions(-)

diff --git 
a/dolphinscheduler-server/src/main/java/org/apache/dolphinscheduler/server/master/MasterServer.java
 
b/dolphinscheduler-server/src/main/java/org/apache/dolphinscheduler/server/master/MasterServer.java
index 12fe25b..292bfae 100644
--- 
a/dolphinscheduler-server/src/main/java/org/apache/dolphinscheduler/server/master/MasterServer.java
+++ 
b/dolphinscheduler-server/src/main/java/org/apache/dolphinscheduler/server/master/MasterServer.java
@@ -54,7 +54,7 @@ import java.util.concurrent.ExecutorService;
  * master server
  */
 @ComponentScan("org.apache.dolphinscheduler")
-public class MasterServer implements IStoppable {
+public class MasterServer {
 
     /**
      * logger of MasterServer
@@ -142,8 +142,6 @@ public class MasterServer implements IStoppable {
 
         masterSchedulerService = 
ThreadUtils.newDaemonSingleThreadExecutor("Master-Scheduler-Thread");
 
-        zkMasterClient.setStoppable(this);
-
         // master scheduler thread
         MasterSchedulerThread masterSchedulerThread = new 
MasterSchedulerThread(
                 zkMasterClient,
@@ -180,7 +178,7 @@ public class MasterServer implements IStoppable {
                     zkMasterClient.getAlertDao().sendServerStopedAlert(
                         1, OSUtils.getHost(), "Master-Server");
                 }
-                stop("shutdownhook");
+                close("shutdownhook");
             }
         }));
     }
@@ -190,8 +188,7 @@ public class MasterServer implements IStoppable {
      * gracefully stop
      * @param cause why stopping
      */
-    @Override
-    public synchronized void stop(String cause) {
+    public void close(String cause) {
 
         try {
             //execute only once
@@ -225,10 +222,10 @@ public class MasterServer implements IStoppable {
             try {
                 ThreadPoolExecutors.getInstance().shutdown();
             }catch (Exception e){
-                logger.warn("threadpool service stopped 
exception:{}",e.getMessage());
+                logger.warn("threadPool service stopped 
exception:{}",e.getMessage());
             }
 
-            logger.info("threadpool service stopped");
+            logger.info("threadPool service stopped");
 
             try {
                 masterSchedulerService.shutdownNow();
diff --git 
a/dolphinscheduler-server/src/main/java/org/apache/dolphinscheduler/server/worker/WorkerServer.java
 
b/dolphinscheduler-server/src/main/java/org/apache/dolphinscheduler/server/worker/WorkerServer.java
index fb35f47..ff8ff00 100644
--- 
a/dolphinscheduler-server/src/main/java/org/apache/dolphinscheduler/server/worker/WorkerServer.java
+++ 
b/dolphinscheduler-server/src/main/java/org/apache/dolphinscheduler/server/worker/WorkerServer.java
@@ -17,7 +17,6 @@
 package org.apache.dolphinscheduler.server.worker;
 
 import org.apache.dolphinscheduler.common.Constants;
-import org.apache.dolphinscheduler.common.IStoppable;
 import org.apache.dolphinscheduler.common.thread.Stopper;
 import org.apache.dolphinscheduler.common.thread.ThreadPoolExecutors;
 import org.apache.dolphinscheduler.common.thread.ThreadUtils;
@@ -29,9 +28,7 @@ import 
org.apache.dolphinscheduler.server.worker.config.WorkerConfig;
 import 
org.apache.dolphinscheduler.server.worker.processor.TaskExecuteProcessor;
 import org.apache.dolphinscheduler.server.worker.processor.TaskKillProcessor;
 import org.apache.dolphinscheduler.server.worker.registry.WorkerRegistry;
-import org.apache.dolphinscheduler.server.zk.ZKWorkerClient;
 import org.apache.dolphinscheduler.service.bean.SpringApplicationContext;
-import org.apache.dolphinscheduler.service.queue.TaskQueueFactory;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 import org.springframework.beans.factory.annotation.Autowired;
@@ -47,20 +44,13 @@ import java.util.concurrent.ExecutorService;
  *  worker server
  */
 @ComponentScan("org.apache.dolphinscheduler")
-public class WorkerServer implements IStoppable {
+public class WorkerServer {
 
     /**
      * logger
      */
     private static final Logger logger = 
LoggerFactory.getLogger(WorkerServer.class);
 
-    /**
-     *  zk worker client
-     */
-    @Autowired
-    private ZKWorkerClient zkWorkerClient = null;
-
-
 
     /**
      *  fetch task executor service
@@ -130,21 +120,16 @@ public class WorkerServer implements IStoppable {
         this.workerRegistry = new WorkerRegistry(zookeeperRegistryCenter, 
serverConfig.getListenPort(), workerConfig.getWorkerHeartbeatInterval(), 
workerConfig.getWorkerGroup());
         this.workerRegistry.registry();
 
-        this.zkWorkerClient.init();
-
-
 
         this.fetchTaskExecutorService = 
ThreadUtils.newDaemonSingleThreadExecutor("Worker-Fetch-Thread-Executor");
 
-        zkWorkerClient.setStoppable(this);
-
         /**
          * register hooks, which are called before the process exits
          */
         Runtime.getRuntime().addShutdownHook(new Thread(new Runnable() {
             @Override
             public void run() {
-                stop("shutdownHook");
+                close("shutdownHook");
             }
         }));
 
@@ -156,8 +141,7 @@ public class WorkerServer implements IStoppable {
         }
     }
 
-    @Override
-    public synchronized void stop(String cause) {
+    public void close(String cause) {
 
         try {
             //execute only once
@@ -195,11 +179,6 @@ public class WorkerServer implements IStoppable {
             }
             logger.info("worker fetch task service stopped");
 
-            try{
-                zkWorkerClient.close();
-            }catch (Exception e){
-                logger.warn("zookeeper service stopped 
exception:{}",e.getMessage());
-            }
             latch.countDown();
             logger.info("zookeeper service stopped");
 
diff --git 
a/dolphinscheduler-server/src/main/java/org/apache/dolphinscheduler/server/zk/ZKMasterClient.java
 
b/dolphinscheduler-server/src/main/java/org/apache/dolphinscheduler/server/zk/ZKMasterClient.java
index 77d2139..7fc91dc 100644
--- 
a/dolphinscheduler-server/src/main/java/org/apache/dolphinscheduler/server/zk/ZKMasterClient.java
+++ 
b/dolphinscheduler-server/src/main/java/org/apache/dolphinscheduler/server/zk/ZKMasterClient.java
@@ -245,10 +245,6 @@ public class ZKMasterClient extends AbstractZKClient {
                                logger.info("master node added : {}", path);
                                break;
                        case NODE_REMOVED:
-                               String serverHost = 
getHostByEventDataPath(path);
-                               if (checkServerSelfDead(serverHost, 
ZKNodeType.MASTER)) {
-                                       return;
-                               }
                                removeZKNodePath(path, ZKNodeType.MASTER, true);
                                break;
                        default:
diff --git 
a/dolphinscheduler-server/src/main/java/org/apache/dolphinscheduler/server/zk/ZKWorkerClient.java
 
b/dolphinscheduler-server/src/main/java/org/apache/dolphinscheduler/server/zk/ZKWorkerClient.java
deleted file mode 100644
index a1d70f8..0000000
--- 
a/dolphinscheduler-server/src/main/java/org/apache/dolphinscheduler/server/zk/ZKWorkerClient.java
+++ /dev/null
@@ -1,103 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements.  See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License.  You may obtain a copy of the License at
- *
- *    http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package org.apache.dolphinscheduler.server.zk;
-
-import org.apache.curator.framework.recipes.cache.TreeCacheEvent;
-import org.apache.dolphinscheduler.common.Constants;
-import org.apache.dolphinscheduler.common.enums.ZKNodeType;
-import org.apache.commons.lang.StringUtils;
-import org.apache.curator.framework.CuratorFramework;
-import org.apache.dolphinscheduler.service.zk.AbstractZKClient;
-import org.slf4j.Logger;
-import org.slf4j.LoggerFactory;
-import org.springframework.stereotype.Component;
-
-
-/**
- *  zookeeper worker client
- *  single instance
- */
-@Component
-public class ZKWorkerClient extends AbstractZKClient {
-
-       /**
-        * logger
-        */
-       private static final Logger logger = 
LoggerFactory.getLogger(ZKWorkerClient.class);
-
-
-       /**
-        * worker znode
-        */
-       private String workerZNode = null;
-
-
-       /**
-        * init
-        */
-       public void init(){
-
-               logger.info("initialize worker client...");
-               // init system znode
-               this.initSystemZNode();
-
-       }
-
-       /**
-        * handle path events that this class cares about
-        * @param client   zkClient
-        * @param event    path event
-        * @param path     zk path
-        */
-       @Override
-       protected void dataChanged(CuratorFramework client, TreeCacheEvent 
event, String path) {
-               
if(path.startsWith(getZNodeParentPath(ZKNodeType.WORKER)+Constants.SINGLE_SLASH)){
-                       handleWorkerEvent(event,path);
-               }
-       }
-
-       /**
-        * monitor worker
-        * @param event event
-        * @param path path
-        */
-       public void handleWorkerEvent(TreeCacheEvent event, String path){
-               switch (event.getType()) {
-                       case NODE_ADDED:
-                               logger.info("worker node added : {}", path);
-                               break;
-                       case NODE_REMOVED:
-                               //find myself dead
-                               String serverHost = 
getHostByEventDataPath(path);
-                               if(checkServerSelfDead(serverHost, 
ZKNodeType.WORKER)){
-                                       return;
-                               }
-                               break;
-                       default:
-                               break;
-               }
-       }
-
-       /**
-        * get worker znode
-        * @return worker zookeeper node
-        */
-       public String getWorkerZNode() {
-               return workerZNode;
-       }
-
-}
diff --git 
a/dolphinscheduler-service/src/main/java/org/apache/dolphinscheduler/service/zk/AbstractZKClient.java
 
b/dolphinscheduler-service/src/main/java/org/apache/dolphinscheduler/service/zk/AbstractZKClient.java
index 6e887f8..24bf259 100644
--- 
a/dolphinscheduler-service/src/main/java/org/apache/dolphinscheduler/service/zk/AbstractZKClient.java
+++ 
b/dolphinscheduler-service/src/main/java/org/apache/dolphinscheduler/service/zk/AbstractZKClient.java
@@ -370,23 +370,6 @@ public abstract class AbstractZKClient extends 
ZookeeperCachedOperator {
        }
 
        /**
-        * server self dead, stop all threads
-        * @param serverHost server host
-        * @param zkNodeType zookeeper node type
-        * @return true if server dead and stop all threads
-        */
-       protected boolean checkServerSelfDead(String serverHost, ZKNodeType 
zkNodeType) {
-               if (serverHost.equals(OSUtils.getHost())) {
-                       logger.error("{} server({}) of myself dead , 
stopping...",
-                                       zkNodeType.toString(), serverHost);
-                       stoppable.stop(String.format(" %s server %s of myself 
dead , stopping...",
-                                       zkNodeType.toString(), serverHost));
-                       return true;
-               }
-               return false;
-       }
-
-       /**
         *  get host ip, string format: masterParentPath/ip
         * @param path path
         * @return host ip, string format: masterParentPath/ip

Reply via email to