This is an automated email from the ASF dual-hosted git repository.
journey pushed a commit to branch refactor-worker
in repository https://gitbox.apache.org/repos/asf/incubator-dolphinscheduler.git
The following commit(s) were added to refs/heads/refactor-worker by this push:
new 8c545ff Refactor worker (#2026)
8c545ff is described below
commit 8c545ffa9bda86f5212fc30aa7cc02b9ffd85783
Author: Tboy <[email protected]>
AuthorDate: Thu Feb 27 09:54:40 2020 +0800
Refactor worker (#2026)
* Refactor worker (#10)
* Refactor worker (#2000)
* Refactor worker (#2)
* Refactor worker (#1993)
* Refactor worker (#1)
* add TaskResponseProcessor (#1983)
* 1, master persistent task 2. extract master and worker communication
model (#1992)
* 1, master persistent task
2. extract master and worker communication model
* 1, master persistent task
2. extract master and worker communication model
* 1, master persistent task
2. extract master and worker communication model
* add license
* modify javadoc error
Co-authored-by: qiaozhanwei <[email protected]>
* updates
Co-authored-by: qiaozhanwei <[email protected]>
* TaskExecutionContext create modify (#1994)
* 1, master persistent task
2. extract master and worker communication model
* 1, master persistent task
2. extract master and worker communication model
* 1, master persistent task
2. extract master and worker communication model
* add license
* modify javadoc error
* TaskExecutionContext create modify
Co-authored-by: qiaozhanwei <[email protected]>
* updates
* add- register processor
Co-authored-by: qiaozhanwei <[email protected]>
* buildAckCommand taskInstanceId not set modify (#2002)
* 1, master persistent task
2. extract master and worker communication model
* 1, master persistent task
2. extract master and worker communication model
* 1, master persistent task
2. extract master and worker communication model
* add license
* modify javadoc error
* TaskExecutionContext create modify
* buildAckCommand taskInstanceId not set modify
* java doc error modify (#2004)
* 1, master persistent task
2. extract master and worker communication model
* 1, master persistent task
2. extract master and worker communication model
* 1, master persistent task
2. extract master and worker communication model
* add license
* modify javadoc error
* TaskExecutionContext create modify
* buildAckCommand taskInstanceId not set modify
* java doc error modify
* add comment (#2006)
* 1, master persistent task
2. extract master and worker communication model
* 1, master persistent task
2. extract master and worker communication model
* 1, master persistent task
2. extract master and worker communication model
* add license
* modify javadoc error
* TaskExecutionContext create modify
* buildAckCommand taskInstanceId not set modify
* java doc error modify
* add comment
* ExecutorManager interface add generic type (#2012)
* 1, master persistent task
2. extract master and worker communication model
* 1, master persistent task
2. extract master and worker communication model
* 1, master persistent task
2. extract master and worker communication model
* add license
* modify javadoc error
* TaskExecutionContext create modify
* buildAckCommand taskInstanceId not set modify
* java doc error modify
* add comment
* ExecutorManager interface add generic type
* Refactor worker (#2018)
* Refactor worker (#7)
* Refactor worker (#2000)
* Refactor worker (#2)
* Refactor worker (#1993)
* Refactor worker (#1)
* add TaskResponseProcessor (#1983)
* 1, master persistent task 2. extract master and worker communication
model (#1992)
* 1, master persistent task
2. extract master and worker communication model
* 1, master persistent task
2. extract master and worker communication model
* 1, master persistent task
2. extract master and worker communication model
* add license
* modify javadoc error
Co-authored-by: qiaozhanwei <[email protected]>
* updates
Co-authored-by: qiaozhanwei <[email protected]>
* TaskExecutionContext create modify (#1994)
* 1, master persistent task
2. extract master and worker communication model
* 1, master persistent task
2. extract master and worker communication model
* 1, master persistent task
2. extract master and worker communication model
* add license
* modify javadoc error
* TaskExecutionContext create modify
Co-authored-by: qiaozhanwei <[email protected]>
* updates
* add- register processor
Co-authored-by: qiaozhanwei <[email protected]>
* buildAckCommand taskInstanceId not set modify (#2002)
* 1, master persistent task
2. extract master and worker communication model
* 1, master persistent task
2. extract master and worker communication model
* 1, master persistent task
2. extract master and worker communication model
* add license
* modify javadoc error
* TaskExecutionContext create modify
* buildAckCommand taskInstanceId not set modify
* java doc error modify (#2004)
* 1, master persistent task
2. extract master and worker communication model
* 1, master persistent task
2. extract master and worker communication model
* 1, master persistent task
2. extract master and worker communication model
* add license
* modify javadoc error
* TaskExecutionContext create modify
* buildAckCommand taskInstanceId not set modify
* java doc error modify
* add comment (#2006)
* 1, master persistent task
2. extract master and worker communication model
* 1, master persistent task
2. extract master and worker communication model
* 1, master persistent task
2. extract master and worker communication model
* add license
* modify javadoc error
* TaskExecutionContext create modify
* buildAckCommand taskInstanceId not set modify
* java doc error modify
* add comment
* ExecutorManager interface add generic type (#2012)
* 1, master persistent task
2. extract master and worker communication model
* 1, master persistent task
2. extract master and worker communication model
* 1, master persistent task
2. extract master and worker communication model
* add license
* modify javadoc error
* TaskExecutionContext create modify
* buildAckCommand taskInstanceId not set modify
* java doc error modify
* add comment
* ExecutorManager interface add generic type
Co-authored-by: qiaozhanwei <[email protected]>
* Refactor worker (#8)
* Refactor worker (#2000)
* Refactor worker (#2)
* Refactor worker (#1993)
* Refactor worker (#1)
* add TaskResponseProcessor (#1983)
* 1, master persistent task 2. extract master and worker communication
model (#1992)
* 1, master persistent task
2. extract master and worker communication model
* 1, master persistent task
2. extract master and worker communication model
* 1, master persistent task
2. extract master and worker communication model
* add license
* modify javadoc error
Co-authored-by: qiaozhanwei <[email protected]>
* updates
Co-authored-by: qiaozhanwei <[email protected]>
* TaskExecutionContext create modify (#1994)
* 1, master persistent task
2. extract master and worker communication model
* 1, master persistent task
2. extract master and worker communication model
* 1, master persistent task
2. extract master and worker communication model
* add license
* modify javadoc error
* TaskExecutionContext create modify
Co-authored-by: qiaozhanwei <[email protected]>
* updates
* add- register processor
Co-authored-by: qiaozhanwei <[email protected]>
* buildAckCommand taskInstanceId not set modify (#2002)
* 1, master persistent task
2. extract master and worker communication model
* 1, master persistent task
2. extract master and worker communication model
* 1, master persistent task
2. extract master and worker communication model
* add license
* modify javadoc error
* TaskExecutionContext create modify
* buildAckCommand taskInstanceId not set modify
* java doc error modify (#2004)
* 1, master persistent task
2. extract master and worker communication model
* 1, master persistent task
2. extract master and worker communication model
* 1, master persistent task
2. extract master and worker communication model
* add license
* modify javadoc error
* TaskExecutionContext create modify
* buildAckCommand taskInstanceId not set modify
* java doc error modify
* add comment (#2006)
* 1, master persistent task
2. extract master and worker communication model
* 1, master persistent task
2. extract master and worker communication model
* 1, master persistent task
2. extract master and worker communication model
* add license
* modify javadoc error
* TaskExecutionContext create modify
* buildAckCommand taskInstanceId not set modify
* java doc error modify
* add comment
* ExecutorManager interface add generic type (#2012)
* 1, master persistent task
2. extract master and worker communication model
* 1, master persistent task
2. extract master and worker communication model
* 1, master persistent task
2. extract master and worker communication model
* add license
* modify javadoc error
* TaskExecutionContext create modify
* buildAckCommand taskInstanceId not set modify
* java doc error modify
* add comment
* ExecutorManager interface add generic type
Co-authored-by: qiaozhanwei <[email protected]>
* add kill command
Co-authored-by: qiaozhanwei <[email protected]>
* add TaskInstanceCacheManager receive Worker report result,modify master
polling db transfrom to cache (#2021)
* 1, master persistent task
2. extract master and worker communication model
* 1, master persistent task
2. extract master and worker communication model
* 1, master persistent task
2. extract master and worker communication model
* add license
* modify javadoc error
* TaskExecutionContext create modify
* buildAckCommand taskInstanceId not set modify
* java doc error modify
* add comment
* ExecutorManager interface add generic type
* add TaskInstanceCacheManager receive Worker report result
* TaskInstance setExecutePath
* add TaskInstanceCacheManager to receive Worker Task result report
* TaskInstanceCacheManager add remove method
* add license
* add dispatcht task method
* AbstractCommandExecutor remove db access
* AbstractCommandExecutor remove db access
* AbstractCommandExecutor remove db access
* AbstractCommandExecutor remove db access
* AbstractCommandExecutor remove db access
* AbstractCommandExecutor remove db access
* AbstractCommandExecutor remove db access
Co-authored-by: qiaozhanwei <[email protected]>
* refactor heartbeat logic
Co-authored-by: qiaozhanwei <[email protected]>
---
.../server/worker/WorkerServer.java | 46 +----------------
.../server/worker/registry/WorkerRegistry.java | 58 +++++++++++++++++++++-
2 files changed, 58 insertions(+), 46 deletions(-)
diff --git
a/dolphinscheduler-server/src/main/java/org/apache/dolphinscheduler/server/worker/WorkerServer.java
b/dolphinscheduler-server/src/main/java/org/apache/dolphinscheduler/server/worker/WorkerServer.java
index c2af7b1..f53f187 100644
---
a/dolphinscheduler-server/src/main/java/org/apache/dolphinscheduler/server/worker/WorkerServer.java
+++
b/dolphinscheduler-server/src/main/java/org/apache/dolphinscheduler/server/worker/WorkerServer.java
@@ -91,11 +91,6 @@ public class WorkerServer implements IStoppable {
private AlertDao alertDao;
/**
- * heartbeat thread pool
- */
- private ScheduledExecutorService heartbeatWorkerService;
-
- /**
* task queue impl
*/
protected ITaskQueue taskQueue;
@@ -155,6 +150,7 @@ public class WorkerServer implements IStoppable {
*/
public static void main(String[] args) {
Thread.currentThread().setName(Constants.THREAD_NAME_WORKER_SERVER);
+ System.setProperty("spring.profiles.active","worker");
new
SpringApplicationBuilder(WorkerServer.class).web(WebApplicationType.NONE).run(args);
}
@@ -173,7 +169,7 @@ public class WorkerServer implements IStoppable {
this.nettyRemotingServer.registerProcessor(CommandType.KILL_TASK_REQUEST, new
TaskKillProcessor());
this.nettyRemotingServer.start();
- this.workerRegistry = new WorkerRegistry(zookeeperRegistryCenter,
serverConfig.getListenPort());
+ this.workerRegistry = new WorkerRegistry(zookeeperRegistryCenter,
serverConfig.getListenPort(), workerConfig.getWorkerHeartbeatInterval());
this.workerRegistry.registry();
this.zkWorkerClient.init();
@@ -184,17 +180,8 @@ public class WorkerServer implements IStoppable {
this.fetchTaskExecutorService =
ThreadUtils.newDaemonSingleThreadExecutor("Worker-Fetch-Thread-Executor");
- heartbeatWorkerService =
ThreadUtils.newDaemonThreadScheduledExecutor("Worker-Heartbeat-Thread-Executor",
Constants.DEFAUL_WORKER_HEARTBEAT_THREAD_NUM);
-
- // heartbeat thread implement
- Runnable heartBeatThread = heartBeatThread();
-
zkWorkerClient.setStoppable(this);
- // regular heartbeat
- // delay 5 seconds, send heartbeat every 30 seconds
- heartbeatWorkerService.scheduleAtFixedRate(heartBeatThread, 5,
workerConfig.getWorkerHeartbeatInterval(), TimeUnit.SECONDS);
-
// kill process thread implement
Runnable killProcessThread = getKillProcessThread();
@@ -256,13 +243,6 @@ public class WorkerServer implements IStoppable {
this.workerRegistry.unRegistry();
try {
- heartbeatWorkerService.shutdownNow();
- }catch (Exception e){
- logger.warn("heartbeat service stopped exception");
- }
- logger.info("heartbeat service stopped");
-
- try {
ThreadPoolExecutors.getInstance().shutdown();
}catch (Exception e){
logger.warn("threadpool service stopped
exception:{}",e.getMessage());
@@ -299,28 +279,6 @@ public class WorkerServer implements IStoppable {
}
/**
- * heartbeat thread implement
- *
- * @return
- */
- private Runnable heartBeatThread(){
- logger.info("start worker heart beat thread...");
- Runnable heartBeatThread = new Runnable() {
- @Override
- public void run() {
- // send heartbeat to zk
- if (StringUtils.isEmpty(zkWorkerClient.getWorkerZNode())){
- logger.error("worker send heartbeat to zk failed");
- }
-
- zkWorkerClient.heartBeatForZk(zkWorkerClient.getWorkerZNode()
, Constants.WORKER_PREFIX);
- }
- };
- return heartBeatThread;
- }
-
-
- /**
* kill process thread implement
*
* @return kill process thread
diff --git
a/dolphinscheduler-server/src/main/java/org/apache/dolphinscheduler/server/worker/registry/WorkerRegistry.java
b/dolphinscheduler-server/src/main/java/org/apache/dolphinscheduler/server/worker/registry/WorkerRegistry.java
index a0f4e66..b6f6896 100644
---
a/dolphinscheduler-server/src/main/java/org/apache/dolphinscheduler/server/worker/registry/WorkerRegistry.java
+++
b/dolphinscheduler-server/src/main/java/org/apache/dolphinscheduler/server/worker/registry/WorkerRegistry.java
@@ -19,11 +19,22 @@ package org.apache.dolphinscheduler.server.worker.registry;
import org.apache.curator.framework.CuratorFramework;
import org.apache.curator.framework.state.ConnectionState;
import org.apache.curator.framework.state.ConnectionStateListener;
+import org.apache.dolphinscheduler.common.utils.DateUtils;
+import org.apache.dolphinscheduler.common.utils.OSUtils;
import org.apache.dolphinscheduler.remote.utils.Constants;
+import org.apache.dolphinscheduler.remote.utils.NamedThreadFactory;
import org.apache.dolphinscheduler.server.registry.ZookeeperRegistryCenter;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
+import java.util.Date;
+import java.util.concurrent.Executors;
+import java.util.concurrent.ScheduledExecutorService;
+import java.util.concurrent.TimeUnit;
+
+import static org.apache.dolphinscheduler.remote.utils.Constants.COMMA;
+
+
/**
* worker registry
*/
@@ -42,13 +53,31 @@ public class WorkerRegistry {
private final int port;
/**
+ * heartbeat interval
+ */
+ private final long heartBeatInterval;
+
+ /**
+ * heartbeat executor
+ */
+ private final ScheduledExecutorService heartBeatExecutor;
+
+ /**
+ * worker start time
+ */
+ private final String startTime;
+
+ /**
* construct
* @param zookeeperRegistryCenter zookeeperRegistryCenter
* @param port port
*/
- public WorkerRegistry(ZookeeperRegistryCenter zookeeperRegistryCenter, int
port){
+ public WorkerRegistry(ZookeeperRegistryCenter zookeeperRegistryCenter, int
port, long heartBeatInterval){
this.zookeeperRegistryCenter = zookeeperRegistryCenter;
this.port = port;
+ this.heartBeatInterval = heartBeatInterval;
+ this.startTime = DateUtils.dateToString(new Date());
+ this.heartBeatExecutor =
Executors.newSingleThreadScheduledExecutor(new
NamedThreadFactory("HeartBeatExecutor"));
}
/**
@@ -71,7 +100,9 @@ public class WorkerRegistry {
}
}
});
- logger.info("worker node : {} registry to ZK successfully.", address);
+ this.heartBeatExecutor.scheduleAtFixedRate(new HeartBeatTask(),
heartBeatInterval, heartBeatInterval, TimeUnit.SECONDS);
+ logger.info("worker node : {} registry to ZK successfully with
heartBeatInterval : {}s", address, heartBeatInterval);
+
}
/**
@@ -81,6 +112,7 @@ public class WorkerRegistry {
String address = getLocalAddress();
String localNodePath = getWorkerPath();
zookeeperRegistryCenter.getZookeeperCachedOperator().remove(localNodePath);
+ this.heartBeatExecutor.shutdownNow();
logger.info("worker node : {} unRegistry to ZK.", address);
}
@@ -101,4 +133,26 @@ public class WorkerRegistry {
private String getLocalAddress(){
return Constants.LOCAL_ADDRESS + ":" + port;
}
+
+ /**
+ * hear beat task
+ */
+ class HeartBeatTask implements Runnable{
+
+ @Override
+ public void run() {
+ try {
+ StringBuilder builder = new StringBuilder(100);
+ builder.append(OSUtils.cpuUsage()).append(COMMA);
+ builder.append(OSUtils.memoryUsage()).append(COMMA);
+ builder.append(OSUtils.loadAverage()).append(COMMA);
+ builder.append(startTime).append(COMMA);
+ builder.append(DateUtils.dateToString(new Date()));
+ String workerPath = getWorkerPath();
+
zookeeperRegistryCenter.getZookeeperCachedOperator().persist(workerPath,
builder.toString());
+ } catch (Throwable ex){
+ logger.error("error write worker heartbeat info", ex);
+ }
+ }
+ }
}