This is an automated email from the ASF dual-hosted git repository.
kirs pushed a commit to branch 1.3.6-prepare
in repository https://gitbox.apache.org/repos/asf/incubator-dolphinscheduler.git
The following commit(s) were added to refs/heads/1.3.6-prepare by this push:
new 7100247 [1.3.6-Prepare][Improvement-4624][cherry pick] When the
server exist in the dead server list of zk,need stop service byself (#5013)
7100247 is described below
commit 710024719e2353b491801bdb6a2daa2ce8f21ed7
Author: lgcareer <[email protected]>
AuthorDate: Thu Mar 11 21:45:43 2021 +0800
[1.3.6-Prepare][Improvement-4624][cherry pick] When the server exist in the
dead server list of zk,need stop service byself (#5013)
---
.../server/master/MasterServer.java | 63 ++-
.../dispatch/host/LowerWeightHostManager.java | 2 +-
.../server/master/registry/MasterRegistry.java | 73 +--
.../server/registry/HeartBeatTask.java | 40 +-
.../server/registry/ZookeeperNodeManager.java | 4 +-
.../server/registry/ZookeeperRegistryCenter.java | 92 +++-
.../server/worker/WorkerServer.java | 30 +-
.../server/worker/registry/WorkerRegistry.java | 60 ++-
.../dolphinscheduler/server/zk/ZKMasterClient.java | 7 +-
.../consumer/TaskPriorityQueueConsumerTest.java | 35 +-
.../server/master/registry/MasterRegistryTest.java | 25 +-
.../registry/ZookeeperRegistryCenterTest.java | 61 +++
.../worker/processor/TaskCallbackServiceTest.java | 78 ++-
.../server/worker/registry/WorkerRegistryTest.java | 45 +-
.../service/zk/AbstractZKClient.java | 542 +++++++++------------
.../service/zk/RegisterOperator.java | 155 ++++++
.../service/zk/ZookeeperConfig.java | 11 +
.../service/zk/RegisterOperatorTest.java | 116 +++++
pom.xml | 3 +
19 files changed, 939 insertions(+), 503 deletions(-)
diff --git
a/dolphinscheduler-server/src/main/java/org/apache/dolphinscheduler/server/master/MasterServer.java
b/dolphinscheduler-server/src/main/java/org/apache/dolphinscheduler/server/master/MasterServer.java
index 18882a2..f1962c7 100644
---
a/dolphinscheduler-server/src/main/java/org/apache/dolphinscheduler/server/master/MasterServer.java
+++
b/dolphinscheduler-server/src/main/java/org/apache/dolphinscheduler/server/master/MasterServer.java
@@ -17,6 +17,7 @@
package org.apache.dolphinscheduler.server.master;
import org.apache.dolphinscheduler.common.Constants;
+import org.apache.dolphinscheduler.common.IStoppable;
import org.apache.dolphinscheduler.common.thread.Stopper;
import org.apache.dolphinscheduler.remote.NettyRemotingServer;
import org.apache.dolphinscheduler.remote.command.CommandType;
@@ -25,6 +26,7 @@ import
org.apache.dolphinscheduler.server.master.config.MasterConfig;
import org.apache.dolphinscheduler.server.master.processor.TaskAckProcessor;
import
org.apache.dolphinscheduler.server.master.processor.TaskKillResponseProcessor;
import
org.apache.dolphinscheduler.server.master.processor.TaskResponseProcessor;
+import org.apache.dolphinscheduler.server.master.registry.MasterRegistry;
import org.apache.dolphinscheduler.server.master.runner.MasterSchedulerService;
import org.apache.dolphinscheduler.server.worker.WorkerServer;
import org.apache.dolphinscheduler.server.zk.ZKMasterClient;
@@ -42,13 +44,10 @@ import
org.springframework.boot.builder.SpringApplicationBuilder;
import org.springframework.context.annotation.ComponentScan;
import org.springframework.context.annotation.FilterType;
-
-
-
@ComponentScan(value = "org.apache.dolphinscheduler", excludeFilters = {
@ComponentScan.Filter(type = FilterType.ASSIGNABLE_TYPE, classes =
{WorkerServer.class})
})
-public class MasterServer {
+public class MasterServer implements IStoppable {
/**
* logger of MasterServer
@@ -74,6 +73,12 @@ public class MasterServer {
private NettyRemotingServer nettyRemotingServer;
/**
+ * master registry
+ */
+ @Autowired
+ private MasterRegistry masterRegistry;
+
+ /**
* zk master client
*/
@Autowired
@@ -100,19 +105,26 @@ public class MasterServer {
* run master server
*/
@PostConstruct
- public void run(){
+ public void run() {
+ try {
+ //init remoting server
+ NettyServerConfig serverConfig = new NettyServerConfig();
+ serverConfig.setListenPort(masterConfig.getListenPort());
+ this.nettyRemotingServer = new NettyRemotingServer(serverConfig);
+
this.nettyRemotingServer.registerProcessor(CommandType.TASK_EXECUTE_RESPONSE,
new TaskResponseProcessor());
+
this.nettyRemotingServer.registerProcessor(CommandType.TASK_EXECUTE_ACK, new
TaskAckProcessor());
+
this.nettyRemotingServer.registerProcessor(CommandType.TASK_KILL_RESPONSE, new
TaskKillResponseProcessor());
+ this.nettyRemotingServer.start();
- //init remoting server
- NettyServerConfig serverConfig = new NettyServerConfig();
- serverConfig.setListenPort(masterConfig.getListenPort());
- this.nettyRemotingServer = new NettyRemotingServer(serverConfig);
-
this.nettyRemotingServer.registerProcessor(CommandType.TASK_EXECUTE_RESPONSE,
new TaskResponseProcessor());
-
this.nettyRemotingServer.registerProcessor(CommandType.TASK_EXECUTE_ACK, new
TaskAckProcessor());
-
this.nettyRemotingServer.registerProcessor(CommandType.TASK_KILL_RESPONSE, new
TaskKillResponseProcessor());
- this.nettyRemotingServer.start();
+
this.masterRegistry.getZookeeperRegistryCenter().setStoppable(this);
+
+ } catch (Exception e) {
+ logger.error(e.getMessage(), e);
+ throw new RuntimeException(e);
+ }
// self tolerant
- this.zkMasterClient.start();
+ this.zkMasterClient.start(this);
// scheduler start
this.masterSchedulerService.start();
@@ -137,7 +149,9 @@ public class MasterServer {
Runtime.getRuntime().addShutdownHook(new Thread(new Runnable() {
@Override
public void run() {
- close("shutdownHook");
+ if (Stopper.isRunning()) {
+ close("shutdownHook");
+ }
}
}));
@@ -145,13 +159,14 @@ public class MasterServer {
/**
* gracefully close
+ *
* @param cause close cause
*/
public void close(String cause) {
try {
//execute only once
- if(Stopper.isStopped()){
+ if (Stopper.isStopped()) {
return;
}
@@ -163,24 +178,32 @@ public class MasterServer {
try {
//thread sleep 3 seconds for thread quietly stop
Thread.sleep(3000L);
- }catch (Exception e){
+ } catch (Exception e) {
logger.warn("thread sleep exception ", e);
}
//
this.masterSchedulerService.close();
this.nettyRemotingServer.close();
+ this.masterRegistry.unRegistry();
this.zkMasterClient.close();
//close quartz
- try{
+ try {
QuartzExecutors.getInstance().shutdown();
logger.info("Quartz service stopped");
- }catch (Exception e){
- logger.warn("Quartz service stopped
exception:{}",e.getMessage());
+ } catch (Exception e) {
+ logger.warn("Quartz service stopped exception:{}",
e.getMessage());
}
+
} catch (Exception e) {
logger.error("master server stop exception ", e);
+ } finally {
System.exit(-1);
}
}
+
+ @Override
+ public void stop(String cause) {
+ close(cause);
+ }
}
diff --git
a/dolphinscheduler-server/src/main/java/org/apache/dolphinscheduler/server/master/dispatch/host/LowerWeightHostManager.java
b/dolphinscheduler-server/src/main/java/org/apache/dolphinscheduler/server/master/dispatch/host/LowerWeightHostManager.java
index 1872ae0..ac7d8b0 100644
---
a/dolphinscheduler-server/src/main/java/org/apache/dolphinscheduler/server/master/dispatch/host/LowerWeightHostManager.java
+++
b/dolphinscheduler-server/src/main/java/org/apache/dolphinscheduler/server/master/dispatch/host/LowerWeightHostManager.java
@@ -149,7 +149,7 @@ public class LowerWeightHostManager extends
CommonHostManager {
String workerGroupPath =
registryCenter.getWorkerGroupPath(workerGroup);
Set<HostWeight> hostWeights = new HashSet<>(nodes.size());
for(String node : nodes){
- String heartbeat =
registryCenter.getZookeeperCachedOperator().get(workerGroupPath + "/" + node);
+ String heartbeat =
registryCenter.getRegisterOperator().get(workerGroupPath + "/" + node);
if(StringUtils.isNotEmpty(heartbeat)
&& heartbeat.split(COMMA).length ==
Constants.HEARTBEAT_FOR_ZOOKEEPER_INFO_LENGTH){
String[] parts = heartbeat.split(COMMA);
diff --git
a/dolphinscheduler-server/src/main/java/org/apache/dolphinscheduler/server/master/registry/MasterRegistry.java
b/dolphinscheduler-server/src/main/java/org/apache/dolphinscheduler/server/master/registry/MasterRegistry.java
index 0f41ba5..0624f0c 100644
---
a/dolphinscheduler-server/src/main/java/org/apache/dolphinscheduler/server/master/registry/MasterRegistry.java
+++
b/dolphinscheduler-server/src/main/java/org/apache/dolphinscheduler/server/master/registry/MasterRegistry.java
@@ -14,8 +14,19 @@
* See the License for the specific language governing permissions and
* limitations under the License.
*/
+
package org.apache.dolphinscheduler.server.master.registry;
+import org.apache.dolphinscheduler.common.Constants;
+import org.apache.dolphinscheduler.common.utils.DateUtils;
+import org.apache.dolphinscheduler.common.utils.OSUtils;
+import org.apache.dolphinscheduler.remote.utils.NamedThreadFactory;
+import org.apache.dolphinscheduler.server.master.config.MasterConfig;
+import org.apache.dolphinscheduler.server.registry.HeartBeatTask;
+import org.apache.dolphinscheduler.server.registry.ZookeeperRegistryCenter;
+
+import org.apache.curator.framework.state.ConnectionState;
+
import java.util.Date;
import java.util.concurrent.Executors;
import java.util.concurrent.ScheduledExecutorService;
@@ -23,15 +34,6 @@ import java.util.concurrent.TimeUnit;
import javax.annotation.PostConstruct;
-import org.apache.curator.framework.CuratorFramework;
-import org.apache.curator.framework.state.ConnectionState;
-import org.apache.curator.framework.state.ConnectionStateListener;
-import org.apache.dolphinscheduler.common.utils.DateUtils;
-import org.apache.dolphinscheduler.common.utils.OSUtils;
-import org.apache.dolphinscheduler.remote.utils.NamedThreadFactory;
-import org.apache.dolphinscheduler.server.master.config.MasterConfig;
-import org.apache.dolphinscheduler.server.registry.HeartBeatTask;
-import org.apache.dolphinscheduler.server.registry.ZookeeperRegistryCenter;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.springframework.beans.factory.annotation.Autowired;
@@ -40,7 +42,7 @@ import org.springframework.stereotype.Service;
import com.google.common.collect.Sets;
/**
- * master registry
+ * master registry
*/
@Service
public class MasterRegistry {
@@ -48,7 +50,7 @@ public class MasterRegistry {
private final Logger logger =
LoggerFactory.getLogger(MasterRegistry.class);
/**
- * zookeeper registry center
+ * zookeeper registry center
*/
@Autowired
private ZookeeperRegistryCenter zookeeperRegistryCenter;
@@ -65,42 +67,41 @@ public class MasterRegistry {
private ScheduledExecutorService heartBeatExecutor;
/**
- * worker start time
+ * master start time
*/
private String startTime;
-
@PostConstruct
- public void init(){
+ public void init() {
this.startTime = DateUtils.dateToString(new Date());
this.heartBeatExecutor =
Executors.newSingleThreadScheduledExecutor(new
NamedThreadFactory("HeartBeatExecutor"));
}
/**
- * registry
+ * registry
*/
public void registry() {
String address = OSUtils.getHost();
String localNodePath = getMasterPath();
-
zookeeperRegistryCenter.getZookeeperCachedOperator().persistEphemeral(localNodePath,
"");
-
zookeeperRegistryCenter.getZookeeperCachedOperator().getZkClient().getConnectionStateListenable().addListener(new
ConnectionStateListener() {
- @Override
- public void stateChanged(CuratorFramework client, ConnectionState
newState) {
- if(newState == ConnectionState.LOST){
+
zookeeperRegistryCenter.getRegisterOperator().persistEphemeral(localNodePath,
"");
+
zookeeperRegistryCenter.getRegisterOperator().getZkClient().getConnectionStateListenable().addListener(
+ (client, newState) -> {
+ if (newState == ConnectionState.LOST) {
logger.error("master : {} connection lost from zookeeper",
address);
- } else if(newState == ConnectionState.RECONNECTED){
+ } else if (newState == ConnectionState.RECONNECTED) {
logger.info("master : {} reconnected to zookeeper",
address);
-
zookeeperRegistryCenter.getZookeeperCachedOperator().persistEphemeral(localNodePath,
"");
- } else if(newState == ConnectionState.SUSPENDED){
+
zookeeperRegistryCenter.getRegisterOperator().persistEphemeral(localNodePath,
"");
+ } else if (newState == ConnectionState.SUSPENDED) {
logger.warn("master : {} connection SUSPENDED ", address);
+
zookeeperRegistryCenter.getRegisterOperator().persistEphemeral(localNodePath,
"");
}
- }
- });
+ });
int masterHeartbeatInterval =
masterConfig.getMasterHeartbeatInterval();
HeartBeatTask heartBeatTask = new HeartBeatTask(startTime,
masterConfig.getMasterReservedMemory(),
masterConfig.getMasterMaxCpuloadAvg(),
Sets.newHashSet(getMasterPath()),
+ Constants.MASTER_PREFIX,
zookeeperRegistryCenter);
this.heartBeatExecutor.scheduleAtFixedRate(heartBeatTask,
masterHeartbeatInterval, masterHeartbeatInterval, TimeUnit.SECONDS);
@@ -108,31 +109,37 @@ public class MasterRegistry {
}
/**
- * remove registry info
+ * remove registry info
*/
public void unRegistry() {
String address = getLocalAddress();
String localNodePath = getMasterPath();
-
zookeeperRegistryCenter.getZookeeperCachedOperator().remove(localNodePath);
+ zookeeperRegistryCenter.getRegisterOperator().remove(localNodePath);
logger.info("master node : {} unRegistry to ZK.", address);
}
/**
- * get master path
- * @return
+ * get master path
*/
- private String getMasterPath() {
+ public String getMasterPath() {
String address = getLocalAddress();
- String localNodePath = this.zookeeperRegistryCenter.getMasterPath() +
"/" + address;
- return localNodePath;
+ return this.zookeeperRegistryCenter.getMasterPath() + "/" + address;
}
/**
- * get local address
+ * get local address
* @return
*/
private String getLocalAddress(){
return OSUtils.getAddr(masterConfig.getListenPort());
}
+ /**
+ * get zookeeper registry center
+ * @return ZookeeperRegistryCenter
+ */
+ public ZookeeperRegistryCenter getZookeeperRegistryCenter() {
+ return zookeeperRegistryCenter;
+ }
+
}
diff --git
a/dolphinscheduler-server/src/main/java/org/apache/dolphinscheduler/server/registry/HeartBeatTask.java
b/dolphinscheduler-server/src/main/java/org/apache/dolphinscheduler/server/registry/HeartBeatTask.java
index bd8c79c..90d6ea3 100644
---
a/dolphinscheduler-server/src/main/java/org/apache/dolphinscheduler/server/registry/HeartBeatTask.java
+++
b/dolphinscheduler-server/src/main/java/org/apache/dolphinscheduler/server/registry/HeartBeatTask.java
@@ -19,16 +19,21 @@ package org.apache.dolphinscheduler.server.registry;
import static org.apache.dolphinscheduler.remote.utils.Constants.COMMA;
-import java.util.Date;
-import java.util.Set;
-
import org.apache.dolphinscheduler.common.Constants;
+import org.apache.dolphinscheduler.common.IStoppable;
import org.apache.dolphinscheduler.common.utils.DateUtils;
import org.apache.dolphinscheduler.common.utils.OSUtils;
+
+import java.util.Date;
+import java.util.Set;
+
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
-public class HeartBeatTask extends Thread {
+/**
+ * Heart beat task
+ */
+public class HeartBeatTask implements Runnable {
private final Logger logger = LoggerFactory.getLogger(HeartBeatTask.class);
@@ -36,23 +41,39 @@ public class HeartBeatTask extends Thread {
private double reservedMemory;
private double maxCpuloadAvg;
private Set<String> heartBeatPaths;
+ private String serverType;
private ZookeeperRegistryCenter zookeeperRegistryCenter;
+ /**
+ * server stop or not
+ */
+ protected IStoppable stoppable = null;
public HeartBeatTask(String startTime,
double reservedMemory,
double maxCpuloadAvg,
Set<String> heartBeatPaths,
+ String serverType,
ZookeeperRegistryCenter zookeeperRegistryCenter) {
this.startTime = startTime;
this.reservedMemory = reservedMemory;
this.maxCpuloadAvg = maxCpuloadAvg;
this.heartBeatPaths = heartBeatPaths;
this.zookeeperRegistryCenter = zookeeperRegistryCenter;
+ this.serverType = serverType;
}
@Override
public void run() {
try {
+
+ // check dead or not in zookeeper
+ for (String heartBeatPath : heartBeatPaths) {
+ if (zookeeperRegistryCenter.checkIsDeadServer(heartBeatPath,
serverType)) {
+ zookeeperRegistryCenter.getStoppable().stop("i was judged
to death, release resources and stop myself");
+ return;
+ }
+ }
+
double availablePhysicalMemorySize =
OSUtils.availablePhysicalMemorySize();
double loadAverage = OSUtils.loadAverage();
@@ -78,10 +99,19 @@ public class HeartBeatTask extends Thread {
builder.append(OSUtils.getProcessID());
for (String heartBeatPath : heartBeatPaths) {
-
zookeeperRegistryCenter.getZookeeperCachedOperator().update(heartBeatPath,
builder.toString());
+
zookeeperRegistryCenter.getRegisterOperator().update(heartBeatPath,
builder.toString());
}
} catch (Throwable ex) {
logger.error("error write heartbeat info", ex);
}
}
+
+ /**
+ * for stop server
+ *
+ * @param serverStoppable server stoppable interface
+ */
+ public void setStoppable(IStoppable serverStoppable) {
+ this.stoppable = serverStoppable;
+ }
}
diff --git
a/dolphinscheduler-server/src/main/java/org/apache/dolphinscheduler/server/registry/ZookeeperNodeManager.java
b/dolphinscheduler-server/src/main/java/org/apache/dolphinscheduler/server/registry/ZookeeperNodeManager.java
index 864276b..bae4d14 100644
---
a/dolphinscheduler-server/src/main/java/org/apache/dolphinscheduler/server/registry/ZookeeperNodeManager.java
+++
b/dolphinscheduler-server/src/main/java/org/apache/dolphinscheduler/server/registry/ZookeeperNodeManager.java
@@ -93,11 +93,11 @@ public class ZookeeperNodeManager implements
InitializingBean {
/**
* init MasterNodeListener listener
*/
- registryCenter.getZookeeperCachedOperator().addListener(new
MasterNodeListener());
+ registryCenter.getRegisterOperator().addListener(new
MasterNodeListener());
/**
* init WorkerNodeListener listener
*/
- registryCenter.getZookeeperCachedOperator().addListener(new
WorkerGroupNodeListener());
+ registryCenter.getRegisterOperator().addListener(new
WorkerGroupNodeListener());
}
/**
diff --git
a/dolphinscheduler-server/src/main/java/org/apache/dolphinscheduler/server/registry/ZookeeperRegistryCenter.java
b/dolphinscheduler-server/src/main/java/org/apache/dolphinscheduler/server/registry/ZookeeperRegistryCenter.java
index 3ca62be..9017a13 100644
---
a/dolphinscheduler-server/src/main/java/org/apache/dolphinscheduler/server/registry/ZookeeperRegistryCenter.java
+++
b/dolphinscheduler-server/src/main/java/org/apache/dolphinscheduler/server/registry/ZookeeperRegistryCenter.java
@@ -17,19 +17,27 @@
package org.apache.dolphinscheduler.server.registry;
-import org.apache.dolphinscheduler.service.zk.ZookeeperCachedOperator;
+import static org.apache.dolphinscheduler.common.Constants.MASTER_PREFIX;
+import static org.apache.dolphinscheduler.common.Constants.SINGLE_SLASH;
+import static org.apache.dolphinscheduler.common.Constants.UNDERLINE;
+import static org.apache.dolphinscheduler.common.Constants.WORKER_PREFIX;
+
+import org.apache.dolphinscheduler.common.Constants;
+import org.apache.dolphinscheduler.common.IStoppable;
+import org.apache.dolphinscheduler.service.zk.RegisterOperator;
import org.apache.dolphinscheduler.service.zk.ZookeeperConfig;
-import org.springframework.beans.factory.InitializingBean;
-import org.springframework.beans.factory.annotation.Autowired;
-import org.springframework.stereotype.Service;
import java.util.HashSet;
import java.util.List;
import java.util.Set;
import java.util.concurrent.atomic.AtomicBoolean;
+import org.springframework.beans.factory.InitializingBean;
+import org.springframework.beans.factory.annotation.Autowired;
+import org.springframework.stereotype.Service;
+
/**
- * zookeeper register center
+ * zookeeper register center
*/
@Service
public class ZookeeperRegistryCenter implements InitializingBean {
@@ -38,10 +46,9 @@ public class ZookeeperRegistryCenter implements
InitializingBean {
@Autowired
- protected ZookeeperCachedOperator zookeeperCachedOperator;
-
+ protected RegisterOperator registerOperator;
@Autowired
- private ZookeeperConfig zookeeperConfig;
+ private ZookeeperConfig zookeeperConfig;
/**
* nodes namespace
@@ -60,6 +67,8 @@ public class ZookeeperRegistryCenter implements
InitializingBean {
public final String EMPTY = "";
+ private IStoppable stoppable;
+
@Override
public void afterPropertiesSet() throws Exception {
NODES = zookeeperConfig.getDsRoot() + "/nodes";
@@ -82,23 +91,22 @@ public class ZookeeperRegistryCenter implements
InitializingBean {
* init nodes
*/
private void initNodes() {
- zookeeperCachedOperator.persist(MASTER_PATH, EMPTY);
- zookeeperCachedOperator.persist(WORKER_PATH, EMPTY);
+ registerOperator.persist(MASTER_PATH, EMPTY);
+ registerOperator.persist(WORKER_PATH, EMPTY);
}
/**
* close
*/
public void close() {
- if (isStarted.compareAndSet(true, false)) {
- if (zookeeperCachedOperator != null) {
- zookeeperCachedOperator.close();
- }
+ if (isStarted.compareAndSet(true, false) && registerOperator != null) {
+ registerOperator.close();
}
}
/**
* get master path
+ *
* @return master path
*/
public String getMasterPath() {
@@ -107,6 +115,7 @@ public class ZookeeperRegistryCenter implements
InitializingBean {
/**
* get worker path
+ *
* @return worker path
*/
public String getWorkerPath() {
@@ -114,7 +123,8 @@ public class ZookeeperRegistryCenter implements
InitializingBean {
}
/**
- * get master nodes directly
+ * get master nodes directly
+ *
* @return master nodes
*/
public Set<String> getMasterNodesDirectly() {
@@ -123,7 +133,8 @@ public class ZookeeperRegistryCenter implements
InitializingBean {
}
/**
- * get worker nodes directly
+ * get worker nodes directly
+ *
* @return master nodes
*/
public Set<String> getWorkerNodesDirectly() {
@@ -133,6 +144,7 @@ public class ZookeeperRegistryCenter implements
InitializingBean {
/**
* get worker group directly
+ *
* @return worker group nodes
*/
public Set<String> getWorkerGroupDirectly() {
@@ -142,6 +154,7 @@ public class ZookeeperRegistryCenter implements
InitializingBean {
/**
* get worker group nodes
+ *
* @param workerGroup
* @return
*/
@@ -152,6 +165,7 @@ public class ZookeeperRegistryCenter implements
InitializingBean {
/**
* whether worker path
+ *
* @param path path
* @return result
*/
@@ -161,6 +175,7 @@ public class ZookeeperRegistryCenter implements
InitializingBean {
/**
* whether master path
+ *
* @param path path
* @return result
*/
@@ -170,6 +185,7 @@ public class ZookeeperRegistryCenter implements
InitializingBean {
/**
* get worker group path
+ *
* @param workerGroup workerGroup
* @return worker group path
*/
@@ -179,19 +195,53 @@ public class ZookeeperRegistryCenter implements
InitializingBean {
/**
* get children nodes
+ *
* @param key key
* @return children nodes
*/
public List<String> getChildrenKeys(final String key) {
- return zookeeperCachedOperator.getChildrenKeys(key);
+ return registerOperator.getChildrenKeys(key);
+ }
+
+ /**
+ * @return get dead server node parent path
+ */
+ public String getDeadZNodeParentPath() {
+ return registerOperator.getZookeeperConfig().getDsRoot() +
Constants.ZOOKEEPER_DOLPHINSCHEDULER_DEAD_SERVERS;
+ }
+
+ public void setStoppable(IStoppable stoppable) {
+ this.stoppable = stoppable;
+ }
+
+ public IStoppable getStoppable() {
+ return stoppable;
}
/**
- * get zookeeperCachedOperator
- * @return zookeeperCachedOperator
+ * check dead server or not , if dead, stop self
+ *
+ * @param zNode node path
+ * @param serverType master or worker prefix
+ * @return true if not exists
+ * @throws Exception errors
*/
- public ZookeeperCachedOperator getZookeeperCachedOperator() {
- return zookeeperCachedOperator;
+ protected boolean checkIsDeadServer(String zNode, String serverType)
throws Exception {
+ //ip_sequenceno
+ String[] zNodesPath = zNode.split("\\/");
+ String ipSeqNo = zNodesPath[zNodesPath.length - 1];
+
+ String type = serverType.equals(MASTER_PREFIX) ? MASTER_PREFIX :
WORKER_PREFIX;
+ String deadServerPath = getDeadZNodeParentPath() + SINGLE_SLASH + type
+ UNDERLINE + ipSeqNo;
+
+ if (!registerOperator.isExisted(zNode) ||
registerOperator.isExisted(deadServerPath)) {
+ return true;
+ }
+
+ return false;
}
+ public RegisterOperator getRegisterOperator() {
+ return registerOperator;
+ }
}
diff --git
a/dolphinscheduler-server/src/main/java/org/apache/dolphinscheduler/server/worker/WorkerServer.java
b/dolphinscheduler-server/src/main/java/org/apache/dolphinscheduler/server/worker/WorkerServer.java
index 6895de3..b408f6b 100644
---
a/dolphinscheduler-server/src/main/java/org/apache/dolphinscheduler/server/worker/WorkerServer.java
+++
b/dolphinscheduler-server/src/main/java/org/apache/dolphinscheduler/server/worker/WorkerServer.java
@@ -17,6 +17,8 @@
package org.apache.dolphinscheduler.server.worker;
import org.apache.dolphinscheduler.common.Constants;
+import org.apache.dolphinscheduler.common.IStoppable;
+import org.apache.dolphinscheduler.common.enums.ZKNodeType;
import org.apache.dolphinscheduler.common.thread.Stopper;
import org.apache.dolphinscheduler.remote.NettyRemotingServer;
import org.apache.dolphinscheduler.remote.command.CommandType;
@@ -29,6 +31,9 @@ import
org.apache.dolphinscheduler.server.worker.processor.TaskKillProcessor;
import org.apache.dolphinscheduler.server.worker.registry.WorkerRegistry;
import
org.apache.dolphinscheduler.server.worker.runner.RetryReportTaskStatusThread;
import org.apache.dolphinscheduler.service.bean.SpringApplicationContext;
+
+import java.util.Set;
+
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.springframework.beans.factory.annotation.Autowired;
@@ -42,7 +47,7 @@ import javax.annotation.PostConstruct;
* worker server
*/
@ComponentScan("org.apache.dolphinscheduler")
-public class WorkerServer {
+public class WorkerServer implements IStoppable {
/**
* logger
@@ -106,7 +111,15 @@ public class WorkerServer {
this.nettyRemotingServer.start();
// worker registry
- this.workerRegistry.registry();
+ try {
+ this.workerRegistry.registry();
+
this.workerRegistry.getZookeeperRegistryCenter().setStoppable(this);
+ Set<String> workerZkPaths = this.workerRegistry.getWorkerZkPaths();
+
this.workerRegistry.getZookeeperRegistryCenter().getRegisterOperator().handleDeadServer(workerZkPaths,
ZKNodeType.WORKER, Constants.DELETE_ZK_OP);
+ } catch (Exception e) {
+ logger.error(e.getMessage(), e);
+ throw new RuntimeException(e);
+ }
// retry report task status
this.retryReportTaskStatusThread.start();
@@ -117,7 +130,9 @@ public class WorkerServer {
Runtime.getRuntime().addShutdownHook(new Thread(new Runnable() {
@Override
public void run() {
- close("shutdownHook");
+ if (Stopper.isRunning()) {
+ close("shutdownHook");
+ }
}
}));
}
@@ -126,7 +141,7 @@ public class WorkerServer {
try {
//execute only once
- if(Stopper.isStopped()){
+ if (Stopper.isStopped()) {
return;
}
@@ -138,7 +153,7 @@ public class WorkerServer {
try {
//thread sleep 3 seconds for thread quitely stop
Thread.sleep(3000L);
- }catch (Exception e){
+ } catch (Exception e) {
logger.warn("thread sleep exception", e);
}
@@ -147,8 +162,13 @@ public class WorkerServer {
} catch (Exception e) {
logger.error("worker server stop exception ", e);
+ } finally {
System.exit(-1);
}
}
+ @Override
+ public void stop(String cause) {
+ close(cause);
+ }
}
diff --git
a/dolphinscheduler-server/src/main/java/org/apache/dolphinscheduler/server/worker/registry/WorkerRegistry.java
b/dolphinscheduler-server/src/main/java/org/apache/dolphinscheduler/server/worker/registry/WorkerRegistry.java
index 921d0de..01e4554 100644
---
a/dolphinscheduler-server/src/main/java/org/apache/dolphinscheduler/server/worker/registry/WorkerRegistry.java
+++
b/dolphinscheduler-server/src/main/java/org/apache/dolphinscheduler/server/worker/registry/WorkerRegistry.java
@@ -19,6 +19,17 @@ package org.apache.dolphinscheduler.server.worker.registry;
import static
org.apache.dolphinscheduler.common.Constants.DEFAULT_WORKER_GROUP;
import static org.apache.dolphinscheduler.common.Constants.SLASH;
+import org.apache.dolphinscheduler.common.Constants;
+import org.apache.dolphinscheduler.common.utils.DateUtils;
+import org.apache.dolphinscheduler.common.utils.OSUtils;
+import org.apache.dolphinscheduler.common.utils.StringUtils;
+import org.apache.dolphinscheduler.remote.utils.NamedThreadFactory;
+import org.apache.dolphinscheduler.server.registry.HeartBeatTask;
+import org.apache.dolphinscheduler.server.registry.ZookeeperRegistryCenter;
+import org.apache.dolphinscheduler.server.worker.config.WorkerConfig;
+
+import org.apache.curator.framework.state.ConnectionState;
+
import java.util.Date;
import java.util.Set;
import java.util.concurrent.Executors;
@@ -27,16 +38,6 @@ import java.util.concurrent.TimeUnit;
import javax.annotation.PostConstruct;
-import org.apache.curator.framework.CuratorFramework;
-import org.apache.curator.framework.state.ConnectionState;
-import org.apache.curator.framework.state.ConnectionStateListener;
-import org.apache.dolphinscheduler.common.utils.DateUtils;
-import org.apache.dolphinscheduler.common.utils.OSUtils;
-import org.apache.dolphinscheduler.common.utils.StringUtils;
-import org.apache.dolphinscheduler.remote.utils.NamedThreadFactory;
-import org.apache.dolphinscheduler.server.registry.HeartBeatTask;
-import org.apache.dolphinscheduler.server.registry.ZookeeperRegistryCenter;
-import org.apache.dolphinscheduler.server.worker.config.WorkerConfig;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.springframework.beans.factory.annotation.Autowired;
@@ -46,7 +47,7 @@ import com.google.common.collect.Sets;
/**
- * worker registry
+ * worker registry
*/
@Service
public class WorkerRegistry {
@@ -54,13 +55,13 @@ public class WorkerRegistry {
private final Logger logger =
LoggerFactory.getLogger(WorkerRegistry.class);
/**
- * zookeeper registry center
+ * zookeeper registry center
*/
@Autowired
private ZookeeperRegistryCenter zookeeperRegistryCenter;
/**
- * worker config
+ * worker config
*/
@Autowired
private WorkerConfig workerConfig;
@@ -79,14 +80,22 @@ public class WorkerRegistry {
private Set<String> workerGroups;
@PostConstruct
- public void init(){
+ public void init() {
this.workerGroups = workerConfig.getWorkerGroups();
this.startTime = DateUtils.dateToString(new Date());
this.heartBeatExecutor =
Executors.newSingleThreadScheduledExecutor(new
NamedThreadFactory("HeartBeatExecutor"));
}
/**
- * registry
+ * get zookeeper registry center
+ * @return ZookeeperRegistryCenter
+ */
+ public ZookeeperRegistryCenter getZookeeperRegistryCenter() {
+ return zookeeperRegistryCenter;
+ }
+
+ /**
+ * registry
*/
public void registry() {
String address = OSUtils.getHost();
@@ -94,20 +103,18 @@ public class WorkerRegistry {
int workerHeartbeatInterval =
workerConfig.getWorkerHeartbeatInterval();
for (String workerZKPath : workerZkPaths) {
-
zookeeperRegistryCenter.getZookeeperCachedOperator().persistEphemeral(workerZKPath,
"");
-
zookeeperRegistryCenter.getZookeeperCachedOperator().getZkClient().getConnectionStateListenable().addListener(new
ConnectionStateListener() {
- @Override
- public void stateChanged(CuratorFramework client,
ConnectionState newState) {
+
zookeeperRegistryCenter.getRegisterOperator().persistEphemeral(workerZKPath,
"");
+
zookeeperRegistryCenter.getRegisterOperator().getZkClient().getConnectionStateListenable().addListener(
+ (client,newState) -> {
if (newState == ConnectionState.LOST) {
logger.error("worker : {} connection lost from
zookeeper", address);
} else if (newState == ConnectionState.RECONNECTED) {
logger.info("worker : {} reconnected to zookeeper",
address);
-
zookeeperRegistryCenter.getZookeeperCachedOperator().persistEphemeral(workerZKPath,
"");
+
zookeeperRegistryCenter.getRegisterOperator().persistEphemeral(workerZKPath,
"");
} else if (newState == ConnectionState.SUSPENDED) {
logger.warn("worker : {} connection SUSPENDED ",
address);
}
- }
- });
+ });
logger.info("worker node : {} registry to ZK {} successfully",
address, workerZKPath);
}
@@ -115,6 +122,7 @@ public class WorkerRegistry {
this.workerConfig.getWorkerReservedMemory(),
this.workerConfig.getWorkerMaxCpuloadAvg(),
workerZkPaths,
+ Constants.WORKER_PREFIX,
this.zookeeperRegistryCenter);
this.heartBeatExecutor.scheduleAtFixedRate(heartBeatTask,
workerHeartbeatInterval, workerHeartbeatInterval, TimeUnit.SECONDS);
@@ -122,22 +130,22 @@ public class WorkerRegistry {
}
/**
- * remove registry info
+ * remove registry info
*/
public void unRegistry() {
String address = getLocalAddress();
Set<String> workerZkPaths = getWorkerZkPaths();
for (String workerZkPath : workerZkPaths) {
-
zookeeperRegistryCenter.getZookeeperCachedOperator().remove(workerZkPath);
+ zookeeperRegistryCenter.getRegisterOperator().remove(workerZkPath);
logger.info("worker node : {} unRegistry from ZK {}.", address,
workerZkPath);
}
this.heartBeatExecutor.shutdownNow();
}
/**
- * get worker path
+ * get worker path
*/
- private Set<String> getWorkerZkPaths() {
+ public Set<String> getWorkerZkPaths() {
Set<String> workerZkPaths = Sets.newHashSet();
String address = getLocalAddress();
diff --git
a/dolphinscheduler-server/src/main/java/org/apache/dolphinscheduler/server/zk/ZKMasterClient.java
b/dolphinscheduler-server/src/main/java/org/apache/dolphinscheduler/server/zk/ZKMasterClient.java
index 7f74c8c..2d8eed9 100644
---
a/dolphinscheduler-server/src/main/java/org/apache/dolphinscheduler/server/zk/ZKMasterClient.java
+++
b/dolphinscheduler-server/src/main/java/org/apache/dolphinscheduler/server/zk/ZKMasterClient.java
@@ -31,6 +31,7 @@ import org.apache.dolphinscheduler.dao.entity.ProcessInstance;
import org.apache.dolphinscheduler.dao.entity.TaskInstance;
import org.apache.dolphinscheduler.server.builder.TaskExecutionContextBuilder;
import org.apache.dolphinscheduler.server.entity.TaskExecutionContext;
+import org.apache.dolphinscheduler.server.master.MasterServer;
import org.apache.dolphinscheduler.server.master.registry.MasterRegistry;
import org.apache.dolphinscheduler.server.utils.ProcessUtils;
import org.apache.dolphinscheduler.service.process.ProcessService;
@@ -72,8 +73,7 @@ public class ZKMasterClient extends AbstractZKClient {
@Autowired
private MasterRegistry masterRegistry;
- public void start() {
-
+ public void start(MasterServer masterServer) {
InterProcessMutex mutex = null;
try {
// create distributed lock with the root node path of the lock
space as /dolphinscheduler/lock/failover/master
@@ -83,6 +83,9 @@ public class ZKMasterClient extends AbstractZKClient {
// Master registry
masterRegistry.registry();
+
masterRegistry.getZookeeperRegistryCenter().setStoppable(masterServer);
+ String registPath = this.masterRegistry.getMasterPath();
+
masterRegistry.getZookeeperRegistryCenter().getRegisterOperator().handleDeadServer(registPath,
ZKNodeType.MASTER, Constants.DELETE_ZK_OP);
// init system znode
this.initSystemZNode();
diff --git
a/dolphinscheduler-server/src/test/java/org/apache/dolphinscheduler/server/master/consumer/TaskPriorityQueueConsumerTest.java
b/dolphinscheduler-server/src/test/java/org/apache/dolphinscheduler/server/master/consumer/TaskPriorityQueueConsumerTest.java
index eb914f4..2c2a1b5 100644
---
a/dolphinscheduler-server/src/test/java/org/apache/dolphinscheduler/server/master/consumer/TaskPriorityQueueConsumerTest.java
+++
b/dolphinscheduler-server/src/test/java/org/apache/dolphinscheduler/server/master/consumer/TaskPriorityQueueConsumerTest.java
@@ -17,11 +17,20 @@
package org.apache.dolphinscheduler.server.master.consumer;
-import org.apache.dolphinscheduler.common.enums.*;
+import org.apache.dolphinscheduler.common.enums.CommandType;
+import org.apache.dolphinscheduler.common.enums.DbType;
+import org.apache.dolphinscheduler.common.enums.ExecutionStatus;
+import org.apache.dolphinscheduler.common.enums.Priority;
+import org.apache.dolphinscheduler.common.enums.ResourceType;
import org.apache.dolphinscheduler.common.model.TaskNode;
import org.apache.dolphinscheduler.common.thread.Stopper;
import org.apache.dolphinscheduler.common.utils.JSONUtils;
-import org.apache.dolphinscheduler.dao.entity.*;
+import org.apache.dolphinscheduler.dao.entity.DataSource;
+import org.apache.dolphinscheduler.dao.entity.ProcessDefinition;
+import org.apache.dolphinscheduler.dao.entity.ProcessInstance;
+import org.apache.dolphinscheduler.dao.entity.Resource;
+import org.apache.dolphinscheduler.dao.entity.TaskInstance;
+import org.apache.dolphinscheduler.dao.entity.Tenant;
import org.apache.dolphinscheduler.server.entity.DataxTaskExecutionContext;
import org.apache.dolphinscheduler.server.entity.TaskExecutionContext;
import org.apache.dolphinscheduler.server.master.config.MasterConfig;
@@ -35,8 +44,17 @@ import
org.apache.dolphinscheduler.service.bean.SpringApplicationContext;
import org.apache.dolphinscheduler.service.process.ProcessService;
import org.apache.dolphinscheduler.service.queue.TaskPriority;
import org.apache.dolphinscheduler.service.queue.TaskPriorityQueue;
-import org.apache.dolphinscheduler.service.zk.ZookeeperCachedOperator;
+import org.apache.dolphinscheduler.service.zk.RegisterOperator;
import org.apache.dolphinscheduler.service.zk.ZookeeperConfig;
+
+import org.apache.curator.CuratorZookeeperClient;
+
+import java.util.ArrayList;
+import java.util.Date;
+import java.util.List;
+import java.util.Map;
+import java.util.concurrent.TimeUnit;
+
import org.junit.After;
import org.junit.Assert;
import org.junit.Before;
@@ -47,17 +65,10 @@ import
org.springframework.beans.factory.annotation.Autowired;
import org.springframework.test.context.ContextConfiguration;
import org.springframework.test.context.junit4.SpringJUnit4ClassRunner;
-import java.util.ArrayList;
-import java.util.Date;
-import java.util.List;
-import java.util.Map;
-import java.util.concurrent.TimeUnit;
-
-
@RunWith(SpringJUnit4ClassRunner.class)
@ContextConfiguration(classes={DependencyConfig.class,
SpringApplicationContext.class, SpringZKServer.class,
NettyExecutorManager.class, ExecutorDispatcher.class,
ZookeeperRegistryCenter.class, TaskPriorityQueueConsumer.class,
- ZookeeperNodeManager.class, ZookeeperCachedOperator.class,
ZookeeperConfig.class, MasterConfig.class})
+ ZookeeperNodeManager.class, RegisterOperator.class,
ZookeeperConfig.class, MasterConfig.class})
public class TaskPriorityQueueConsumerTest {
@@ -503,8 +514,6 @@ public class TaskPriorityQueueConsumerTest {
TaskExecutionContext taskExecutionContext =
taskPriorityQueueConsumer.getTaskExecutionContext(1);
-
-
Assert.assertNotNull(taskExecutionContext);
}
diff --git
a/dolphinscheduler-server/src/test/java/org/apache/dolphinscheduler/server/master/registry/MasterRegistryTest.java
b/dolphinscheduler-server/src/test/java/org/apache/dolphinscheduler/server/master/registry/MasterRegistryTest.java
index 81d27be..2488d59 100644
---
a/dolphinscheduler-server/src/test/java/org/apache/dolphinscheduler/server/master/registry/MasterRegistryTest.java
+++
b/dolphinscheduler-server/src/test/java/org/apache/dolphinscheduler/server/master/registry/MasterRegistryTest.java
@@ -17,29 +17,32 @@
package org.apache.dolphinscheduler.server.master.registry;
-import org.apache.dolphinscheduler.common.utils.OSUtils;
+import static
org.apache.dolphinscheduler.common.Constants.HEARTBEAT_FOR_ZOOKEEPER_INFO_LENGTH;
+
import org.apache.dolphinscheduler.remote.utils.Constants;
import org.apache.dolphinscheduler.server.master.config.MasterConfig;
import org.apache.dolphinscheduler.server.registry.ZookeeperRegistryCenter;
import org.apache.dolphinscheduler.server.zk.SpringZKServer;
import org.apache.dolphinscheduler.service.zk.ZookeeperCachedOperator;
import org.apache.dolphinscheduler.service.zk.ZookeeperConfig;
+
+import org.apache.curator.CuratorZookeeperClient;
+
+import java.util.List;
+import java.util.concurrent.TimeUnit;
+
import org.junit.Assert;
import org.junit.Test;
import org.junit.runner.RunWith;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.test.context.ContextConfiguration;
import org.springframework.test.context.junit4.SpringRunner;
-
-import java.util.List;
-import java.util.concurrent.TimeUnit;
-
-import static
org.apache.dolphinscheduler.common.Constants.HEARTBEAT_FOR_ZOOKEEPER_INFO_LENGTH;
/**
* master registry test
*/
@RunWith(SpringRunner.class)
-@ContextConfiguration(classes={SpringZKServer.class,
MasterRegistry.class,ZookeeperRegistryCenter.class, MasterConfig.class,
ZookeeperCachedOperator.class, ZookeeperConfig.class})
+@ContextConfiguration(classes = {SpringZKServer.class, MasterRegistry.class,
ZookeeperRegistryCenter.class,
+ MasterConfig.class, ZookeeperCachedOperator.class,
ZookeeperConfig.class, CuratorZookeeperClient.class})
public class MasterRegistryTest {
@Autowired
@@ -56,18 +59,20 @@ public class MasterRegistryTest {
masterRegistry.registry();
String masterPath = zookeeperRegistryCenter.getMasterPath();
TimeUnit.SECONDS.sleep(masterConfig.getMasterHeartbeatInterval() + 2);
//wait heartbeat info write into zk node
- String masterNodePath = masterPath + Constants.SLASH +
(OSUtils.getAddr(Constants.LOCAL_ADDRESS, masterConfig.getListenPort()));
- String heartbeat =
zookeeperRegistryCenter.getZookeeperCachedOperator().get(masterNodePath);
+ String masterNodePath = masterPath + "/" + (Constants.LOCAL_ADDRESS +
":" + masterConfig.getListenPort());
+ String heartbeat =
zookeeperRegistryCenter.getRegisterOperator().get(masterNodePath);
Assert.assertEquals(HEARTBEAT_FOR_ZOOKEEPER_INFO_LENGTH,
heartbeat.split(",").length);
+ masterRegistry.unRegistry();
}
@Test
public void testUnRegistry() throws InterruptedException {
+ masterRegistry.init();
masterRegistry.registry();
TimeUnit.SECONDS.sleep(masterConfig.getMasterHeartbeatInterval() + 2);
//wait heartbeat info write into zk node
masterRegistry.unRegistry();
String masterPath = zookeeperRegistryCenter.getMasterPath();
- List<String> childrenKeys =
zookeeperRegistryCenter.getZookeeperCachedOperator().getChildrenKeys(masterPath);
+ List<String> childrenKeys =
zookeeperRegistryCenter.getRegisterOperator().getChildrenKeys(masterPath);
Assert.assertTrue(childrenKeys.isEmpty());
}
}
diff --git
a/dolphinscheduler-server/src/test/java/org/apache/dolphinscheduler/server/registry/ZookeeperRegistryCenterTest.java
b/dolphinscheduler-server/src/test/java/org/apache/dolphinscheduler/server/registry/ZookeeperRegistryCenterTest.java
new file mode 100644
index 0000000..24bb25c
--- /dev/null
+++
b/dolphinscheduler-server/src/test/java/org/apache/dolphinscheduler/server/registry/ZookeeperRegistryCenterTest.java
@@ -0,0 +1,61 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.dolphinscheduler.server.registry;
+
+import org.apache.dolphinscheduler.common.Constants;
+import org.apache.dolphinscheduler.service.zk.RegisterOperator;
+import org.apache.dolphinscheduler.service.zk.ZookeeperConfig;
+
+import org.junit.Assert;
+import org.junit.Test;
+import org.junit.runner.RunWith;
+import org.mockito.InjectMocks;
+import org.mockito.Mock;
+import org.mockito.Mockito;
+import org.mockito.junit.MockitoJUnitRunner;
+
+/**
+ * zookeeper registry center test
+ */
+@RunWith(MockitoJUnitRunner.class)
+public class ZookeeperRegistryCenterTest {
+
+ @InjectMocks
+ private ZookeeperRegistryCenter zookeeperRegistryCenter;
+
+ @Mock
+ protected RegisterOperator registerOperator;
+
+ @Mock
+ private ZookeeperConfig zookeeperConfig;
+
+ private static final String DS_ROOT = "/dolphinscheduler";
+
+ @Test
+ public void testGetDeadZNodeParentPath() {
+ ZookeeperConfig zookeeperConfig = new ZookeeperConfig();
+ zookeeperConfig.setDsRoot(DS_ROOT);
+
Mockito.when(registerOperator.getZookeeperConfig()).thenReturn(zookeeperConfig);
+
+ String deadZNodeParentPath =
zookeeperRegistryCenter.getDeadZNodeParentPath();
+
+ Assert.assertEquals(deadZNodeParentPath, DS_ROOT +
Constants.ZOOKEEPER_DOLPHINSCHEDULER_DEAD_SERVERS);
+
+ }
+
+}
\ No newline at end of file
diff --git
a/dolphinscheduler-server/src/test/java/org/apache/dolphinscheduler/server/worker/processor/TaskCallbackServiceTest.java
b/dolphinscheduler-server/src/test/java/org/apache/dolphinscheduler/server/worker/processor/TaskCallbackServiceTest.java
index c38ca3e..f228802 100644
---
a/dolphinscheduler-server/src/test/java/org/apache/dolphinscheduler/server/worker/processor/TaskCallbackServiceTest.java
+++
b/dolphinscheduler-server/src/test/java/org/apache/dolphinscheduler/server/worker/processor/TaskCallbackServiceTest.java
@@ -14,9 +14,9 @@
* See the License for the specific language governing permissions and
* limitations under the License.
*/
+
package org.apache.dolphinscheduler.server.worker.processor;
-import io.netty.channel.Channel;
import org.apache.dolphinscheduler.common.thread.Stopper;
import org.apache.dolphinscheduler.remote.NettyRemotingClient;
import org.apache.dolphinscheduler.remote.NettyRemotingServer;
@@ -33,12 +33,18 @@ import
org.apache.dolphinscheduler.server.master.processor.queue.TaskResponseSer
import org.apache.dolphinscheduler.server.master.registry.MasterRegistry;
import org.apache.dolphinscheduler.server.registry.ZookeeperNodeManager;
import org.apache.dolphinscheduler.server.registry.ZookeeperRegistryCenter;
+import
org.apache.dolphinscheduler.server.worker.cache.impl.TaskExecutionContextCacheManagerImpl;
import org.apache.dolphinscheduler.server.worker.config.WorkerConfig;
import org.apache.dolphinscheduler.server.worker.registry.WorkerRegistry;
import org.apache.dolphinscheduler.server.zk.SpringZKServer;
import org.apache.dolphinscheduler.service.bean.SpringApplicationContext;
-import org.apache.dolphinscheduler.service.zk.ZookeeperCachedOperator;
+import org.apache.dolphinscheduler.service.zk.RegisterOperator;
import org.apache.dolphinscheduler.service.zk.ZookeeperConfig;
+
+import org.apache.curator.CuratorZookeeperClient;
+
+import java.util.Date;
+
import org.junit.Assert;
import org.junit.Test;
import org.junit.runner.RunWith;
@@ -47,17 +53,32 @@ import
org.springframework.beans.factory.annotation.Autowired;
import org.springframework.test.context.ContextConfiguration;
import org.springframework.test.context.junit4.SpringJUnit4ClassRunner;
-import java.io.IOException;
-import java.util.Date;
+import io.netty.channel.Channel;
/**
* test task call back service
+ * todo refactor it in the form of mock
*/
@RunWith(SpringJUnit4ClassRunner.class)
-@ContextConfiguration(classes={TaskCallbackServiceTestConfig.class,
SpringZKServer.class, SpringApplicationContext.class, MasterRegistry.class,
WorkerRegistry.class,
- ZookeeperRegistryCenter.class, MasterConfig.class, WorkerConfig.class,
- ZookeeperCachedOperator.class, ZookeeperConfig.class,
ZookeeperNodeManager.class, TaskCallbackService.class,
- TaskResponseService.class,
TaskAckProcessor.class,TaskResponseProcessor.class})
+@ContextConfiguration(classes = {
+ TaskCallbackServiceTestConfig.class,
+ SpringZKServer.class,
+ SpringApplicationContext.class,
+ MasterRegistry.class,
+ WorkerRegistry.class,
+ ZookeeperRegistryCenter.class,
+ MasterConfig.class,
+ WorkerConfig.class,
+ RegisterOperator.class,
+ ZookeeperConfig.class,
+ ZookeeperNodeManager.class,
+ TaskCallbackService.class,
+ TaskResponseService.class,
+ TaskAckProcessor.class,
+ TaskResponseProcessor.class,
+ TaskExecuteProcessor.class,
+ CuratorZookeeperClient.class,
+ TaskExecutionContextCacheManagerImpl.class})
public class TaskCallbackServiceTest {
@Autowired
@@ -74,6 +95,7 @@ public class TaskCallbackServiceTest {
/**
* send ack test
+ *
* @throws Exception
*/
@Test
@@ -101,6 +123,7 @@ public class TaskCallbackServiceTest {
/**
* send result test
+ *
* @throws Exception
*/
@Test
@@ -140,7 +163,7 @@ public class TaskCallbackServiceTest {
@Test
public void testPause(){
- Assert.assertEquals(5000, taskCallbackService.pause(3));;
+ Assert.assertEquals(5000, taskCallbackService.pause(3));
}
@Test
@@ -171,41 +194,4 @@ public class TaskCallbackServiceTest {
nettyRemotingServer.close();
nettyRemotingClient.close();
}
-
-// @Test(expected = IllegalStateException.class)
-// public void testSendAckWithIllegalStateException2(){
-// masterRegistry.registry();
-// final NettyServerConfig serverConfig = new NettyServerConfig();
-// serverConfig.setListenPort(30000);
-// NettyRemotingServer nettyRemotingServer = new
NettyRemotingServer(serverConfig);
-// nettyRemotingServer.registerProcessor(CommandType.TASK_EXECUTE_ACK,
taskAckProcessor);
-// nettyRemotingServer.start();
-//
-// final NettyClientConfig clientConfig = new NettyClientConfig();
-// NettyRemotingClient nettyRemotingClient = new
NettyRemotingClient(clientConfig);
-// Channel channel =
nettyRemotingClient.getChannel(Host.of("localhost:30000"));
-// taskCallbackService.addRemoteChannel(1, new
NettyRemoteChannel(channel, 1));
-// channel.close();
-// TaskExecuteAckCommand ackCommand = new TaskExecuteAckCommand();
-// ackCommand.setTaskInstanceId(1);
-// ackCommand.setStartTime(new Date());
-//
-// nettyRemotingServer.close();
-//
-// taskCallbackService.sendAck(1, ackCommand.convert2Command());
-// try {
-// Thread.sleep(5000);
-// } catch (InterruptedException e) {
-// e.printStackTrace();
-// }
-//
-// Stopper.stop();
-//
-// try {
-// Thread.sleep(5000);
-// } catch (InterruptedException e) {
-// e.printStackTrace();
-// }
-// }
-
}
diff --git
a/dolphinscheduler-server/src/test/java/org/apache/dolphinscheduler/server/worker/registry/WorkerRegistryTest.java
b/dolphinscheduler-server/src/test/java/org/apache/dolphinscheduler/server/worker/registry/WorkerRegistryTest.java
index 98a78e9..0a4307b 100644
---
a/dolphinscheduler-server/src/test/java/org/apache/dolphinscheduler/server/worker/registry/WorkerRegistryTest.java
+++
b/dolphinscheduler-server/src/test/java/org/apache/dolphinscheduler/server/worker/registry/WorkerRegistryTest.java
@@ -19,18 +19,20 @@ package org.apache.dolphinscheduler.server.worker.registry;
import static
org.apache.dolphinscheduler.common.Constants.DEFAULT_WORKER_GROUP;
-import java.util.List;
-import java.util.Set;
-import java.util.concurrent.Executor;
-
-import org.apache.curator.framework.imps.CuratorFrameworkImpl;
-import org.apache.curator.framework.listen.Listenable;
-import org.apache.curator.framework.state.ConnectionStateListener;
import org.apache.dolphinscheduler.common.utils.OSUtils;
import org.apache.dolphinscheduler.common.utils.StringUtils;
import org.apache.dolphinscheduler.server.registry.ZookeeperRegistryCenter;
import org.apache.dolphinscheduler.server.worker.config.WorkerConfig;
-import org.apache.dolphinscheduler.service.zk.ZookeeperCachedOperator;
+import org.apache.dolphinscheduler.service.zk.RegisterOperator;
+
+import org.apache.curator.framework.imps.CuratorFrameworkImpl;
+import org.apache.curator.framework.listen.Listenable;
+import org.apache.curator.framework.state.ConnectionStateListener;
+
+import java.util.List;
+import java.util.Set;
+import java.util.concurrent.Executor;
+
import org.junit.Assert;
import org.junit.Before;
import org.junit.Test;
@@ -61,7 +63,7 @@ public class WorkerRegistryTest {
private ZookeeperRegistryCenter zookeeperRegistryCenter;
@Mock
- private ZookeeperCachedOperator zookeeperCachedOperator;
+ private RegisterOperator registerOperator;
@Mock
private CuratorFrameworkImpl zkClient;
@@ -69,15 +71,21 @@ public class WorkerRegistryTest {
@Mock
private WorkerConfig workerConfig;
+ private static final Set<String> workerGroups;
+
+ static {
+ workerGroups = Sets.newHashSet(DEFAULT_WORKER_GROUP,
TEST_WORKER_GROUP);
+ }
+
@Before
public void before() {
- Set<String> workerGroups = Sets.newHashSet(DEFAULT_WORKER_GROUP,
TEST_WORKER_GROUP);
+
Mockito.when(workerConfig.getWorkerGroups()).thenReturn(workerGroups);
Mockito.when(zookeeperRegistryCenter.getWorkerPath()).thenReturn("/dolphinscheduler/nodes/worker");
-
Mockito.when(zookeeperRegistryCenter.getZookeeperCachedOperator()).thenReturn(zookeeperCachedOperator);
-
Mockito.when(zookeeperRegistryCenter.getZookeeperCachedOperator().getZkClient()).thenReturn(zkClient);
-
Mockito.when(zookeeperRegistryCenter.getZookeeperCachedOperator().getZkClient().getConnectionStateListenable()).thenReturn(
+
Mockito.when(zookeeperRegistryCenter.getRegisterOperator()).thenReturn(registerOperator);
+
Mockito.when(zookeeperRegistryCenter.getRegisterOperator().getZkClient()).thenReturn(zkClient);
+
Mockito.when(zookeeperRegistryCenter.getRegisterOperator().getZkClient().getConnectionStateListenable()).thenReturn(
new Listenable<ConnectionStateListener>() {
@Override
public void addListener(ConnectionStateListener
connectionStateListener) {
@@ -114,7 +122,7 @@ public class WorkerRegistryTest {
int i = 0;
for (String workerGroup : workerConfig.getWorkerGroups()) {
String workerZkPath = workerPath + "/" + workerGroup.trim() + "/"
+ (OSUtils.getAddr(workerConfig.getListenPort()));
- String heartbeat =
zookeeperRegistryCenter.getZookeeperCachedOperator().get(workerZkPath);
+ String heartbeat =
zookeeperRegistryCenter.getRegisterOperator().get(workerZkPath);
if (0 == i) {
Assert.assertTrue(workerZkPath.startsWith("/dolphinscheduler/nodes/worker/test/"));
} else {
@@ -143,6 +151,7 @@ public class WorkerRegistryTest {
Assert.assertEquals(0, testWorkerGroupPathZkChildren.size());
Assert.assertEquals(0, defaultWorkerGroupPathZkChildren.size());
+ workerRegistry.unRegistry();
}
@Test
@@ -155,7 +164,7 @@ public class WorkerRegistryTest {
for (String workerGroup : workerConfig.getWorkerGroups()) {
String workerGroupPath = workerPath + "/" + workerGroup.trim();
- List<String> childrenKeys =
zookeeperRegistryCenter.getZookeeperCachedOperator().getChildrenKeys(workerGroupPath);
+ List<String> childrenKeys =
zookeeperRegistryCenter.getRegisterOperator().getChildrenKeys(workerGroupPath);
Assert.assertTrue(childrenKeys.isEmpty());
}
@@ -167,4 +176,10 @@ public class WorkerRegistryTest {
workerRegistry.unRegistry();
}
+
+ @Test
+ public void testGetWorkerZkPaths() {
+ workerRegistry.init();
+
Assert.assertEquals(workerGroups.size(),workerRegistry.getWorkerZkPaths().size());
+ }
}
diff --git
a/dolphinscheduler-service/src/main/java/org/apache/dolphinscheduler/service/zk/AbstractZKClient.java
b/dolphinscheduler-service/src/main/java/org/apache/dolphinscheduler/service/zk/AbstractZKClient.java
index 1cc4db6..24cdb89 100644
---
a/dolphinscheduler-service/src/main/java/org/apache/dolphinscheduler/service/zk/AbstractZKClient.java
+++
b/dolphinscheduler-service/src/main/java/org/apache/dolphinscheduler/service/zk/AbstractZKClient.java
@@ -14,322 +14,256 @@
* See the License for the specific language governing permissions and
* limitations under the License.
*/
+
package org.apache.dolphinscheduler.service.zk;
-import org.apache.curator.framework.recipes.locks.InterProcessMutex;
+import static org.apache.dolphinscheduler.common.Constants.ADD_ZK_OP;
+import static org.apache.dolphinscheduler.common.Constants.COLON;
+import static org.apache.dolphinscheduler.common.Constants.DELETE_ZK_OP;
+import static org.apache.dolphinscheduler.common.Constants.DIVISION_STRING;
+import static org.apache.dolphinscheduler.common.Constants.MASTER_PREFIX;
+import static org.apache.dolphinscheduler.common.Constants.SINGLE_SLASH;
+import static org.apache.dolphinscheduler.common.Constants.UNDERLINE;
+import static org.apache.dolphinscheduler.common.Constants.WORKER_PREFIX;
+
import org.apache.dolphinscheduler.common.Constants;
import org.apache.dolphinscheduler.common.enums.ZKNodeType;
import org.apache.dolphinscheduler.common.model.Server;
import org.apache.dolphinscheduler.common.utils.ResInfo;
import org.apache.dolphinscheduler.common.utils.StringUtils;
+
+import org.apache.curator.framework.recipes.locks.InterProcessMutex;
+
+import java.util.ArrayList;
+import java.util.HashMap;
+import java.util.List;
+import java.util.Map;
+
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.springframework.stereotype.Component;
-import java.util.*;
-
-import static org.apache.dolphinscheduler.common.Constants.*;
-
/**
* abstract zookeeper client
*/
@Component
-public abstract class AbstractZKClient extends ZookeeperCachedOperator {
-
- private static final Logger logger =
LoggerFactory.getLogger(AbstractZKClient.class);
-
-
- /**
- * remove dead server by host
- * @param host host
- * @param serverType serverType
- * @throws Exception
- */
- public void removeDeadServerByHost(String host, String serverType)
throws Exception {
- List<String> deadServers =
super.getChildrenKeys(getDeadZNodeParentPath());
- for(String serverPath : deadServers){
- if(serverPath.startsWith(serverType+UNDERLINE+host)){
- String server = getDeadZNodeParentPath() +
SINGLE_SLASH + serverPath;
- super.remove(server);
- logger.info("{} server {} deleted from zk dead
server path success" , serverType , host);
- }
- }
- }
-
-
- /**
- * opType(add): if find dead server , then add to zk deadServerPath
- * opType(delete): delete path from zk
- *
- * @param zNode node path
- * @param zkNodeType master or worker
- * @param opType delete or add
- * @throws Exception errors
- */
- public void handleDeadServer(String zNode, ZKNodeType zkNodeType,
String opType) throws Exception {
- String host = getHostByEventDataPath(zNode);
- String type = (zkNodeType == ZKNodeType.MASTER) ? MASTER_PREFIX
: WORKER_PREFIX;
-
- //check server restart, if restart , dead server path in zk
should be delete
- if(opType.equals(DELETE_ZK_OP)){
- removeDeadServerByHost(host, type);
-
- }else if(opType.equals(ADD_ZK_OP)){
- String deadServerPath = getDeadZNodeParentPath() +
SINGLE_SLASH + type + UNDERLINE + host;
- if(!super.isExisted(deadServerPath)){
- //add dead server info to zk dead server path :
/dead-servers/
-
- super.persist(deadServerPath,(type + UNDERLINE
+ host));
-
- logger.info("{} server dead , and {} added to
zk dead server path success" ,
- zkNodeType.toString(), zNode);
- }
- }
-
- }
-
- /**
- * get active master num
- * @return active master number
- */
- public int getActiveMasterNum(){
- List<String> childrenList = new ArrayList<>();
- try {
- // read master node parent path from conf
-
if(super.isExisted(getZNodeParentPath(ZKNodeType.MASTER))){
- childrenList =
super.getChildrenKeys(getZNodeParentPath(ZKNodeType.MASTER));
- }
- } catch (Exception e) {
- logger.error("getActiveMasterNum error",e);
- }
- return childrenList.size();
- }
-
- /**
- *
- * @return zookeeper quorum
- */
- public String getZookeeperQuorum(){
- return getZookeeperConfig().getServerList();
- }
-
- /**
- * get server list.
- * @param zkNodeType zookeeper node type
- * @return server list
- */
- public List<Server> getServersList(ZKNodeType zkNodeType){
- Map<String, String> masterMap = getServerMaps(zkNodeType);
- String parentPath = getZNodeParentPath(zkNodeType);
-
- List<Server> masterServers = new ArrayList<>();
- for (Map.Entry<String, String> entry : masterMap.entrySet()) {
- Server masterServer =
ResInfo.parseHeartbeatForZKInfo(entry.getValue());
- if(masterServer == null){
- continue;
- }
- String key = entry.getKey();
- masterServer.setZkDirectory(parentPath + "/"+ key);
- //set host and port
- String[] hostAndPort=key.split(COLON);
- String[] hosts=hostAndPort[0].split(DIVISION_STRING);
- // fetch the last one
- masterServer.setHost(hosts[hosts.length-1]);
- masterServer.setPort(Integer.parseInt(hostAndPort[1]));
- masterServers.add(masterServer);
- }
- return masterServers;
- }
-
- /**
- * get master server list map.
- * @param zkNodeType zookeeper node type
- * @return result : {host : resource info}
- */
- public Map<String, String> getServerMaps(ZKNodeType zkNodeType){
-
- Map<String, String> masterMap = new HashMap<>();
- try {
- String path = getZNodeParentPath(zkNodeType);
- List<String> serverList = super.getChildrenKeys(path);
- if(zkNodeType == ZKNodeType.WORKER){
- List<String> workerList = new ArrayList<>();
- for(String group : serverList){
- List<String> groupServers =
super.getChildrenKeys(path + Constants.SLASH + group);
- for(String groupServer : groupServers){
- workerList.add(group + Constants.SLASH
+ groupServer);
- }
- }
- serverList = workerList;
- }
- for(String server : serverList){
- masterMap.putIfAbsent(server, super.get(path +
Constants.SLASH + server));
- }
- } catch (Exception e) {
- logger.error("get server list failed", e);
- }
-
- return masterMap;
- }
-
- /**
- * check the zookeeper node already exists
- * @param host host
- * @param zkNodeType zookeeper node type
- * @return true if exists
- */
- public boolean checkZKNodeExists(String host, ZKNodeType zkNodeType) {
- String path = getZNodeParentPath(zkNodeType);
- if(StringUtils.isEmpty(path)){
- logger.error("check zk node exists error, host:{}, zk
node type:{}",
- host, zkNodeType.toString());
- return false;
- }
- Map<String, String> serverMaps = getServerMaps(zkNodeType);
- for(String hostKey : serverMaps.keySet()){
- if(hostKey.contains(host)){
- return true;
- }
- }
- return false;
- }
-
- /**
- *
- * @return get worker node parent path
- */
- protected String getWorkerZNodeParentPath(){
- return getZookeeperConfig().getDsRoot() +
Constants.ZOOKEEPER_DOLPHINSCHEDULER_WORKERS;
- }
-
- /**
- *
- * @return get master node parent path
- */
- protected String getMasterZNodeParentPath(){
- return getZookeeperConfig().getDsRoot() +
Constants.ZOOKEEPER_DOLPHINSCHEDULER_MASTERS;
- }
-
- /**
- *
- * @return get master lock path
- */
- public String getMasterLockPath(){
- return getZookeeperConfig().getDsRoot() +
Constants.ZOOKEEPER_DOLPHINSCHEDULER_LOCK_MASTERS;
- }
-
- /**
- *
- * @param zkNodeType zookeeper node type
- * @return get zookeeper node parent path
- */
- public String getZNodeParentPath(ZKNodeType zkNodeType) {
- String path = "";
- switch (zkNodeType){
- case MASTER:
- return getMasterZNodeParentPath();
- case WORKER:
- return getWorkerZNodeParentPath();
- case DEAD_SERVER:
- return getDeadZNodeParentPath();
- default:
- break;
- }
- return path;
- }
-
- /**
- *
- * @return get dead server node parent path
- */
- protected String getDeadZNodeParentPath(){
- return getZookeeperConfig().getDsRoot() +
Constants.ZOOKEEPER_DOLPHINSCHEDULER_DEAD_SERVERS;
- }
-
- /**
- *
- * @return get master start up lock path
- */
- public String getMasterStartUpLockPath(){
- return getZookeeperConfig().getDsRoot() +
Constants.ZOOKEEPER_DOLPHINSCHEDULER_LOCK_FAILOVER_STARTUP_MASTERS;
- }
-
- /**
- *
- * @return get master failover lock path
- */
- public String getMasterFailoverLockPath(){
- return getZookeeperConfig().getDsRoot() +
Constants.ZOOKEEPER_DOLPHINSCHEDULER_LOCK_FAILOVER_MASTERS;
- }
-
- /**
- *
- * @return get worker failover lock path
- */
- public String getWorkerFailoverLockPath(){
- return getZookeeperConfig().getDsRoot() +
Constants.ZOOKEEPER_DOLPHINSCHEDULER_LOCK_FAILOVER_WORKERS;
- }
-
- /**
- * release mutex
- * @param mutex mutex
- */
- public void releaseMutex(InterProcessMutex mutex) {
- if (mutex != null){
- try {
- mutex.release();
- } catch (Exception e) {
- if(e.getMessage().equals("instance must be
started before calling this method")){
- logger.warn("lock release");
- }else{
- logger.error("lock release failed",e);
- }
-
- }
- }
- }
-
- /**
- * init system znode
- */
- protected void initSystemZNode(){
- try {
- persist(getMasterZNodeParentPath(), "");
- persist(getWorkerZNodeParentPath(), "");
- persist(getDeadZNodeParentPath(), "");
-
- logger.info("initialize server nodes success.");
- } catch (Exception e) {
- logger.error("init system znode failed",e);
- }
- }
-
- /**
- * get host ip, string format: masterParentPath/ip
- * @param path path
- * @return host ip, string format: masterParentPath/ip
- */
- protected String getHostByEventDataPath(String path) {
- if(StringUtils.isEmpty(path)){
- logger.error("empty path!");
- return "";
- }
- String[] pathArray = path.split(SINGLE_SLASH);
- if(pathArray.length < 1){
- logger.error("parse ip error: {}", path);
- return "";
- }
- return pathArray[pathArray.length - 1];
-
- }
-
- @Override
- public String toString() {
- return "AbstractZKClient{" +
- "zkClient=" + zkClient +
- ", deadServerZNodeParentPath='" +
getZNodeParentPath(ZKNodeType.DEAD_SERVER) + '\'' +
- ", masterZNodeParentPath='" +
getZNodeParentPath(ZKNodeType.MASTER) + '\'' +
- ", workerZNodeParentPath='" +
getZNodeParentPath(ZKNodeType.WORKER) + '\'' +
- '}';
- }
-}
\ No newline at end of file
+public abstract class AbstractZKClient extends RegisterOperator {
+
+ private static final Logger logger =
LoggerFactory.getLogger(AbstractZKClient.class);
+
+ /**
+ * get active master num
+ *
+ * @return active master number
+ */
+ public int getActiveMasterNum() {
+ List<String> childrenList = new ArrayList<>();
+ try {
+ // read master node parent path from conf
+ if (super.isExisted(getZNodeParentPath(ZKNodeType.MASTER))) {
+ childrenList =
super.getChildrenKeys(getZNodeParentPath(ZKNodeType.MASTER));
+ }
+ } catch (Exception e) {
+ logger.error("getActiveMasterNum error", e);
+ }
+ return childrenList.size();
+ }
+
+ /**
+ * @return zookeeper quorum
+ */
+ public String getZookeeperQuorum() {
+ return getZookeeperConfig().getServerList();
+ }
+
+ /**
+ * get server list.
+ *
+ * @param zkNodeType zookeeper node type
+ * @return server list
+ */
+ public List<Server> getServersList(ZKNodeType zkNodeType) {
+ Map<String, String> masterMap = getServerMaps(zkNodeType);
+ String parentPath = getZNodeParentPath(zkNodeType);
+
+ List<Server> masterServers = new ArrayList<>();
+ for (Map.Entry<String, String> entry : masterMap.entrySet()) {
+ Server masterServer =
ResInfo.parseHeartbeatForZKInfo(entry.getValue());
+ if (masterServer == null) {
+ continue;
+ }
+ String key = entry.getKey();
+ masterServer.setZkDirectory(parentPath + "/" + key);
+ //set host and port
+ String[] hostAndPort = key.split(COLON);
+ String[] hosts = hostAndPort[0].split(DIVISION_STRING);
+ // fetch the last one
+ masterServer.setHost(hosts[hosts.length - 1]);
+ masterServer.setPort(Integer.parseInt(hostAndPort[1]));
+ masterServers.add(masterServer);
+ }
+ return masterServers;
+ }
+
+ /**
+ * get master server list map.
+ *
+ * @param zkNodeType zookeeper node type
+ * @return result : {host : resource info}
+ */
+ public Map<String, String> getServerMaps(ZKNodeType zkNodeType) {
+
+ Map<String, String> masterMap = new HashMap<>();
+ try {
+ String path = getZNodeParentPath(zkNodeType);
+ List<String> serverList = super.getChildrenKeys(path);
+ if (zkNodeType == ZKNodeType.WORKER) {
+ List<String> workerList = new ArrayList<>();
+ for (String group : serverList) {
+ List<String> groupServers = super.getChildrenKeys(path +
Constants.SLASH + group);
+ for (String groupServer : groupServers) {
+ workerList.add(group + Constants.SLASH + groupServer);
+ }
+ }
+ serverList = workerList;
+ }
+ for (String server : serverList) {
+ masterMap.putIfAbsent(server, super.get(path + Constants.SLASH
+ server));
+ }
+ } catch (Exception e) {
+ logger.error("get server list failed", e);
+ }
+
+ return masterMap;
+ }
+
+ /**
+ * check the zookeeper node already exists
+ *
+ * @param host host
+ * @param zkNodeType zookeeper node type
+ * @return true if exists
+ */
+ public boolean checkZKNodeExists(String host, ZKNodeType zkNodeType) {
+ String path = getZNodeParentPath(zkNodeType);
+ if (StringUtils.isEmpty(path)) {
+ logger.error("check zk node exists error, host:{}, zk node
type:{}",
+ host, zkNodeType);
+ return false;
+ }
+ Map<String, String> serverMaps = getServerMaps(zkNodeType);
+ for (String hostKey : serverMaps.keySet()) {
+ if (hostKey.contains(host)) {
+ return true;
+ }
+ }
+ return false;
+ }
+
+ /**
+ * @return get worker node parent path
+ */
+ protected String getWorkerZNodeParentPath() {
+ return getZookeeperConfig().getDsRoot() +
Constants.ZOOKEEPER_DOLPHINSCHEDULER_WORKERS;
+ }
+
+ /**
+ * @return get master node parent path
+ */
+ protected String getMasterZNodeParentPath() {
+ return getZookeeperConfig().getDsRoot() +
Constants.ZOOKEEPER_DOLPHINSCHEDULER_MASTERS;
+ }
+
+ /**
+ * @return get master lock path
+ */
+ public String getMasterLockPath() {
+ return getZookeeperConfig().getDsRoot() +
Constants.ZOOKEEPER_DOLPHINSCHEDULER_LOCK_MASTERS;
+ }
+
+ /**
+ * @param zkNodeType zookeeper node type
+ * @return get zookeeper node parent path
+ */
+ public String getZNodeParentPath(ZKNodeType zkNodeType) {
+ String path = "";
+ switch (zkNodeType) {
+ case MASTER:
+ return getMasterZNodeParentPath();
+ case WORKER:
+ return getWorkerZNodeParentPath();
+ case DEAD_SERVER:
+ return getDeadZNodeParentPath();
+ default:
+ break;
+ }
+ return path;
+ }
+
+
+ /**
+ * @return get master start up lock path
+ */
+ public String getMasterStartUpLockPath() {
+ return getZookeeperConfig().getDsRoot() +
Constants.ZOOKEEPER_DOLPHINSCHEDULER_LOCK_FAILOVER_STARTUP_MASTERS;
+ }
+
+ /**
+ * @return get master failover lock path
+ */
+ public String getMasterFailoverLockPath() {
+ return getZookeeperConfig().getDsRoot() +
Constants.ZOOKEEPER_DOLPHINSCHEDULER_LOCK_FAILOVER_MASTERS;
+ }
+
+ /**
+ * @return get worker failover lock path
+ */
+ public String getWorkerFailoverLockPath() {
+ return getZookeeperConfig().getDsRoot() +
Constants.ZOOKEEPER_DOLPHINSCHEDULER_LOCK_FAILOVER_WORKERS;
+ }
+
+ /**
+ * release mutex
+ *
+ * @param mutex mutex
+ */
+ public void releaseMutex(InterProcessMutex mutex) {
+ if (mutex != null) {
+ try {
+ mutex.release();
+ } catch (Exception e) {
+ if ("instance must be started before calling this
method".equals(e.getMessage())) {
+ logger.warn("lock release");
+ } else {
+ logger.error("lock release failed", e);
+ }
+
+ }
+ }
+ }
+
+ /**
+ * init system znode
+ */
+ protected void initSystemZNode() {
+ try {
+ persist(getMasterZNodeParentPath(), "");
+ persist(getWorkerZNodeParentPath(), "");
+ persist(getDeadZNodeParentPath(), "");
+
+ logger.info("initialize server nodes success.");
+ } catch (Exception e) {
+ logger.error("init system znode failed", e);
+ }
+ }
+
+ @Override
+ public String toString() {
+ return "AbstractZKClient{"
+ + "zkClient=" + getZkClient()
+ + ", deadServerZNodeParentPath='" +
getZNodeParentPath(ZKNodeType.DEAD_SERVER) + '\''
+ + ", masterZNodeParentPath='" +
getZNodeParentPath(ZKNodeType.MASTER) + '\''
+ + ", workerZNodeParentPath='" +
getZNodeParentPath(ZKNodeType.WORKER) + '\''
+ + '}';
+ }
+}
diff --git
a/dolphinscheduler-service/src/main/java/org/apache/dolphinscheduler/service/zk/RegisterOperator.java
b/dolphinscheduler-service/src/main/java/org/apache/dolphinscheduler/service/zk/RegisterOperator.java
new file mode 100644
index 0000000..0fd4a4f
--- /dev/null
+++
b/dolphinscheduler-service/src/main/java/org/apache/dolphinscheduler/service/zk/RegisterOperator.java
@@ -0,0 +1,155 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.dolphinscheduler.service.zk;
+
+import static org.apache.dolphinscheduler.common.Constants.ADD_ZK_OP;
+import static org.apache.dolphinscheduler.common.Constants.DELETE_ZK_OP;
+import static org.apache.dolphinscheduler.common.Constants.MASTER_PREFIX;
+import static org.apache.dolphinscheduler.common.Constants.SINGLE_SLASH;
+import static org.apache.dolphinscheduler.common.Constants.UNDERLINE;
+import static org.apache.dolphinscheduler.common.Constants.WORKER_PREFIX;
+
+import org.apache.dolphinscheduler.common.Constants;
+import org.apache.dolphinscheduler.common.enums.ZKNodeType;
+import org.apache.dolphinscheduler.common.utils.StringUtils;
+
+import java.util.List;
+import java.util.Set;
+
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+import org.springframework.stereotype.Component;
+
+/**
+ * register operator
+ */
+@Component
+public class RegisterOperator extends ZookeeperCachedOperator {
+
+ private final Logger logger =
LoggerFactory.getLogger(RegisterOperator.class);
+
+ /**
+ * @return get dead server node parent path
+ */
+ protected String getDeadZNodeParentPath() {
+ return getZookeeperConfig().getDsRoot() +
Constants.ZOOKEEPER_DOLPHINSCHEDULER_DEAD_SERVERS;
+ }
+
+ /**
+ * remove dead server by host
+ *
+ * @param host host
+ * @param serverType serverType
+ * @throws Exception
+ */
+ public void removeDeadServerByHost(String host, String serverType) throws
Exception {
+ List<String> deadServers =
super.getChildrenKeys(getDeadZNodeParentPath());
+ for (String serverPath : deadServers) {
+ if (serverPath.startsWith(serverType + UNDERLINE + host)) {
+ String server = getDeadZNodeParentPath() + SINGLE_SLASH +
serverPath;
+ super.remove(server);
+ logger.info("{} server {} deleted from zk dead server path
success", serverType, host);
+ }
+ }
+ }
+
+ /**
+ * get host ip, string format: masterParentPath/ip
+ *
+ * @param path path
+ * @return host ip, string format: masterParentPath/ip
+ */
+ protected String getHostByEventDataPath(String path) {
+ if (StringUtils.isEmpty(path)) {
+ logger.error("empty path!");
+ return "";
+ }
+ String[] pathArray = path.split(SINGLE_SLASH);
+ if (pathArray.length < 1) {
+ logger.error("parse ip error: {}", path);
+ return "";
+ }
+ return pathArray[pathArray.length - 1];
+
+ }
+
+ /**
+ * opType(add): if find dead server , then add to zk deadServerPath
+ * opType(delete): delete path from zk
+ *
+ * @param zNode node path
+ * @param zkNodeType master or worker
+ * @param opType delete or add
+ * @throws Exception errors
+ */
+ public void handleDeadServer(String zNode, ZKNodeType zkNodeType, String
opType) throws Exception {
+ String host = getHostByEventDataPath(zNode);
+ String type = (zkNodeType == ZKNodeType.MASTER) ? MASTER_PREFIX :
WORKER_PREFIX;
+
+ //check server restart, if restart , dead server path in zk should be
delete
+ if (opType.equals(DELETE_ZK_OP)) {
+ removeDeadServerByHost(host, type);
+
+ } else if (opType.equals(ADD_ZK_OP)) {
+ String deadServerPath = getDeadZNodeParentPath() + SINGLE_SLASH +
type + UNDERLINE + host;
+ if (!super.isExisted(deadServerPath)) {
+ //add dead server info to zk dead server path : /dead-servers/
+
+ super.persist(deadServerPath, (type + UNDERLINE + host));
+
+ logger.info("{} server dead , and {} added to zk dead server
path success",
+ zkNodeType, zNode);
+ }
+ }
+
+ }
+
+ /**
+ * opType(add): if find dead server , then add to zk deadServerPath
+ * opType(delete): delete path from zk
+ *
+ * @param zNodeSet node path set
+ * @param zkNodeType master or worker
+ * @param opType delete or add
+ * @throws Exception errors
+ */
+ public void handleDeadServer(Set<String> zNodeSet, ZKNodeType zkNodeType,
String opType) throws Exception {
+
+ String type = (zkNodeType == ZKNodeType.MASTER) ? MASTER_PREFIX :
WORKER_PREFIX;
+ for (String zNode : zNodeSet) {
+ String host = getHostByEventDataPath(zNode);
+ //check server restart, if restart , dead server path in zk should
be delete
+ if (opType.equals(DELETE_ZK_OP)) {
+ removeDeadServerByHost(host, type);
+
+ } else if (opType.equals(ADD_ZK_OP)) {
+ String deadServerPath = getDeadZNodeParentPath() +
SINGLE_SLASH + type + UNDERLINE + host;
+ if (!super.isExisted(deadServerPath)) {
+ //add dead server info to zk dead server path :
/dead-servers/
+
+ super.persist(deadServerPath, (type + UNDERLINE + host));
+
+ logger.info("{} server dead , and {} added to zk dead
server path success",
+ zkNodeType, zNode);
+ }
+ }
+
+ }
+
+ }
+}
diff --git
a/dolphinscheduler-service/src/main/java/org/apache/dolphinscheduler/service/zk/ZookeeperConfig.java
b/dolphinscheduler-service/src/main/java/org/apache/dolphinscheduler/service/zk/ZookeeperConfig.java
index 5bdc6f8..57ac13e 100644
---
a/dolphinscheduler-service/src/main/java/org/apache/dolphinscheduler/service/zk/ZookeeperConfig.java
+++
b/dolphinscheduler-service/src/main/java/org/apache/dolphinscheduler/service/zk/ZookeeperConfig.java
@@ -52,6 +52,9 @@ public class ZookeeperConfig {
@Value("${zookeeper.dolphinscheduler.root:/dolphinscheduler}")
private String dsRoot;
+ @Value("${zookeeper.max.wait.time:10000}")
+ private int maxWaitTime;
+
public String getServerList() {
return serverList;
}
@@ -115,4 +118,12 @@ public class ZookeeperConfig {
public void setDsRoot(String dsRoot) {
this.dsRoot = dsRoot;
}
+
+ public int getMaxWaitTime() {
+ return maxWaitTime;
+ }
+
+ public void setMaxWaitTime(int maxWaitTime) {
+ this.maxWaitTime = maxWaitTime;
+ }
}
\ No newline at end of file
diff --git
a/dolphinscheduler-service/src/test/java/org/apache/dolphinscheduler/service/zk/RegisterOperatorTest.java
b/dolphinscheduler-service/src/test/java/org/apache/dolphinscheduler/service/zk/RegisterOperatorTest.java
new file mode 100644
index 0000000..f828c07
--- /dev/null
+++
b/dolphinscheduler-service/src/test/java/org/apache/dolphinscheduler/service/zk/RegisterOperatorTest.java
@@ -0,0 +1,116 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.dolphinscheduler.service.zk;
+
+import org.apache.dolphinscheduler.common.Constants;
+import org.apache.dolphinscheduler.common.enums.ZKNodeType;
+
+import java.util.concurrent.TimeUnit;
+
+import org.junit.After;
+import org.junit.Assert;
+import org.junit.Before;
+import org.junit.Test;
+import org.junit.runner.RunWith;
+import org.mockito.InjectMocks;
+import org.mockito.Mock;
+import org.mockito.Mockito;
+import org.mockito.junit.MockitoJUnitRunner;
+
+/**
+ * register operator test
+ */
+@RunWith(MockitoJUnitRunner.Silent.class)
+public class RegisterOperatorTest {
+
+ private static ZKServer zkServer;
+
+ @InjectMocks
+ private RegisterOperator registerOperator;
+
+ @Mock
+ private ZookeeperConfig zookeeperConfig;
+
+ private static final String DS_ROOT = "/dolphinscheduler";
+ private static final String MASTER_NODE = "127.0.0.1:5678";
+
+ @Before
+ public void before() {
+ new Thread(() -> {
+ if (zkServer == null) {
+ zkServer = new ZKServer();
+ }
+ zkServer.startLocalZkServer(2185);
+ }).start();
+ }
+
+ @Test
+ public void testAfterPropertiesSet() throws Exception {
+ TimeUnit.SECONDS.sleep(10);
+
Mockito.when(zookeeperConfig.getServerList()).thenReturn("127.0.0.1:2185");
+ Mockito.when(zookeeperConfig.getBaseSleepTimeMs()).thenReturn(100);
+ Mockito.when(zookeeperConfig.getMaxRetries()).thenReturn(10);
+ Mockito.when(zookeeperConfig.getMaxSleepMs()).thenReturn(30000);
+ Mockito.when(zookeeperConfig.getSessionTimeoutMs()).thenReturn(60000);
+
Mockito.when(zookeeperConfig.getConnectionTimeoutMs()).thenReturn(30000);
+ Mockito.when(zookeeperConfig.getDigest()).thenReturn("");
+ Mockito.when(zookeeperConfig.getDsRoot()).thenReturn(DS_ROOT);
+ Mockito.when(zookeeperConfig.getMaxWaitTime()).thenReturn(30000);
+
+ registerOperator.afterPropertiesSet();
+ Assert.assertNotNull(registerOperator.getZkClient());
+ }
+
+ @After
+ public void after() {
+ if (zkServer != null) {
+ zkServer.stop();
+ }
+ }
+
+ @Test
+ public void testGetDeadZNodeParentPath() throws Exception {
+
+ testAfterPropertiesSet();
+ String path = registerOperator.getDeadZNodeParentPath();
+
+ Assert.assertEquals(DS_ROOT +
Constants.ZOOKEEPER_DOLPHINSCHEDULER_DEAD_SERVERS, path);
+ }
+
+ @Test
+ public void testHandleDeadServer() throws Exception {
+ testAfterPropertiesSet();
+ registerOperator.handleDeadServer(MASTER_NODE,
ZKNodeType.MASTER,Constants.ADD_ZK_OP);
+ String path = registerOperator.getDeadZNodeParentPath();
+
Assert.assertTrue(registerOperator.getChildrenKeys(path).contains(String.format("%s_%s",Constants.MASTER_PREFIX,MASTER_NODE)));
+
+ }
+
+ @Test
+ public void testRemoveDeadServerByHost() throws Exception {
+ testAfterPropertiesSet();
+ String path = registerOperator.getDeadZNodeParentPath();
+
+ registerOperator.handleDeadServer(MASTER_NODE,
ZKNodeType.MASTER,Constants.ADD_ZK_OP);
+
Assert.assertTrue(registerOperator.getChildrenKeys(path).contains(String.format("%s_%s",Constants.MASTER_PREFIX,MASTER_NODE)));
+
+
registerOperator.removeDeadServerByHost(MASTER_NODE,Constants.MASTER_PREFIX);
+
Assert.assertFalse(registerOperator.getChildrenKeys(path).contains(String.format("%s_%s",Constants.MASTER_PREFIX,MASTER_NODE)));
+ }
+
+}
\ No newline at end of file
diff --git a/pom.xml b/pom.xml
index aff1f98..fd0ea39 100644
--- a/pom.xml
+++ b/pom.xml
@@ -817,6 +817,7 @@
<include>**/server/master/MasterExecThreadTest.java</include> -->
<include>**/server/master/ParamsTest.java</include>
<include>**/server/register/ZookeeperNodeManagerTest.java</include>
+
<include>**/server/register/ZookeeperRegistryCenterTest.java</include>
<include>**/server/utils/DataxUtilsTest.java</include>
<include>**/server/utils/ExecutionContextTestUtils.java</include>
<include>**/server/utils/FlinkArgsUtilsTest.java</include>
@@ -838,6 +839,8 @@
<include>**/service/quartz/cron/CronUtilsTest.java</include>
<include>**/service/zk/DefaultEnsembleProviderTest.java</include>
<include>**/service/zk/ZKServerTest.java</include>
+
<include>**/service/zk/CuratorZookeeperClientTest.java</include>
+
<include>**/service/zk/RegisterOperatorTest.java</include>
<include>**/service/queue/TaskUpdateQueueTest.java</include>
<include>**/service/queue/TaskPriorityTest.java</include>
<include>**/dao/mapper/DataSourceUserMapperTest.java</include>