This is an automated email from the ASF dual-hosted git repository.
xyuanlu pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/helix.git
The following commit(s) were added to refs/heads/master by this push:
new 7b724aa06 Add ZK multi set with updater to ZKBaseDataAccesor (#2770)
7b724aa06 is described below
commit 7b724aa06ac3fb1c64db81892287f9864e44f5c2
Author: Zachary Pinto <[email protected]>
AuthorDate: Mon Mar 4 13:18:24 2024 -0800
Add ZK multi set with updater to ZKBaseDataAccesor (#2770)
* Add ZK multi set with updated to ZKBaseAccesor. Allows for version based
transactional sets.
* Fix exception handling for multi set.
---
.../java/org/apache/helix/BaseDataAccessor.java | 13 ++++
.../helix/manager/zk/ZkBaseDataAccessor.java | 50 ++++++++++++++
.../helix/manager/zk/TestZkBaseDataAccessor.java | 79 ++++++++++++++++++++++
3 files changed, 142 insertions(+)
diff --git a/helix-core/src/main/java/org/apache/helix/BaseDataAccessor.java
b/helix-core/src/main/java/org/apache/helix/BaseDataAccessor.java
index 110b6810e..162ffb05d 100644
--- a/helix-core/src/main/java/org/apache/helix/BaseDataAccessor.java
+++ b/helix-core/src/main/java/org/apache/helix/BaseDataAccessor.java
@@ -20,7 +20,9 @@ package org.apache.helix;
*/
import java.util.List;
+import java.util.Map;
+import org.apache.commons.lang3.NotImplementedException;
import org.apache.helix.zookeeper.zkclient.DataUpdater;
import org.apache.helix.zookeeper.zkclient.IZkChildListener;
import org.apache.helix.zookeeper.zkclient.IZkDataListener;
@@ -91,6 +93,17 @@ public interface BaseDataAccessor<T> {
*/
boolean update(String path, DataUpdater<T> updater, int options);
+ /**
+ * This will attempt to update the data using the updater using
+ * each updater for the corresponding path.
+ * This should be used on existing ZNodes only.
+ * @param updaterByPath updaters for each path to update
+ * @return true if all the updates succeeded, false otherwise
+ */
+ default boolean multiSet(Map<String, DataUpdater<T>> updaterByPath) {
+ throw new NotImplementedException("multiSet is not implemented");
+ }
+
/**
* This will remove the ZNode and all its descendants if any
* @param path path to the root ZNode to remove
diff --git
a/helix-core/src/main/java/org/apache/helix/manager/zk/ZkBaseDataAccessor.java
b/helix-core/src/main/java/org/apache/helix/manager/zk/ZkBaseDataAccessor.java
index 0c7a94f1b..e074a22e6 100644
---
a/helix-core/src/main/java/org/apache/helix/manager/zk/ZkBaseDataAccessor.java
+++
b/helix-core/src/main/java/org/apache/helix/manager/zk/ZkBaseDataAccessor.java
@@ -56,7 +56,10 @@ import
org.apache.helix.zookeeper.zkclient.exception.ZkNodeExistsException;
import org.apache.helix.zookeeper.zkclient.serialize.PathBasedZkSerializer;
import org.apache.helix.zookeeper.zkclient.serialize.ZkSerializer;
import org.apache.zookeeper.CreateMode;
+import org.apache.zookeeper.KeeperException;
import org.apache.zookeeper.KeeperException.Code;
+import org.apache.zookeeper.Op;
+import org.apache.zookeeper.OpResult;
import org.apache.zookeeper.data.Stat;
import org.apache.zookeeper.server.DataTree;
import org.slf4j.Logger;
@@ -485,6 +488,53 @@ public class ZkBaseDataAccessor<T> implements
BaseDataAccessor<T> {
return result;
}
+ /**
+ * transactional sync set
+ */
+ @Override
+ public boolean multiSet(Map<String, DataUpdater<T>> updaterByPath) {
+ AccessResult result = doMultiSet(updaterByPath);
+ return result._retCode == RetCode.OK;
+ }
+
+ private AccessResult doMultiSet(Map<String, DataUpdater<T>> updaterByPath) {
+ AccessResult result = new AccessResult();
+ boolean retry;
+ do {
+ retry = false;
+ List<Op> ops = new ArrayList<>();
+ try {
+ for (Map.Entry<String, DataUpdater<T>> entry :
updaterByPath.entrySet()) {
+ String path = entry.getKey();
+ DataUpdater<T> updater = entry.getValue();
+ Stat readStat = new Stat();
+ T oldData = (T) _zkClient.readData(path, readStat);
+ T newData = updater.update(oldData);
+ if (newData != null) {
+ ops.add(Op.setData(path, _zkClient.serialize(newData, path),
readStat.getVersion()));
+ }
+ }
+ } catch (Exception e1) {
+ LOG.error("Exception while reading paths: " + updaterByPath.keySet(),
e1);
+ result._retCode = RetCode.ERROR;
+ return result;
+ }
+
+ try {
+ _zkClient.multi(ops);
+ } catch (ZkBadVersionException e) {
+ retry = true;
+ } catch (Exception e1) {
+ LOG.error("Exception while updating paths: " + updaterByPath.keySet(),
e1);
+ result._retCode = RetCode.ERROR;
+ return result;
+ }
+ } while (retry);
+
+ result._retCode = RetCode.OK;
+ return result;
+ }
+
/**
* sync get
*/
diff --git
a/helix-core/src/test/java/org/apache/helix/manager/zk/TestZkBaseDataAccessor.java
b/helix-core/src/test/java/org/apache/helix/manager/zk/TestZkBaseDataAccessor.java
index 8d9cd29d9..bf4f342dd 100644
---
a/helix-core/src/test/java/org/apache/helix/manager/zk/TestZkBaseDataAccessor.java
+++
b/helix-core/src/test/java/org/apache/helix/manager/zk/TestZkBaseDataAccessor.java
@@ -23,6 +23,7 @@ import java.util.ArrayList;
import java.util.Arrays;
import java.util.Date;
import java.util.List;
+import java.util.Map;
import java.util.stream.Collectors;
import com.google.common.collect.ImmutableList;
@@ -419,6 +420,84 @@ public class TestZkBaseDataAccessor extends ZkUnitTestBase
{
System.out.println("END " + testName + " at " + new
Date(System.currentTimeMillis()));
}
+ @Test
+ public void testSyncMultiSet() {
+ String className = TestHelper.getTestClassName();
+ String methodName = TestHelper.getTestMethodName();
+ String testName = className + "_" + methodName;
+
+ System.out.println("START " + testName + " at " + new
Date(System.currentTimeMillis()));
+
+ String path1 = String.format("/%s/%s", _rootPath, "foo");
+ String path2 = String.format("/%s/%s", _rootPath, "bar");
+ ZkBaseDataAccessor<ZNRecord> accessor = new
ZkBaseDataAccessor<ZNRecord>(_gZkClient);
+ accessor.create(path1, new ZNRecord("foo"), AccessOption.PERSISTENT);
+ accessor.create(path2, new ZNRecord("bar"), AccessOption.PERSISTENT);
+
+ boolean success = accessor.multiSet(Map.of(path1, new
DataUpdater<ZNRecord>() {
+ @Override
+ public ZNRecord update(ZNRecord currentData) {
+ currentData.setMapField("key", Map.of("key1", "value1"));
+ return currentData;
+ }
+ }, path2, new DataUpdater<ZNRecord>() {
+ @Override
+ public ZNRecord update(ZNRecord currentData) {
+ currentData.setSimpleField("key", "value");
+ return currentData;
+ }
+ }));
+ Assert.assertTrue(success);
+ ZNRecord getRecord1 = _gZkClient.readData(path1);
+ ZNRecord getRecord2 = _gZkClient.readData(path2);
+
+ Assert.assertNotNull(getRecord1);
+ Assert.assertEquals(getRecord1.getId(), "foo");
+ Assert.assertEquals(getRecord1.getMapField("key"), Map.of("key1",
"value1"));
+
+ Assert.assertNotNull(getRecord2);
+ Assert.assertEquals(getRecord2.getId(), "bar");
+ Assert.assertEquals(getRecord2.getSimpleField("key"), "value");
+
+ System.out.println("END " + testName + " at " + new
Date(System.currentTimeMillis()));
+ }
+
+ @Test
+ public void testSyncMultiSetOneRecordNoExist() {
+ String className = TestHelper.getTestClassName();
+ String methodName = TestHelper.getTestMethodName();
+ String testName = className + "_" + methodName;
+
+ System.out.println("START " + testName + " at " + new
Date(System.currentTimeMillis()));
+
+ String path1 = String.format("/%s/%s", _rootPath, "foo");
+ String path2 = String.format("/%s/%s", _rootPath, "bar");
+ ZkBaseDataAccessor<ZNRecord> accessor = new
ZkBaseDataAccessor<ZNRecord>(_gZkClient);
+ accessor.create(path1, new ZNRecord("foo"), AccessOption.PERSISTENT);
+
+ boolean success = accessor.multiSet(Map.of(path1, new
DataUpdater<ZNRecord>() {
+ @Override
+ public ZNRecord update(ZNRecord currentData) {
+ currentData.setMapField("key", Map.of("key1", "value1"));
+ return currentData;
+ }
+ }, path2, new DataUpdater<ZNRecord>() {
+ @Override
+ public ZNRecord update(ZNRecord currentData) {
+ currentData.setSimpleField("key", "value");
+ return currentData;
+ }
+ }));
+ Assert.assertFalse(success);
+ ZNRecord getRecord1 = _gZkClient.readData(path1);
+
+ Assert.assertNotNull(getRecord1);
+ Assert.assertEquals(getRecord1.getId(), "foo");
+ Assert.assertNotSame(getRecord1.getMapField("key"), Map.of("key1",
"value1"));
+
+ System.out.println("END " + testName + " at " + new
Date(System.currentTimeMillis()));
+ }
+
@Test
public void testSyncRemove() {
String className = TestHelper.getTestClassName();