This is an automated email from the ASF dual-hosted git repository.
xyuanlu pushed a commit to branch metaclient
in repository https://gitbox.apache.org/repos/asf/helix.git
The following commit(s) were added to refs/heads/metaclient by this push:
new 34a74343f Meta client - implement CRUD for zkMetaClient
34a74343f is described below
commit 34a74343f93f4e67c59ef33f75684f288a27cb23
Author: xyuanlu <[email protected]>
AuthorDate: Thu Jan 12 09:39:21 2023 -0800
Meta client - implement CRUD for zkMetaClient
Add basic CRUD for ZkMetaClient and test
---
.../helix/metaclient/api/MetaClientInterface.java | 39 +++++-
.../constants/MetaClientBadVersionException.java | 20 +++
.../metaclient/constants/MetaClientException.java | 19 ++-
...tion.java => MetaClientInterruptException.java} | 17 ++-
...ception.java => MetaClientNoNodeException.java} | 20 ++-
...eption.java => MetaClientTimeoutException.java} | 18 ++-
.../helix/metaclient/impl/zk/ZkMetaClient.java | 146 ++++++++++++++++----
.../impl/zk/factory/ZkMetaClientConfig.java | 2 +-
.../helix/metaclient/impl/zk/TestZkMetaClient.java | 149 ++++++++++++++++++++-
9 files changed, 387 insertions(+), 43 deletions(-)
diff --git
a/meta-client/src/main/java/org/apache/helix/metaclient/api/MetaClientInterface.java
b/meta-client/src/main/java/org/apache/helix/metaclient/api/MetaClientInterface.java
index 5118f6359..b00993b97 100644
---
a/meta-client/src/main/java/org/apache/helix/metaclient/api/MetaClientInterface.java
+++
b/meta-client/src/main/java/org/apache/helix/metaclient/api/MetaClientInterface.java
@@ -22,6 +22,9 @@ package org.apache.helix.metaclient.api;
import java.util.List;
import java.util.concurrent.TimeUnit;
+import org.apache.helix.metaclient.constants.MetaClientInterruptException;
+import org.apache.helix.metaclient.constants.MetaClientTimeoutException;
+
public interface MetaClientInterface<T> {
@@ -34,9 +37,21 @@ public interface MetaClientInterface<T> {
// An ephemeral node cannot have sub entry.
PERSISTENT,
- // The node will not be automatically deleted when the last sub-entry of
the node is deleted.
+ // For metadata storage that has hierarchical key space (e.g. ZK), the
node will be
+ // automatically deleted at some point in the future if the last child of
the node is deleted.
+ // For metadata storage that has non-hierarchical key space (e.g. etcd),
the node will be
+ // automatically deleted at some point in the future if the last entry
that has the prefix
+ // is deleted.
// The node is an ephemeral node.
- CONTAINER
+ CONTAINER,
+
+ // For metadata storage that has hierarchical key space (e.g. ZK) If the
entry is not modified
+ // within the TTL and has no children it will become a candidate to be
deleted by the server
+ // at some point in the future.
+ // For metadata storage that has non-hierarchical key space (e.g. etcd) If
the entry is not modified
+ // within the TTL, it will become a candidate to be deleted by the server
at some point in the
+ // future.
+ TTL
}
enum ConnectState {
@@ -71,6 +86,11 @@ public interface MetaClientInterface<T> {
public int getVersion() {
return _version;
}
+
+ public Stat (EntryMode mode, int version) {
+ _version = version;
+ _entryMode = mode;
+ }
}
//synced CRUD API
@@ -143,7 +163,7 @@ public interface MetaClientInterface<T> {
* @eturn Return a list of children keys. Return direct child name only for
hierarchical key
* space, return the whole sub key for non-hierarchical key space.
*/
- List<String> getDirestChildrenKeys(final String key);
+ List<String> getDirectChildrenKeys(final String key);
/**
* Return the number of children for the given keys.
@@ -152,7 +172,7 @@ public interface MetaClientInterface<T> {
* For metadata storage that has non-hierarchical key space (e.g.
etcd), the key would
* be a prefix key.
*/
- int countDirestChildren(final String key);
+ int countDirectChildren(final String key);
/**
* Remove the entry associated with the given key.
@@ -332,9 +352,14 @@ public interface MetaClientInterface<T> {
/**
* Maintains a connection with underlying metadata service based on config
params. Connection
* created by this method will be used to perform CRUD operations on
metadata service.
- * @return True if connection is successfully established.
- */
- boolean connect();
+ * @throws MetaClientInterruptException
+ * if the connection timed out due to thread interruption
+ * @throws MetaClientTimeoutException
+ * if the connection timed out
+ * @throws IllegalStateException
+ * if already connected or the connection is already closed
explicitly
+ */
+ void connect();
/**
* Disconnect from server explicitly.
diff --git
a/meta-client/src/main/java/org/apache/helix/metaclient/constants/MetaClientBadVersionException.java
b/meta-client/src/main/java/org/apache/helix/metaclient/constants/MetaClientBadVersionException.java
new file mode 100644
index 000000000..4a7f4a0b2
--- /dev/null
+++
b/meta-client/src/main/java/org/apache/helix/metaclient/constants/MetaClientBadVersionException.java
@@ -0,0 +1,20 @@
+package org.apache.helix.metaclient.constants;
+
+public final class MetaClientBadVersionException extends MetaClientException {
+ public MetaClientBadVersionException() {
+ super();
+ }
+
+ public MetaClientBadVersionException(String message, Throwable cause) {
+ super(message, cause);
+ }
+
+ public MetaClientBadVersionException(String message) {
+ super(message);
+ }
+
+ public MetaClientBadVersionException(Throwable cause) {
+ super(cause);
+ }
+
+}
diff --git
a/meta-client/src/main/java/org/apache/helix/metaclient/constants/MetaClientException.java
b/meta-client/src/main/java/org/apache/helix/metaclient/constants/MetaClientException.java
index cf8a07c56..5ace636e3 100644
---
a/meta-client/src/main/java/org/apache/helix/metaclient/constants/MetaClientException.java
+++
b/meta-client/src/main/java/org/apache/helix/metaclient/constants/MetaClientException.java
@@ -19,5 +19,20 @@ package org.apache.helix.metaclient.constants;
* under the License.
*/
-public final class MetaClientException {
-}
+public class MetaClientException extends RuntimeException {
+ public MetaClientException() {
+ super();
+ }
+
+ public MetaClientException(String message, Throwable cause) {
+ super(message, cause);
+ }
+
+ public MetaClientException(String message) {
+ super(message);
+ }
+
+ public MetaClientException(Throwable cause) {
+ super(cause);
+ }
+}
\ No newline at end of file
diff --git
a/meta-client/src/main/java/org/apache/helix/metaclient/constants/MetaClientException.java
b/meta-client/src/main/java/org/apache/helix/metaclient/constants/MetaClientInterruptException.java
similarity index 68%
copy from
meta-client/src/main/java/org/apache/helix/metaclient/constants/MetaClientException.java
copy to
meta-client/src/main/java/org/apache/helix/metaclient/constants/MetaClientInterruptException.java
index cf8a07c56..3f52e3ad9 100644
---
a/meta-client/src/main/java/org/apache/helix/metaclient/constants/MetaClientException.java
+++
b/meta-client/src/main/java/org/apache/helix/metaclient/constants/MetaClientInterruptException.java
@@ -19,5 +19,20 @@ package org.apache.helix.metaclient.constants;
* under the License.
*/
-public final class MetaClientException {
+public final class MetaClientInterruptException extends MetaClientException {
+ public MetaClientInterruptException() {
+ super();
+ }
+
+ public MetaClientInterruptException(String message, Throwable cause) {
+ super(message, cause);
+ }
+
+ public MetaClientInterruptException(String message) {
+ super(message);
+ }
+
+ public MetaClientInterruptException(Throwable cause) {
+ super(cause);
+ }
}
diff --git
a/meta-client/src/main/java/org/apache/helix/metaclient/constants/MetaClientException.java
b/meta-client/src/main/java/org/apache/helix/metaclient/constants/MetaClientNoNodeException.java
similarity index 68%
copy from
meta-client/src/main/java/org/apache/helix/metaclient/constants/MetaClientException.java
copy to
meta-client/src/main/java/org/apache/helix/metaclient/constants/MetaClientNoNodeException.java
index cf8a07c56..b9cfb4b9f 100644
---
a/meta-client/src/main/java/org/apache/helix/metaclient/constants/MetaClientException.java
+++
b/meta-client/src/main/java/org/apache/helix/metaclient/constants/MetaClientNoNodeException.java
@@ -19,5 +19,21 @@ package org.apache.helix.metaclient.constants;
* under the License.
*/
-public final class MetaClientException {
-}
+public final class MetaClientNoNodeException extends MetaClientException {
+ public MetaClientNoNodeException() {
+ super();
+ }
+
+ public MetaClientNoNodeException(String message, Throwable cause) {
+ super(message, cause);
+ }
+
+ public MetaClientNoNodeException(String message) {
+ super(message);
+ }
+
+ public MetaClientNoNodeException(Throwable cause) {
+ super(cause);
+ }
+
+}
\ No newline at end of file
diff --git
a/meta-client/src/main/java/org/apache/helix/metaclient/constants/MetaClientException.java
b/meta-client/src/main/java/org/apache/helix/metaclient/constants/MetaClientTimeoutException.java
similarity index 68%
copy from
meta-client/src/main/java/org/apache/helix/metaclient/constants/MetaClientException.java
copy to
meta-client/src/main/java/org/apache/helix/metaclient/constants/MetaClientTimeoutException.java
index cf8a07c56..6f1c381a4 100644
---
a/meta-client/src/main/java/org/apache/helix/metaclient/constants/MetaClientException.java
+++
b/meta-client/src/main/java/org/apache/helix/metaclient/constants/MetaClientTimeoutException.java
@@ -19,5 +19,21 @@ package org.apache.helix.metaclient.constants;
* under the License.
*/
-public final class MetaClientException {
+public final class MetaClientTimeoutException extends MetaClientException {
+ public MetaClientTimeoutException() {
+ super();
+ }
+
+ public MetaClientTimeoutException(String message, Throwable cause) {
+ super(message, cause);
+ }
+
+ public MetaClientTimeoutException(String message) {
+ super(message);
+ }
+
+ public MetaClientTimeoutException(Throwable cause) {
+ super(cause);
+ }
+
}
diff --git
a/meta-client/src/main/java/org/apache/helix/metaclient/impl/zk/ZkMetaClient.java
b/meta-client/src/main/java/org/apache/helix/metaclient/impl/zk/ZkMetaClient.java
index b3270ad81..35529454a 100644
---
a/meta-client/src/main/java/org/apache/helix/metaclient/impl/zk/ZkMetaClient.java
+++
b/meta-client/src/main/java/org/apache/helix/metaclient/impl/zk/ZkMetaClient.java
@@ -30,76 +30,133 @@ import org.apache.helix.metaclient.api.DataUpdater;
import org.apache.helix.metaclient.api.DirectChildChangeListener;
import org.apache.helix.metaclient.api.DirectChildSubscribeResult;
import org.apache.helix.metaclient.api.MetaClientInterface;
+import org.apache.helix.metaclient.api.Op;
import org.apache.helix.metaclient.api.OpResult;
+import org.apache.helix.metaclient.constants.MetaClientBadVersionException;
+import org.apache.helix.metaclient.constants.MetaClientException;
+import org.apache.helix.metaclient.constants.MetaClientInterruptException;
+import org.apache.helix.metaclient.constants.MetaClientNoNodeException;
+import org.apache.helix.metaclient.constants.MetaClientTimeoutException;
import org.apache.helix.metaclient.impl.zk.factory.ZkMetaClientConfig;
import org.apache.helix.zookeeper.impl.client.ZkClient;
import org.apache.helix.zookeeper.zkclient.IZkDataListener;
import org.apache.helix.zookeeper.zkclient.ZkConnection;
+import org.apache.helix.zookeeper.zkclient.exception.ZkBadVersionException;
+import org.apache.helix.zookeeper.zkclient.exception.ZkException;
+import org.apache.helix.zookeeper.zkclient.exception.ZkInterruptedException;
+import org.apache.helix.zookeeper.zkclient.exception.ZkNodeExistsException;
+import org.apache.helix.zookeeper.zkclient.exception.ZkTimeoutException;
+import org.apache.zookeeper.CreateMode;
import org.apache.zookeeper.Watcher;
+import org.apache.zookeeper.ZooDefs;
+import org.apache.zookeeper.server.EphemeralType;
-public class ZkMetaClient implements MetaClientInterface {
+public class ZkMetaClient<T> implements MetaClientInterface<T>, AutoCloseable {
private final ZkClient _zkClient;
+ private final int _connectionTimeout;
public ZkMetaClient(ZkMetaClientConfig config) {
- _zkClient = new ZkClient(new ZkConnection(config.getConnectionAddress(),
- (int) config.getSessionTimeoutInMillis()),
- (int) config.getConnectionInitTimeoutInMillis(), -1
/*operationRetryTimeout*/,
- config.getZkSerializer(), config.getMonitorType(),
config.getMonitorKey(),
- config.getMonitorInstanceName(), config.getMonitorRootPathOnly());
+ _connectionTimeout = (int) config.getConnectionInitTimeoutInMillis();
+ _zkClient = new ZkClient(
+ new ZkConnection(config.getConnectionAddress(), (int)
config.getSessionTimeoutInMillis()),
+ _connectionTimeout, -1 /*operationRetryTimeout*/,
config.getZkSerializer(),
+ config.getMonitorType(), config.getMonitorKey(),
config.getMonitorInstanceName(),
+ config.getMonitorRootPathOnly(), false);
}
@Override
- public void create(String key, Object data) {
-
+ public void create(String key, T data) {
+ // TODO: This function is implemented only for test. It does not have
proper error handling
+ _zkClient.create(key, data, ZooDefs.Ids.OPEN_ACL_UNSAFE,
CreateMode.PERSISTENT);
}
@Override
- public void create(String key, Object data, EntryMode mode) {
+ public void create(String key, T data, EntryMode mode) {
}
@Override
- public void set(String key, Object data, int version) {
-
+ public void set(String key, T data, int version) {
+ try {
+ _zkClient.writeData(key, data, version);
+ } catch (ZkException e) {
+ throw translateZkExceptionToMetaclientException(e);
+ }
}
@Override
- public Object update(String key, DataUpdater updater) {
- return null;
+ public T update( String key, DataUpdater<T> updater) {
+ org.apache.zookeeper.data.Stat stat = new org.apache.zookeeper.data.Stat();
+ // TODO: add retry logic for ZkBadVersionException.
+ try {
+ T oldData = _zkClient.readData(key, stat);
+ T newData = updater.update(oldData);
+ set(key, newData, stat.getVersion());
+ return newData;
+ } catch (ZkException e) {
+ throw translateZkExceptionToMetaclientException(e);
+ }
}
@Override
public Stat exists(String key) {
- return null;
+ org.apache.zookeeper.data.Stat zkStats;
+ try {
+ zkStats = _zkClient.getStat(key);
+ if (zkStats == null) {
+ return null;
+ }
+ return new Stat(convertZkEntryMode(zkStats.getEphemeralOwner()),
zkStats.getVersion());
+ } catch (ZkException e) {
+ throw translateZkExceptionToMetaclientException(e);
+ }
}
@Override
- public Object get(String key) {
- return null;
+ public T get(String key) {
+ return _zkClient.readData(key, true);
}
@Override
- public List<String> getDirestChildrenKeys(String key) {
+ public List<OpResult> transactionOP(Iterable<Op> ops) {
return null;
}
@Override
- public int countDirestChildren(String key) {
- return 0;
+ public List<String> getDirectChildrenKeys(String key) {
+ try {
+ return _zkClient.getChildren(key);
+ } catch (ZkException e) {
+ throw translateZkExceptionToMetaclientException(e);
+ }
+ }
+
+ @Override
+ public int countDirectChildren(String key) {
+ return _zkClient.countChildren(key);
}
@Override
public boolean delete(String key) {
- return false;
+ try {
+ return _zkClient.delete(key);
+ } catch (ZkException e) {
+ throw translateZkExceptionToMetaclientException(e);
+ }
}
@Override
public boolean recursiveDelete(String key) {
- return false;
+ _zkClient.deleteRecursively(key);
+ return true;
}
+ // Zookeeper execute async callbacks at zookeeper server side. In our first
version of
+ // implementation, we will keep this behavior.
+ // In later version, we may consider creating a thread pool to execute
registered callbacks.
+ // However, this will change metaclient from stateless to stateful.
@Override
public void setAsyncExecPoolSize(int poolSize) {
@@ -156,13 +213,19 @@ public class ZkMetaClient implements MetaClientInterface {
}
@Override
- public boolean connect() {
- return false;
+ public void connect() {
+ // TODO: throws IllegalStateException when already connected
+ try {
+ _zkClient.connect(_connectionTimeout, _zkClient);
+ } catch (ZkException e) {
+ throw translateZkExceptionToMetaclientException(e);
+ }
}
@Override
public void disconnect() {
-
+ // TODO: This is a temp impl for test only. no proper interrupt handling
and error handling.
+ _zkClient.close();
}
@Override
@@ -246,8 +309,8 @@ public class ZkMetaClient implements MetaClientInterface {
}
@Override
- public List<OpResult> transactionOP(Iterable iterable) {
- return null;
+ public void close() {
+ disconnect();
}
/**
@@ -301,4 +364,35 @@ public class ZkMetaClient implements MetaClientInterface {
return _listener.hashCode();
}
}
+
+ private static MetaClientException
translateZkExceptionToMetaclientException(ZkException e) {
+ if (e instanceof ZkNodeExistsException) {
+ return new MetaClientNoNodeException(e);
+ } else if (e instanceof ZkBadVersionException) {
+ return new MetaClientBadVersionException(e);
+ } else if (e instanceof ZkTimeoutException) {
+ return new MetaClientTimeoutException(e);
+ } else if (e instanceof ZkInterruptedException) {
+ return new MetaClientInterruptException(e);
+ } else {
+ return new MetaClientException(e);
+ }
+ }
+
+ private static EntryMode convertZkEntryMode(long ephemeralOwner) {
+ EphemeralType zkEphemeralType = EphemeralType.get(ephemeralOwner);
+ switch (zkEphemeralType) {
+ case VOID:
+ return EntryMode.PERSISTENT;
+ case CONTAINER:
+ return EntryMode.CONTAINER;
+ case NORMAL:
+ return EntryMode.EPHEMERAL;
+ // TODO: TTL is not supported now.
+ //case TTL:
+ // return EntryMode.TTL;
+ default:
+ throw new IllegalArgumentException(zkEphemeralType + " is not
supported.");
+ }
+ }
}
diff --git
a/meta-client/src/main/java/org/apache/helix/metaclient/impl/zk/factory/ZkMetaClientConfig.java
b/meta-client/src/main/java/org/apache/helix/metaclient/impl/zk/factory/ZkMetaClientConfig.java
index c4190fc84..d9292f846 100644
---
a/meta-client/src/main/java/org/apache/helix/metaclient/impl/zk/factory/ZkMetaClientConfig.java
+++
b/meta-client/src/main/java/org/apache/helix/metaclient/impl/zk/factory/ZkMetaClientConfig.java
@@ -128,7 +128,7 @@ public class ZkMetaClientConfig extends MetaClientConfig {
}
@Override
- public MetaClientConfig build() {
+ public ZkMetaClientConfig build() {
if (_zkSerializer == null) {
_zkSerializer = new BasicZkSerializer(new SerializableSerializer());
}
diff --git
a/meta-client/src/test/java/org/apache/helix/metaclient/impl/zk/TestZkMetaClient.java
b/meta-client/src/test/java/org/apache/helix/metaclient/impl/zk/TestZkMetaClient.java
index 4c74fe847..0eab0315e 100644
---
a/meta-client/src/test/java/org/apache/helix/metaclient/impl/zk/TestZkMetaClient.java
+++
b/meta-client/src/test/java/org/apache/helix/metaclient/impl/zk/TestZkMetaClient.java
@@ -21,18 +21,31 @@ package org.apache.helix.metaclient.impl.zk;
import java.io.File;
import java.io.IOException;
+import java.util.Arrays;
+import java.util.HashSet;
+import java.util.List;
+import java.util.Set;
+
import org.apache.commons.io.FileUtils;
+import org.apache.helix.metaclient.api.DataUpdater;
+import org.apache.helix.metaclient.api.MetaClientInterface;
+import org.apache.helix.metaclient.constants.MetaClientException;
+import org.apache.helix.metaclient.impl.zk.factory.ZkMetaClientConfig;
import org.apache.helix.zookeeper.zkclient.IDefaultNameSpace;
import org.apache.helix.zookeeper.zkclient.ZkServer;
import org.testng.Assert;
+import org.testng.annotations.AfterClass;
import org.testng.annotations.BeforeClass;
import org.testng.annotations.Test;
+import static
org.apache.helix.metaclient.api.MetaClientInterface.EntryMode.PERSISTENT;
+
public class TestZkMetaClient {
private static final String ZK_ADDR = "localhost:2183";
private ZkServer _zkServer;
+ private static final String ENTRY_STRING_VALUE = "test-value";
@BeforeClass
public void prepare() {
@@ -40,9 +53,139 @@ public class TestZkMetaClient {
_zkServer = startZkServer(ZK_ADDR);
}
+ @AfterClass
+ public void cleanUp() {
+ _zkServer.shutdown();
+ }
+
@Test
- public void dummyTest() {
- Assert.assertEquals(1+1, 2);
+ public void testGet() {
+ final String key = "/TestZkMetaClient_testGet";
+ try (ZkMetaClient<String> zkMetaClient = createZkMetaClient()) {
+ zkMetaClient.connect();
+ String value;
+ zkMetaClient.create(key, ENTRY_STRING_VALUE);
+ String dataValue = zkMetaClient.get(key);
+ Assert.assertEquals(dataValue, ENTRY_STRING_VALUE);
+
+ value = zkMetaClient.get(key + "/a/b/c");
+ Assert.assertNull(value);
+
+ zkMetaClient.delete(key);
+
+ value = zkMetaClient.get(key);
+ Assert.assertNull(value);
+ }
+ }
+
+ @Test
+ public void testSet() {
+ final String key = "/TestZkMetaClient_testSet";
+ try (ZkMetaClient<String> zkMetaClient = createZkMetaClient()) {
+ zkMetaClient.connect();
+ zkMetaClient.create(key, ENTRY_STRING_VALUE);
+ String testValueV1 = ENTRY_STRING_VALUE + "-v1";
+ String testValueV2 = ENTRY_STRING_VALUE + "-v2";
+
+ // test set() with no expected version and validate result.
+ zkMetaClient.set(key, testValueV1, -1);
+ Assert.assertEquals(zkMetaClient.get(key), testValueV1);
+ MetaClientInterface.Stat entryStat = zkMetaClient.exists(key);
+ Assert.assertEquals(entryStat.getVersion(), 1);
+ Assert.assertEquals(entryStat.getEntryType().name(), PERSISTENT.name());
+
+ // test set() with expected version and validate result and new version
number
+ zkMetaClient.set(key, testValueV2, 1);
+ entryStat = zkMetaClient.exists(key);
+ Assert.assertEquals(zkMetaClient.get(key), testValueV2);
+ Assert.assertEquals(entryStat.getVersion(), 2);
+
+ // test set() with a wrong version
+ try {
+ zkMetaClient.set(key, "test-node-changed", 10);
+ Assert.fail("No reach.");
+ } catch (MetaClientException ex) {
+ Assert.assertEquals(ex.getClass().getName(),
+
"org.apache.helix.metaclient.constants.MetaClientBadVersionException");
+ }
+ zkMetaClient.delete(key);
+ }
+ }
+
+ @Test
+ public void testUpdate() {
+ final String key = "/TestZkMetaClient_testUpdate";
+ ZkMetaClientConfig config =
+ new
ZkMetaClientConfig.ZkMetaClientConfigBuilder().setConnectionAddress(ZK_ADDR).build();
+ try (ZkMetaClient<Integer> zkMetaClient = new ZkMetaClient<>(config)) {
+ zkMetaClient.connect();
+ int initValue = 3;
+ zkMetaClient.create(key, initValue);
+ MetaClientInterface.Stat entryStat = zkMetaClient.exists(key);
+ Assert.assertEquals(entryStat.getVersion(), 0);
+
+ // test update() and validate entry value and version
+ Integer newData = zkMetaClient.update(key, new DataUpdater<Integer>() {
+ @Override
+ public Integer update(Integer currentData) {
+ return currentData + 1;
+ }
+ });
+ Assert.assertEquals((int) newData, (int) initValue + 1);
+
+ entryStat = zkMetaClient.exists(key);
+ Assert.assertEquals(entryStat.getVersion(), 1);
+
+ newData = zkMetaClient.update(key, new DataUpdater<Integer>() {
+
+ @Override
+ public Integer update(Integer currentData) {
+ return currentData + 1;
+ }
+ });
+
+ entryStat = zkMetaClient.exists(key);
+ Assert.assertEquals(entryStat.getVersion(), 2);
+ Assert.assertEquals((int) newData, (int) initValue + 2);
+ zkMetaClient.delete(key);
+ }
+ }
+
+ @Test
+ public void testGetAndCountChildrenAndRecursiveDelete() {
+ final String key = "/TestZkMetaClient_testGetAndCountChildren";
+ List<String> childrenNames = Arrays.asList("/c1", "/c2", "/c3");
+
+ // create child nodes and validate retrieved children count and names
+ try (ZkMetaClient<String> zkMetaClient = createZkMetaClient()) {
+ zkMetaClient.connect();
+ zkMetaClient.create(key, ENTRY_STRING_VALUE);
+ Assert.assertEquals(zkMetaClient.countDirectChildren(key), 0);
+ for (String str : childrenNames) {
+ zkMetaClient.create(key + str, ENTRY_STRING_VALUE);
+ }
+
+ List<String> retrievedChildrenNames =
zkMetaClient.getDirectChildrenKeys(key);
+ Assert.assertEquals(retrievedChildrenNames.size(), childrenNames.size());
+ Set<String> childrenNameSet = new HashSet<>(childrenNames);
+ for (String str : retrievedChildrenNames) {
+ Assert.assertTrue(childrenNameSet.contains("/" + str));
+ }
+
+ // recursive delete and validate
+ Assert.assertEquals(zkMetaClient.countDirectChildren(key),
childrenNames.size());
+ Assert.assertNotNull(zkMetaClient.exists(key));
+ zkMetaClient.recursiveDelete(key);
+ Assert.assertNull(zkMetaClient.exists(key));
+ }
+ }
+
+
+
+ private static ZkMetaClient<String> createZkMetaClient() {
+ ZkMetaClientConfig config =
+ new
ZkMetaClientConfig.ZkMetaClientConfigBuilder().setConnectionAddress(ZK_ADDR).build();
+ return new ZkMetaClient<>(config);
}
private static ZkServer startZkServer(final String zkAddress) {
@@ -62,8 +205,8 @@ public class TestZkMetaClient {
};
int port = Integer.parseInt(zkAddress.substring(zkAddress.lastIndexOf(':')
+ 1));
- ZkServer zkServer = new ZkServer(dataDir, logDir, defaultNameSpace, port);
System.out.println("Starting ZK server at " + zkAddress);
+ ZkServer zkServer = new ZkServer(dataDir, logDir, defaultNameSpace, port);
zkServer.start();
return zkServer;
}