This is an automated email from the ASF dual-hosted git repository. nealsun pushed a commit to branch zookeeper-api-ttlcontainer in repository https://gitbox.apache.org/repos/asf/helix.git
commit d820d29f85f880f1c88951503bd802309eec6801 Author: Ramin Bashizade <[email protected]> AuthorDate: Tue May 24 10:08:20 2022 -0700 Add TTL and Container modes to BaseDataAccessor and its implementations (#2107) This commit adds support for TTL and Container modes to BaseDataAccessor and its implementations by taking advantage of relevant API from ZkClient and its descendent classes. --- .../java/org/apache/helix/BaseDataAccessor.java | 23 +++++ .../helix/manager/zk/ZkBaseDataAccessor.java | 66 +++++++++++-- .../helix/manager/zk/ZkCacheBaseDataAccessor.java | 19 +++- .../helix/manager/zk/TestZkBaseDataAccessor.java | 109 +++++++++++++++++++++ .../apache/helix/mock/MockBaseDataAccessor.java | 11 +++ 5 files changed, 217 insertions(+), 11 deletions(-) diff --git a/helix-core/src/main/java/org/apache/helix/BaseDataAccessor.java b/helix-core/src/main/java/org/apache/helix/BaseDataAccessor.java index e556ccfed..9c639d817 100644 --- a/helix-core/src/main/java/org/apache/helix/BaseDataAccessor.java +++ b/helix-core/src/main/java/org/apache/helix/BaseDataAccessor.java @@ -43,6 +43,18 @@ public interface BaseDataAccessor<T> { */ boolean create(String path, T record, int options); + /** + * This will always attempt to create the znode, if it exists it will return false. Will + * create parents if they do not exist. For performance reasons, it may try to create + * child first and only if it fails it will try to create parents + * @param path path to the ZNode to create + * @param record the data to write to the ZNode + * @param options Set the type of ZNode see the valid values in {@link AccessOption} + * @param ttl TTL of the node in milliseconds, if options supports it + * @return true if creation succeeded, false otherwise (e.g. if the ZNode exists) + */ + boolean create(String path, T record, int options, long ttl); + /** * This will always attempt to set the data on existing node. If the ZNode does not * exist it will create it and all its parents ZNodes if necessary @@ -95,6 +107,17 @@ public interface BaseDataAccessor<T> { */ boolean[] createChildren(List<String> paths, List<T> records, int options); + /** + * Use it when creating children under a parent node. This will use async api for better + * performance. If the child already exists it will return false. + * @param paths the paths to the children ZNodes + * @param records List of data to write to each of the path + * @param options Set the type of ZNode see the valid values in {@link AccessOption} + * @param ttl TTL of the node in milliseconds, if options supports it + * @return For each child: true if creation succeeded, false otherwise (e.g. if the child exists) + */ + boolean[] createChildren(List<String> paths, List<T> records, int options, long ttl); + /** * can set multiple children under a parent node. This will use async api for better * performance. If this child does not exist it will create it. diff --git a/helix-core/src/main/java/org/apache/helix/manager/zk/ZkBaseDataAccessor.java b/helix-core/src/main/java/org/apache/helix/manager/zk/ZkBaseDataAccessor.java index 4e40d413c..0c7a94f1b 100644 --- a/helix-core/src/main/java/org/apache/helix/manager/zk/ZkBaseDataAccessor.java +++ b/helix-core/src/main/java/org/apache/helix/manager/zk/ZkBaseDataAccessor.java @@ -47,6 +47,7 @@ import org.apache.helix.zookeeper.impl.factory.SharedZkClientFactory; import org.apache.helix.zookeeper.zkclient.DataUpdater; import org.apache.helix.zookeeper.zkclient.IZkChildListener; import org.apache.helix.zookeeper.zkclient.IZkDataListener; +import org.apache.helix.zookeeper.zkclient.ZkClient; import org.apache.helix.zookeeper.zkclient.callback.ZkAsyncCallbacks; import org.apache.helix.zookeeper.zkclient.exception.ZkBadVersionException; import org.apache.helix.zookeeper.zkclient.exception.ZkException; @@ -249,7 +250,15 @@ public class ZkBaseDataAccessor<T> implements BaseDataAccessor<T> { */ @Override public boolean create(String path, T record, int options) { - AccessResult result = doCreate(path, record, options); + return create(path, record, options, ZkClient.TTL_NOT_SET); + } + + /** + * sync create with TTL + */ + @Override + public boolean create(String path, T record, int options, long ttl) { + AccessResult result = doCreate(path, record, options, ttl); return result._retCode == RetCode.OK; } @@ -257,6 +266,13 @@ public class ZkBaseDataAccessor<T> implements BaseDataAccessor<T> { * sync create */ public AccessResult doCreate(String path, T record, int options) { + return doCreate(path, record, options, ZkClient.TTL_NOT_SET); + } + + /** + * sync create with TTL + */ + public AccessResult doCreate(String path, T record, int options, long ttl) { AccessResult result = new AccessResult(); CreateMode mode = AccessOption.getMode(options); if (mode == null) { @@ -269,7 +285,7 @@ public class ZkBaseDataAccessor<T> implements BaseDataAccessor<T> { do { retry = false; try { - _zkClient.create(path, record, mode); + _zkClient.create(path, record, mode, ttl); result._pathCreated.add(path); result._retCode = RetCode.OK; @@ -278,7 +294,14 @@ public class ZkBaseDataAccessor<T> implements BaseDataAccessor<T> { // this will happen if parent node does not exist String parentPath = HelixUtil.getZkParentPath(path); try { - AccessResult res = doCreate(parentPath, null, AccessOption.PERSISTENT); + AccessResult res; + if (mode.isTTL()) { + res = doCreate(parentPath, null, options, ttl); + } else if (mode.isContainer()) { + res = doCreate(parentPath, null, AccessOption.CONTAINER); + } else { + res = doCreate(parentPath, null, AccessOption.PERSISTENT); + } result._pathCreated.addAll(res._pathCreated); RetCode rc = res._retCode; if (rc == RetCode.OK || rc == RetCode.NODE_EXISTS) { @@ -720,6 +743,14 @@ public class ZkBaseDataAccessor<T> implements BaseDataAccessor<T> { */ ZkAsyncCallbacks.CreateCallbackHandler[] create(List<String> paths, List<T> records, boolean[] needCreate, List<List<String>> pathsCreated, int options) { + return create(paths, records, needCreate, pathsCreated, options, ZkClient.TTL_NOT_SET); + } + + /** + * async create with TTL. give up on error other than NONODE + */ + ZkAsyncCallbacks.CreateCallbackHandler[] create(List<String> paths, List<T> records, + boolean[] needCreate, List<List<String>> pathsCreated, int options, long ttl) { if ((records != null && records.size() != paths.size()) || needCreate.length != paths.size() || (pathsCreated != null && pathsCreated.size() != paths.size())) { throw new IllegalArgumentException( @@ -747,7 +778,11 @@ public class ZkBaseDataAccessor<T> implements BaseDataAccessor<T> { String path = paths.get(i); T record = records == null ? null : records.get(i); cbList[i] = new ZkAsyncCallbacks.CreateCallbackHandler(); - _zkClient.asyncCreate(path, record, mode, cbList[i]); + if (mode.isTTL()) { + _zkClient.asyncCreate(path, record, mode, ttl, cbList[i]); + } else { + _zkClient.asyncCreate(path, record, mode, cbList[i]); + } } List<String> parentPaths = new ArrayList<>(Collections.<String>nCopies(paths.size(), null)); @@ -784,8 +819,16 @@ public class ZkBaseDataAccessor<T> implements BaseDataAccessor<T> { if (failOnNoNode) { boolean[] needCreateParent = Arrays.copyOf(needCreate, needCreate.length); - ZkAsyncCallbacks.CreateCallbackHandler[] parentCbList = - create(parentPaths, null, needCreateParent, pathsCreated, AccessOption.PERSISTENT); + ZkAsyncCallbacks.CreateCallbackHandler[] parentCbList; + if (mode.isTTL()) { + parentCbList = create(parentPaths, null, needCreateParent, pathsCreated, options, ttl); + } else if (mode.isContainer()) { + parentCbList = + create(parentPaths, null, needCreateParent, pathsCreated, AccessOption.CONTAINER); + } else { + parentCbList = + create(parentPaths, null, needCreateParent, pathsCreated, AccessOption.PERSISTENT); + } for (int i = 0; i < parentCbList.length; i++) { ZkAsyncCallbacks.CreateCallbackHandler parentCb = parentCbList[i]; if (parentCb == null) { @@ -812,6 +855,15 @@ public class ZkBaseDataAccessor<T> implements BaseDataAccessor<T> { */ @Override public boolean[] createChildren(List<String> paths, List<T> records, int options) { + return createChildren(paths, records, options, ZkClient.TTL_NOT_SET); + } + + /** + * async create with TTL + * TODO: rename to create + */ + @Override + public boolean[] createChildren(List<String> paths, List<T> records, int options, long ttl) { boolean[] success = new boolean[paths.size()]; CreateMode mode = AccessOption.getMode(options); @@ -829,7 +881,7 @@ public class ZkBaseDataAccessor<T> implements BaseDataAccessor<T> { try { ZkAsyncCallbacks.CreateCallbackHandler[] cbList = - create(paths, records, needCreate, pathsCreated, options); + create(paths, records, needCreate, pathsCreated, options, ttl); for (int i = 0; i < cbList.length; i++) { ZkAsyncCallbacks.CreateCallbackHandler cb = cbList[i]; diff --git a/helix-core/src/main/java/org/apache/helix/manager/zk/ZkCacheBaseDataAccessor.java b/helix-core/src/main/java/org/apache/helix/manager/zk/ZkCacheBaseDataAccessor.java index e16519958..6a635a5da 100644 --- a/helix-core/src/main/java/org/apache/helix/manager/zk/ZkCacheBaseDataAccessor.java +++ b/helix-core/src/main/java/org/apache/helix/manager/zk/ZkCacheBaseDataAccessor.java @@ -38,6 +38,7 @@ import org.apache.helix.zookeeper.api.client.RealmAwareZkClient; import org.apache.helix.zookeeper.zkclient.DataUpdater; import org.apache.helix.zookeeper.zkclient.IZkChildListener; import org.apache.helix.zookeeper.zkclient.IZkDataListener; +import org.apache.helix.zookeeper.zkclient.ZkClient; import org.apache.helix.zookeeper.zkclient.callback.ZkAsyncCallbacks; import org.apache.helix.zookeeper.zkclient.exception.ZkNoNodeException; import org.apache.helix.zookeeper.zkclient.serialize.ZkSerializer; @@ -225,6 +226,11 @@ public class ZkCacheBaseDataAccessor<T> implements HelixPropertyStore<T> { @Override public boolean create(String path, T data, int options) { + return create(path, data, options, ZkClient.TTL_NOT_SET); + } + + @Override + public boolean create(String path, T data, int options, long ttl) { String clientPath = path; String serverPath = prependChroot(clientPath); @@ -233,7 +239,7 @@ public class ZkCacheBaseDataAccessor<T> implements HelixPropertyStore<T> { try { cache.lockWrite(); ZkBaseDataAccessor<T>.AccessResult result = - _baseAccessor.doCreate(serverPath, data, options); + _baseAccessor.doCreate(serverPath, data, options, ttl); boolean success = (result._retCode == RetCode.OK); updateCache(cache, result._pathCreated, success, serverPath, data, ZNode.ZERO_STAT); @@ -245,7 +251,7 @@ public class ZkCacheBaseDataAccessor<T> implements HelixPropertyStore<T> { } // no cache - return _baseAccessor.create(serverPath, data, options); + return _baseAccessor.create(serverPath, data, options, ttl); } @Override @@ -426,6 +432,11 @@ public class ZkCacheBaseDataAccessor<T> implements HelixPropertyStore<T> { @Override public boolean[] createChildren(List<String> paths, List<T> records, int options) { + return createChildren(paths, records, options, ZkClient.TTL_NOT_SET); + } + + @Override + public boolean[] createChildren(List<String> paths, List<T> records, int options, long ttl) { final int size = paths.size(); List<String> serverPaths = prependChroot(paths); @@ -438,7 +449,7 @@ public class ZkCacheBaseDataAccessor<T> implements HelixPropertyStore<T> { List<List<String>> pathsCreatedList = new ArrayList<List<String>>(Collections.<List<String>>nCopies(size, null)); ZkAsyncCallbacks.CreateCallbackHandler[] createCbList = - _baseAccessor.create(serverPaths, records, needCreate, pathsCreatedList, options); + _baseAccessor.create(serverPaths, records, needCreate, pathsCreatedList, options, ttl); boolean[] success = new boolean[size]; for (int i = 0; i < size; i++) { @@ -456,7 +467,7 @@ public class ZkCacheBaseDataAccessor<T> implements HelixPropertyStore<T> { } // no cache - return _baseAccessor.createChildren(serverPaths, records, options); + return _baseAccessor.createChildren(serverPaths, records, options, ttl); } @Override diff --git a/helix-core/src/test/java/org/apache/helix/manager/zk/TestZkBaseDataAccessor.java b/helix-core/src/test/java/org/apache/helix/manager/zk/TestZkBaseDataAccessor.java index f473be806..0ce99d59e 100644 --- a/helix-core/src/test/java/org/apache/helix/manager/zk/TestZkBaseDataAccessor.java +++ b/helix-core/src/test/java/org/apache/helix/manager/zk/TestZkBaseDataAccessor.java @@ -210,6 +210,69 @@ public class TestZkBaseDataAccessor extends ZkUnitTestBase { System.out.println("END " + testName + " at " + new Date(System.currentTimeMillis())); } + @Test + public void testSyncCreateWithTTL() { + System.setProperty("zookeeper.extendedTypesEnabled", "true"); + String className = TestHelper.getTestClassName(); + String methodName = TestHelper.getTestMethodName(); + String testName = className + "_" + methodName; + + System.out.println("START " + testName + " at " + new Date(System.currentTimeMillis())); + + String path = String.format("/%s/%s", _rootPath, "msg_0"); + ZNRecord record = new ZNRecord("msg_0"); + ZkBaseDataAccessor<ZNRecord> accessor = new ZkBaseDataAccessor<>(_gZkClient); + + boolean success = accessor.create(path, record, AccessOption.PERSISTENT_WITH_TTL); + Assert.assertFalse(success); + long ttl = 1L; + success = accessor.create(path, record, AccessOption.PERSISTENT_WITH_TTL, ttl); + Assert.assertTrue(success); + ZNRecord getRecord = _gZkClient.readData(path); + Assert.assertNotNull(getRecord); + Assert.assertEquals(getRecord.getId(), "msg_0"); + + record.setSimpleField("key0", "value0"); + success = accessor.create(path, record, AccessOption.PERSISTENT_WITH_TTL, ttl); + Assert.assertFalse(success, "Should fail since node already exists"); + getRecord = _gZkClient.readData(path); + Assert.assertNotNull(getRecord); + Assert.assertEquals(getRecord.getSimpleFields().size(), 0); + + System.clearProperty("zookeeper.extendedTypesEnabled"); + System.out.println("END " + testName + " at " + new Date(System.currentTimeMillis())); + } + + @Test + public void testSyncCreateContainer() { + System.setProperty("zookeeper.extendedTypesEnabled", "true"); + String className = TestHelper.getTestClassName(); + String methodName = TestHelper.getTestMethodName(); + String testName = className + "_" + methodName; + + System.out.println("START " + testName + " at " + new Date(System.currentTimeMillis())); + + String path = String.format("/%s/%s", _rootPath, "msg_0"); + ZNRecord record = new ZNRecord("msg_0"); + ZkBaseDataAccessor<ZNRecord> accessor = new ZkBaseDataAccessor<>(_gZkClient); + + boolean success = accessor.create(path, record, AccessOption.CONTAINER); + Assert.assertTrue(success); + ZNRecord getRecord = _gZkClient.readData(path); + Assert.assertNotNull(getRecord); + Assert.assertEquals(getRecord.getId(), "msg_0"); + + record.setSimpleField("key0", "value0"); + success = accessor.create(path, record, AccessOption.CONTAINER); + Assert.assertFalse(success, "Should fail since node already exists"); + getRecord = _gZkClient.readData(path); + Assert.assertNotNull(getRecord); + Assert.assertEquals(getRecord.getSimpleFields().size(), 0); + + System.clearProperty("zookeeper.extendedTypesEnabled"); + System.out.println("END " + testName + " at " + new Date(System.currentTimeMillis())); + } + @Test public void testDefaultAccessorCreateCustomData() { String className = TestHelper.getTestClassName(); @@ -513,6 +576,52 @@ public class TestZkBaseDataAccessor extends ZkUnitTestBase { Assert.assertEquals(record.getId(), msgId, "Should get what we created"); } + // test async createChildren with TTL + System.setProperty("zookeeper.extendedTypesEnabled", "true"); + records = new ArrayList<>(); + paths = new ArrayList<>(); + for (int i = 0; i < 10; i++) { + String msgId = "msg_" + i; + paths.add(PropertyPathBuilder.instanceMessage(root, "host_2", msgId)); + records.add(new ZNRecord(msgId)); + } + success = accessor.createChildren(paths, records, AccessOption.PERSISTENT_WITH_TTL, 1L); + for (int i = 0; i < 10; i++) { + String msgId = "msg_" + i; + Assert.assertTrue(success[i], "Should succeed in create " + msgId); + } + + // test get what we created + for (int i = 0; i < 10; i++) { + String msgId = "msg_" + i; + String path = PropertyPathBuilder.instanceMessage(root, "host_2", msgId); + ZNRecord record = _gZkClient.readData(path); + Assert.assertEquals(record.getId(), msgId, "Should get what we created"); + } + + // test async createChildren with Container mode + records = new ArrayList<>(); + paths = new ArrayList<>(); + for (int i = 0; i < 10; i++) { + String msgId = "msg_" + i; + paths.add(PropertyPathBuilder.instanceMessage(root, "host_3", msgId)); + records.add(new ZNRecord(msgId)); + } + success = accessor.createChildren(paths, records, AccessOption.CONTAINER); + for (int i = 0; i < 10; i++) { + String msgId = "msg_" + i; + Assert.assertTrue(success[i], "Should succeed in create " + msgId); + } + + // test get what we created + for (int i = 0; i < 10; i++) { + String msgId = "msg_" + i; + String path = PropertyPathBuilder.instanceMessage(root, "host_3", msgId); + ZNRecord record = _gZkClient.readData(path); + Assert.assertEquals(record.getId(), msgId, "Should get what we created"); + } + System.clearProperty("zookeeper.extendedTypesEnabled"); + // test async setChildren records = new ArrayList<>(); paths = new ArrayList<>(); diff --git a/helix-core/src/test/java/org/apache/helix/mock/MockBaseDataAccessor.java b/helix-core/src/test/java/org/apache/helix/mock/MockBaseDataAccessor.java index e22fcc2d3..1567b98cc 100644 --- a/helix-core/src/test/java/org/apache/helix/mock/MockBaseDataAccessor.java +++ b/helix-core/src/test/java/org/apache/helix/mock/MockBaseDataAccessor.java @@ -67,6 +67,11 @@ public class MockBaseDataAccessor implements BaseDataAccessor<ZNRecord> { return set(path, record, options); } + @Override + public boolean create(String path, ZNRecord record, int options, long ttl) { + return set(path, record, options); + } + @Override public boolean set(String path, ZNRecord record, int options) { ZNode zNode = _recordMap.get(path); @@ -112,6 +117,12 @@ public class MockBaseDataAccessor implements BaseDataAccessor<ZNRecord> { return setChildren(paths, records, options); } + @Override + public boolean[] createChildren(List<String> paths, List<ZNRecord> records, + int options, long ttl) { + return setChildren(paths, records, options); + } + @Override public boolean[] setChildren(List<String> paths, List<ZNRecord> records, int options) { boolean [] ret = new boolean[paths.size()];
