This is an automated email from the ASF dual-hosted git repository.
technoboy pushed a commit to branch refactor-worker
in repository https://gitbox.apache.org/repos/asf/incubator-dolphinscheduler.git
The following commit(s) were added to refs/heads/refactor-worker by this push:
new 055bb28 worker fault tolerance modify (#2212)
055bb28 is described below
commit 055bb28de41d58183d34bf51a880280503d93099
Author: qiaozhanwei <[email protected]>
AuthorDate: Tue Mar 17 18:45:39 2020 +0800
worker fault tolerance modify (#2212)
* 1, master persistent task
2. extract master and worker communication model
* 1, master persistent task
2. extract master and worker communication model
* 1, master persistent task
2. extract master and worker communication model
* add license
* modify javadoc error
* TaskExecutionContext create modify
* buildAckCommand taskInstanceId not set modify
* java doc error modify
* add comment
* ExecutorManager interface add generic type
* add TaskInstanceCacheManager receive Worker report result
* TaskInstance setExecutePath
* add TaskInstanceCacheManager to receive Worker Task result report
* TaskInstanceCacheManager add remove method
* add license
* add dispatcht task method
* AbstractCommandExecutor remove db access
* AbstractCommandExecutor remove db access
* AbstractCommandExecutor remove db access
* AbstractCommandExecutor remove db access
* AbstractCommandExecutor remove db access
* AbstractCommandExecutor remove db access
* AbstractCommandExecutor remove db access
* taskInstanceCache is null ,need load from db
* taskInstanceCache is null ,need load from db
* taskInstanceCache is null ,need load from db
* 1,worker TaskPros use TaskExecutionContext replase
2,Master kill Task , KillTaskProcessor modify
* worker remove db
* ShellTask modify
* master persistence processId and appIds
* master persistence processId and appIds
* master add kill task logic
* master add kill task logic
* master add kill task logic
* javadoc error modify
* remove chinese log
* executeDirectly method add Override
* remote module modify
* TaskKillResponseProcessor command type modify
* create buildKillCommand
* host add host:port format
* host add host:port format
* TaskAckProcessor modify
* TaskAckProcessor modify
* task prioriry refator
* remove ITaskQueue
* task prioriry refator
* remove ITaskQueue
* TaskPriority refactor
* remove logs
* WorkerServer refactor
* MasterSchedulerService modify
* WorkerConfig listen port modify
* modify master and worker listen port
* cancelTaskInstance set TaskExecutionContext host,logPath,executePath
* cancelTaskInstance set TaskExecutionContext host,logPath,executePath
* Encapsulate the parameters required by sqltask
* 1,Encapsulate the parameters required by sqltask
2,SQLTask optimization
* AbstractTask modify
* ProcedureTask optimization
* MasterSchedulerService modify
* TaskUpdateQueueConsumer modify
* test
* DataxTask process run debug
* DataxTask process run debug
* add protobuf dependency,MR、Spark task etc need this
* TaskUpdateQueueConsumer modify
* TaskExecutionContextBuilder set TaskInstance workgroup
* WorkerGroupService queryAllGroup modify
query available work group
* 1,get workergroup from zk modify
2,SpringConnectionFactory repeat load modify
* master and worker register ip use OSUtils.getHost()
* ProcessInstance host set ip:port format
* worker fault tolerance modify
* Constants and .env modify
Co-authored-by: qiaozhanwei <[email protected]>
---
.../api/controller/LoggerController.java | 8 +-
.../apache/dolphinscheduler/common/Constants.java | 124 +--------------------
.../builder/TaskExecutionContextBuilder.java | 1 +
.../server/utils/ProcessUtils.java | 21 ++--
.../dolphinscheduler/server/zk/ZKMasterClient.java | 45 ++++----
.../service/zk/AbstractZKClient.java | 58 +++-------
.../service/zk/ZookeeperCachedOperator.java | 2 +-
dolphinscheduler-ui/.env | 2 +-
8 files changed, 59 insertions(+), 202 deletions(-)
diff --git
a/dolphinscheduler-api/src/main/java/org/apache/dolphinscheduler/api/controller/LoggerController.java
b/dolphinscheduler-api/src/main/java/org/apache/dolphinscheduler/api/controller/LoggerController.java
index 802f09f..eefd6ba 100644
---
a/dolphinscheduler-api/src/main/java/org/apache/dolphinscheduler/api/controller/LoggerController.java
+++
b/dolphinscheduler-api/src/main/java/org/apache/dolphinscheduler/api/controller/LoggerController.java
@@ -60,14 +60,14 @@ public class LoggerController extends BaseController {
*/
@ApiOperation(value = "queryLog", notes= "QUERY_TASK_INSTANCE_LOG_NOTES")
@ApiImplicitParams({
- @ApiImplicitParam(name = "taskInstId", value = "TASK_ID", dataType
= "Int", example = "100"),
+ @ApiImplicitParam(name = "taskInstanceId", value = "TASK_ID",
dataType = "Int", example = "100"),
@ApiImplicitParam(name = "skipLineNum", value = "SKIP_LINE_NUM",
dataType ="Int", example = "100"),
@ApiImplicitParam(name = "limit", value = "LIMIT", dataType
="Int", example = "100")
})
@GetMapping(value = "/detail")
@ResponseStatus(HttpStatus.OK)
public Result queryLog(@ApiIgnore @RequestAttribute(value =
Constants.SESSION_USER) User loginUser,
- @RequestParam(value = "taskInstId") int
taskInstanceId,
+ @RequestParam(value = "taskInstanceId") int
taskInstanceId,
@RequestParam(value = "skipLineNum") int skipNum,
@RequestParam(value = "limit") int limit) {
try {
@@ -91,12 +91,12 @@ public class LoggerController extends BaseController {
*/
@ApiOperation(value = "downloadTaskLog", notes=
"DOWNLOAD_TASK_INSTANCE_LOG_NOTES")
@ApiImplicitParams({
- @ApiImplicitParam(name = "taskInstId", value = "TASK_ID",dataType
= "Int", example = "100")
+ @ApiImplicitParam(name = "taskInstanceId", value =
"TASK_ID",dataType = "Int", example = "100")
})
@GetMapping(value = "/download-log")
@ResponseBody
public ResponseEntity downloadTaskLog(@ApiIgnore @RequestAttribute(value =
Constants.SESSION_USER) User loginUser,
- @RequestParam(value = "taskInstId")
int taskInstanceId) {
+ @RequestParam(value =
"taskInstanceId") int taskInstanceId) {
try {
byte[] logBytes = loggerService.getLogBytes(taskInstanceId);
return ResponseEntity
diff --git
a/dolphinscheduler-common/src/main/java/org/apache/dolphinscheduler/common/Constants.java
b/dolphinscheduler-common/src/main/java/org/apache/dolphinscheduler/common/Constants.java
index 67ce5fd..2aded0f 100644
---
a/dolphinscheduler-common/src/main/java/org/apache/dolphinscheduler/common/Constants.java
+++
b/dolphinscheduler-common/src/main/java/org/apache/dolphinscheduler/common/Constants.java
@@ -119,19 +119,14 @@ public final class Constants {
public static final String RES_UPLOAD_STARTUP_TYPE =
"res.upload.startup.type";
/**
- * zookeeper quorum
- */
- public static final String ZOOKEEPER_QUORUM = "zookeeper.quorum";
-
- /**
* MasterServer directory registered in zookeeper
*/
- public static final String ZOOKEEPER_DOLPHINSCHEDULER_MASTERS = "/masters";
+ public static final String ZOOKEEPER_DOLPHINSCHEDULER_MASTERS =
"/nodes/masters";
/**
* WorkerServer directory registered in zookeeper
*/
- public static final String ZOOKEEPER_DOLPHINSCHEDULER_WORKERS = "/workers";
+ public static final String ZOOKEEPER_DOLPHINSCHEDULER_WORKERS =
"/nodes/worker";
/**
* all servers directory registered in zookeeper
@@ -143,10 +138,6 @@ public final class Constants {
*/
public static final String ZOOKEEPER_DOLPHINSCHEDULER_LOCK_MASTERS =
"/lock/masters";
- /**
- * WorkerServer lock directory registered in zookeeper
- */
- public static final String ZOOKEEPER_DOLPHINSCHEDULER_LOCK_WORKERS =
"/lock/workers";
/**
* MasterServer failover directory registered in zookeeper
@@ -163,10 +154,6 @@ public final class Constants {
*/
public static final String
ZOOKEEPER_DOLPHINSCHEDULER_LOCK_FAILOVER_STARTUP_MASTERS =
"/lock/failover/startup-masters";
- /**
- * need send warn times when master server or worker server failover
- */
- public static final int DOLPHINSCHEDULER_WARN_TIMES_FAILOVER = 3;
/**
* comma ,
@@ -203,37 +190,6 @@ public final class Constants {
*/
public static final String EQUAL_SIGN = "=";
- /**
- * ZOOKEEPER_SESSION_TIMEOUT
- */
- public static final String ZOOKEEPER_SESSION_TIMEOUT =
"zookeeper.session.timeout";
-
- public static final String ZOOKEEPER_CONNECTION_TIMEOUT =
"zookeeper.connection.timeout";
-
- public static final String ZOOKEEPER_RETRY_SLEEP = "zookeeper.retry.sleep";
- public static final String ZOOKEEPER_RETRY_BASE_SLEEP =
"zookeeper.retry.base.sleep";
- public static final String ZOOKEEPER_RETRY_MAX_SLEEP =
"zookeeper.retry.max.sleep";
-
- public static final String ZOOKEEPER_RETRY_MAXTIME =
"zookeeper.retry.maxtime";
-
-
- public static final String MASTER_HEARTBEAT_INTERVAL =
"master.heartbeat.interval";
-
- public static final String MASTER_EXEC_THREADS = "master.exec.threads";
-
- public static final String MASTER_EXEC_TASK_THREADS =
"master.exec.task.number";
-
-
- public static final String MASTER_COMMIT_RETRY_TIMES =
"master.task.commit.retryTimes";
-
- public static final String MASTER_COMMIT_RETRY_INTERVAL =
"master.task.commit.interval";
-
-
- public static final String WORKER_EXEC_THREADS = "worker.exec.threads";
-
- public static final String WORKER_HEARTBEAT_INTERVAL =
"worker.heartbeat.interval";
-
- public static final String WORKER_FETCH_TASK_NUM = "worker.fetch.task.num";
public static final String WORKER_MAX_CPULOAD_AVG =
"worker.max.cpuload.avg";
@@ -244,17 +200,6 @@ public final class Constants {
public static final String MASTER_RESERVED_MEMORY =
"master.reserved.memory";
- /**
- * dolphinscheduler tasks queue
- */
- public static final String DOLPHINSCHEDULER_TASKS_QUEUE = "tasks_queue";
-
- /**
- * dolphinscheduler need kill tasks queue
- */
- public static final String DOLPHINSCHEDULER_TASKS_KILL = "tasks_kill";
-
- public static final String ZOOKEEPER_DOLPHINSCHEDULER_ROOT =
"zookeeper.dolphinscheduler.root";
public static final String SCHEDULER_QUEUE_IMPL =
"dolphinscheduler.queue.impl";
@@ -351,26 +296,6 @@ public final class Constants {
/**
- * heartbeat threads number
- */
- public static final int DEFAUL_WORKER_HEARTBEAT_THREAD_NUM = 1;
-
- /**
- * heartbeat interval
- */
- public static final int DEFAULT_WORKER_HEARTBEAT_INTERVAL = 60;
-
- /**
- * worker fetch task number
- */
- public static final int DEFAULT_WORKER_FETCH_TASK_NUM = 1;
-
- /**
- * worker execute threads number
- */
- public static final int DEFAULT_WORKER_EXEC_THREAD_NUM = 10;
-
- /**
* master cpu load
*/
public static final int DEFAULT_MASTER_CPU_LOAD =
Runtime.getRuntime().availableProcessors() * 2;
@@ -391,16 +316,6 @@ public final class Constants {
public static final double DEFAULT_WORKER_RESERVED_MEMORY =
OSUtils.totalMemorySize() / 10;
- /**
- * master execute threads number
- */
- public static final int DEFAULT_MASTER_EXEC_THREAD_NUM = 100;
-
-
- /**
- * default master concurrent task execute num
- */
- public static final int DEFAULT_MASTER_TASK_EXEC_NUM = 20;
/**
* default log cache rows num,output when reach the number
@@ -408,34 +323,12 @@ public final class Constants {
public static final int DEFAULT_LOG_ROWS_NUM = 4 * 16;
/**
- * log flush interval,output when reach the interval
+ * log flush interval?output when reach the interval
*/
public static final int DEFAULT_LOG_FLUSH_INTERVAL = 1000;
/**
- * default master heartbeat thread number
- */
- public static final int DEFAULT_MASTER_HEARTBEAT_THREAD_NUM = 1;
-
-
- /**
- * default master heartbeat interval
- */
- public static final int DEFAULT_MASTER_HEARTBEAT_INTERVAL = 60;
-
- /**
- * default master commit retry times
- */
- public static final int DEFAULT_MASTER_COMMIT_RETRY_TIMES = 5;
-
-
- /**
- * default master commit retry interval
- */
- public static final int DEFAULT_MASTER_COMMIT_RETRY_INTERVAL = 3000;
-
- /**
* time unit secong to minutes
*/
public static final int SEC_2_MINUTES_TIME_UNIT = 60;
@@ -805,7 +698,6 @@ public final class Constants {
public static final String ALIAS = "alias";
public static final String CONTENT = "content";
public static final String DEPENDENT_SPLIT = ":||";
- public static final String DEPENDENT_ALL = "ALL";
/**
@@ -864,7 +756,7 @@ public final class Constants {
*/
public static final String HIVE_CONF = "hiveconf:";
- //flink 任务
+ //flink ??
public static final String FLINK_YARN_CLUSTER = "yarn-cluster";
public static final String FLINK_RUN_MODE = "-m";
public static final String FLINK_YARN_SLOT = "-ys";
@@ -899,26 +791,20 @@ public final class Constants {
/**
* data total
- * 数据总数
*/
public static final String COUNT = "count";
/**
* page size
- * 每页数据条数
*/
public static final String PAGE_SIZE = "pageSize";
/**
* current page no
- * 当前页码
*/
public static final String PAGE_NUMBER = "pageNo";
- /**
- * result
- */
- public static final String RESULT = "result";
+
/**
*
diff --git
a/dolphinscheduler-server/src/main/java/org/apache/dolphinscheduler/server/builder/TaskExecutionContextBuilder.java
b/dolphinscheduler-server/src/main/java/org/apache/dolphinscheduler/server/builder/TaskExecutionContextBuilder.java
index 08c105a..fc60e88 100644
---
a/dolphinscheduler-server/src/main/java/org/apache/dolphinscheduler/server/builder/TaskExecutionContextBuilder.java
+++
b/dolphinscheduler-server/src/main/java/org/apache/dolphinscheduler/server/builder/TaskExecutionContextBuilder.java
@@ -49,6 +49,7 @@ public class TaskExecutionContextBuilder {
taskExecutionContext.setExecutePath(taskInstance.getExecutePath());
taskExecutionContext.setTaskJson(taskInstance.getTaskJson());
taskExecutionContext.setWorkerGroup(taskInstance.getWorkerGroup());
+ taskExecutionContext.setHost(taskInstance.getHost());
return this;
}
diff --git
a/dolphinscheduler-server/src/main/java/org/apache/dolphinscheduler/server/utils/ProcessUtils.java
b/dolphinscheduler-server/src/main/java/org/apache/dolphinscheduler/server/utils/ProcessUtils.java
index f29e7df..12cd66f 100644
---
a/dolphinscheduler-server/src/main/java/org/apache/dolphinscheduler/server/utils/ProcessUtils.java
+++
b/dolphinscheduler-server/src/main/java/org/apache/dolphinscheduler/server/utils/ProcessUtils.java
@@ -22,6 +22,7 @@ import org.apache.dolphinscheduler.common.utils.LoggerUtils;
import org.apache.dolphinscheduler.common.utils.OSUtils;
import org.apache.dolphinscheduler.common.utils.StringUtils;
import org.apache.commons.io.FileUtils;
+import org.apache.dolphinscheduler.remote.utils.Host;
import org.apache.dolphinscheduler.server.entity.TaskExecutionContext;
import org.apache.dolphinscheduler.service.log.LogClientService;
import org.slf4j.Logger;
@@ -60,7 +61,7 @@ public class ProcessUtils {
allowAmbiguousCommands = true;
String value =
System.getProperty("jdk.lang.Process.allowAmbiguousCommands");
if (value != null) {
- allowAmbiguousCommands = !"false".equalsIgnoreCase(value);
+ allowAmbiguousCommands = !"false".equalsIgnoreCase(value);
}
}
if (allowAmbiguousCommands) {
@@ -68,7 +69,7 @@ public class ProcessUtils {
String executablePath = new File(cmd[0]).getPath();
if (needsEscaping(VERIFICATION_LEGACY, executablePath)) {
- executablePath = quoteString(executablePath);
+ executablePath = quoteString(executablePath);
}
cmdstr = createCommandLine(
@@ -81,7 +82,7 @@ public class ProcessUtils {
StringBuilder join = new StringBuilder();
for (String s : cmd) {
- join.append(s).append(' ');
+ join.append(s).append(' ');
}
cmd = getTokensFromCommand(join.toString());
@@ -89,7 +90,7 @@ public class ProcessUtils {
// Check new executable name once more
if (security != null) {
- security.checkExec(executablePath);
+ security.checkExec(executablePath);
}
}
@@ -147,7 +148,7 @@ public class ProcessUtils {
ArrayList<String> matchList = new ArrayList<>(8);
Matcher regexMatcher = LazyPattern.PATTERN.matcher(command);
while (regexMatcher.find()) {
- matchList.add(regexMatcher.group());
+ matchList.add(regexMatcher.group());
}
return matchList.toArray(new String[matchList.size()]);
}
@@ -323,9 +324,9 @@ public class ProcessUtils {
try {
int processId = taskExecutionContext.getProcessId();
if(processId == 0 ){
- logger.error("process kill failed, process id :{}, task id:{}",
- processId, taskExecutionContext.getTaskInstanceId());
- return ;
+ logger.error("process kill failed, process id :{}, task id:{}",
+ processId, taskExecutionContext.getTaskInstanceId());
+ return ;
}
String cmd = String.format("sudo kill -9 %s", getPidsStr(processId));
@@ -379,7 +380,9 @@ public class ProcessUtils {
String log = null;
try {
logClient = new LogClientService();
- log = logClient.viewLog(taskExecutionContext.getHost(),
Constants.RPC_PORT, taskExecutionContext.getLogPath());
+ log =
logClient.viewLog(Host.of(taskExecutionContext.getHost()).getIp(),
+ Constants.RPC_PORT,
+ taskExecutionContext.getLogPath());
} finally {
if(logClient != null){
logClient.close();
diff --git
a/dolphinscheduler-server/src/main/java/org/apache/dolphinscheduler/server/zk/ZKMasterClient.java
b/dolphinscheduler-server/src/main/java/org/apache/dolphinscheduler/server/zk/ZKMasterClient.java
index a59cf3e..0e9a839 100644
---
a/dolphinscheduler-server/src/main/java/org/apache/dolphinscheduler/server/zk/ZKMasterClient.java
+++
b/dolphinscheduler-server/src/main/java/org/apache/dolphinscheduler/server/zk/ZKMasterClient.java
@@ -59,7 +59,7 @@ public class ZKMasterClient extends AbstractZKClient {
@Autowired
private ProcessService processService;
- public void start() {
+ public void start() {
InterProcessMutex mutex = null;
try {
@@ -71,7 +71,7 @@ public class ZKMasterClient extends AbstractZKClient {
// init system znode
this.initSystemZNode();
- // check if fault tolerance is required,failure and
tolerance
+ // check if fault tolerance is required?failure and
tolerance
if (getActiveMasterNum() == 1) {
failoverWorker(null, true);
failoverMaster(null);
@@ -146,8 +146,8 @@ public class ZKMasterClient extends AbstractZKClient {
* @throws Exception exception
*/
private void failoverServerWhenDown(String serverHost, ZKNodeType
zkNodeType) throws Exception {
- if(StringUtils.isEmpty(serverHost)){
- return ;
+ if(StringUtils.isEmpty(serverHost)){
+ return ;
}
switch (zkNodeType){
case MASTER:
@@ -217,7 +217,7 @@ public class ZKMasterClient extends AbstractZKClient {
/**
* task needs failover if task start before worker starts
- *
+ *
* @param taskInstance task instance
* @return true if task instance need fail over
*/
@@ -231,10 +231,10 @@ public class ZKMasterClient extends AbstractZKClient {
}
// if the worker node exists in zookeeper, we must check the
task starts after the worker
- if(checkZKNodeExists(taskInstance.getHost(), ZKNodeType.WORKER)){
- //if task start after worker starts, there is no need to
failover the task.
- if(checkTaskAfterWorkerStart(taskInstance)){
- taskNeedFailover = false;
+ if(checkZKNodeExists(taskInstance.getHost(),
ZKNodeType.WORKER)){
+ //if task start after worker starts, there is no need
to failover the task.
+ if(checkTaskAfterWorkerStart(taskInstance)){
+ taskNeedFailover = false;
}
}
return taskNeedFailover;
@@ -247,15 +247,15 @@ public class ZKMasterClient extends AbstractZKClient {
* @return true if task instance start time after worker server start
date
*/
private boolean checkTaskAfterWorkerStart(TaskInstance taskInstance) {
- if(StringUtils.isEmpty(taskInstance.getHost())){
- return false;
+ if(StringUtils.isEmpty(taskInstance.getHost())){
+ return false;
}
- Date workerServerStartDate = null;
- List<Server> workerServers = getServersList(ZKNodeType.WORKER);
- for(Server workerServer : workerServers){
- if(workerServer.getHost().equals(taskInstance.getHost())){
- workerServerStartDate = workerServer.getCreateTime();
- break;
+ Date workerServerStartDate = null;
+ List<Server> workerServers = getServersList(ZKNodeType.WORKER);
+ for(Server workerServer : workerServers){
+
if(workerServer.getHost().equals(taskInstance.getHost())){
+ workerServerStartDate =
workerServer.getCreateTime();
+ break;
}
}
@@ -271,7 +271,7 @@ public class ZKMasterClient extends AbstractZKClient {
*
* 1. kill yarn job if there are yarn jobs in tasks.
* 2. change task state from running to need failover.
- * 3. failover all tasks when workerHost is null
+ * 3. failover all tasks when workerHost is null
* @param workerHost worker host
*/
@@ -293,7 +293,7 @@ public class ZKMasterClient extends AbstractZKClient {
if(needCheckWorkerAlive){
if(!checkTaskInstanceNeedFailover(taskInstance)){
continue;
- }
+ }
}
ProcessInstance processInstance =
processService.findProcessInstanceDetailById(taskInstance.getProcessInstanceId());
@@ -304,7 +304,6 @@ public class ZKMasterClient extends AbstractZKClient {
TaskExecutionContext taskExecutionContext =
TaskExecutionContextBuilder.get()
.buildTaskInstanceRelatedInfo(taskInstance)
.buildProcessInstanceRelatedInfo(processInstance)
- .buildProcessDefinitionRelatedInfo(null)
.create();
// only kill yarn job if exists , the local thread has
exited
ProcessUtils.killYarnJob(taskExecutionContext);
@@ -334,9 +333,9 @@ public class ZKMasterClient extends AbstractZKClient {
}
public InterProcessMutex blockAcquireMutex() throws Exception {
- InterProcessMutex mutex = new InterProcessMutex(getZkClient(),
getMasterLockPath());
- mutex.acquire();
- return mutex;
+ InterProcessMutex mutex = new InterProcessMutex(getZkClient(),
getMasterLockPath());
+ mutex.acquire();
+ return mutex;
}
}
diff --git
a/dolphinscheduler-service/src/main/java/org/apache/dolphinscheduler/service/zk/AbstractZKClient.java
b/dolphinscheduler-service/src/main/java/org/apache/dolphinscheduler/service/zk/AbstractZKClient.java
index 0b9fbe4..106dbc1 100644
---
a/dolphinscheduler-service/src/main/java/org/apache/dolphinscheduler/service/zk/AbstractZKClient.java
+++
b/dolphinscheduler-service/src/main/java/org/apache/dolphinscheduler/service/zk/AbstractZKClient.java
@@ -38,56 +38,24 @@ public abstract class AbstractZKClient extends
ZookeeperCachedOperator {
private static final Logger logger =
LoggerFactory.getLogger(AbstractZKClient.class);
+
/**
- * check dead server or not , if dead, stop self
- *
- * @param zNode node path
- * @param serverType master or worker prefix
- * @return true if not exists
- * @throws Exception errors
+ * remove dead server by host
+ * @param host host
+ * @param serverType serverType
+ * @throws Exception
*/
- protected boolean checkIsDeadServer(String zNode, String serverType)
throws Exception{
- //ip_sequenceno
- String[] zNodesPath = zNode.split("\\/");
- String ipSeqNo = zNodesPath[zNodesPath.length - 1];
-
- String type = serverType.equals(MASTER_PREFIX) ? MASTER_PREFIX
: WORKER_PREFIX;
- String deadServerPath = getDeadZNodeParentPath() + SINGLE_SLASH
+ type + UNDERLINE + ipSeqNo;
-
- if(!isExisted(zNode) || isExisted(deadServerPath)){
- return true;
- }
- return false;
- }
-
-
public void removeDeadServerByHost(String host, String serverType)
throws Exception {
- List<String> deadServers =
super.getChildrenKeys(getDeadZNodeParentPath());
- for(String serverPath : deadServers){
- if(serverPath.startsWith(serverType+UNDERLINE+host)){
- String server = getDeadZNodeParentPath() + SINGLE_SLASH +
serverPath;
- super.remove(server);
+ List<String> deadServers =
super.getChildrenKeys(getDeadZNodeParentPath());
+ for(String serverPath : deadServers){
+ if(serverPath.startsWith(serverType+UNDERLINE+host)){
+ String server = getDeadZNodeParentPath() +
SINGLE_SLASH + serverPath;
+ super.remove(server);
logger.info("{} server {} deleted from zk dead
server path success" , serverType , host);
- }
- }
+ }
+ }
}
- /**
- * create zookeeper path according the zk node type.
- * @param zkNodeType zookeeper node type
- * @return register zookeeper path
- * @throws Exception
- */
- private String createZNodePath(ZKNodeType zkNodeType, String host)
throws Exception {
- // specify the format of stored data in ZK nodes
- String heartbeatZKInfo = ResInfo.getHeartBeatInfo(new Date());
- // create temporary sequence nodes for master znode
- String registerPath= getZNodeParentPath(zkNodeType) +
SINGLE_SLASH + host;
-
- super.persistEphemeral(registerPath, heartbeatZKInfo);
- logger.info("register {} node {} success" ,
zkNodeType.toString(), registerPath);
- return registerPath;
- }
/**
* opType(add): if find dead server , then add to zk deadServerPath
@@ -326,7 +294,7 @@ public abstract class AbstractZKClient extends
ZookeeperCachedOperator {
*/
protected String getHostByEventDataPath(String path) {
if(StringUtils.isEmpty(path)){
- logger.error("empty path!");
+ logger.error("empty path!");
return "";
}
String[] pathArray = path.split(SINGLE_SLASH);
diff --git
a/dolphinscheduler-service/src/main/java/org/apache/dolphinscheduler/service/zk/ZookeeperCachedOperator.java
b/dolphinscheduler-service/src/main/java/org/apache/dolphinscheduler/service/zk/ZookeeperCachedOperator.java
index 6c38a68..e71cb74 100644
---
a/dolphinscheduler-service/src/main/java/org/apache/dolphinscheduler/service/zk/ZookeeperCachedOperator.java
+++
b/dolphinscheduler-service/src/main/java/org/apache/dolphinscheduler/service/zk/ZookeeperCachedOperator.java
@@ -39,7 +39,7 @@ public class ZookeeperCachedOperator extends
ZookeeperOperator {
*/
@Override
protected void registerListener() {
- treeCache = new TreeCache(zkClient, getZookeeperConfig().getDsRoot());
+ treeCache = new TreeCache(zkClient, getZookeeperConfig().getDsRoot() +
"/nodes");
logger.info("add listener to zk path: {}",
getZookeeperConfig().getDsRoot());
try {
treeCache.start();
diff --git a/dolphinscheduler-ui/.env b/dolphinscheduler-ui/.env
index 4c7e96e..e676be6 100644
--- a/dolphinscheduler-ui/.env
+++ b/dolphinscheduler-ui/.env
@@ -17,4 +17,4 @@
API_BASE = http://192.168.xx.xx:12345
# If IP access is required for local development, remove the "#"
-#DEV_HOST = 192.168.xx.xx
\ No newline at end of file
+#DEV_HOST = 192.168.xx.xx