This is an automated email from the ASF dual-hosted git repository.

journey pushed a commit to branch refactor-worker
in repository https://gitbox.apache.org/repos/asf/incubator-dolphinscheduler.git


The following commit(s) were added to refs/heads/refactor-worker by this push:
     new 8c545ff  Refactor worker (#2026)
8c545ff is described below

commit 8c545ffa9bda86f5212fc30aa7cc02b9ffd85783
Author: Tboy <[email protected]>
AuthorDate: Thu Feb 27 09:54:40 2020 +0800

    Refactor worker (#2026)
    
    * Refactor worker (#10)
    
    * Refactor worker (#2000)
    
    * Refactor worker (#2)
    
    * Refactor worker (#1993)
    
    * Refactor worker (#1)
    
    * add TaskResponseProcessor (#1983)
    
    * 1, master persistent task 2. extract  master and worker communication 
model (#1992)
    
    * 1, master persistent task
    2. extract  master and worker communication model
    
    * 1, master persistent task
    2. extract  master and worker communication model
    
    * 1, master persistent task
    2. extract  master and worker communication model
    
    * add license
    
    * modify javadoc error
    
    Co-authored-by: qiaozhanwei <[email protected]>
    
    * updates
    
    Co-authored-by: qiaozhanwei <[email protected]>
    
    * TaskExecutionContext create modify (#1994)
    
    * 1, master persistent task
    2. extract  master and worker communication model
    
    * 1, master persistent task
    2. extract  master and worker communication model
    
    * 1, master persistent task
    2. extract  master and worker communication model
    
    * add license
    
    * modify javadoc error
    
    * TaskExecutionContext create modify
    
    Co-authored-by: qiaozhanwei <[email protected]>
    
    * updates
    
    * add- register processor
    
    Co-authored-by: qiaozhanwei <[email protected]>
    
    * buildAckCommand taskInstanceId not set modify (#2002)
    
    * 1, master persistent task
    2. extract  master and worker communication model
    
    * 1, master persistent task
    2. extract  master and worker communication model
    
    * 1, master persistent task
    2. extract  master and worker communication model
    
    * add license
    
    * modify javadoc error
    
    * TaskExecutionContext create modify
    
    * buildAckCommand taskInstanceId not set modify
    
    * java doc error modify (#2004)
    
    * 1, master persistent task
    2. extract  master and worker communication model
    
    * 1, master persistent task
    2. extract  master and worker communication model
    
    * 1, master persistent task
    2. extract  master and worker communication model
    
    * add license
    
    * modify javadoc error
    
    * TaskExecutionContext create modify
    
    * buildAckCommand taskInstanceId not set modify
    
    * java doc error modify
    
    * add comment (#2006)
    
    * 1, master persistent task
    2. extract  master and worker communication model
    
    * 1, master persistent task
    2. extract  master and worker communication model
    
    * 1, master persistent task
    2. extract  master and worker communication model
    
    * add license
    
    * modify javadoc error
    
    * TaskExecutionContext create modify
    
    * buildAckCommand taskInstanceId not set modify
    
    * java doc error modify
    
    * add comment
    
    * ExecutorManager interface add generic type (#2012)
    
    * 1, master persistent task
    2. extract  master and worker communication model
    
    * 1, master persistent task
    2. extract  master and worker communication model
    
    * 1, master persistent task
    2. extract  master and worker communication model
    
    * add license
    
    * modify javadoc error
    
    * TaskExecutionContext create modify
    
    * buildAckCommand taskInstanceId not set modify
    
    * java doc error modify
    
    * add comment
    
    * ExecutorManager interface add generic type
    
    * Refactor worker (#2018)
    
    * Refactor worker (#7)
    
    * Refactor worker (#2000)
    
    * Refactor worker (#2)
    
    * Refactor worker (#1993)
    
    * Refactor worker (#1)
    
    * add TaskResponseProcessor (#1983)
    
    * 1, master persistent task 2. extract  master and worker communication 
model (#1992)
    
    * 1, master persistent task
    2. extract  master and worker communication model
    
    * 1, master persistent task
    2. extract  master and worker communication model
    
    * 1, master persistent task
    2. extract  master and worker communication model
    
    * add license
    
    * modify javadoc error
    
    Co-authored-by: qiaozhanwei <[email protected]>
    
    * updates
    
    Co-authored-by: qiaozhanwei <[email protected]>
    
    * TaskExecutionContext create modify (#1994)
    
    * 1, master persistent task
    2. extract  master and worker communication model
    
    * 1, master persistent task
    2. extract  master and worker communication model
    
    * 1, master persistent task
    2. extract  master and worker communication model
    
    * add license
    
    * modify javadoc error
    
    * TaskExecutionContext create modify
    
    Co-authored-by: qiaozhanwei <[email protected]>
    
    * updates
    
    * add- register processor
    
    Co-authored-by: qiaozhanwei <[email protected]>
    
    * buildAckCommand taskInstanceId not set modify (#2002)
    
    * 1, master persistent task
    2. extract  master and worker communication model
    
    * 1, master persistent task
    2. extract  master and worker communication model
    
    * 1, master persistent task
    2. extract  master and worker communication model
    
    * add license
    
    * modify javadoc error
    
    * TaskExecutionContext create modify
    
    * buildAckCommand taskInstanceId not set modify
    
    * java doc error modify (#2004)
    
    * 1, master persistent task
    2. extract  master and worker communication model
    
    * 1, master persistent task
    2. extract  master and worker communication model
    
    * 1, master persistent task
    2. extract  master and worker communication model
    
    * add license
    
    * modify javadoc error
    
    * TaskExecutionContext create modify
    
    * buildAckCommand taskInstanceId not set modify
    
    * java doc error modify
    
    * add comment (#2006)
    
    * 1, master persistent task
    2. extract  master and worker communication model
    
    * 1, master persistent task
    2. extract  master and worker communication model
    
    * 1, master persistent task
    2. extract  master and worker communication model
    
    * add license
    
    * modify javadoc error
    
    * TaskExecutionContext create modify
    
    * buildAckCommand taskInstanceId not set modify
    
    * java doc error modify
    
    * add comment
    
    * ExecutorManager interface add generic type (#2012)
    
    * 1, master persistent task
    2. extract  master and worker communication model
    
    * 1, master persistent task
    2. extract  master and worker communication model
    
    * 1, master persistent task
    2. extract  master and worker communication model
    
    * add license
    
    * modify javadoc error
    
    * TaskExecutionContext create modify
    
    * buildAckCommand taskInstanceId not set modify
    
    * java doc error modify
    
    * add comment
    
    * ExecutorManager interface add generic type
    
    Co-authored-by: qiaozhanwei <[email protected]>
    
    * Refactor worker (#8)
    
    * Refactor worker (#2000)
    
    * Refactor worker (#2)
    
    * Refactor worker (#1993)
    
    * Refactor worker (#1)
    
    * add TaskResponseProcessor (#1983)
    
    * 1, master persistent task 2. extract  master and worker communication 
model (#1992)
    
    * 1, master persistent task
    2. extract  master and worker communication model
    
    * 1, master persistent task
    2. extract  master and worker communication model
    
    * 1, master persistent task
    2. extract  master and worker communication model
    
    * add license
    
    * modify javadoc error
    
    Co-authored-by: qiaozhanwei <[email protected]>
    
    * updates
    
    Co-authored-by: qiaozhanwei <[email protected]>
    
    * TaskExecutionContext create modify (#1994)
    
    * 1, master persistent task
    2. extract  master and worker communication model
    
    * 1, master persistent task
    2. extract  master and worker communication model
    
    * 1, master persistent task
    2. extract  master and worker communication model
    
    * add license
    
    * modify javadoc error
    
    * TaskExecutionContext create modify
    
    Co-authored-by: qiaozhanwei <[email protected]>
    
    * updates
    
    * add- register processor
    
    Co-authored-by: qiaozhanwei <[email protected]>
    
    * buildAckCommand taskInstanceId not set modify (#2002)
    
    * 1, master persistent task
    2. extract  master and worker communication model
    
    * 1, master persistent task
    2. extract  master and worker communication model
    
    * 1, master persistent task
    2. extract  master and worker communication model
    
    * add license
    
    * modify javadoc error
    
    * TaskExecutionContext create modify
    
    * buildAckCommand taskInstanceId not set modify
    
    * java doc error modify (#2004)
    
    * 1, master persistent task
    2. extract  master and worker communication model
    
    * 1, master persistent task
    2. extract  master and worker communication model
    
    * 1, master persistent task
    2. extract  master and worker communication model
    
    * add license
    
    * modify javadoc error
    
    * TaskExecutionContext create modify
    
    * buildAckCommand taskInstanceId not set modify
    
    * java doc error modify
    
    * add comment (#2006)
    
    * 1, master persistent task
    2. extract  master and worker communication model
    
    * 1, master persistent task
    2. extract  master and worker communication model
    
    * 1, master persistent task
    2. extract  master and worker communication model
    
    * add license
    
    * modify javadoc error
    
    * TaskExecutionContext create modify
    
    * buildAckCommand taskInstanceId not set modify
    
    * java doc error modify
    
    * add comment
    
    * ExecutorManager interface add generic type (#2012)
    
    * 1, master persistent task
    2. extract  master and worker communication model
    
    * 1, master persistent task
    2. extract  master and worker communication model
    
    * 1, master persistent task
    2. extract  master and worker communication model
    
    * add license
    
    * modify javadoc error
    
    * TaskExecutionContext create modify
    
    * buildAckCommand taskInstanceId not set modify
    
    * java doc error modify
    
    * add comment
    
    * ExecutorManager interface add generic type
    
    Co-authored-by: qiaozhanwei <[email protected]>
    
    * add kill command
    
    Co-authored-by: qiaozhanwei <[email protected]>
    
    * add TaskInstanceCacheManager receive Worker report result,modify master 
polling db transfrom to cache (#2021)
    
    * 1, master persistent task
    2. extract  master and worker communication model
    
    * 1, master persistent task
    2. extract  master and worker communication model
    
    * 1, master persistent task
    2. extract  master and worker communication model
    
    * add license
    
    * modify javadoc error
    
    * TaskExecutionContext create modify
    
    * buildAckCommand taskInstanceId not set modify
    
    * java doc error modify
    
    * add comment
    
    * ExecutorManager interface add generic type
    
    * add TaskInstanceCacheManager receive Worker report result
    
    * TaskInstance setExecutePath
    
    * add TaskInstanceCacheManager to receive Worker Task result report
    
    * TaskInstanceCacheManager add remove method
    
    * add license
    
    * add dispatcht task method
    
    * AbstractCommandExecutor remove db access
    
    * AbstractCommandExecutor remove db access
    
    * AbstractCommandExecutor remove db access
    
    * AbstractCommandExecutor remove db access
    
    * AbstractCommandExecutor remove db access
    
    * AbstractCommandExecutor remove db access
    
    * AbstractCommandExecutor remove db access
    
    Co-authored-by: qiaozhanwei <[email protected]>
    
    * refactor heartbeat logic
    
    Co-authored-by: qiaozhanwei <[email protected]>
---
 .../server/worker/WorkerServer.java                | 46 +----------------
 .../server/worker/registry/WorkerRegistry.java     | 58 +++++++++++++++++++++-
 2 files changed, 58 insertions(+), 46 deletions(-)

diff --git 
a/dolphinscheduler-server/src/main/java/org/apache/dolphinscheduler/server/worker/WorkerServer.java
 
b/dolphinscheduler-server/src/main/java/org/apache/dolphinscheduler/server/worker/WorkerServer.java
index c2af7b1..f53f187 100644
--- 
a/dolphinscheduler-server/src/main/java/org/apache/dolphinscheduler/server/worker/WorkerServer.java
+++ 
b/dolphinscheduler-server/src/main/java/org/apache/dolphinscheduler/server/worker/WorkerServer.java
@@ -91,11 +91,6 @@ public class WorkerServer implements IStoppable {
     private AlertDao alertDao;
 
     /**
-     * heartbeat thread pool
-     */
-    private ScheduledExecutorService heartbeatWorkerService;
-
-    /**
      * task queue impl
      */
     protected ITaskQueue taskQueue;
@@ -155,6 +150,7 @@ public class WorkerServer implements IStoppable {
      */
     public static void main(String[] args) {
         Thread.currentThread().setName(Constants.THREAD_NAME_WORKER_SERVER);
+        System.setProperty("spring.profiles.active","worker");
         new 
SpringApplicationBuilder(WorkerServer.class).web(WebApplicationType.NONE).run(args);
     }
 
@@ -173,7 +169,7 @@ public class WorkerServer implements IStoppable {
         
this.nettyRemotingServer.registerProcessor(CommandType.KILL_TASK_REQUEST, new 
TaskKillProcessor());
         this.nettyRemotingServer.start();
 
-        this.workerRegistry = new WorkerRegistry(zookeeperRegistryCenter, 
serverConfig.getListenPort());
+        this.workerRegistry = new WorkerRegistry(zookeeperRegistryCenter, 
serverConfig.getListenPort(), workerConfig.getWorkerHeartbeatInterval());
         this.workerRegistry.registry();
 
         this.zkWorkerClient.init();
@@ -184,17 +180,8 @@ public class WorkerServer implements IStoppable {
 
         this.fetchTaskExecutorService = 
ThreadUtils.newDaemonSingleThreadExecutor("Worker-Fetch-Thread-Executor");
 
-        heartbeatWorkerService = 
ThreadUtils.newDaemonThreadScheduledExecutor("Worker-Heartbeat-Thread-Executor",
 Constants.DEFAUL_WORKER_HEARTBEAT_THREAD_NUM);
-
-        // heartbeat thread implement
-        Runnable heartBeatThread = heartBeatThread();
-
         zkWorkerClient.setStoppable(this);
 
-        // regular heartbeat
-        // delay 5 seconds, send heartbeat every 30 seconds
-        heartbeatWorkerService.scheduleAtFixedRate(heartBeatThread, 5, 
workerConfig.getWorkerHeartbeatInterval(), TimeUnit.SECONDS);
-
         // kill process thread implement
         Runnable killProcessThread = getKillProcessThread();
 
@@ -256,13 +243,6 @@ public class WorkerServer implements IStoppable {
             this.workerRegistry.unRegistry();
 
             try {
-                heartbeatWorkerService.shutdownNow();
-            }catch (Exception e){
-                logger.warn("heartbeat service stopped exception");
-            }
-            logger.info("heartbeat service stopped");
-
-            try {
                 ThreadPoolExecutors.getInstance().shutdown();
             }catch (Exception e){
                 logger.warn("threadpool service stopped 
exception:{}",e.getMessage());
@@ -299,28 +279,6 @@ public class WorkerServer implements IStoppable {
     }
 
     /**
-     * heartbeat thread implement
-     *
-     * @return
-     */
-    private Runnable heartBeatThread(){
-        logger.info("start worker heart beat thread...");
-        Runnable heartBeatThread  = new Runnable() {
-            @Override
-            public void run() {
-                // send heartbeat to zk
-                if (StringUtils.isEmpty(zkWorkerClient.getWorkerZNode())){
-                    logger.error("worker send heartbeat to zk failed");
-                }
-
-                zkWorkerClient.heartBeatForZk(zkWorkerClient.getWorkerZNode() 
, Constants.WORKER_PREFIX);
-            }
-        };
-        return heartBeatThread;
-    }
-
-
-    /**
      * kill process thread implement
      *
      * @return kill process thread
diff --git 
a/dolphinscheduler-server/src/main/java/org/apache/dolphinscheduler/server/worker/registry/WorkerRegistry.java
 
b/dolphinscheduler-server/src/main/java/org/apache/dolphinscheduler/server/worker/registry/WorkerRegistry.java
index a0f4e66..b6f6896 100644
--- 
a/dolphinscheduler-server/src/main/java/org/apache/dolphinscheduler/server/worker/registry/WorkerRegistry.java
+++ 
b/dolphinscheduler-server/src/main/java/org/apache/dolphinscheduler/server/worker/registry/WorkerRegistry.java
@@ -19,11 +19,22 @@ package org.apache.dolphinscheduler.server.worker.registry;
 import org.apache.curator.framework.CuratorFramework;
 import org.apache.curator.framework.state.ConnectionState;
 import org.apache.curator.framework.state.ConnectionStateListener;
+import org.apache.dolphinscheduler.common.utils.DateUtils;
+import org.apache.dolphinscheduler.common.utils.OSUtils;
 import org.apache.dolphinscheduler.remote.utils.Constants;
+import org.apache.dolphinscheduler.remote.utils.NamedThreadFactory;
 import org.apache.dolphinscheduler.server.registry.ZookeeperRegistryCenter;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
+import java.util.Date;
+import java.util.concurrent.Executors;
+import java.util.concurrent.ScheduledExecutorService;
+import java.util.concurrent.TimeUnit;
+
+import static org.apache.dolphinscheduler.remote.utils.Constants.COMMA;
+
+
 /**
  *  worker registry
  */
@@ -42,13 +53,31 @@ public class WorkerRegistry {
     private final int port;
 
     /**
+     * heartbeat interval
+     */
+    private final long heartBeatInterval;
+
+    /**
+     * heartbeat executor
+     */
+    private final ScheduledExecutorService heartBeatExecutor;
+
+    /**
+     * worker start time
+     */
+    private final String startTime;
+
+    /**
      *  construct
      * @param zookeeperRegistryCenter zookeeperRegistryCenter
      * @param port port
      */
-    public WorkerRegistry(ZookeeperRegistryCenter zookeeperRegistryCenter, int 
port){
+    public WorkerRegistry(ZookeeperRegistryCenter zookeeperRegistryCenter, int 
port, long heartBeatInterval){
         this.zookeeperRegistryCenter = zookeeperRegistryCenter;
         this.port = port;
+        this.heartBeatInterval = heartBeatInterval;
+        this.startTime = DateUtils.dateToString(new Date());
+        this.heartBeatExecutor = 
Executors.newSingleThreadScheduledExecutor(new 
NamedThreadFactory("HeartBeatExecutor"));
     }
 
     /**
@@ -71,7 +100,9 @@ public class WorkerRegistry {
                 }
             }
         });
-        logger.info("worker node : {} registry to ZK successfully.", address);
+        this.heartBeatExecutor.scheduleAtFixedRate(new HeartBeatTask(), 
heartBeatInterval, heartBeatInterval, TimeUnit.SECONDS);
+        logger.info("worker node : {} registry to ZK successfully with 
heartBeatInterval : {}s", address, heartBeatInterval);
+
     }
 
     /**
@@ -81,6 +112,7 @@ public class WorkerRegistry {
         String address = getLocalAddress();
         String localNodePath = getWorkerPath();
         
zookeeperRegistryCenter.getZookeeperCachedOperator().remove(localNodePath);
+        this.heartBeatExecutor.shutdownNow();
         logger.info("worker node : {} unRegistry to ZK.", address);
     }
 
@@ -101,4 +133,26 @@ public class WorkerRegistry {
     private String getLocalAddress(){
         return Constants.LOCAL_ADDRESS + ":" + port;
     }
+
+    /**
+     * hear beat task
+     */
+    class HeartBeatTask implements Runnable{
+
+        @Override
+        public void run() {
+            try {
+                StringBuilder builder = new StringBuilder(100);
+                builder.append(OSUtils.cpuUsage()).append(COMMA);
+                builder.append(OSUtils.memoryUsage()).append(COMMA);
+                builder.append(OSUtils.loadAverage()).append(COMMA);
+                builder.append(startTime).append(COMMA);
+                builder.append(DateUtils.dateToString(new Date()));
+                String workerPath = getWorkerPath();
+                
zookeeperRegistryCenter.getZookeeperCachedOperator().persist(workerPath, 
builder.toString());
+            } catch (Throwable ex){
+                logger.error("error write worker heartbeat info", ex);
+            }
+        }
+    }
 }

Reply via email to