This is an automated email from the ASF dual-hosted git repository.
kirs pushed a commit to branch 1.3.6-prepare
in repository https://gitbox.apache.org/repos/asf/incubator-dolphinscheduler.git
The following commit(s) were added to refs/heads/1.3.6-prepare by this push:
new af78727 [1.3.6-prepare][Fix-5037][Server] Fix that both the master
and the worker is hanging after restarting and stopping #5038 (#5039)
af78727 is described below
commit af78727b6b22d25b589ae8b4fd88fda9ed7f9d21
Author: Shiwen Cheng <[email protected]>
AuthorDate: Fri Mar 12 09:39:23 2021 +0800
[1.3.6-prepare][Fix-5037][Server] Fix that both the master and the worker
is hanging after restarting and stopping #5038 (#5039)
---
.github/workflows/ci_ut.yml | 2 +-
.../apache/dolphinscheduler/common/IStoppable.java | 14 ++---
.../server/master/MasterServer.java | 63 +++++++---------------
.../server/master/registry/MasterRegistry.java | 4 +-
.../master/runner/MasterSchedulerService.java | 17 +++---
.../server/registry/HeartBeatTask.java | 5 +-
.../server/registry/ZookeeperRegistryCenter.java | 8 +--
.../server/worker/WorkerServer.java | 42 ++++++---------
.../server/worker/registry/WorkerRegistry.java | 6 ++-
.../dolphinscheduler/server/zk/ZKMasterClient.java | 24 ++++-----
.../executor/NettyExecutorManagerTest.java | 2 +-
.../server/master/registry/MasterRegistryTest.java | 3 +-
.../service/zk/ZookeeperOperator.java | 13 ++---
.../service/zk/RegisterOperatorTest.java | 15 ++++++
14 files changed, 104 insertions(+), 114 deletions(-)
diff --git a/.github/workflows/ci_ut.yml b/.github/workflows/ci_ut.yml
index 48a0d26..8c5e27d 100644
--- a/.github/workflows/ci_ut.yml
+++ b/.github/workflows/ci_ut.yml
@@ -48,7 +48,7 @@ jobs:
- name: Bootstrap database
run: |
sed -i "s/: root/: test/g"
$(pwd)/docker/docker-swarm/docker-compose.yml
- docker-compose -f $(pwd)/docker/docker-swarm/docker-compose.yml
create --force-recreate dolphinscheduler-zookeeper dolphinscheduler-postgresql
+ docker-compose -f $(pwd)/docker/docker-swarm/docker-compose.yml up
--no-start --force-recreate dolphinscheduler-zookeeper
dolphinscheduler-postgresql
sudo cp $(pwd)/sql/dolphinscheduler_postgre.sql $(docker volume
inspect docker-swarm_dolphinscheduler-postgresql-initdb | grep "Mountpoint" |
awk -F "\"" '{print $4}')
docker-compose -f $(pwd)/docker/docker-swarm/docker-compose.yml up
-d dolphinscheduler-zookeeper dolphinscheduler-postgresql
- name: Set up JDK 1.8
diff --git
a/dolphinscheduler-common/src/main/java/org/apache/dolphinscheduler/common/IStoppable.java
b/dolphinscheduler-common/src/main/java/org/apache/dolphinscheduler/common/IStoppable.java
index 0f6f40b..c685b89 100644
---
a/dolphinscheduler-common/src/main/java/org/apache/dolphinscheduler/common/IStoppable.java
+++
b/dolphinscheduler-common/src/main/java/org/apache/dolphinscheduler/common/IStoppable.java
@@ -14,16 +14,18 @@
* See the License for the specific language governing permissions and
* limitations under the License.
*/
+
package org.apache.dolphinscheduler.common;
/**
* server stop interface.
*/
public interface IStoppable {
- /**
- * Stop this service.
- * @param cause why stopping
- */
- void stop(String cause);
-}
\ No newline at end of file
+ /**
+ * Stop this service.
+ * @param cause why stopping
+ */
+ void stop(String cause);
+
+}
diff --git
a/dolphinscheduler-server/src/main/java/org/apache/dolphinscheduler/server/master/MasterServer.java
b/dolphinscheduler-server/src/main/java/org/apache/dolphinscheduler/server/master/MasterServer.java
index f1962c7..e03e8e8 100644
---
a/dolphinscheduler-server/src/main/java/org/apache/dolphinscheduler/server/master/MasterServer.java
+++
b/dolphinscheduler-server/src/main/java/org/apache/dolphinscheduler/server/master/MasterServer.java
@@ -14,6 +14,7 @@
* See the License for the specific language governing permissions and
* limitations under the License.
*/
+
package org.apache.dolphinscheduler.server.master;
import org.apache.dolphinscheduler.common.Constants;
@@ -26,7 +27,6 @@ import
org.apache.dolphinscheduler.server.master.config.MasterConfig;
import org.apache.dolphinscheduler.server.master.processor.TaskAckProcessor;
import
org.apache.dolphinscheduler.server.master.processor.TaskKillResponseProcessor;
import
org.apache.dolphinscheduler.server.master.processor.TaskResponseProcessor;
-import org.apache.dolphinscheduler.server.master.registry.MasterRegistry;
import org.apache.dolphinscheduler.server.master.runner.MasterSchedulerService;
import org.apache.dolphinscheduler.server.worker.WorkerServer;
import org.apache.dolphinscheduler.server.zk.ZKMasterClient;
@@ -61,8 +61,8 @@ public class MasterServer implements IStoppable {
private MasterConfig masterConfig;
/**
- * spring application context
- * only use it for initialization
+ * spring application context
+ * only use it for initialization
*/
@Autowired
private SpringApplicationContext springApplicationContext;
@@ -73,12 +73,6 @@ public class MasterServer implements IStoppable {
private NettyRemotingServer nettyRemotingServer;
/**
- * master registry
- */
- @Autowired
- private MasterRegistry masterRegistry;
-
- /**
* zk master client
*/
@Autowired
@@ -91,9 +85,8 @@ public class MasterServer implements IStoppable {
private MasterSchedulerService masterSchedulerService;
/**
- * master server startup
+ * master server startup, not use web service
*
- * master server not use web service
* @param args arguments
*/
public static void main(String[] args) {
@@ -106,22 +99,14 @@ public class MasterServer implements IStoppable {
*/
@PostConstruct
public void run() {
- try {
- //init remoting server
- NettyServerConfig serverConfig = new NettyServerConfig();
- serverConfig.setListenPort(masterConfig.getListenPort());
- this.nettyRemotingServer = new NettyRemotingServer(serverConfig);
-
this.nettyRemotingServer.registerProcessor(CommandType.TASK_EXECUTE_RESPONSE,
new TaskResponseProcessor());
-
this.nettyRemotingServer.registerProcessor(CommandType.TASK_EXECUTE_ACK, new
TaskAckProcessor());
-
this.nettyRemotingServer.registerProcessor(CommandType.TASK_KILL_RESPONSE, new
TaskKillResponseProcessor());
- this.nettyRemotingServer.start();
-
-
this.masterRegistry.getZookeeperRegistryCenter().setStoppable(this);
-
- } catch (Exception e) {
- logger.error(e.getMessage(), e);
- throw new RuntimeException(e);
- }
+ // init remoting server
+ NettyServerConfig serverConfig = new NettyServerConfig();
+ serverConfig.setListenPort(masterConfig.getListenPort());
+ this.nettyRemotingServer = new NettyRemotingServer(serverConfig);
+
this.nettyRemotingServer.registerProcessor(CommandType.TASK_EXECUTE_RESPONSE,
new TaskResponseProcessor());
+
this.nettyRemotingServer.registerProcessor(CommandType.TASK_EXECUTE_ACK, new
TaskAckProcessor());
+
this.nettyRemotingServer.registerProcessor(CommandType.TASK_KILL_RESPONSE, new
TaskKillResponseProcessor());
+ this.nettyRemotingServer.start();
// self tolerant
this.zkMasterClient.start(this);
@@ -144,14 +129,11 @@ public class MasterServer implements IStoppable {
}
/**
- * register hooks, which are called before the process exits
+ * register hooks, which are called before the process exits
*/
- Runtime.getRuntime().addShutdownHook(new Thread(new Runnable() {
- @Override
- public void run() {
- if (Stopper.isRunning()) {
- close("shutdownHook");
- }
+ Runtime.getRuntime().addShutdownHook(new Thread(() -> {
+ if (Stopper.isRunning()) {
+ close("shutdownHook");
}
}));
@@ -165,7 +147,7 @@ public class MasterServer implements IStoppable {
public void close(String cause) {
try {
- //execute only once
+ // execute only once
if (Stopper.isStopped()) {
return;
}
@@ -176,28 +158,24 @@ public class MasterServer implements IStoppable {
Stopper.stop();
try {
- //thread sleep 3 seconds for thread quietly stop
+ // thread sleep 3 seconds for thread quietly stop
Thread.sleep(3000L);
} catch (Exception e) {
logger.warn("thread sleep exception ", e);
}
- //
+ // close
this.masterSchedulerService.close();
this.nettyRemotingServer.close();
- this.masterRegistry.unRegistry();
this.zkMasterClient.close();
- //close quartz
+ // close quartz
try {
QuartzExecutors.getInstance().shutdown();
logger.info("Quartz service stopped");
} catch (Exception e) {
logger.warn("Quartz service stopped exception:{}",
e.getMessage());
}
-
} catch (Exception e) {
logger.error("master server stop exception ", e);
- } finally {
- System.exit(-1);
}
}
@@ -206,4 +184,3 @@ public class MasterServer implements IStoppable {
close(cause);
}
}
-
diff --git
a/dolphinscheduler-server/src/main/java/org/apache/dolphinscheduler/server/master/registry/MasterRegistry.java
b/dolphinscheduler-server/src/main/java/org/apache/dolphinscheduler/server/master/registry/MasterRegistry.java
index 0624f0c..a865809 100644
---
a/dolphinscheduler-server/src/main/java/org/apache/dolphinscheduler/server/master/registry/MasterRegistry.java
+++
b/dolphinscheduler-server/src/main/java/org/apache/dolphinscheduler/server/master/registry/MasterRegistry.java
@@ -116,6 +116,8 @@ public class MasterRegistry {
String localNodePath = getMasterPath();
zookeeperRegistryCenter.getRegisterOperator().remove(localNodePath);
logger.info("master node : {} unRegistry to ZK.", address);
+ heartBeatExecutor.shutdown();
+ logger.info("heartbeat executor shutdown");
}
/**
@@ -130,7 +132,7 @@ public class MasterRegistry {
* get local address
* @return
*/
- private String getLocalAddress(){
+ private String getLocalAddress() {
return OSUtils.getAddr(masterConfig.getListenPort());
}
diff --git
a/dolphinscheduler-server/src/main/java/org/apache/dolphinscheduler/server/master/runner/MasterSchedulerService.java
b/dolphinscheduler-server/src/main/java/org/apache/dolphinscheduler/server/master/runner/MasterSchedulerService.java
index 0985c1f..cd18c46 100644
---
a/dolphinscheduler-server/src/main/java/org/apache/dolphinscheduler/server/master/runner/MasterSchedulerService.java
+++
b/dolphinscheduler-server/src/main/java/org/apache/dolphinscheduler/server/master/runner/MasterSchedulerService.java
@@ -16,8 +16,6 @@
*/
package org.apache.dolphinscheduler.server.master.runner;
-import org.apache.curator.framework.imps.CuratorFrameworkState;
-import org.apache.curator.framework.recipes.locks.InterProcessMutex;
import org.apache.dolphinscheduler.common.Constants;
import org.apache.dolphinscheduler.common.thread.Stopper;
import org.apache.dolphinscheduler.common.thread.ThreadUtils;
@@ -29,15 +27,20 @@ import
org.apache.dolphinscheduler.remote.config.NettyClientConfig;
import org.apache.dolphinscheduler.server.master.config.MasterConfig;
import org.apache.dolphinscheduler.server.zk.ZKMasterClient;
import org.apache.dolphinscheduler.service.process.ProcessService;
+
+import org.apache.curator.framework.imps.CuratorFrameworkState;
+import org.apache.curator.framework.recipes.locks.InterProcessMutex;
+
+import java.util.concurrent.ThreadPoolExecutor;
+import java.util.concurrent.TimeUnit;
+
+import javax.annotation.PostConstruct;
+
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.stereotype.Service;
-import javax.annotation.PostConstruct;
-import java.util.concurrent.ThreadPoolExecutor;
-import java.util.concurrent.TimeUnit;
-
/**
* master scheduler thread
*/
@@ -167,7 +170,7 @@ public class MasterSchedulerService extends Thread {
}
}
- private String getLocalAddress(){
+ private String getLocalAddress() {
return OSUtils.getAddr(masterConfig.getListenPort());
}
}
diff --git
a/dolphinscheduler-server/src/main/java/org/apache/dolphinscheduler/server/registry/HeartBeatTask.java
b/dolphinscheduler-server/src/main/java/org/apache/dolphinscheduler/server/registry/HeartBeatTask.java
index 90d6ea3..ea187c2 100644
---
a/dolphinscheduler-server/src/main/java/org/apache/dolphinscheduler/server/registry/HeartBeatTask.java
+++
b/dolphinscheduler-server/src/main/java/org/apache/dolphinscheduler/server/registry/HeartBeatTask.java
@@ -43,9 +43,8 @@ public class HeartBeatTask implements Runnable {
private Set<String> heartBeatPaths;
private String serverType;
private ZookeeperRegistryCenter zookeeperRegistryCenter;
- /**
- * server stop or not
- */
+
+ // server stop or not
protected IStoppable stoppable = null;
public HeartBeatTask(String startTime,
diff --git
a/dolphinscheduler-server/src/main/java/org/apache/dolphinscheduler/server/registry/ZookeeperRegistryCenter.java
b/dolphinscheduler-server/src/main/java/org/apache/dolphinscheduler/server/registry/ZookeeperRegistryCenter.java
index 9017a13..591333b 100644
---
a/dolphinscheduler-server/src/main/java/org/apache/dolphinscheduler/server/registry/ZookeeperRegistryCenter.java
+++
b/dolphinscheduler-server/src/main/java/org/apache/dolphinscheduler/server/registry/ZookeeperRegistryCenter.java
@@ -227,18 +227,14 @@ public class ZookeeperRegistryCenter implements
InitializingBean {
* @throws Exception errors
*/
protected boolean checkIsDeadServer(String zNode, String serverType)
throws Exception {
- //ip_sequenceno
+ // ip_sequence_no
String[] zNodesPath = zNode.split("\\/");
String ipSeqNo = zNodesPath[zNodesPath.length - 1];
String type = serverType.equals(MASTER_PREFIX) ? MASTER_PREFIX :
WORKER_PREFIX;
String deadServerPath = getDeadZNodeParentPath() + SINGLE_SLASH + type
+ UNDERLINE + ipSeqNo;
- if (!registerOperator.isExisted(zNode) ||
registerOperator.isExisted(deadServerPath)) {
- return true;
- }
-
- return false;
+ return !registerOperator.isExisted(zNode) ||
registerOperator.isExisted(deadServerPath);
}
public RegisterOperator getRegisterOperator() {
diff --git
a/dolphinscheduler-server/src/main/java/org/apache/dolphinscheduler/server/worker/WorkerServer.java
b/dolphinscheduler-server/src/main/java/org/apache/dolphinscheduler/server/worker/WorkerServer.java
index b408f6b..aa5600a 100644
---
a/dolphinscheduler-server/src/main/java/org/apache/dolphinscheduler/server/worker/WorkerServer.java
+++
b/dolphinscheduler-server/src/main/java/org/apache/dolphinscheduler/server/worker/WorkerServer.java
@@ -14,6 +14,7 @@
* See the License for the specific language governing permissions and
* limitations under the License.
*/
+
package org.apache.dolphinscheduler.server.worker;
import org.apache.dolphinscheduler.common.Constants;
@@ -34,6 +35,8 @@ import
org.apache.dolphinscheduler.service.bean.SpringApplicationContext;
import java.util.Set;
+import javax.annotation.PostConstruct;
+
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.springframework.beans.factory.annotation.Autowired;
@@ -41,8 +44,6 @@ import org.springframework.boot.WebApplicationType;
import org.springframework.boot.builder.SpringApplicationBuilder;
import org.springframework.context.annotation.ComponentScan;
-import javax.annotation.PostConstruct;
-
/**
* worker server
*/
@@ -55,25 +56,25 @@ public class WorkerServer implements IStoppable {
private static final Logger logger =
LoggerFactory.getLogger(WorkerServer.class);
/**
- * netty remote server
+ * netty remote server
*/
private NettyRemotingServer nettyRemotingServer;
/**
- * worker registry
+ * worker registry
*/
@Autowired
private WorkerRegistry workerRegistry;
/**
- * worker config
+ * worker config
*/
@Autowired
private WorkerConfig workerConfig;
/**
- * spring application context
- * only use it for initialization
+ * spring application context
+ * only use it for initialization
*/
@Autowired
private SpringApplicationContext springApplicationContext;
@@ -82,9 +83,8 @@ public class WorkerServer implements IStoppable {
private RetryReportTaskStatusThread retryReportTaskStatusThread;
/**
- * worker server startup
+ * worker server startup, not use web service
*
- * worker server not use web service
* @param args arguments
*/
public static void main(String[] args) {
@@ -92,15 +92,12 @@ public class WorkerServer implements IStoppable {
new
SpringApplicationBuilder(WorkerServer.class).web(WebApplicationType.NONE).run(args);
}
-
/**
* worker server run
*/
@PostConstruct
- public void run(){
- logger.info("start worker server...");
-
- //init remoting server
+ public void run() {
+ // init remoting server
NettyServerConfig serverConfig = new NettyServerConfig();
serverConfig.setListenPort(workerConfig.getListenPort());
this.nettyRemotingServer = new NettyRemotingServer(serverConfig);
@@ -127,12 +124,9 @@ public class WorkerServer implements IStoppable {
/**
* register hooks, which are called before the process exits
*/
- Runtime.getRuntime().addShutdownHook(new Thread(new Runnable() {
- @Override
- public void run() {
- if (Stopper.isRunning()) {
- close("shutdownHook");
- }
+ Runtime.getRuntime().addShutdownHook(new Thread(() -> {
+ if (Stopper.isRunning()) {
+ close("shutdownHook");
}
}));
}
@@ -140,7 +134,7 @@ public class WorkerServer implements IStoppable {
public void close(String cause) {
try {
- //execute only once
+ // execute only once
if (Stopper.isStopped()) {
return;
}
@@ -151,19 +145,17 @@ public class WorkerServer implements IStoppable {
Stopper.stop();
try {
- //thread sleep 3 seconds for thread quitely stop
+ // thread sleep 3 seconds for thread quitely stop
Thread.sleep(3000L);
} catch (Exception e) {
logger.warn("thread sleep exception", e);
}
+ // close
this.nettyRemotingServer.close();
this.workerRegistry.unRegistry();
-
} catch (Exception e) {
logger.error("worker server stop exception ", e);
- } finally {
- System.exit(-1);
}
}
diff --git
a/dolphinscheduler-server/src/main/java/org/apache/dolphinscheduler/server/worker/registry/WorkerRegistry.java
b/dolphinscheduler-server/src/main/java/org/apache/dolphinscheduler/server/worker/registry/WorkerRegistry.java
index 01e4554..d7b1fc0 100644
---
a/dolphinscheduler-server/src/main/java/org/apache/dolphinscheduler/server/worker/registry/WorkerRegistry.java
+++
b/dolphinscheduler-server/src/main/java/org/apache/dolphinscheduler/server/worker/registry/WorkerRegistry.java
@@ -14,6 +14,7 @@
* See the License for the specific language governing permissions and
* limitations under the License.
*/
+
package org.apache.dolphinscheduler.server.worker.registry;
import static
org.apache.dolphinscheduler.common.Constants.DEFAULT_WORKER_GROUP;
@@ -45,7 +46,6 @@ import org.springframework.stereotype.Service;
import com.google.common.collect.Sets;
-
/**
* worker registry
*/
@@ -113,6 +113,7 @@ public class WorkerRegistry {
zookeeperRegistryCenter.getRegisterOperator().persistEphemeral(workerZKPath,
"");
} else if (newState == ConnectionState.SUSPENDED) {
logger.warn("worker : {} connection SUSPENDED ",
address);
+
zookeeperRegistryCenter.getRegisterOperator().persistEphemeral(workerZKPath,
"");
}
});
logger.info("worker node : {} registry to ZK {} successfully",
address, workerZKPath);
@@ -140,6 +141,7 @@ public class WorkerRegistry {
logger.info("worker node : {} unRegistry from ZK {}.", address,
workerZkPath);
}
this.heartBeatExecutor.shutdownNow();
+ logger.info("heartbeat executor shutdown");
}
/**
@@ -169,7 +171,7 @@ public class WorkerRegistry {
* get local address
* @return local address
*/
- private String getLocalAddress(){
+ private String getLocalAddress() {
return OSUtils.getAddr(workerConfig.getListenPort());
}
diff --git
a/dolphinscheduler-server/src/main/java/org/apache/dolphinscheduler/server/zk/ZKMasterClient.java
b/dolphinscheduler-server/src/main/java/org/apache/dolphinscheduler/server/zk/ZKMasterClient.java
index 2d8eed9..b313a1d 100644
---
a/dolphinscheduler-server/src/main/java/org/apache/dolphinscheduler/server/zk/ZKMasterClient.java
+++
b/dolphinscheduler-server/src/main/java/org/apache/dolphinscheduler/server/zk/ZKMasterClient.java
@@ -14,12 +14,10 @@
* See the License for the specific language governing permissions and
* limitations under the License.
*/
+
package org.apache.dolphinscheduler.server.zk;
-import org.apache.commons.lang.StringUtils;
-import org.apache.curator.framework.CuratorFramework;
-import org.apache.curator.framework.recipes.cache.TreeCacheEvent;
-import org.apache.curator.framework.recipes.locks.InterProcessMutex;
+import static org.apache.dolphinscheduler.common.Constants.SLEEP_TIME_MILLIS;
import org.apache.dolphinscheduler.common.Constants;
import org.apache.dolphinscheduler.common.enums.ExecutionStatus;
@@ -37,16 +35,18 @@ import
org.apache.dolphinscheduler.server.utils.ProcessUtils;
import org.apache.dolphinscheduler.service.process.ProcessService;
import org.apache.dolphinscheduler.service.zk.AbstractZKClient;
-import org.slf4j.Logger;
-import org.slf4j.LoggerFactory;
-import org.springframework.beans.factory.annotation.Autowired;
-import org.springframework.stereotype.Component;
+import org.apache.commons.lang.StringUtils;
+import org.apache.curator.framework.CuratorFramework;
+import org.apache.curator.framework.recipes.cache.TreeCacheEvent;
+import org.apache.curator.framework.recipes.locks.InterProcessMutex;
import java.util.Date;
import java.util.List;
-import static org.apache.dolphinscheduler.common.Constants.*;
-
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+import org.springframework.beans.factory.annotation.Autowired;
+import org.springframework.stereotype.Component;
/**
* zookeeper master client
@@ -81,7 +81,7 @@ public class ZKMasterClient extends AbstractZKClient {
mutex = new InterProcessMutex(getZkClient(), znodeLock);
mutex.acquire();
- // Master registry
+ // master registry
masterRegistry.registry();
masterRegistry.getZookeeperRegistryCenter().setStoppable(masterServer);
String registPath = this.masterRegistry.getMasterPath();
@@ -108,8 +108,8 @@ public class ZKMasterClient extends AbstractZKClient {
@Override
public void close() {
- super.close();
masterRegistry.unRegistry();
+ super.close();
}
/**
diff --git
a/dolphinscheduler-server/src/test/java/org/apache/dolphinscheduler/server/master/dispatch/executor/NettyExecutorManagerTest.java
b/dolphinscheduler-server/src/test/java/org/apache/dolphinscheduler/server/master/dispatch/executor/NettyExecutorManagerTest.java
index de5d052..98d58a8 100644
---
a/dolphinscheduler-server/src/test/java/org/apache/dolphinscheduler/server/master/dispatch/executor/NettyExecutorManagerTest.java
+++
b/dolphinscheduler-server/src/test/java/org/apache/dolphinscheduler/server/master/dispatch/executor/NettyExecutorManagerTest.java
@@ -78,7 +78,7 @@ public class NettyExecutorManagerTest {
.buildProcessDefinitionRelatedInfo(processDefinition)
.create();
ExecutionContext executionContext = new
ExecutionContext(context.toCommand(), ExecutorType.WORKER);
- executionContext.setHost(Host.of(OSUtils.getAddr(OSUtils.getHost(),
serverConfig.getListenPort())));
+
executionContext.setHost(Host.of(OSUtils.getAddr(serverConfig.getListenPort())));
Boolean execute = nettyExecutorManager.execute(executionContext);
Assert.assertTrue(execute);
nettyRemotingServer.close();
diff --git
a/dolphinscheduler-server/src/test/java/org/apache/dolphinscheduler/server/master/registry/MasterRegistryTest.java
b/dolphinscheduler-server/src/test/java/org/apache/dolphinscheduler/server/master/registry/MasterRegistryTest.java
index 2488d59..6fd5a73 100644
---
a/dolphinscheduler-server/src/test/java/org/apache/dolphinscheduler/server/master/registry/MasterRegistryTest.java
+++
b/dolphinscheduler-server/src/test/java/org/apache/dolphinscheduler/server/master/registry/MasterRegistryTest.java
@@ -19,6 +19,7 @@ package org.apache.dolphinscheduler.server.master.registry;
import static
org.apache.dolphinscheduler.common.Constants.HEARTBEAT_FOR_ZOOKEEPER_INFO_LENGTH;
+import org.apache.dolphinscheduler.common.utils.OSUtils;
import org.apache.dolphinscheduler.remote.utils.Constants;
import org.apache.dolphinscheduler.server.master.config.MasterConfig;
import org.apache.dolphinscheduler.server.registry.ZookeeperRegistryCenter;
@@ -59,7 +60,7 @@ public class MasterRegistryTest {
masterRegistry.registry();
String masterPath = zookeeperRegistryCenter.getMasterPath();
TimeUnit.SECONDS.sleep(masterConfig.getMasterHeartbeatInterval() + 2);
//wait heartbeat info write into zk node
- String masterNodePath = masterPath + "/" + (Constants.LOCAL_ADDRESS +
":" + masterConfig.getListenPort());
+ String masterNodePath = masterPath + "/" +
OSUtils.getAddr(Constants.LOCAL_ADDRESS, masterConfig.getListenPort());
String heartbeat =
zookeeperRegistryCenter.getRegisterOperator().get(masterNodePath);
Assert.assertEquals(HEARTBEAT_FOR_ZOOKEEPER_INFO_LENGTH,
heartbeat.split(",").length);
masterRegistry.unRegistry();
diff --git
a/dolphinscheduler-service/src/main/java/org/apache/dolphinscheduler/service/zk/ZookeeperOperator.java
b/dolphinscheduler-service/src/main/java/org/apache/dolphinscheduler/service/zk/ZookeeperOperator.java
index 4c5691b..b635dc7 100644
---
a/dolphinscheduler-service/src/main/java/org/apache/dolphinscheduler/service/zk/ZookeeperOperator.java
+++
b/dolphinscheduler-service/src/main/java/org/apache/dolphinscheduler/service/zk/ZookeeperOperator.java
@@ -14,6 +14,7 @@
* See the License for the specific language governing permissions and
* limitations under the License.
*/
+
package org.apache.dolphinscheduler.service.zk;
import static
org.apache.dolphinscheduler.common.utils.Preconditions.checkNotNull;
@@ -55,7 +56,7 @@ public class ZookeeperOperator implements InitializingBean {
protected CuratorFramework zkClient;
@Override
- public void afterPropertiesSet() throws Exception {
+ public void afterPropertiesSet() {
this.zkClient = buildClient();
initStateListener();
treeCacheStart();
@@ -72,11 +73,11 @@ public class ZookeeperOperator implements InitializingBean {
checkNotNull(zkClient);
zkClient.getConnectionStateListenable().addListener((client, newState)
-> {
- if(newState == ConnectionState.LOST){
+ if (newState == ConnectionState.LOST) {
logger.error("connection lost from zookeeper");
- } else if(newState == ConnectionState.RECONNECTED){
+ } else if (newState == ConnectionState.RECONNECTED) {
logger.info("reconnected to zookeeper");
- } else if(newState == ConnectionState.SUSPENDED){
+ } else if (newState == ConnectionState.SUSPENDED) {
logger.warn("connection SUSPENDED to zookeeper");
}
});
@@ -142,8 +143,8 @@ public class ZookeeperOperator implements InitializingBean {
}
}
- public boolean hasChildren(final String key){
- Stat stat ;
+ public boolean hasChildren(final String key) {
+ Stat stat;
try {
stat = zkClient.checkExists().forPath(key);
return stat.getNumChildren() >= 1;
diff --git
a/dolphinscheduler-service/src/test/java/org/apache/dolphinscheduler/service/zk/RegisterOperatorTest.java
b/dolphinscheduler-service/src/test/java/org/apache/dolphinscheduler/service/zk/RegisterOperatorTest.java
index f828c07..7823c9b 100644
---
a/dolphinscheduler-service/src/test/java/org/apache/dolphinscheduler/service/zk/RegisterOperatorTest.java
+++
b/dolphinscheduler-service/src/test/java/org/apache/dolphinscheduler/service/zk/RegisterOperatorTest.java
@@ -113,4 +113,19 @@ public class RegisterOperatorTest {
Assert.assertFalse(registerOperator.getChildrenKeys(path).contains(String.format("%s_%s",Constants.MASTER_PREFIX,MASTER_NODE)));
}
+ @Test
+ public void testGetChildrenKeysWithNoNodeException() throws Exception {
+ testAfterPropertiesSet();
+ String path = registerOperator.getDeadZNodeParentPath();
+ Assert.assertEquals(0, registerOperator.getChildrenKeys(path).size());
+ }
+
+ @Test
+ public void testNoNodeException() throws Exception {
+ testAfterPropertiesSet();
+ String path = registerOperator.getDeadZNodeParentPath();
+ registerOperator.persistEphemeral(path, "test");
+ registerOperator.remove(path);
+ }
+
}
\ No newline at end of file