This is an automated email from the ASF dual-hosted git repository.

xyuanlu pushed a commit to branch metaclient
in repository https://gitbox.apache.org/repos/asf/helix.git

commit 6c3070fc11324898de0d4c2e16e206006f022e2e
Author: xyuanlu <[email protected]>
AuthorDate: Thu Jan 12 09:39:21 2023 -0800

    Meta client - implement CRUD for zkMetaClient
    
    Add basic CRUD for ZkMetaClient and test
---
 .../helix/metaclient/api/MetaClientInterface.java  |  39 +++++-
 .../constants/MetaClientBadVersionException.java   |  20 +++
 .../metaclient/constants/MetaClientException.java  |  19 ++-
 ...tion.java => MetaClientInterruptException.java} |  17 ++-
 ...ception.java => MetaClientNoNodeException.java} |  20 ++-
 ...eption.java => MetaClientTimeoutException.java} |  18 ++-
 .../helix/metaclient/impl/zk/ZkMetaClient.java     | 146 ++++++++++++++++----
 .../impl/zk/factory/ZkMetaClientConfig.java        |   2 +-
 .../helix/metaclient/impl/zk/TestZkMetaClient.java | 149 ++++++++++++++++++++-
 9 files changed, 387 insertions(+), 43 deletions(-)

diff --git 
a/meta-client/src/main/java/org/apache/helix/metaclient/api/MetaClientInterface.java
 
b/meta-client/src/main/java/org/apache/helix/metaclient/api/MetaClientInterface.java
index 5118f6359..b00993b97 100644
--- 
a/meta-client/src/main/java/org/apache/helix/metaclient/api/MetaClientInterface.java
+++ 
b/meta-client/src/main/java/org/apache/helix/metaclient/api/MetaClientInterface.java
@@ -22,6 +22,9 @@ package org.apache.helix.metaclient.api;
 import java.util.List;
 import java.util.concurrent.TimeUnit;
 
+import org.apache.helix.metaclient.constants.MetaClientInterruptException;
+import org.apache.helix.metaclient.constants.MetaClientTimeoutException;
+
 
 public interface MetaClientInterface<T> {
 
@@ -34,9 +37,21 @@ public interface MetaClientInterface<T> {
     // An ephemeral node cannot have sub entry.
     PERSISTENT,
 
-    // The node will not be automatically deleted when the last sub-entry of 
the node is deleted.
+    // For metadata storage that has hierarchical key space (e.g. ZK), the 
node will be
+    // automatically deleted at some point in the future if the last child of 
the node is deleted.
+    // For metadata storage that has non-hierarchical key space (e.g. etcd), 
the node will be
+    // automatically deleted at some point in the future if the last entry 
that has the prefix
+    // is deleted.
     // The node is an ephemeral node.
-    CONTAINER
+    CONTAINER,
+
+    // For metadata storage that has hierarchical key space (e.g. ZK) If the 
entry is not modified
+    // within the TTL and has no children it will become a candidate to be 
deleted by the server
+    // at some point in the future.
+    // For metadata storage that has non-hierarchical key space (e.g. etcd) If 
the entry is not modified
+    // within the TTL, it will become a candidate to be deleted by the server 
at some point in the
+    // future.
+    TTL
   }
 
   enum ConnectState {
@@ -71,6 +86,11 @@ public interface MetaClientInterface<T> {
     public int getVersion() {
       return _version;
     }
+
+    public Stat (EntryMode mode, int version) {
+      _version = version;
+      _entryMode = mode;
+    }
   }
 
   //synced CRUD API
@@ -143,7 +163,7 @@ public interface MetaClientInterface<T> {
    * @eturn Return a list of children keys. Return direct child name only for 
hierarchical key
    *        space, return the whole sub key for non-hierarchical key space.
    */
-  List<String> getDirestChildrenKeys(final String key);
+  List<String> getDirectChildrenKeys(final String key);
 
   /**
    * Return the number of children for the given keys.
@@ -152,7 +172,7 @@ public interface MetaClientInterface<T> {
    *            For metadata storage that has non-hierarchical key space (e.g. 
etcd), the key would
    *            be a prefix key.
    */
-  int countDirestChildren(final String key);
+  int countDirectChildren(final String key);
 
   /**
    * Remove the entry associated with the given key.
@@ -332,9 +352,14 @@ public interface MetaClientInterface<T> {
   /**
    * Maintains a connection with underlying metadata service based on config 
params. Connection
    * created by this method will be used to perform CRUD operations on 
metadata service.
-   * @return True if connection is successfully established.
-   */
-  boolean connect();
+   * @throws MetaClientInterruptException
+   *          if the connection timed out due to thread interruption
+   * @throws MetaClientTimeoutException
+   *          if the connection timed out
+   * @throws IllegalStateException
+   *         if already connected or the connection is already closed 
explicitly
+   */
+  void connect();
 
   /**
    * Disconnect from server explicitly.
diff --git 
a/meta-client/src/main/java/org/apache/helix/metaclient/constants/MetaClientBadVersionException.java
 
b/meta-client/src/main/java/org/apache/helix/metaclient/constants/MetaClientBadVersionException.java
new file mode 100644
index 000000000..4a7f4a0b2
--- /dev/null
+++ 
b/meta-client/src/main/java/org/apache/helix/metaclient/constants/MetaClientBadVersionException.java
@@ -0,0 +1,20 @@
+package org.apache.helix.metaclient.constants;
+
+public final class MetaClientBadVersionException extends MetaClientException {
+  public MetaClientBadVersionException() {
+    super();
+  }
+
+  public MetaClientBadVersionException(String message, Throwable cause) {
+    super(message, cause);
+  }
+
+  public MetaClientBadVersionException(String message) {
+    super(message);
+  }
+
+  public MetaClientBadVersionException(Throwable cause) {
+    super(cause);
+  }
+
+}
diff --git 
a/meta-client/src/main/java/org/apache/helix/metaclient/constants/MetaClientException.java
 
b/meta-client/src/main/java/org/apache/helix/metaclient/constants/MetaClientException.java
index cf8a07c56..5ace636e3 100644
--- 
a/meta-client/src/main/java/org/apache/helix/metaclient/constants/MetaClientException.java
+++ 
b/meta-client/src/main/java/org/apache/helix/metaclient/constants/MetaClientException.java
@@ -19,5 +19,20 @@ package org.apache.helix.metaclient.constants;
  * under the License.
  */
 
-public final class MetaClientException {
-}
+public class MetaClientException extends RuntimeException {
+  public MetaClientException() {
+    super();
+  }
+
+  public MetaClientException(String message, Throwable cause) {
+    super(message, cause);
+  }
+
+  public MetaClientException(String message) {
+    super(message);
+  }
+
+  public MetaClientException(Throwable cause) {
+    super(cause);
+  }
+}
\ No newline at end of file
diff --git 
a/meta-client/src/main/java/org/apache/helix/metaclient/constants/MetaClientException.java
 
b/meta-client/src/main/java/org/apache/helix/metaclient/constants/MetaClientInterruptException.java
similarity index 68%
copy from 
meta-client/src/main/java/org/apache/helix/metaclient/constants/MetaClientException.java
copy to 
meta-client/src/main/java/org/apache/helix/metaclient/constants/MetaClientInterruptException.java
index cf8a07c56..3f52e3ad9 100644
--- 
a/meta-client/src/main/java/org/apache/helix/metaclient/constants/MetaClientException.java
+++ 
b/meta-client/src/main/java/org/apache/helix/metaclient/constants/MetaClientInterruptException.java
@@ -19,5 +19,20 @@ package org.apache.helix.metaclient.constants;
  * under the License.
  */
 
-public final class MetaClientException {
+public final class MetaClientInterruptException extends MetaClientException {
+  public MetaClientInterruptException() {
+    super();
+  }
+
+  public MetaClientInterruptException(String message, Throwable cause) {
+    super(message, cause);
+  }
+
+  public MetaClientInterruptException(String message) {
+    super(message);
+  }
+
+  public MetaClientInterruptException(Throwable cause) {
+    super(cause);
+  }
 }
diff --git 
a/meta-client/src/main/java/org/apache/helix/metaclient/constants/MetaClientException.java
 
b/meta-client/src/main/java/org/apache/helix/metaclient/constants/MetaClientNoNodeException.java
similarity index 68%
copy from 
meta-client/src/main/java/org/apache/helix/metaclient/constants/MetaClientException.java
copy to 
meta-client/src/main/java/org/apache/helix/metaclient/constants/MetaClientNoNodeException.java
index cf8a07c56..b9cfb4b9f 100644
--- 
a/meta-client/src/main/java/org/apache/helix/metaclient/constants/MetaClientException.java
+++ 
b/meta-client/src/main/java/org/apache/helix/metaclient/constants/MetaClientNoNodeException.java
@@ -19,5 +19,21 @@ package org.apache.helix.metaclient.constants;
  * under the License.
  */
 
-public final class MetaClientException {
-}
+public final class MetaClientNoNodeException extends MetaClientException {
+  public MetaClientNoNodeException() {
+    super();
+  }
+
+  public MetaClientNoNodeException(String message, Throwable cause) {
+    super(message, cause);
+  }
+
+  public MetaClientNoNodeException(String message) {
+    super(message);
+  }
+
+  public MetaClientNoNodeException(Throwable cause) {
+    super(cause);
+  }
+
+}
\ No newline at end of file
diff --git 
a/meta-client/src/main/java/org/apache/helix/metaclient/constants/MetaClientException.java
 
b/meta-client/src/main/java/org/apache/helix/metaclient/constants/MetaClientTimeoutException.java
similarity index 68%
copy from 
meta-client/src/main/java/org/apache/helix/metaclient/constants/MetaClientException.java
copy to 
meta-client/src/main/java/org/apache/helix/metaclient/constants/MetaClientTimeoutException.java
index cf8a07c56..6f1c381a4 100644
--- 
a/meta-client/src/main/java/org/apache/helix/metaclient/constants/MetaClientException.java
+++ 
b/meta-client/src/main/java/org/apache/helix/metaclient/constants/MetaClientTimeoutException.java
@@ -19,5 +19,21 @@ package org.apache.helix.metaclient.constants;
  * under the License.
  */
 
-public final class MetaClientException {
+public final class MetaClientTimeoutException extends MetaClientException {
+  public MetaClientTimeoutException() {
+    super();
+  }
+
+  public MetaClientTimeoutException(String message, Throwable cause) {
+    super(message, cause);
+  }
+
+  public MetaClientTimeoutException(String message) {
+    super(message);
+  }
+
+  public MetaClientTimeoutException(Throwable cause) {
+    super(cause);
+  }
+
 }
diff --git 
a/meta-client/src/main/java/org/apache/helix/metaclient/impl/zk/ZkMetaClient.java
 
b/meta-client/src/main/java/org/apache/helix/metaclient/impl/zk/ZkMetaClient.java
index b3270ad81..35529454a 100644
--- 
a/meta-client/src/main/java/org/apache/helix/metaclient/impl/zk/ZkMetaClient.java
+++ 
b/meta-client/src/main/java/org/apache/helix/metaclient/impl/zk/ZkMetaClient.java
@@ -30,76 +30,133 @@ import org.apache.helix.metaclient.api.DataUpdater;
 import org.apache.helix.metaclient.api.DirectChildChangeListener;
 import org.apache.helix.metaclient.api.DirectChildSubscribeResult;
 import org.apache.helix.metaclient.api.MetaClientInterface;
+import org.apache.helix.metaclient.api.Op;
 import org.apache.helix.metaclient.api.OpResult;
+import org.apache.helix.metaclient.constants.MetaClientBadVersionException;
+import org.apache.helix.metaclient.constants.MetaClientException;
+import org.apache.helix.metaclient.constants.MetaClientInterruptException;
+import org.apache.helix.metaclient.constants.MetaClientNoNodeException;
+import org.apache.helix.metaclient.constants.MetaClientTimeoutException;
 import org.apache.helix.metaclient.impl.zk.factory.ZkMetaClientConfig;
 import org.apache.helix.zookeeper.impl.client.ZkClient;
 import org.apache.helix.zookeeper.zkclient.IZkDataListener;
 import org.apache.helix.zookeeper.zkclient.ZkConnection;
+import org.apache.helix.zookeeper.zkclient.exception.ZkBadVersionException;
+import org.apache.helix.zookeeper.zkclient.exception.ZkException;
+import org.apache.helix.zookeeper.zkclient.exception.ZkInterruptedException;
+import org.apache.helix.zookeeper.zkclient.exception.ZkNodeExistsException;
+import org.apache.helix.zookeeper.zkclient.exception.ZkTimeoutException;
+import org.apache.zookeeper.CreateMode;
 import org.apache.zookeeper.Watcher;
+import org.apache.zookeeper.ZooDefs;
+import org.apache.zookeeper.server.EphemeralType;
 
 
-public class ZkMetaClient implements MetaClientInterface {
+public class ZkMetaClient<T> implements MetaClientInterface<T>, AutoCloseable {
 
   private final ZkClient _zkClient;
+  private final int _connectionTimeout;
 
   public ZkMetaClient(ZkMetaClientConfig config) {
-    _zkClient =  new ZkClient(new ZkConnection(config.getConnectionAddress(),
-        (int) config.getSessionTimeoutInMillis()),
-        (int) config.getConnectionInitTimeoutInMillis(), -1 
/*operationRetryTimeout*/,
-        config.getZkSerializer(), config.getMonitorType(), 
config.getMonitorKey(),
-        config.getMonitorInstanceName(), config.getMonitorRootPathOnly());
+    _connectionTimeout = (int) config.getConnectionInitTimeoutInMillis();
+    _zkClient = new ZkClient(
+        new ZkConnection(config.getConnectionAddress(), (int) 
config.getSessionTimeoutInMillis()),
+        _connectionTimeout, -1 /*operationRetryTimeout*/, 
config.getZkSerializer(),
+        config.getMonitorType(), config.getMonitorKey(), 
config.getMonitorInstanceName(),
+        config.getMonitorRootPathOnly(), false);
   }
 
   @Override
-  public void create(String key, Object data) {
-
+  public void create(String key, T data) {
+    // TODO: This function is implemented only for test. It does not have 
proper error handling
+    _zkClient.create(key, data, ZooDefs.Ids.OPEN_ACL_UNSAFE, 
CreateMode.PERSISTENT);
   }
 
   @Override
-  public void create(String key, Object data, EntryMode mode) {
+  public void create(String key, T data, EntryMode mode) {
 
   }
 
   @Override
-  public void set(String key, Object data, int version) {
-
+  public void set(String key, T data, int version) {
+    try {
+      _zkClient.writeData(key, data, version);
+    } catch (ZkException e) {
+      throw translateZkExceptionToMetaclientException(e);
+    }
   }
 
   @Override
-  public Object update(String key, DataUpdater updater) {
-    return null;
+  public T update( String key, DataUpdater<T> updater) {
+    org.apache.zookeeper.data.Stat stat = new org.apache.zookeeper.data.Stat();
+    // TODO: add retry logic for ZkBadVersionException.
+    try {
+      T oldData = _zkClient.readData(key, stat);
+      T newData = updater.update(oldData);
+      set(key, newData, stat.getVersion());
+      return newData;
+    } catch (ZkException e) {
+      throw translateZkExceptionToMetaclientException(e);
+    }
   }
 
   @Override
   public Stat exists(String key) {
-    return null;
+    org.apache.zookeeper.data.Stat zkStats;
+    try {
+      zkStats = _zkClient.getStat(key);
+      if (zkStats == null) {
+        return null;
+      }
+      return new Stat(convertZkEntryMode(zkStats.getEphemeralOwner()), 
zkStats.getVersion());
+    } catch (ZkException e) {
+      throw translateZkExceptionToMetaclientException(e);
+    }
   }
 
   @Override
-  public Object get(String key) {
-    return null;
+  public T get(String key) {
+    return _zkClient.readData(key, true);
   }
 
   @Override
-  public List<String> getDirestChildrenKeys(String key) {
+  public List<OpResult> transactionOP(Iterable<Op> ops) {
     return null;
   }
 
   @Override
-  public int countDirestChildren(String key) {
-    return 0;
+  public List<String> getDirectChildrenKeys(String key) {
+    try {
+      return _zkClient.getChildren(key);
+    } catch (ZkException e) {
+      throw translateZkExceptionToMetaclientException(e);
+    }
+  }
+
+  @Override
+  public int countDirectChildren(String key) {
+    return _zkClient.countChildren(key);
   }
 
   @Override
   public boolean delete(String key) {
-    return false;
+    try {
+      return _zkClient.delete(key);
+    } catch (ZkException e) {
+      throw translateZkExceptionToMetaclientException(e);
+    }
   }
 
   @Override
   public boolean recursiveDelete(String key) {
-    return false;
+    _zkClient.deleteRecursively(key);
+    return true;
   }
 
+  // Zookeeper execute async callbacks at zookeeper server side. In our first 
version of
+  // implementation, we will keep this behavior.
+  // In later version, we may consider creating a thread pool to execute 
registered callbacks.
+  // However, this will change metaclient from stateless to stateful.
   @Override
   public void setAsyncExecPoolSize(int poolSize) {
 
@@ -156,13 +213,19 @@ public class ZkMetaClient implements MetaClientInterface {
   }
 
   @Override
-  public boolean connect() {
-    return false;
+  public void connect() {
+    // TODO: throws IllegalStateException when already connected
+    try {
+      _zkClient.connect(_connectionTimeout, _zkClient);
+    } catch (ZkException e) {
+      throw translateZkExceptionToMetaclientException(e);
+    }
   }
 
   @Override
   public void disconnect() {
-
+    // TODO: This is a temp impl for test only. no proper interrupt handling 
and error handling.
+    _zkClient.close();
   }
 
   @Override
@@ -246,8 +309,8 @@ public class ZkMetaClient implements MetaClientInterface {
   }
 
   @Override
-  public List<OpResult> transactionOP(Iterable iterable) {
-    return null;
+  public void close() {
+    disconnect();
   }
 
   /**
@@ -301,4 +364,35 @@ public class ZkMetaClient implements MetaClientInterface {
       return _listener.hashCode();
     }
   }
+
+  private static MetaClientException 
translateZkExceptionToMetaclientException(ZkException e) {
+    if (e instanceof ZkNodeExistsException) {
+      return new MetaClientNoNodeException(e);
+    } else if (e instanceof ZkBadVersionException) {
+      return new MetaClientBadVersionException(e);
+    } else if (e instanceof ZkTimeoutException) {
+      return new MetaClientTimeoutException(e);
+    } else if (e instanceof ZkInterruptedException) {
+      return new MetaClientInterruptException(e);
+    } else {
+      return new MetaClientException(e);
+    }
+  }
+
+  private static EntryMode convertZkEntryMode(long ephemeralOwner) {
+    EphemeralType zkEphemeralType = EphemeralType.get(ephemeralOwner);
+    switch (zkEphemeralType) {
+      case VOID:
+        return EntryMode.PERSISTENT;
+      case CONTAINER:
+        return EntryMode.CONTAINER;
+      case NORMAL:
+        return EntryMode.EPHEMERAL;
+      // TODO: TTL is not supported now.
+      //case TTL:
+      //  return EntryMode.TTL;
+      default:
+        throw new IllegalArgumentException(zkEphemeralType + " is not 
supported.");
+    }
+  }
 }
diff --git 
a/meta-client/src/main/java/org/apache/helix/metaclient/impl/zk/factory/ZkMetaClientConfig.java
 
b/meta-client/src/main/java/org/apache/helix/metaclient/impl/zk/factory/ZkMetaClientConfig.java
index c4190fc84..d9292f846 100644
--- 
a/meta-client/src/main/java/org/apache/helix/metaclient/impl/zk/factory/ZkMetaClientConfig.java
+++ 
b/meta-client/src/main/java/org/apache/helix/metaclient/impl/zk/factory/ZkMetaClientConfig.java
@@ -128,7 +128,7 @@ public class ZkMetaClientConfig extends MetaClientConfig {
     }
 
     @Override
-    public MetaClientConfig build() {
+    public ZkMetaClientConfig build() {
       if (_zkSerializer == null) {
         _zkSerializer = new BasicZkSerializer(new SerializableSerializer());
       }
diff --git 
a/meta-client/src/test/java/org/apache/helix/metaclient/impl/zk/TestZkMetaClient.java
 
b/meta-client/src/test/java/org/apache/helix/metaclient/impl/zk/TestZkMetaClient.java
index 4c74fe847..0eab0315e 100644
--- 
a/meta-client/src/test/java/org/apache/helix/metaclient/impl/zk/TestZkMetaClient.java
+++ 
b/meta-client/src/test/java/org/apache/helix/metaclient/impl/zk/TestZkMetaClient.java
@@ -21,18 +21,31 @@ package org.apache.helix.metaclient.impl.zk;
 
 import java.io.File;
 import java.io.IOException;
+import java.util.Arrays;
+import java.util.HashSet;
+import java.util.List;
+import java.util.Set;
+
 import org.apache.commons.io.FileUtils;
+import org.apache.helix.metaclient.api.DataUpdater;
+import org.apache.helix.metaclient.api.MetaClientInterface;
+import org.apache.helix.metaclient.constants.MetaClientException;
+import org.apache.helix.metaclient.impl.zk.factory.ZkMetaClientConfig;
 import org.apache.helix.zookeeper.zkclient.IDefaultNameSpace;
 import org.apache.helix.zookeeper.zkclient.ZkServer;
 import org.testng.Assert;
+import org.testng.annotations.AfterClass;
 import org.testng.annotations.BeforeClass;
 import org.testng.annotations.Test;
 
+import static 
org.apache.helix.metaclient.api.MetaClientInterface.EntryMode.PERSISTENT;
+
 
 public class TestZkMetaClient {
 
   private static final String ZK_ADDR = "localhost:2183";
   private ZkServer _zkServer;
+  private static final String ENTRY_STRING_VALUE = "test-value";
 
   @BeforeClass
   public void prepare() {
@@ -40,9 +53,139 @@ public class TestZkMetaClient {
     _zkServer = startZkServer(ZK_ADDR);
   }
 
+  @AfterClass
+  public void cleanUp() {
+    _zkServer.shutdown();
+  }
+
   @Test
-  public void dummyTest() {
-    Assert.assertEquals(1+1, 2);
+  public void testGet() {
+    final String key = "/TestZkMetaClient_testGet";
+    try (ZkMetaClient<String> zkMetaClient = createZkMetaClient()) {
+      zkMetaClient.connect();
+      String value;
+      zkMetaClient.create(key, ENTRY_STRING_VALUE);
+      String dataValue = zkMetaClient.get(key);
+      Assert.assertEquals(dataValue, ENTRY_STRING_VALUE);
+
+      value = zkMetaClient.get(key + "/a/b/c");
+      Assert.assertNull(value);
+
+      zkMetaClient.delete(key);
+
+      value = zkMetaClient.get(key);
+      Assert.assertNull(value);
+    }
+  }
+
+  @Test
+  public void testSet() {
+    final String key = "/TestZkMetaClient_testSet";
+    try (ZkMetaClient<String> zkMetaClient = createZkMetaClient()) {
+      zkMetaClient.connect();
+      zkMetaClient.create(key, ENTRY_STRING_VALUE);
+      String testValueV1 = ENTRY_STRING_VALUE + "-v1";
+      String testValueV2 = ENTRY_STRING_VALUE + "-v2";
+
+      // test set() with no expected version and validate result.
+      zkMetaClient.set(key, testValueV1, -1);
+      Assert.assertEquals(zkMetaClient.get(key), testValueV1);
+      MetaClientInterface.Stat entryStat = zkMetaClient.exists(key);
+      Assert.assertEquals(entryStat.getVersion(), 1);
+      Assert.assertEquals(entryStat.getEntryType().name(), PERSISTENT.name());
+
+      // test set() with expected version and validate result and new version 
number
+      zkMetaClient.set(key, testValueV2, 1);
+      entryStat = zkMetaClient.exists(key);
+      Assert.assertEquals(zkMetaClient.get(key), testValueV2);
+      Assert.assertEquals(entryStat.getVersion(), 2);
+
+      // test set() with a wrong version
+      try {
+        zkMetaClient.set(key, "test-node-changed", 10);
+        Assert.fail("No reach.");
+      } catch (MetaClientException ex) {
+        Assert.assertEquals(ex.getClass().getName(),
+            
"org.apache.helix.metaclient.constants.MetaClientBadVersionException");
+      }
+      zkMetaClient.delete(key);
+    }
+  }
+
+  @Test
+  public void testUpdate() {
+    final String key = "/TestZkMetaClient_testUpdate";
+    ZkMetaClientConfig config =
+        new 
ZkMetaClientConfig.ZkMetaClientConfigBuilder().setConnectionAddress(ZK_ADDR).build();
+    try (ZkMetaClient<Integer> zkMetaClient = new ZkMetaClient<>(config)) {
+      zkMetaClient.connect();
+      int initValue = 3;
+      zkMetaClient.create(key, initValue);
+      MetaClientInterface.Stat entryStat = zkMetaClient.exists(key);
+      Assert.assertEquals(entryStat.getVersion(), 0);
+
+      // test update() and validate entry value and version
+      Integer newData = zkMetaClient.update(key, new DataUpdater<Integer>() {
+        @Override
+        public Integer update(Integer currentData) {
+          return currentData + 1;
+        }
+      });
+      Assert.assertEquals((int) newData, (int) initValue + 1);
+
+      entryStat = zkMetaClient.exists(key);
+      Assert.assertEquals(entryStat.getVersion(), 1);
+
+      newData = zkMetaClient.update(key, new DataUpdater<Integer>() {
+
+        @Override
+        public Integer update(Integer currentData) {
+          return currentData + 1;
+        }
+      });
+
+      entryStat = zkMetaClient.exists(key);
+      Assert.assertEquals(entryStat.getVersion(), 2);
+      Assert.assertEquals((int) newData, (int) initValue + 2);
+      zkMetaClient.delete(key);
+    }
+  }
+
+  @Test
+  public void testGetAndCountChildrenAndRecursiveDelete() {
+    final String key = "/TestZkMetaClient_testGetAndCountChildren";
+    List<String> childrenNames = Arrays.asList("/c1", "/c2", "/c3");
+
+    // create child nodes and validate retrieved children count and names
+    try (ZkMetaClient<String> zkMetaClient = createZkMetaClient()) {
+      zkMetaClient.connect();
+      zkMetaClient.create(key, ENTRY_STRING_VALUE);
+      Assert.assertEquals(zkMetaClient.countDirectChildren(key), 0);
+      for (String str : childrenNames) {
+        zkMetaClient.create(key + str, ENTRY_STRING_VALUE);
+      }
+
+      List<String> retrievedChildrenNames = 
zkMetaClient.getDirectChildrenKeys(key);
+      Assert.assertEquals(retrievedChildrenNames.size(), childrenNames.size());
+      Set<String> childrenNameSet = new HashSet<>(childrenNames);
+      for (String str : retrievedChildrenNames) {
+        Assert.assertTrue(childrenNameSet.contains("/" + str));
+      }
+
+      // recursive delete and validate
+      Assert.assertEquals(zkMetaClient.countDirectChildren(key), 
childrenNames.size());
+      Assert.assertNotNull(zkMetaClient.exists(key));
+      zkMetaClient.recursiveDelete(key);
+      Assert.assertNull(zkMetaClient.exists(key));
+    }
+  }
+
+
+
+  private static ZkMetaClient<String> createZkMetaClient() {
+    ZkMetaClientConfig config =
+        new 
ZkMetaClientConfig.ZkMetaClientConfigBuilder().setConnectionAddress(ZK_ADDR).build();
+    return new ZkMetaClient<>(config);
   }
 
   private static ZkServer startZkServer(final String zkAddress) {
@@ -62,8 +205,8 @@ public class TestZkMetaClient {
     };
 
     int port = Integer.parseInt(zkAddress.substring(zkAddress.lastIndexOf(':') 
+ 1));
-    ZkServer zkServer = new ZkServer(dataDir, logDir, defaultNameSpace, port);
     System.out.println("Starting ZK server at " + zkAddress);
+    ZkServer zkServer = new ZkServer(dataDir, logDir, defaultNameSpace, port);
     zkServer.start();
     return zkServer;
   }

Reply via email to