This is an automated email from the ASF dual-hosted git repository.
journey pushed a commit to branch dev
in repository https://gitbox.apache.org/repos/asf/incubator-dolphinscheduler.git
The following commit(s) were added to refs/heads/dev by this push:
new 753ed58 [Bug-3140]fix the deadlock between start and stop of ZKServer
(#3141)
753ed58 is described below
commit 753ed58fb9f15f85240918fe6457cf9015ee5101
Author: tswstarplanet <[email protected]>
AuthorDate: Mon Jul 13 18:52:33 2020 +0800
[Bug-3140]fix the deadlock between start and stop of ZKServer (#3141)
* [Bug-3140]fix the deadlock between start and stop of ZKServer
* use Log framework to print information
* fix code smells; add path prefix of embedded zk server
* Update
dolphinscheduler-service/src/main/java/org/apache/dolphinscheduler/service/zk/ZKServer.java
Co-authored-by: Yichao Yang <[email protected]>
* optimize the code
Co-authored-by: Yichao Yang <[email protected]>
---
.../dolphinscheduler/service/zk/ZKServer.java | 119 ++++++++++++---------
.../dolphinscheduler/service/zk/ZKServerTest.java | 38 +++++--
2 files changed, 103 insertions(+), 54 deletions(-)
diff --git
a/dolphinscheduler-service/src/main/java/org/apache/dolphinscheduler/service/zk/ZKServer.java
b/dolphinscheduler-service/src/main/java/org/apache/dolphinscheduler/service/zk/ZKServer.java
index 3cdc9ab..c7a53eb 100644
---
a/dolphinscheduler-service/src/main/java/org/apache/dolphinscheduler/service/zk/ZKServer.java
+++
b/dolphinscheduler-service/src/main/java/org/apache/dolphinscheduler/service/zk/ZKServer.java
@@ -16,6 +16,7 @@
*/
package org.apache.dolphinscheduler.service.zk;
+import org.apache.dolphinscheduler.common.utils.StringUtils;
import org.apache.zookeeper.server.ZooKeeperServer;
import org.apache.zookeeper.server.ZooKeeperServerMain;
import org.apache.zookeeper.server.quorum.QuorumPeerConfig;
@@ -34,44 +35,62 @@ import java.util.concurrent.atomic.AtomicBoolean;
public class ZKServer {
private static final Logger logger =
LoggerFactory.getLogger(ZKServer.class);
- private static volatile PublicZooKeeperServerMain zkServer = null;
-
public static final int DEFAULT_ZK_TEST_PORT = 2181;
- private static String dataDir = null;
+ private final AtomicBoolean isStarted = new AtomicBoolean(false);
+
+ private PublicZooKeeperServerMain zooKeeperServerMain = null;
+
+ private int port;
- private static final AtomicBoolean isStarted = new AtomicBoolean(false);
+ private String dataDir = null;
+
+ private String prefix;
public static void main(String[] args) {
- if(!isStarted()){
- ZKServer.start();
-
- /**
- * register hooks, which are called before the process exits
- */
- Runtime.getRuntime().addShutdownHook(new Thread(new Runnable() {
- @Override
- public void run() {
- stop();
- }
- }));
- }else{
- logger.info("zk server aleady started");
+ ZKServer zkServer;
+ if (args.length == 0) {
+ zkServer = new ZKServer();
+ } else if (args.length == 1){
+ zkServer = new ZKServer(Integer.valueOf(args[0]), "");
+ } else {
+ zkServer = new ZKServer(Integer.valueOf(args[0]), args[1]);
+ }
+ zkServer.registerHook();
+ zkServer.start();
+ }
+
+ public ZKServer() {
+ this(DEFAULT_ZK_TEST_PORT, "");
+ }
+
+ public ZKServer(int port, String prefix) {
+ this.port = port;
+ if (prefix != null && prefix.contains("/")) {
+ throw new IllegalArgumentException("The prefix of path may not
have '/'");
}
+ this.prefix = (prefix == null ? null : prefix.trim());
+ }
+
+ private void registerHook() {
+ /**
+ * register hooks, which are called before the process exits
+ */
+ Runtime.getRuntime().addShutdownHook(new Thread(this::stop));
}
/**
* start service
*/
- public static void start() {
+ public void start() {
try {
- startLocalZkServer(DEFAULT_ZK_TEST_PORT);
+ startLocalZkServer(port);
} catch (Exception e) {
- logger.error("Failed to start ZK: " + e);
+ logger.error("Failed to start ZK ", e);
}
}
- public static boolean isStarted(){
+ public boolean isStarted(){
return isStarted.get();
}
@@ -94,8 +113,12 @@ public class ZKServer {
*
* @param port The port to listen on
*/
- public static void startLocalZkServer(final int port) {
- String zkDataDir = System.getProperty("user.dir") +"/zookeeper_data";
+ public void startLocalZkServer(final int port) {
+ String zkDataDir = System.getProperty("user.dir") +
(StringUtils.isEmpty(prefix) ? StringUtils.EMPTY : ("/" + prefix)) +
"/zookeeper_data";
+ File file = new File(zkDataDir);
+ if (file.exists()) {
+ logger.warn("The path of zk server exists");
+ }
logger.info("zk server starting, data dir path:{}" , zkDataDir);
startLocalZkServer(port, zkDataDir,
ZooKeeperServer.DEFAULT_TICK_TIME,"60");
}
@@ -108,31 +131,29 @@ public class ZKServer {
* @param tickTime zk tick time
* @param maxClientCnxns zk max client connections
*/
- private static synchronized void startLocalZkServer(final int port, final
String dataDirPath,final int tickTime,String maxClientCnxns) {
- if (zkServer != null) {
- throw new RuntimeException("Zookeeper server is already started!");
- }
- zkServer = new PublicZooKeeperServerMain();
- logger.info("Zookeeper data path : {} ", dataDirPath);
- dataDir = dataDirPath;
- final String[] args = new String[]{Integer.toString(port),
dataDirPath, Integer.toString(tickTime), maxClientCnxns};
+ private void startLocalZkServer(final int port, final String
dataDirPath,final int tickTime,String maxClientCnxns) {
+ if (isStarted.compareAndSet(false, true)) {
+ zooKeeperServerMain = new PublicZooKeeperServerMain();
+ logger.info("Zookeeper data path : {} ", dataDirPath);
+ dataDir = dataDirPath;
+ final String[] args = new String[]{Integer.toString(port),
dataDirPath, Integer.toString(tickTime), maxClientCnxns};
- try {
- logger.info("Zookeeper server started ");
- isStarted.compareAndSet(false, true);
-
- zkServer.initializeAndRun(args);
- } catch (QuorumPeerConfig.ConfigException e) {
- logger.warn("Caught exception while starting ZK", e);
- } catch (IOException e) {
- logger.warn("Caught exception while starting ZK", e);
+ try {
+ logger.info("Zookeeper server started ");
+ isStarted.compareAndSet(false, true);
+
+ zooKeeperServerMain.initializeAndRun(args);
+ } catch (QuorumPeerConfig.ConfigException | IOException e) {
+ logger.warn("Caught exception while starting ZK", e);
+ throw new RuntimeException(e);
+ }
}
}
/**
* Stops a local Zk instance, deleting its data directory
*/
- public static void stop() {
+ public void stop() {
try {
stopLocalZkServer(true);
logger.info("zk server stopped");
@@ -147,19 +168,21 @@ public class ZKServer {
*
* @param deleteDataDir Whether or not to delete the data directory
*/
- private static synchronized void stopLocalZkServer(final boolean
deleteDataDir) {
- if (zkServer != null) {
+ private void stopLocalZkServer(final boolean deleteDataDir) {
+ if (isStarted.compareAndSet(true, false)) {
try {
- zkServer.shutdown();
- zkServer = null;
+ if (zooKeeperServerMain == null) {
+ return;
+ }
+ zooKeeperServerMain.shutdown();
+ zooKeeperServerMain = null;
if (deleteDataDir) {
org.apache.commons.io.FileUtils.deleteDirectory(new
File(dataDir));
}
- isStarted.compareAndSet(true, false);
} catch (Exception e) {
logger.warn("Caught exception while stopping ZK server", e);
throw new RuntimeException(e);
}
}
}
-}
\ No newline at end of file
+}
diff --git
a/dolphinscheduler-service/src/test/java/org/apache/dolphinscheduler/service/zk/ZKServerTest.java
b/dolphinscheduler-service/src/test/java/org/apache/dolphinscheduler/service/zk/ZKServerTest.java
index 42b942b..10be65e 100644
---
a/dolphinscheduler-service/src/test/java/org/apache/dolphinscheduler/service/zk/ZKServerTest.java
+++
b/dolphinscheduler-service/src/test/java/org/apache/dolphinscheduler/service/zk/ZKServerTest.java
@@ -18,18 +18,44 @@ package org.apache.dolphinscheduler.service.zk;
import org.junit.Assert;
import org.junit.Test;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
-// ZKServer is a process, can't unit test
-public class ZKServerTest {
+import java.util.concurrent.TimeUnit;
+import java.util.concurrent.atomic.AtomicReference;
+public class ZKServerTest {
+ private static final Logger log =
LoggerFactory.getLogger(ZKServerTest.class);
@Test
- public void isStarted() {
- Assert.assertEquals(false, ZKServer.isStarted());
+ public void testRunWithDefaultPort() {
+ AtomicReference<ZKServer> zkServer = new AtomicReference<>();
+ new Thread(() -> {
+ zkServer.set(new ZKServer());
+ zkServer.get().start();
+ }).start();
+ try {
+ TimeUnit.SECONDS.sleep(5);
+ Assert.assertEquals(true, zkServer.get().isStarted());
+ } catch (InterruptedException e) {
+ log.error("Thread interrupted", e);
+ }
+ zkServer.get().stop();
}
@Test
- public void stop() {
- ZKServer.stop();
+ public void testRunWithCustomPort() {
+ AtomicReference<ZKServer> zkServer = new AtomicReference<>();
+ new Thread(() -> {
+ zkServer.set(new ZKServer(2183, null));
+ zkServer.get().start();
+ }).start();
+ try {
+ TimeUnit.SECONDS.sleep(5);
+ Assert.assertEquals(true, zkServer.get().isStarted());
+ } catch (InterruptedException e) {
+ log.error("Thread interrupted", e);
+ }
+ zkServer.get().stop();
}
}
\ No newline at end of file