This is an automated email from the ASF dual-hosted git repository. xyuanlu pushed a commit to branch metaclient in repository https://gitbox.apache.org/repos/asf/helix.git
commit 0b603cbcae422d1be448cc670039076f14708344 Author: xyuanlu <[email protected]> AuthorDate: Wed Feb 22 14:33:21 2023 -0800 Implement zk Meta client async crud (#2354) Implement zk Meta client async crud --- .../apache/helix/metaclient/api/AsyncCallback.java | 2 +- .../helix/metaclient/api/MetaClientInterface.java | 18 ++ .../helix/metaclient/impl/zk/ZkMetaClient.java | 88 +++++--- .../zk/adapter/ZkMetaClientSetCallbackHandler.java | 42 ++++ .../helix/metaclient/impl/zk/TestZkMetaClient.java | 67 +----- .../impl/zk/TestZkMetaClientAsyncOperations.java | 233 +++++++++++++++++++++ .../metaclient/impl/zk/ZkMetaClientTestBase.java | 91 ++++++++ 7 files changed, 457 insertions(+), 84 deletions(-) diff --git a/meta-client/src/main/java/org/apache/helix/metaclient/api/AsyncCallback.java b/meta-client/src/main/java/org/apache/helix/metaclient/api/AsyncCallback.java index 1e7c1eb75..ae7aed59b 100644 --- a/meta-client/src/main/java/org/apache/helix/metaclient/api/AsyncCallback.java +++ b/meta-client/src/main/java/org/apache/helix/metaclient/api/AsyncCallback.java @@ -28,8 +28,8 @@ import javax.annotation.Nullable; * The corresponding callback is registered when async CRUD API is invoked. Implementation processes * the result of each CRUD call. It should check return code and perform accordingly. */ -// TODO: define return code. failure code should map to MetaClient exceptions. public interface AsyncCallback { + //This callback is used when stat object is returned from the operation. interface StatCallback extends AsyncCallback { /** diff --git a/meta-client/src/main/java/org/apache/helix/metaclient/api/MetaClientInterface.java b/meta-client/src/main/java/org/apache/helix/metaclient/api/MetaClientInterface.java index a4c5113f2..af9a170b4 100644 --- a/meta-client/src/main/java/org/apache/helix/metaclient/api/MetaClientInterface.java +++ b/meta-client/src/main/java/org/apache/helix/metaclient/api/MetaClientInterface.java @@ -515,5 +515,23 @@ public interface MetaClientInterface<T> { */ boolean waitUntilExists(String key, TimeUnit timeUnit, long timeOut); + /** + * Serialize the data in type T to a byte array. This function can be used in API that returns or + * has input value in byte array format. + * @param data to be serialized. + * @param path timeout unit + * @return + */ + byte[] serialize(T data, String path); + + /** + * Serialize a byte array to data in type T. This function can be used in API that returns or + * has input value in byte array format. + * @param bytes to be deserialized. + * @param path timeout unit + * @return + */ + T deserialize(byte[] bytes, String path); + // TODO: Secure CRUD APIs } \ No newline at end of file diff --git a/meta-client/src/main/java/org/apache/helix/metaclient/impl/zk/ZkMetaClient.java b/meta-client/src/main/java/org/apache/helix/metaclient/impl/zk/ZkMetaClient.java index 875eb6a50..f520e319b 100644 --- a/meta-client/src/main/java/org/apache/helix/metaclient/impl/zk/ZkMetaClient.java +++ b/meta-client/src/main/java/org/apache/helix/metaclient/impl/zk/ZkMetaClient.java @@ -36,6 +36,11 @@ import org.apache.helix.metaclient.api.OpResult; import org.apache.helix.metaclient.exception.MetaClientException; import org.apache.helix.metaclient.impl.zk.adapter.DataListenerAdapter; import org.apache.helix.metaclient.impl.zk.adapter.DirectChildListenerAdapter; +import org.apache.helix.metaclient.impl.zk.adapter.ZkMetaClientCreateCallbackHandler; +import org.apache.helix.metaclient.impl.zk.adapter.ZkMetaClientDeleteCallbackHandler; +import org.apache.helix.metaclient.impl.zk.adapter.ZkMetaClientExistCallbackHandler; +import org.apache.helix.metaclient.impl.zk.adapter.ZkMetaClientGetCallbackHandler; +import org.apache.helix.metaclient.impl.zk.adapter.ZkMetaClientSetCallbackHandler; import org.apache.helix.metaclient.impl.zk.factory.ZkMetaClientConfig; import org.apache.helix.metaclient.impl.zk.util.ZkMetaClientUtil; import org.apache.helix.zookeeper.api.client.ChildrenSubscribeResult; @@ -51,7 +56,7 @@ import static org.apache.helix.metaclient.impl.zk.util.ZkMetaClientUtil.convertZ import static org.apache.helix.metaclient.impl.zk.util.ZkMetaClientUtil.translateZkExceptionToMetaclientException; public class ZkMetaClient<T> implements MetaClientInterface<T>, AutoCloseable { - private static final Logger LOG = LoggerFactory.getLogger(ZkMetaClient.class); + private static final Logger LOG = LoggerFactory.getLogger(ZkMetaClient.class); private final ZkClient _zkClient; private final int _connectionTimeout; @@ -93,7 +98,7 @@ public class ZkMetaClient<T> implements MetaClientInterface<T>, AutoCloseable { } @Override - public T update( String key, DataUpdater<T> updater) { + public T update(String key, DataUpdater<T> updater) { org.apache.zookeeper.data.Stat stat = new org.apache.zookeeper.data.Stat(); // TODO: add retry logic for ZkBadVersionException. try { @@ -161,63 +166,76 @@ public class ZkMetaClient<T> implements MetaClientInterface<T>, AutoCloseable { // thread. In our first version of implementation, we will keep similar behavior and have // callbacks executed in ZkClient event thread, and reuse zkclient retry logic. - // It is highly recommended NOT to perform any blocking operation inside the callbacks. + // It is highly recommended *NOT* to perform any blocking operation inside the callbacks. // If you block the thread the meta client won't process other events. // corresponding callbacks for each operation are invoked in order. @Override public void setAsyncExecPoolSize(int poolSize) { - + throw new UnsupportedOperationException( + "All async calls are executed in a single thread to maintain sequence."); } @Override public void asyncCreate(String key, Object data, EntryMode mode, AsyncCallback.VoidCallback cb) { - - } - - @Override - public void asyncSet(String key, T data, int version, AsyncCallback.StatCallback cb) { - + CreateMode entryMode; + try { + entryMode = ZkMetaClientUtil.convertMetaClientMode(mode); + } catch (ZkException | KeeperException e) { + throw new MetaClientException(e); + } + _zkClient.asyncCreate(key, data, entryMode, + new ZkMetaClientCreateCallbackHandler(cb)); } @Override - public void asyncUpdate(String key, DataUpdater updater, AsyncCallback.DataCallback cb) { - + public void asyncUpdate(String key, DataUpdater<T> updater, AsyncCallback.DataCallback cb) { + throw new NotImplementedException("Currently asyncUpdate is not supported in ZkMetaClient."); + /* + * TODO: Only Helix has potential using this API as of now. (ZkBaseDataAccessor.update()) + * Will move impl from ZkBaseDataAccessor to here when retiring ZkBaseDataAccessor. + */ } @Override public void asyncGet(String key, AsyncCallback.DataCallback cb) { - + _zkClient.asyncGetData(key, + new ZkMetaClientGetCallbackHandler(cb)); } @Override public void asyncCountChildren(String key, AsyncCallback.DataCallback cb) { + throw new NotImplementedException( + "Currently asyncCountChildren is not supported in ZkMetaClient."); + /* + * TODO: Only Helix has potential using this API as of now. (ZkBaseDataAccessor.getChildren()) + * Will move impl from ZkBaseDataAccessor to here when retiring ZkBaseDataAccessor. + */ } @Override public void asyncExist(String key, AsyncCallback.StatCallback cb) { - + _zkClient.asyncExists(key, + new ZkMetaClientExistCallbackHandler(cb)); } - @Override - public void asyncDelete(String keys, AsyncCallback.VoidCallback cb) { - + public void asyncDelete(String key, AsyncCallback.VoidCallback cb) { + _zkClient.asyncDelete(key, new ZkMetaClientDeleteCallbackHandler(cb)); } @Override - public boolean[] create(List key, List data, List mode) { - return new boolean[0]; - } + public void asyncTransaction(Iterable<Op> ops, AsyncCallback.TransactionCallback cb) { + throw new NotImplementedException( + "Currently asyncTransaction is not supported in ZkMetaClient."); - @Override - public boolean[] create(List key, List data) { - return new boolean[0]; + //TODO: There is no active use case for Async transaction. } @Override - public void asyncTransaction(Iterable iterable, AsyncCallback.TransactionCallback cb) { - + public void asyncSet(String key, T data, int version, AsyncCallback.StatCallback cb) { + _zkClient.asyncSetData(key, data, version, + new ZkMetaClientSetCallbackHandler(cb)); } @Override @@ -290,6 +308,16 @@ public class ZkMetaClient<T> implements MetaClientInterface<T>, AutoCloseable { return false; } + @Override + public boolean[] create(List key, List data, List mode) { + return new boolean[0]; + } + + @Override + public boolean[] create(List key, List data) { + return new boolean[0]; + } + @Override public boolean[] delete(List keys) { return new boolean[0]; @@ -329,4 +357,14 @@ public class ZkMetaClient<T> implements MetaClientInterface<T>, AutoCloseable { // Convert list of Zk OpResults to MetaClient OpResults return ZkMetaClientUtil.zkOpResultToMetaClientOpResults(zkResult); } + + @Override + public byte[] serialize(T data, String path) { + return _zkClient.serialize(data, path); + } + + @Override + public T deserialize(byte[] bytes, String path) { + return _zkClient.deserialize(bytes, path); + } } diff --git a/meta-client/src/main/java/org/apache/helix/metaclient/impl/zk/adapter/ZkMetaClientSetCallbackHandler.java b/meta-client/src/main/java/org/apache/helix/metaclient/impl/zk/adapter/ZkMetaClientSetCallbackHandler.java new file mode 100644 index 000000000..60eecc7f8 --- /dev/null +++ b/meta-client/src/main/java/org/apache/helix/metaclient/impl/zk/adapter/ZkMetaClientSetCallbackHandler.java @@ -0,0 +1,42 @@ +package org.apache.helix.metaclient.impl.zk.adapter; + +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ + +import org.apache.helix.metaclient.api.AsyncCallback; +import org.apache.helix.metaclient.api.MetaClientInterface; +import org.apache.helix.metaclient.impl.zk.util.ZkMetaClientUtil; +import org.apache.helix.zookeeper.zkclient.callback.ZkAsyncCallbacks; + + +public class ZkMetaClientSetCallbackHandler extends ZkAsyncCallbacks.SetDataCallbackHandler { + AsyncCallback.StatCallback userCallback; + + public ZkMetaClientSetCallbackHandler(AsyncCallback.StatCallback cb) { + userCallback = cb; + } + + @Override + public void handle() { + userCallback.processResult(getRc(), getPath(), getStat() == null ? null + : new MetaClientInterface.Stat( + ZkMetaClientUtil.convertZkEntryModeToMetaClientEntryMode(getStat().getEphemeralOwner()), + getStat().getVersion())); + } +} diff --git a/meta-client/src/test/java/org/apache/helix/metaclient/impl/zk/TestZkMetaClient.java b/meta-client/src/test/java/org/apache/helix/metaclient/impl/zk/TestZkMetaClient.java index 70a7afccd..f23ed0f03 100644 --- a/meta-client/src/test/java/org/apache/helix/metaclient/impl/zk/TestZkMetaClient.java +++ b/meta-client/src/test/java/org/apache/helix/metaclient/impl/zk/TestZkMetaClient.java @@ -27,6 +27,13 @@ import java.util.HashSet; import java.util.List; import java.util.Set; +import org.apache.helix.metaclient.api.DataUpdater; +import org.apache.helix.metaclient.api.MetaClientInterface; +import org.apache.helix.metaclient.exception.MetaClientException; +import java.util.HashMap; +import java.util.Map; +import java.util.Set; + import org.apache.commons.io.FileUtils; import org.apache.helix.metaclient.api.DataUpdater; import org.apache.helix.metaclient.api.DirectChildChangeListener; @@ -55,7 +62,7 @@ import static org.apache.helix.metaclient.api.MetaClientInterface.EntryMode.CONT import static org.apache.helix.metaclient.api.MetaClientInterface.EntryMode.PERSISTENT; -public class TestZkMetaClient { +public class TestZkMetaClient extends ZkMetaClientTestBase{ private static final String ZK_ADDR = "localhost:2183"; private static final int DEFAULT_TIMEOUT_MS = 1000; @@ -65,31 +72,6 @@ public class TestZkMetaClient { private final Object _syncObject = new Object(); - - private ZkServer _zkServer; - - /** - * Creates local Zk Server - * Note: Cannot test container / TTL node end to end behavior as - * the zk server setup doesn't allow for that. To enable this, zk server - * setup must invoke ContainerManager.java. However, the actual - * behavior has been verified to work on native ZK Client. - * TODO: Modify zk server setup to include ContainerManager. - * This can be done through ZooKeeperServerMain.java or - * LeaderZooKeeperServer.java. - */ - @BeforeClass - public void prepare() { - System.setProperty("zookeeper.extendedTypesEnabled", "true"); - // start local zookeeper server - _zkServer = startZkServer(ZK_ADDR); - } - - @AfterClass - public void cleanUp() { - _zkServer.shutdown(); - } - @Test public void testCreate() { final String key = "/TestZkMetaClient_testCreate"; @@ -115,7 +97,7 @@ public class TestZkMetaClient { Assert.assertNotNull(zkMetaClient.exists(key)); } } - + @Test public void testGet() { final String key = "/TestZkMetaClient_testGet"; @@ -352,37 +334,6 @@ public class TestZkMetaClient { } } - // TODO: Create a ZkMetadata test base class and move these helper to base class when more tests - // are added. - private static ZkMetaClient<String> createZkMetaClient() { - ZkMetaClientConfig config = - new ZkMetaClientConfig.ZkMetaClientConfigBuilder().setConnectionAddress(ZK_ADDR).build(); - return new ZkMetaClient<>(config); - } - - private static ZkServer startZkServer(final String zkAddress) { - String zkDir = zkAddress.replace(':', '_'); - final String logDir = "/tmp/" + zkDir + "/logs"; - final String dataDir = "/tmp/" + zkDir + "/dataDir"; - - // Clean up local directory - try { - FileUtils.deleteDirectory(new File(dataDir)); - FileUtils.deleteDirectory(new File(logDir)); - } catch (IOException e) { - e.printStackTrace(); - } - - IDefaultNameSpace defaultNameSpace = zkClient -> { - }; - - int port = Integer.parseInt(zkAddress.substring(zkAddress.lastIndexOf(':') + 1)); - System.out.println("Starting ZK server at " + zkAddress); - ZkServer zkServer = new ZkServer(dataDir, logDir, defaultNameSpace, port); - zkServer.start(); - return zkServer; - } - /** * Transactional op calls zk.multi() with a set of ops (operations) * and the return values are converted into metaclient opResults. diff --git a/meta-client/src/test/java/org/apache/helix/metaclient/impl/zk/TestZkMetaClientAsyncOperations.java b/meta-client/src/test/java/org/apache/helix/metaclient/impl/zk/TestZkMetaClientAsyncOperations.java new file mode 100644 index 000000000..fbfd8ed35 --- /dev/null +++ b/meta-client/src/test/java/org/apache/helix/metaclient/impl/zk/TestZkMetaClientAsyncOperations.java @@ -0,0 +1,233 @@ +package org.apache.helix.metaclient.impl.zk; + +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ + +import java.util.concurrent.CountDownLatch; +import java.util.concurrent.TimeUnit; + +import javax.annotation.Nullable; + +import org.apache.helix.metaclient.api.AsyncCallback; +import org.apache.helix.metaclient.api.MetaClientInterface; +import org.apache.zookeeper.KeeperException; +import org.testng.Assert; +import org.testng.annotations.Test; + + +public class TestZkMetaClientAsyncOperations extends ZkMetaClientTestBase { + + static TestAsyncContext[] asyncContext = new TestAsyncContext[1]; + static final String entryKey = "/TestAsyncEntryKey"; + static final String nonExistsEntry = "/a/b/c"; + static final long LATCH_WAIT_TIMEOUT_IN_S = 3 * 60; + + static class TestAsyncContext { + int _asyncCallSize; + CountDownLatch _countDownLatch; + int[] _returnCode; + MetaClientInterface.Stat[] _stats; + String[] _data; + + TestAsyncContext(int callSize) { + _asyncCallSize = callSize; + _countDownLatch = new CountDownLatch(callSize); + _returnCode = new int[callSize]; + _stats = new MetaClientInterface.Stat[callSize]; + _data = new String[callSize]; + } + + public CountDownLatch getCountDownLatch() { + return _countDownLatch; + } + + public void countDown() { + _countDownLatch.countDown(); + } + + public int getReturnCode(int idx) { + return _returnCode[idx]; + } + + public MetaClientInterface.Stat getStats(int idx) { + return _stats[idx]; + } + + public String getData(int idx) { + return _data[idx]; + } + + public void setReturnCodeWhenFinished(int idx, int returnCode) { + _returnCode[idx] = returnCode; + } + + public void setStatWhenFinished(int idx, MetaClientInterface.Stat stat) { + _stats[idx] = stat; + } + + public void setDataWhenFinished(int idx, String data) { + _data[idx] = data; + } + } + + @Test + public void testAsyncCreateSetAndGet() { + asyncContext[0] = new TestAsyncContext(2); + try (ZkMetaClient<String> zkMetaClient = createZkMetaClient()) { + zkMetaClient.connect(); + + zkMetaClient + .asyncCreate(entryKey, "async_create-data", MetaClientInterface.EntryMode.PERSISTENT, + new AsyncCallback.VoidCallback() { + @Override + public void processResult(int returnCode, String key) { + asyncContext[0].setReturnCodeWhenFinished(0, returnCode); + asyncContext[0].countDown(); + } + }); + + zkMetaClient.asyncCreate(nonExistsEntry, "async_create-data-invalid", + MetaClientInterface.EntryMode.PERSISTENT, new AsyncCallback.VoidCallback() { + @Override + public void processResult(int returnCode, String key) { + asyncContext[0].setReturnCodeWhenFinished(1, returnCode); + asyncContext[0].countDown(); + } + }); + + asyncContext[0].getCountDownLatch().await(LATCH_WAIT_TIMEOUT_IN_S, TimeUnit.SECONDS); + + Assert.assertEquals(asyncContext[0].getReturnCode(0), KeeperException.Code.OK.intValue()); + Assert.assertEquals(asyncContext[0].getReturnCode(1), KeeperException.Code.NONODE.intValue()); + + // create the entry again and expect a duplicated error code + asyncContext[0] = new TestAsyncContext(1); + zkMetaClient + .asyncCreate(entryKey, "async_create-data", MetaClientInterface.EntryMode.PERSISTENT, + new AsyncCallback.VoidCallback() { + @Override + public void processResult(int returnCode, String key) { + asyncContext[0].setReturnCodeWhenFinished(0, returnCode); + asyncContext[0].countDown(); + } + }); + asyncContext[0].getCountDownLatch().await(LATCH_WAIT_TIMEOUT_IN_S, TimeUnit.SECONDS); + Assert.assertEquals(asyncContext[0].getReturnCode(0), + KeeperException.Code.NODEEXISTS.intValue()); + + + // test set + asyncContext[0] = new TestAsyncContext(1); + zkMetaClient + .asyncSet(entryKey, "async_create-data-new", 0, + new AsyncCallback.StatCallback() { + @Override + public void processResult(int returnCode, String key, + @Nullable MetaClientInterface.Stat stat) { + asyncContext[0].setReturnCodeWhenFinished(0, returnCode); + asyncContext[0].setStatWhenFinished(0, stat); + asyncContext[0].countDown(); + } + }); + asyncContext[0].getCountDownLatch().await(LATCH_WAIT_TIMEOUT_IN_S, TimeUnit.SECONDS); + Assert.assertEquals(asyncContext[0].getReturnCode(0), + KeeperException.Code.OK.intValue()); + Assert.assertEquals(asyncContext[0].getStats(0).getEntryType(), + MetaClientInterface.EntryMode.PERSISTENT); + Assert.assertEquals(asyncContext[0].getStats(0).getVersion(), 1); + + // test get + asyncContext[0] = new TestAsyncContext(1); + zkMetaClient.asyncGet(entryKey, new AsyncCallback.DataCallback() { + @Override + public void processResult(int returnCode, String key, byte[] data, + MetaClientInterface.Stat stat) { + asyncContext[0].setReturnCodeWhenFinished(0, returnCode); + asyncContext[0].setStatWhenFinished(0, stat); + asyncContext[0].setDataWhenFinished(0, zkMetaClient.deserialize(data, key)); + asyncContext[0].countDown(); + } + }); + + asyncContext[0].getCountDownLatch().await(LATCH_WAIT_TIMEOUT_IN_S, TimeUnit.SECONDS); + + Assert.assertEquals(asyncContext[0].getReturnCode(0), KeeperException.Code.OK.intValue()); + Assert.assertEquals(asyncContext[0].getStats(0).getEntryType(), + MetaClientInterface.EntryMode.PERSISTENT); + Assert.assertEquals(asyncContext[0].getStats(0).getVersion(), 1); + Assert.assertEquals(asyncContext[0].getData(0), "async_create-data-new"); + } catch (Exception ex) { + Assert.fail("Test testAsyncCreate failed because of:", ex); + } + } + + @Test(dependsOnMethods = "testAsyncCreateSetAndGet") + public void testAsyncExistsAndDelete() { + asyncContext[0] = new TestAsyncContext(2); + try (ZkMetaClient<String> zkMetaClient = createZkMetaClient()) { + zkMetaClient.connect(); + + zkMetaClient.asyncExist(entryKey, new AsyncCallback.StatCallback() { + @Override + public void processResult(int returnCode, String key, MetaClientInterface.Stat stat) { + asyncContext[0].setReturnCodeWhenFinished(0, returnCode); + asyncContext[0].setStatWhenFinished(0, stat); + asyncContext[0].countDown(); + } + }); + + zkMetaClient.asyncExist(nonExistsEntry, new AsyncCallback.StatCallback() { + @Override + public void processResult(int returnCode, String key, MetaClientInterface.Stat stat) { + asyncContext[0].setReturnCodeWhenFinished(1, returnCode); + asyncContext[0].setStatWhenFinished(1, stat); + asyncContext[0].countDown(); + } + }); + + asyncContext[0].getCountDownLatch().await(LATCH_WAIT_TIMEOUT_IN_S, TimeUnit.SECONDS); + + Assert.assertEquals(asyncContext[0].getReturnCode(0), KeeperException.Code.OK.intValue()); + Assert.assertEquals(asyncContext[0].getStats(0).getEntryType(), + MetaClientInterface.EntryMode.PERSISTENT); + Assert.assertEquals(asyncContext[0].getStats(0).getVersion(), 1); + Assert.assertEquals(asyncContext[0].getReturnCode(1), KeeperException.Code.NONODE.intValue()); + Assert.assertNull(asyncContext[0].getStats(1)); + + // test delete + asyncContext[0] = new TestAsyncContext(1); + zkMetaClient.asyncDelete(entryKey, new AsyncCallback.VoidCallback() { + @Override + public void processResult(int returnCode, String key) { + asyncContext[0].setReturnCodeWhenFinished(0, returnCode); + asyncContext[0].countDown(); + } + }); + + asyncContext[0].getCountDownLatch().await(LATCH_WAIT_TIMEOUT_IN_S, TimeUnit.SECONDS); + + Assert.assertEquals(asyncContext[0].getReturnCode(0), KeeperException.Code.OK.intValue()); + + // node should not be there + Assert.assertNull(zkMetaClient.get(entryKey)); + } catch (InterruptedException ex) { + Assert.fail("Test testAsyncCreate failed because of:", ex); + } + } +} diff --git a/meta-client/src/test/java/org/apache/helix/metaclient/impl/zk/ZkMetaClientTestBase.java b/meta-client/src/test/java/org/apache/helix/metaclient/impl/zk/ZkMetaClientTestBase.java new file mode 100644 index 000000000..cbc9832e1 --- /dev/null +++ b/meta-client/src/test/java/org/apache/helix/metaclient/impl/zk/ZkMetaClientTestBase.java @@ -0,0 +1,91 @@ +package org.apache.helix.metaclient.impl.zk; + +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ + +import java.io.File; +import java.io.IOException; + +import org.apache.commons.io.FileUtils; +import org.apache.helix.metaclient.impl.zk.factory.ZkMetaClientConfig; +import org.apache.helix.zookeeper.zkclient.IDefaultNameSpace; +import org.apache.helix.zookeeper.zkclient.ZkServer; +import org.testng.annotations.AfterSuite; +import org.testng.annotations.BeforeSuite; + + +public abstract class ZkMetaClientTestBase { + + protected static final String ZK_ADDR = "localhost:2183"; + protected static final int DEFAULT_TIMEOUT_MS = 1000; + protected static final String ENTRY_STRING_VALUE = "test-value"; + private static ZkServer _zkServer; + + /** + * Creates local Zk Server + * Note: Cannot test container / TTL node end to end behavior as + * the zk server setup doesn't allow for that. To enable this, zk server + * setup must invoke ContainerManager.java. However, the actual + * behavior has been verified to work on native ZK Client. + * TODO: Modify zk server setup to include ContainerManager. + * This can be done through ZooKeeperServerMain.java or + * LeaderZooKeeperServer.java. + */ + @BeforeSuite + public void prepare() { + // start local zookeeper server + _zkServer = startZkServer(ZK_ADDR); + } + + @AfterSuite + public void cleanUp() { + _zkServer.shutdown(); + } + + protected static ZkMetaClient<String> createZkMetaClient() { + ZkMetaClientConfig config = + new ZkMetaClientConfig.ZkMetaClientConfigBuilder().setConnectionAddress(ZK_ADDR) + //.setZkSerializer(new TestStringSerializer()) + .build(); + return new ZkMetaClient<>(config); + } + + protected static ZkServer startZkServer(final String zkAddress) { + String zkDir = zkAddress.replace(':', '_'); + final String logDir = "/tmp/" + zkDir + "/logs"; + final String dataDir = "/tmp/" + zkDir + "/dataDir"; + + // Clean up local directory + try { + FileUtils.deleteDirectory(new File(dataDir)); + FileUtils.deleteDirectory(new File(logDir)); + } catch (IOException e) { + e.printStackTrace(); + } + + IDefaultNameSpace defaultNameSpace = zkClient -> { + }; + + int port = Integer.parseInt(zkAddress.substring(zkAddress.lastIndexOf(':') + 1)); + System.out.println("Starting ZK server at " + zkAddress); + ZkServer zkServer = new ZkServer(dataDir, logDir, defaultNameSpace, port); + zkServer.start(); + return zkServer; + } +}
