This is an automated email from the ASF dual-hosted git repository.
leonbao pushed a commit to branch dev
in repository https://gitbox.apache.org/repos/asf/incubator-dolphinscheduler.git
The following commit(s) were added to refs/heads/dev by this push:
new 93188b7 add ZkServer for UT (#1499)
93188b7 is described below
commit 93188b7da1eddfc58581487c236e8392a3345c23
Author: Tboy <[email protected]>
AuthorDate: Tue Dec 17 16:13:07 2019 +0800
add ZkServer for UT (#1499)
* add ZkServer for UT
* Add FileUtilsTest.java , the unit test for FileUtils (#1493) (#1)
* updates for reference ZkServer
---
.../common/queue/TaskQueueImplTest.java | 7 +-
.../common/zk/StandaloneZKServerForTest.java | 98 ------------
.../apache/dolphinscheduler/common/zk/TestZk.java | 43 ++++++
.../dolphinscheduler/common/zk/ZKServer.java | 165 +++++++++++++++++++++
4 files changed, 212 insertions(+), 101 deletions(-)
diff --git
a/dolphinscheduler-common/src/test/java/org/apache/dolphinscheduler/common/queue/TaskQueueImplTest.java
b/dolphinscheduler-common/src/test/java/org/apache/dolphinscheduler/common/queue/TaskQueueImplTest.java
index efee627..14e90eb 100644
---
a/dolphinscheduler-common/src/test/java/org/apache/dolphinscheduler/common/queue/TaskQueueImplTest.java
+++
b/dolphinscheduler-common/src/test/java/org/apache/dolphinscheduler/common/queue/TaskQueueImplTest.java
@@ -19,7 +19,7 @@ package org.apache.dolphinscheduler.common.queue;
import org.apache.dolphinscheduler.common.Constants;
import org.apache.dolphinscheduler.common.utils.IpUtils;
import org.apache.dolphinscheduler.common.utils.OSUtils;
-import org.apache.dolphinscheduler.common.zk.StandaloneZKServerForTest;
+import org.apache.dolphinscheduler.common.zk.ZKServer;
import org.junit.After;
import org.junit.Before;
import org.junit.Ignore;
@@ -35,7 +35,7 @@ import static org.junit.Assert.assertEquals;
/**
* task queue test
*/
-public class TaskQueueImplTest extends StandaloneZKServerForTest {
+public class TaskQueueImplTest {
private static final Logger logger =
LoggerFactory.getLogger(TaskQueueImplTest.class);
@@ -43,7 +43,7 @@ public class TaskQueueImplTest extends
StandaloneZKServerForTest {
@Before
public void before(){
- super.before();
+ ZKServer.start();
tasksQueue = TaskQueueFactory.getTaskQueueInstance();
@@ -57,6 +57,7 @@ public class TaskQueueImplTest extends
StandaloneZKServerForTest {
public void after(){
//clear all data
tasksQueue.delete();
+ ZKServer.stop();
}
diff --git
a/dolphinscheduler-common/src/test/java/org/apache/dolphinscheduler/common/zk/StandaloneZKServerForTest.java
b/dolphinscheduler-common/src/test/java/org/apache/dolphinscheduler/common/zk/StandaloneZKServerForTest.java
deleted file mode 100644
index fed9ebb..0000000
---
a/dolphinscheduler-common/src/test/java/org/apache/dolphinscheduler/common/zk/StandaloneZKServerForTest.java
+++ /dev/null
@@ -1,98 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements. See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package org.apache.dolphinscheduler.common.zk;
-
-import java.io.File;
-import java.util.Properties;
-
-import org.apache.dolphinscheduler.common.thread.ThreadPoolExecutors;
-import org.apache.zookeeper.server.ServerConfig;
-import org.apache.zookeeper.server.ZooKeeperServerMain;
-import org.apache.zookeeper.server.quorum.QuorumPeerConfig;
-import org.junit.Before;
-import org.slf4j.Logger;
-import org.slf4j.LoggerFactory;
-
-
-/**
- * just for test
- */
-public class StandaloneZKServerForTest {
-
- private static final Logger logger =
LoggerFactory.getLogger(StandaloneZKServerForTest.class);
-
- private static volatile ZooKeeperServerMain zkServer = null;
-
-
- @Before
- public void before() {
- logger.info("standalone zookeeper server for test service start ");
-
- ThreadPoolExecutors.getInstance().execute(new Runnable() {
- @Override
- public void run() {
-
- //delete zk data dir ?
- File zkFile = new File(System.getProperty("java.io.tmpdir"),
"zookeeper");
-
- startStandaloneServer("2000", zkFile.getAbsolutePath(),
"2181", "10", "5");
- }
- });
-
- }
-
-
- /**
- * start zk server
- * @param tickTime zookeeper ticktime
- * @param dataDir zookeeper data dir
- * @param clientPort zookeeper client port
- * @param initLimit zookeeper init limit
- * @param syncLimit zookeeper sync limit
- */
- private void startStandaloneServer(String tickTime, String dataDir, String
clientPort, String initLimit, String syncLimit) {
- Properties props = new Properties();
- props.setProperty("tickTime", tickTime);
- props.setProperty("dataDir", dataDir);
- props.setProperty("clientPort", clientPort);
- props.setProperty("initLimit", initLimit);
- props.setProperty("syncLimit", syncLimit);
-
- QuorumPeerConfig quorumConfig = new QuorumPeerConfig();
- try {
- quorumConfig.parseProperties(props);
-
- if(zkServer == null ){
-
- synchronized (StandaloneZKServerForTest.class){
- if(zkServer == null ){
- zkServer = new ZooKeeperServerMain();
- final ServerConfig config = new ServerConfig();
- config.readFrom(quorumConfig);
- zkServer.runFromConfig(config);
- }
- }
-
- }
-
- } catch (Exception e) {
- logger.error("start standalone server fail!", e);
- }
- }
-
-
-}
\ No newline at end of file
diff --git
a/dolphinscheduler-common/src/test/java/org/apache/dolphinscheduler/common/zk/TestZk.java
b/dolphinscheduler-common/src/test/java/org/apache/dolphinscheduler/common/zk/TestZk.java
new file mode 100644
index 0000000..5c3db2d
--- /dev/null
+++
b/dolphinscheduler-common/src/test/java/org/apache/dolphinscheduler/common/zk/TestZk.java
@@ -0,0 +1,43 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.dolphinscheduler.common.zk;
+
+import org.junit.After;
+import org.junit.Assert;
+import org.junit.Before;
+import org.junit.Test;
+
+/**
+ * demo for using zkServer
+ */
+public class TestZk {
+
+ @Before
+ public void before(){
+ ZKServer.start();
+ }
+
+ @Test
+ public void test(){
+ Assert.assertTrue(ZKServer.isStarted());
+ }
+
+ @After
+ public void after(){
+ ZKServer.stop();
+ }
+}
diff --git
a/dolphinscheduler-common/src/test/java/org/apache/dolphinscheduler/common/zk/ZKServer.java
b/dolphinscheduler-common/src/test/java/org/apache/dolphinscheduler/common/zk/ZKServer.java
new file mode 100644
index 0000000..5aba9fd
--- /dev/null
+++
b/dolphinscheduler-common/src/test/java/org/apache/dolphinscheduler/common/zk/ZKServer.java
@@ -0,0 +1,165 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.dolphinscheduler.common.zk;
+
+import org.apache.curator.framework.CuratorFramework;
+import org.apache.curator.framework.CuratorFrameworkFactory;
+import org.apache.curator.retry.ExponentialBackoffRetry;
+import org.apache.zookeeper.server.ZooKeeperServerMain;
+import org.apache.zookeeper.server.quorum.QuorumPeerConfig;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import java.io.File;
+import java.io.IOException;
+import java.util.concurrent.TimeUnit;
+import java.util.concurrent.atomic.AtomicBoolean;
+
+
+/**
+ * just for test
+ */
+public class ZKServer {
+
+ private static final Logger logger =
LoggerFactory.getLogger(ZKServer.class);
+
+ private static volatile PublicZooKeeperServerMain zkServer = null;
+
+ public static final int DEFAULT_ZK_TEST_PORT = 22181;
+
+ public static final String DEFAULT_ZK_STR = "localhost:" +
DEFAULT_ZK_TEST_PORT;
+
+ private static String dataDir = null;
+
+ private static final AtomicBoolean isStarted = new AtomicBoolean(false);
+
+ public static void start() {
+ try {
+ startLocalZkServer(DEFAULT_ZK_TEST_PORT);
+ } catch (Exception e) {
+ logger.error("Failed to start ZK: " + e);
+ }
+ }
+
+ public static boolean isStarted(){
+ return isStarted.get();
+ }
+
+ static class PublicZooKeeperServerMain extends ZooKeeperServerMain {
+
+ @Override
+ public void initializeAndRun(String[] args)
+ throws QuorumPeerConfig.ConfigException, IOException {
+ super.initializeAndRun(args);
+ }
+
+ @Override
+ public void shutdown() {
+ super.shutdown();
+ }
+ }
+
+ /**
+ * Starts a local Zk instance with a generated empty data directory
+ *
+ * @param port The port to listen on
+ */
+ public static void startLocalZkServer(final int port) {
+ startLocalZkServer(port,
org.apache.commons.io.FileUtils.getTempDirectoryPath() + File.separator +
"test-" + System.currentTimeMillis());
+ }
+
+ /**
+ * Starts a local Zk instance
+ *
+ * @param port The port to listen on
+ * @param dataDirPath The path for the Zk data directory
+ */
+ private static synchronized void startLocalZkServer(final int port, final
String dataDirPath) {
+ if (zkServer != null) {
+ throw new RuntimeException("Zookeeper server is already started!");
+ }
+ try {
+ zkServer = new PublicZooKeeperServerMain();
+ logger.info("Zookeeper data path : {} ", dataDirPath);
+ dataDir = dataDirPath;
+ final String[] args = new String[]{Integer.toString(port),
dataDirPath};
+ Thread init = new Thread(new Runnable() {
+ @Override
+ public void run() {
+ try {
+ zkServer.initializeAndRun(args);
+ } catch (QuorumPeerConfig.ConfigException e) {
+ logger.warn("Caught exception while starting ZK", e);
+ } catch (IOException e) {
+ logger.warn("Caught exception while starting ZK", e);
+ }
+ }
+ }, "init-zk-thread");
+ init.start();
+ } catch (Exception e) {
+ logger.warn("Caught exception while starting ZK", e);
+ throw new RuntimeException(e);
+ }
+
+ CuratorFramework zkClient = CuratorFrameworkFactory.builder()
+ .connectString(DEFAULT_ZK_STR)
+ .retryPolicy(new ExponentialBackoffRetry(10,100))
+ .sessionTimeoutMs(1000 * 30)
+ .connectionTimeoutMs(1000 * 30)
+ .build();
+
+ try {
+ zkClient.blockUntilConnected(10, TimeUnit.SECONDS);
+ zkClient.close();
+ } catch (InterruptedException ignore) {
+ }
+ isStarted.compareAndSet(false, true);
+ logger.info("zk server started");
+ }
+
+ /**
+ * Stops a local Zk instance, deleting its data directory
+ */
+ public static void stop() {
+ try {
+ stopLocalZkServer(true);
+ } catch (Exception e) {
+ logger.error("Failed to stop ZK ",e);
+ }
+ }
+
+ /**
+ * Stops a local Zk instance.
+ *
+ * @param deleteDataDir Whether or not to delete the data directory
+ */
+ private static synchronized void stopLocalZkServer(final boolean
deleteDataDir) {
+ if (zkServer != null) {
+ try {
+ zkServer.shutdown();
+ zkServer = null;
+ if (deleteDataDir) {
+ org.apache.commons.io.FileUtils.deleteDirectory(new
File(dataDir));
+ }
+ isStarted.compareAndSet(true, false);
+ } catch (Exception e) {
+ logger.warn("Caught exception while stopping ZK server", e);
+ throw new RuntimeException(e);
+ }
+ }
+ }
+}
\ No newline at end of file