This is an automated email from the ASF dual-hosted git repository. xyuanlu pushed a commit to branch metaclient in repository https://gitbox.apache.org/repos/asf/helix.git
commit 6c3070fc11324898de0d4c2e16e206006f022e2e Author: xyuanlu <[email protected]> AuthorDate: Thu Jan 12 09:39:21 2023 -0800 Meta client - implement CRUD for zkMetaClient Add basic CRUD for ZkMetaClient and test --- .../helix/metaclient/api/MetaClientInterface.java | 39 +++++- .../constants/MetaClientBadVersionException.java | 20 +++ .../metaclient/constants/MetaClientException.java | 19 ++- ...tion.java => MetaClientInterruptException.java} | 17 ++- ...ception.java => MetaClientNoNodeException.java} | 20 ++- ...eption.java => MetaClientTimeoutException.java} | 18 ++- .../helix/metaclient/impl/zk/ZkMetaClient.java | 146 ++++++++++++++++---- .../impl/zk/factory/ZkMetaClientConfig.java | 2 +- .../helix/metaclient/impl/zk/TestZkMetaClient.java | 149 ++++++++++++++++++++- 9 files changed, 387 insertions(+), 43 deletions(-) diff --git a/meta-client/src/main/java/org/apache/helix/metaclient/api/MetaClientInterface.java b/meta-client/src/main/java/org/apache/helix/metaclient/api/MetaClientInterface.java index 5118f6359..b00993b97 100644 --- a/meta-client/src/main/java/org/apache/helix/metaclient/api/MetaClientInterface.java +++ b/meta-client/src/main/java/org/apache/helix/metaclient/api/MetaClientInterface.java @@ -22,6 +22,9 @@ package org.apache.helix.metaclient.api; import java.util.List; import java.util.concurrent.TimeUnit; +import org.apache.helix.metaclient.constants.MetaClientInterruptException; +import org.apache.helix.metaclient.constants.MetaClientTimeoutException; + public interface MetaClientInterface<T> { @@ -34,9 +37,21 @@ public interface MetaClientInterface<T> { // An ephemeral node cannot have sub entry. PERSISTENT, - // The node will not be automatically deleted when the last sub-entry of the node is deleted. + // For metadata storage that has hierarchical key space (e.g. ZK), the node will be + // automatically deleted at some point in the future if the last child of the node is deleted. + // For metadata storage that has non-hierarchical key space (e.g. etcd), the node will be + // automatically deleted at some point in the future if the last entry that has the prefix + // is deleted. // The node is an ephemeral node. - CONTAINER + CONTAINER, + + // For metadata storage that has hierarchical key space (e.g. ZK) If the entry is not modified + // within the TTL and has no children it will become a candidate to be deleted by the server + // at some point in the future. + // For metadata storage that has non-hierarchical key space (e.g. etcd) If the entry is not modified + // within the TTL, it will become a candidate to be deleted by the server at some point in the + // future. + TTL } enum ConnectState { @@ -71,6 +86,11 @@ public interface MetaClientInterface<T> { public int getVersion() { return _version; } + + public Stat (EntryMode mode, int version) { + _version = version; + _entryMode = mode; + } } //synced CRUD API @@ -143,7 +163,7 @@ public interface MetaClientInterface<T> { * @eturn Return a list of children keys. Return direct child name only for hierarchical key * space, return the whole sub key for non-hierarchical key space. */ - List<String> getDirestChildrenKeys(final String key); + List<String> getDirectChildrenKeys(final String key); /** * Return the number of children for the given keys. @@ -152,7 +172,7 @@ public interface MetaClientInterface<T> { * For metadata storage that has non-hierarchical key space (e.g. etcd), the key would * be a prefix key. */ - int countDirestChildren(final String key); + int countDirectChildren(final String key); /** * Remove the entry associated with the given key. @@ -332,9 +352,14 @@ public interface MetaClientInterface<T> { /** * Maintains a connection with underlying metadata service based on config params. Connection * created by this method will be used to perform CRUD operations on metadata service. - * @return True if connection is successfully established. - */ - boolean connect(); + * @throws MetaClientInterruptException + * if the connection timed out due to thread interruption + * @throws MetaClientTimeoutException + * if the connection timed out + * @throws IllegalStateException + * if already connected or the connection is already closed explicitly + */ + void connect(); /** * Disconnect from server explicitly. diff --git a/meta-client/src/main/java/org/apache/helix/metaclient/constants/MetaClientBadVersionException.java b/meta-client/src/main/java/org/apache/helix/metaclient/constants/MetaClientBadVersionException.java new file mode 100644 index 000000000..4a7f4a0b2 --- /dev/null +++ b/meta-client/src/main/java/org/apache/helix/metaclient/constants/MetaClientBadVersionException.java @@ -0,0 +1,20 @@ +package org.apache.helix.metaclient.constants; + +public final class MetaClientBadVersionException extends MetaClientException { + public MetaClientBadVersionException() { + super(); + } + + public MetaClientBadVersionException(String message, Throwable cause) { + super(message, cause); + } + + public MetaClientBadVersionException(String message) { + super(message); + } + + public MetaClientBadVersionException(Throwable cause) { + super(cause); + } + +} diff --git a/meta-client/src/main/java/org/apache/helix/metaclient/constants/MetaClientException.java b/meta-client/src/main/java/org/apache/helix/metaclient/constants/MetaClientException.java index cf8a07c56..5ace636e3 100644 --- a/meta-client/src/main/java/org/apache/helix/metaclient/constants/MetaClientException.java +++ b/meta-client/src/main/java/org/apache/helix/metaclient/constants/MetaClientException.java @@ -19,5 +19,20 @@ package org.apache.helix.metaclient.constants; * under the License. */ -public final class MetaClientException { -} +public class MetaClientException extends RuntimeException { + public MetaClientException() { + super(); + } + + public MetaClientException(String message, Throwable cause) { + super(message, cause); + } + + public MetaClientException(String message) { + super(message); + } + + public MetaClientException(Throwable cause) { + super(cause); + } +} \ No newline at end of file diff --git a/meta-client/src/main/java/org/apache/helix/metaclient/constants/MetaClientException.java b/meta-client/src/main/java/org/apache/helix/metaclient/constants/MetaClientInterruptException.java similarity index 68% copy from meta-client/src/main/java/org/apache/helix/metaclient/constants/MetaClientException.java copy to meta-client/src/main/java/org/apache/helix/metaclient/constants/MetaClientInterruptException.java index cf8a07c56..3f52e3ad9 100644 --- a/meta-client/src/main/java/org/apache/helix/metaclient/constants/MetaClientException.java +++ b/meta-client/src/main/java/org/apache/helix/metaclient/constants/MetaClientInterruptException.java @@ -19,5 +19,20 @@ package org.apache.helix.metaclient.constants; * under the License. */ -public final class MetaClientException { +public final class MetaClientInterruptException extends MetaClientException { + public MetaClientInterruptException() { + super(); + } + + public MetaClientInterruptException(String message, Throwable cause) { + super(message, cause); + } + + public MetaClientInterruptException(String message) { + super(message); + } + + public MetaClientInterruptException(Throwable cause) { + super(cause); + } } diff --git a/meta-client/src/main/java/org/apache/helix/metaclient/constants/MetaClientException.java b/meta-client/src/main/java/org/apache/helix/metaclient/constants/MetaClientNoNodeException.java similarity index 68% copy from meta-client/src/main/java/org/apache/helix/metaclient/constants/MetaClientException.java copy to meta-client/src/main/java/org/apache/helix/metaclient/constants/MetaClientNoNodeException.java index cf8a07c56..b9cfb4b9f 100644 --- a/meta-client/src/main/java/org/apache/helix/metaclient/constants/MetaClientException.java +++ b/meta-client/src/main/java/org/apache/helix/metaclient/constants/MetaClientNoNodeException.java @@ -19,5 +19,21 @@ package org.apache.helix.metaclient.constants; * under the License. */ -public final class MetaClientException { -} +public final class MetaClientNoNodeException extends MetaClientException { + public MetaClientNoNodeException() { + super(); + } + + public MetaClientNoNodeException(String message, Throwable cause) { + super(message, cause); + } + + public MetaClientNoNodeException(String message) { + super(message); + } + + public MetaClientNoNodeException(Throwable cause) { + super(cause); + } + +} \ No newline at end of file diff --git a/meta-client/src/main/java/org/apache/helix/metaclient/constants/MetaClientException.java b/meta-client/src/main/java/org/apache/helix/metaclient/constants/MetaClientTimeoutException.java similarity index 68% copy from meta-client/src/main/java/org/apache/helix/metaclient/constants/MetaClientException.java copy to meta-client/src/main/java/org/apache/helix/metaclient/constants/MetaClientTimeoutException.java index cf8a07c56..6f1c381a4 100644 --- a/meta-client/src/main/java/org/apache/helix/metaclient/constants/MetaClientException.java +++ b/meta-client/src/main/java/org/apache/helix/metaclient/constants/MetaClientTimeoutException.java @@ -19,5 +19,21 @@ package org.apache.helix.metaclient.constants; * under the License. */ -public final class MetaClientException { +public final class MetaClientTimeoutException extends MetaClientException { + public MetaClientTimeoutException() { + super(); + } + + public MetaClientTimeoutException(String message, Throwable cause) { + super(message, cause); + } + + public MetaClientTimeoutException(String message) { + super(message); + } + + public MetaClientTimeoutException(Throwable cause) { + super(cause); + } + } diff --git a/meta-client/src/main/java/org/apache/helix/metaclient/impl/zk/ZkMetaClient.java b/meta-client/src/main/java/org/apache/helix/metaclient/impl/zk/ZkMetaClient.java index b3270ad81..35529454a 100644 --- a/meta-client/src/main/java/org/apache/helix/metaclient/impl/zk/ZkMetaClient.java +++ b/meta-client/src/main/java/org/apache/helix/metaclient/impl/zk/ZkMetaClient.java @@ -30,76 +30,133 @@ import org.apache.helix.metaclient.api.DataUpdater; import org.apache.helix.metaclient.api.DirectChildChangeListener; import org.apache.helix.metaclient.api.DirectChildSubscribeResult; import org.apache.helix.metaclient.api.MetaClientInterface; +import org.apache.helix.metaclient.api.Op; import org.apache.helix.metaclient.api.OpResult; +import org.apache.helix.metaclient.constants.MetaClientBadVersionException; +import org.apache.helix.metaclient.constants.MetaClientException; +import org.apache.helix.metaclient.constants.MetaClientInterruptException; +import org.apache.helix.metaclient.constants.MetaClientNoNodeException; +import org.apache.helix.metaclient.constants.MetaClientTimeoutException; import org.apache.helix.metaclient.impl.zk.factory.ZkMetaClientConfig; import org.apache.helix.zookeeper.impl.client.ZkClient; import org.apache.helix.zookeeper.zkclient.IZkDataListener; import org.apache.helix.zookeeper.zkclient.ZkConnection; +import org.apache.helix.zookeeper.zkclient.exception.ZkBadVersionException; +import org.apache.helix.zookeeper.zkclient.exception.ZkException; +import org.apache.helix.zookeeper.zkclient.exception.ZkInterruptedException; +import org.apache.helix.zookeeper.zkclient.exception.ZkNodeExistsException; +import org.apache.helix.zookeeper.zkclient.exception.ZkTimeoutException; +import org.apache.zookeeper.CreateMode; import org.apache.zookeeper.Watcher; +import org.apache.zookeeper.ZooDefs; +import org.apache.zookeeper.server.EphemeralType; -public class ZkMetaClient implements MetaClientInterface { +public class ZkMetaClient<T> implements MetaClientInterface<T>, AutoCloseable { private final ZkClient _zkClient; + private final int _connectionTimeout; public ZkMetaClient(ZkMetaClientConfig config) { - _zkClient = new ZkClient(new ZkConnection(config.getConnectionAddress(), - (int) config.getSessionTimeoutInMillis()), - (int) config.getConnectionInitTimeoutInMillis(), -1 /*operationRetryTimeout*/, - config.getZkSerializer(), config.getMonitorType(), config.getMonitorKey(), - config.getMonitorInstanceName(), config.getMonitorRootPathOnly()); + _connectionTimeout = (int) config.getConnectionInitTimeoutInMillis(); + _zkClient = new ZkClient( + new ZkConnection(config.getConnectionAddress(), (int) config.getSessionTimeoutInMillis()), + _connectionTimeout, -1 /*operationRetryTimeout*/, config.getZkSerializer(), + config.getMonitorType(), config.getMonitorKey(), config.getMonitorInstanceName(), + config.getMonitorRootPathOnly(), false); } @Override - public void create(String key, Object data) { - + public void create(String key, T data) { + // TODO: This function is implemented only for test. It does not have proper error handling + _zkClient.create(key, data, ZooDefs.Ids.OPEN_ACL_UNSAFE, CreateMode.PERSISTENT); } @Override - public void create(String key, Object data, EntryMode mode) { + public void create(String key, T data, EntryMode mode) { } @Override - public void set(String key, Object data, int version) { - + public void set(String key, T data, int version) { + try { + _zkClient.writeData(key, data, version); + } catch (ZkException e) { + throw translateZkExceptionToMetaclientException(e); + } } @Override - public Object update(String key, DataUpdater updater) { - return null; + public T update( String key, DataUpdater<T> updater) { + org.apache.zookeeper.data.Stat stat = new org.apache.zookeeper.data.Stat(); + // TODO: add retry logic for ZkBadVersionException. + try { + T oldData = _zkClient.readData(key, stat); + T newData = updater.update(oldData); + set(key, newData, stat.getVersion()); + return newData; + } catch (ZkException e) { + throw translateZkExceptionToMetaclientException(e); + } } @Override public Stat exists(String key) { - return null; + org.apache.zookeeper.data.Stat zkStats; + try { + zkStats = _zkClient.getStat(key); + if (zkStats == null) { + return null; + } + return new Stat(convertZkEntryMode(zkStats.getEphemeralOwner()), zkStats.getVersion()); + } catch (ZkException e) { + throw translateZkExceptionToMetaclientException(e); + } } @Override - public Object get(String key) { - return null; + public T get(String key) { + return _zkClient.readData(key, true); } @Override - public List<String> getDirestChildrenKeys(String key) { + public List<OpResult> transactionOP(Iterable<Op> ops) { return null; } @Override - public int countDirestChildren(String key) { - return 0; + public List<String> getDirectChildrenKeys(String key) { + try { + return _zkClient.getChildren(key); + } catch (ZkException e) { + throw translateZkExceptionToMetaclientException(e); + } + } + + @Override + public int countDirectChildren(String key) { + return _zkClient.countChildren(key); } @Override public boolean delete(String key) { - return false; + try { + return _zkClient.delete(key); + } catch (ZkException e) { + throw translateZkExceptionToMetaclientException(e); + } } @Override public boolean recursiveDelete(String key) { - return false; + _zkClient.deleteRecursively(key); + return true; } + // Zookeeper execute async callbacks at zookeeper server side. In our first version of + // implementation, we will keep this behavior. + // In later version, we may consider creating a thread pool to execute registered callbacks. + // However, this will change metaclient from stateless to stateful. @Override public void setAsyncExecPoolSize(int poolSize) { @@ -156,13 +213,19 @@ public class ZkMetaClient implements MetaClientInterface { } @Override - public boolean connect() { - return false; + public void connect() { + // TODO: throws IllegalStateException when already connected + try { + _zkClient.connect(_connectionTimeout, _zkClient); + } catch (ZkException e) { + throw translateZkExceptionToMetaclientException(e); + } } @Override public void disconnect() { - + // TODO: This is a temp impl for test only. no proper interrupt handling and error handling. + _zkClient.close(); } @Override @@ -246,8 +309,8 @@ public class ZkMetaClient implements MetaClientInterface { } @Override - public List<OpResult> transactionOP(Iterable iterable) { - return null; + public void close() { + disconnect(); } /** @@ -301,4 +364,35 @@ public class ZkMetaClient implements MetaClientInterface { return _listener.hashCode(); } } + + private static MetaClientException translateZkExceptionToMetaclientException(ZkException e) { + if (e instanceof ZkNodeExistsException) { + return new MetaClientNoNodeException(e); + } else if (e instanceof ZkBadVersionException) { + return new MetaClientBadVersionException(e); + } else if (e instanceof ZkTimeoutException) { + return new MetaClientTimeoutException(e); + } else if (e instanceof ZkInterruptedException) { + return new MetaClientInterruptException(e); + } else { + return new MetaClientException(e); + } + } + + private static EntryMode convertZkEntryMode(long ephemeralOwner) { + EphemeralType zkEphemeralType = EphemeralType.get(ephemeralOwner); + switch (zkEphemeralType) { + case VOID: + return EntryMode.PERSISTENT; + case CONTAINER: + return EntryMode.CONTAINER; + case NORMAL: + return EntryMode.EPHEMERAL; + // TODO: TTL is not supported now. + //case TTL: + // return EntryMode.TTL; + default: + throw new IllegalArgumentException(zkEphemeralType + " is not supported."); + } + } } diff --git a/meta-client/src/main/java/org/apache/helix/metaclient/impl/zk/factory/ZkMetaClientConfig.java b/meta-client/src/main/java/org/apache/helix/metaclient/impl/zk/factory/ZkMetaClientConfig.java index c4190fc84..d9292f846 100644 --- a/meta-client/src/main/java/org/apache/helix/metaclient/impl/zk/factory/ZkMetaClientConfig.java +++ b/meta-client/src/main/java/org/apache/helix/metaclient/impl/zk/factory/ZkMetaClientConfig.java @@ -128,7 +128,7 @@ public class ZkMetaClientConfig extends MetaClientConfig { } @Override - public MetaClientConfig build() { + public ZkMetaClientConfig build() { if (_zkSerializer == null) { _zkSerializer = new BasicZkSerializer(new SerializableSerializer()); } diff --git a/meta-client/src/test/java/org/apache/helix/metaclient/impl/zk/TestZkMetaClient.java b/meta-client/src/test/java/org/apache/helix/metaclient/impl/zk/TestZkMetaClient.java index 4c74fe847..0eab0315e 100644 --- a/meta-client/src/test/java/org/apache/helix/metaclient/impl/zk/TestZkMetaClient.java +++ b/meta-client/src/test/java/org/apache/helix/metaclient/impl/zk/TestZkMetaClient.java @@ -21,18 +21,31 @@ package org.apache.helix.metaclient.impl.zk; import java.io.File; import java.io.IOException; +import java.util.Arrays; +import java.util.HashSet; +import java.util.List; +import java.util.Set; + import org.apache.commons.io.FileUtils; +import org.apache.helix.metaclient.api.DataUpdater; +import org.apache.helix.metaclient.api.MetaClientInterface; +import org.apache.helix.metaclient.constants.MetaClientException; +import org.apache.helix.metaclient.impl.zk.factory.ZkMetaClientConfig; import org.apache.helix.zookeeper.zkclient.IDefaultNameSpace; import org.apache.helix.zookeeper.zkclient.ZkServer; import org.testng.Assert; +import org.testng.annotations.AfterClass; import org.testng.annotations.BeforeClass; import org.testng.annotations.Test; +import static org.apache.helix.metaclient.api.MetaClientInterface.EntryMode.PERSISTENT; + public class TestZkMetaClient { private static final String ZK_ADDR = "localhost:2183"; private ZkServer _zkServer; + private static final String ENTRY_STRING_VALUE = "test-value"; @BeforeClass public void prepare() { @@ -40,9 +53,139 @@ public class TestZkMetaClient { _zkServer = startZkServer(ZK_ADDR); } + @AfterClass + public void cleanUp() { + _zkServer.shutdown(); + } + @Test - public void dummyTest() { - Assert.assertEquals(1+1, 2); + public void testGet() { + final String key = "/TestZkMetaClient_testGet"; + try (ZkMetaClient<String> zkMetaClient = createZkMetaClient()) { + zkMetaClient.connect(); + String value; + zkMetaClient.create(key, ENTRY_STRING_VALUE); + String dataValue = zkMetaClient.get(key); + Assert.assertEquals(dataValue, ENTRY_STRING_VALUE); + + value = zkMetaClient.get(key + "/a/b/c"); + Assert.assertNull(value); + + zkMetaClient.delete(key); + + value = zkMetaClient.get(key); + Assert.assertNull(value); + } + } + + @Test + public void testSet() { + final String key = "/TestZkMetaClient_testSet"; + try (ZkMetaClient<String> zkMetaClient = createZkMetaClient()) { + zkMetaClient.connect(); + zkMetaClient.create(key, ENTRY_STRING_VALUE); + String testValueV1 = ENTRY_STRING_VALUE + "-v1"; + String testValueV2 = ENTRY_STRING_VALUE + "-v2"; + + // test set() with no expected version and validate result. + zkMetaClient.set(key, testValueV1, -1); + Assert.assertEquals(zkMetaClient.get(key), testValueV1); + MetaClientInterface.Stat entryStat = zkMetaClient.exists(key); + Assert.assertEquals(entryStat.getVersion(), 1); + Assert.assertEquals(entryStat.getEntryType().name(), PERSISTENT.name()); + + // test set() with expected version and validate result and new version number + zkMetaClient.set(key, testValueV2, 1); + entryStat = zkMetaClient.exists(key); + Assert.assertEquals(zkMetaClient.get(key), testValueV2); + Assert.assertEquals(entryStat.getVersion(), 2); + + // test set() with a wrong version + try { + zkMetaClient.set(key, "test-node-changed", 10); + Assert.fail("No reach."); + } catch (MetaClientException ex) { + Assert.assertEquals(ex.getClass().getName(), + "org.apache.helix.metaclient.constants.MetaClientBadVersionException"); + } + zkMetaClient.delete(key); + } + } + + @Test + public void testUpdate() { + final String key = "/TestZkMetaClient_testUpdate"; + ZkMetaClientConfig config = + new ZkMetaClientConfig.ZkMetaClientConfigBuilder().setConnectionAddress(ZK_ADDR).build(); + try (ZkMetaClient<Integer> zkMetaClient = new ZkMetaClient<>(config)) { + zkMetaClient.connect(); + int initValue = 3; + zkMetaClient.create(key, initValue); + MetaClientInterface.Stat entryStat = zkMetaClient.exists(key); + Assert.assertEquals(entryStat.getVersion(), 0); + + // test update() and validate entry value and version + Integer newData = zkMetaClient.update(key, new DataUpdater<Integer>() { + @Override + public Integer update(Integer currentData) { + return currentData + 1; + } + }); + Assert.assertEquals((int) newData, (int) initValue + 1); + + entryStat = zkMetaClient.exists(key); + Assert.assertEquals(entryStat.getVersion(), 1); + + newData = zkMetaClient.update(key, new DataUpdater<Integer>() { + + @Override + public Integer update(Integer currentData) { + return currentData + 1; + } + }); + + entryStat = zkMetaClient.exists(key); + Assert.assertEquals(entryStat.getVersion(), 2); + Assert.assertEquals((int) newData, (int) initValue + 2); + zkMetaClient.delete(key); + } + } + + @Test + public void testGetAndCountChildrenAndRecursiveDelete() { + final String key = "/TestZkMetaClient_testGetAndCountChildren"; + List<String> childrenNames = Arrays.asList("/c1", "/c2", "/c3"); + + // create child nodes and validate retrieved children count and names + try (ZkMetaClient<String> zkMetaClient = createZkMetaClient()) { + zkMetaClient.connect(); + zkMetaClient.create(key, ENTRY_STRING_VALUE); + Assert.assertEquals(zkMetaClient.countDirectChildren(key), 0); + for (String str : childrenNames) { + zkMetaClient.create(key + str, ENTRY_STRING_VALUE); + } + + List<String> retrievedChildrenNames = zkMetaClient.getDirectChildrenKeys(key); + Assert.assertEquals(retrievedChildrenNames.size(), childrenNames.size()); + Set<String> childrenNameSet = new HashSet<>(childrenNames); + for (String str : retrievedChildrenNames) { + Assert.assertTrue(childrenNameSet.contains("/" + str)); + } + + // recursive delete and validate + Assert.assertEquals(zkMetaClient.countDirectChildren(key), childrenNames.size()); + Assert.assertNotNull(zkMetaClient.exists(key)); + zkMetaClient.recursiveDelete(key); + Assert.assertNull(zkMetaClient.exists(key)); + } + } + + + + private static ZkMetaClient<String> createZkMetaClient() { + ZkMetaClientConfig config = + new ZkMetaClientConfig.ZkMetaClientConfigBuilder().setConnectionAddress(ZK_ADDR).build(); + return new ZkMetaClient<>(config); } private static ZkServer startZkServer(final String zkAddress) { @@ -62,8 +205,8 @@ public class TestZkMetaClient { }; int port = Integer.parseInt(zkAddress.substring(zkAddress.lastIndexOf(':') + 1)); - ZkServer zkServer = new ZkServer(dataDir, logDir, defaultNameSpace, port); System.out.println("Starting ZK server at " + zkAddress); + ZkServer zkServer = new ZkServer(dataDir, logDir, defaultNameSpace, port); zkServer.start(); return zkServer; }
