This is an automated email from the ASF dual-hosted git repository.

xyuanlu pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/helix.git


The following commit(s) were added to refs/heads/master by this push:
     new 7b724aa06 Add ZK multi set with updater to ZKBaseDataAccesor (#2770)
7b724aa06 is described below

commit 7b724aa06ac3fb1c64db81892287f9864e44f5c2
Author: Zachary Pinto <[email protected]>
AuthorDate: Mon Mar 4 13:18:24 2024 -0800

    Add ZK multi set with updater to ZKBaseDataAccesor (#2770)
    
    * Add ZK multi set with updated to ZKBaseAccesor. Allows for version based 
transactional sets.
    
    * Fix exception handling for multi set.
---
 .../java/org/apache/helix/BaseDataAccessor.java    | 13 ++++
 .../helix/manager/zk/ZkBaseDataAccessor.java       | 50 ++++++++++++++
 .../helix/manager/zk/TestZkBaseDataAccessor.java   | 79 ++++++++++++++++++++++
 3 files changed, 142 insertions(+)

diff --git a/helix-core/src/main/java/org/apache/helix/BaseDataAccessor.java 
b/helix-core/src/main/java/org/apache/helix/BaseDataAccessor.java
index 110b6810e..162ffb05d 100644
--- a/helix-core/src/main/java/org/apache/helix/BaseDataAccessor.java
+++ b/helix-core/src/main/java/org/apache/helix/BaseDataAccessor.java
@@ -20,7 +20,9 @@ package org.apache.helix;
  */
 
 import java.util.List;
+import java.util.Map;
 
+import org.apache.commons.lang3.NotImplementedException;
 import org.apache.helix.zookeeper.zkclient.DataUpdater;
 import org.apache.helix.zookeeper.zkclient.IZkChildListener;
 import org.apache.helix.zookeeper.zkclient.IZkDataListener;
@@ -91,6 +93,17 @@ public interface BaseDataAccessor<T> {
    */
   boolean update(String path, DataUpdater<T> updater, int options);
 
+  /**
+   * This will attempt to update the data using the updater using
+   * each updater for the corresponding path.
+   * This should be used on existing ZNodes only.
+   * @param updaterByPath updaters for each path to update
+   * @return true if all the updates succeeded, false otherwise
+   */
+  default boolean multiSet(Map<String, DataUpdater<T>> updaterByPath) {
+    throw new NotImplementedException("multiSet is not implemented");
+  }
+
   /**
    * This will remove the ZNode and all its descendants if any
    * @param path path to the root ZNode to remove
diff --git 
a/helix-core/src/main/java/org/apache/helix/manager/zk/ZkBaseDataAccessor.java 
b/helix-core/src/main/java/org/apache/helix/manager/zk/ZkBaseDataAccessor.java
index 0c7a94f1b..e074a22e6 100644
--- 
a/helix-core/src/main/java/org/apache/helix/manager/zk/ZkBaseDataAccessor.java
+++ 
b/helix-core/src/main/java/org/apache/helix/manager/zk/ZkBaseDataAccessor.java
@@ -56,7 +56,10 @@ import 
org.apache.helix.zookeeper.zkclient.exception.ZkNodeExistsException;
 import org.apache.helix.zookeeper.zkclient.serialize.PathBasedZkSerializer;
 import org.apache.helix.zookeeper.zkclient.serialize.ZkSerializer;
 import org.apache.zookeeper.CreateMode;
+import org.apache.zookeeper.KeeperException;
 import org.apache.zookeeper.KeeperException.Code;
+import org.apache.zookeeper.Op;
+import org.apache.zookeeper.OpResult;
 import org.apache.zookeeper.data.Stat;
 import org.apache.zookeeper.server.DataTree;
 import org.slf4j.Logger;
@@ -485,6 +488,53 @@ public class ZkBaseDataAccessor<T> implements 
BaseDataAccessor<T> {
     return result;
   }
 
+  /**
+   * transactional sync set
+   */
+  @Override
+  public boolean multiSet(Map<String, DataUpdater<T>> updaterByPath) {
+    AccessResult result = doMultiSet(updaterByPath);
+    return result._retCode == RetCode.OK;
+  }
+
+  private AccessResult doMultiSet(Map<String, DataUpdater<T>> updaterByPath) {
+    AccessResult result = new AccessResult();
+    boolean retry;
+    do {
+      retry = false;
+      List<Op> ops = new ArrayList<>();
+      try {
+        for (Map.Entry<String, DataUpdater<T>> entry : 
updaterByPath.entrySet()) {
+          String path = entry.getKey();
+          DataUpdater<T> updater = entry.getValue();
+          Stat readStat = new Stat();
+          T oldData = (T) _zkClient.readData(path, readStat);
+          T newData = updater.update(oldData);
+          if (newData != null) {
+            ops.add(Op.setData(path, _zkClient.serialize(newData, path), 
readStat.getVersion()));
+          }
+        }
+      } catch (Exception e1) {
+        LOG.error("Exception while reading paths: " + updaterByPath.keySet(), 
e1);
+        result._retCode = RetCode.ERROR;
+        return result;
+      }
+
+      try {
+        _zkClient.multi(ops);
+      } catch (ZkBadVersionException e) {
+        retry = true;
+      } catch (Exception e1) {
+        LOG.error("Exception while updating paths: " + updaterByPath.keySet(), 
e1);
+        result._retCode = RetCode.ERROR;
+        return result;
+      }
+    } while (retry);
+
+    result._retCode = RetCode.OK;
+    return result;
+  }
+
   /**
    * sync get
    */
diff --git 
a/helix-core/src/test/java/org/apache/helix/manager/zk/TestZkBaseDataAccessor.java
 
b/helix-core/src/test/java/org/apache/helix/manager/zk/TestZkBaseDataAccessor.java
index 8d9cd29d9..bf4f342dd 100644
--- 
a/helix-core/src/test/java/org/apache/helix/manager/zk/TestZkBaseDataAccessor.java
+++ 
b/helix-core/src/test/java/org/apache/helix/manager/zk/TestZkBaseDataAccessor.java
@@ -23,6 +23,7 @@ import java.util.ArrayList;
 import java.util.Arrays;
 import java.util.Date;
 import java.util.List;
+import java.util.Map;
 import java.util.stream.Collectors;
 
 import com.google.common.collect.ImmutableList;
@@ -419,6 +420,84 @@ public class TestZkBaseDataAccessor extends ZkUnitTestBase 
{
     System.out.println("END " + testName + " at " + new 
Date(System.currentTimeMillis()));
   }
 
+  @Test
+  public void testSyncMultiSet() {
+    String className = TestHelper.getTestClassName();
+    String methodName = TestHelper.getTestMethodName();
+    String testName = className + "_" + methodName;
+
+    System.out.println("START " + testName + " at " + new 
Date(System.currentTimeMillis()));
+
+    String path1 = String.format("/%s/%s", _rootPath, "foo");
+    String path2 = String.format("/%s/%s", _rootPath, "bar");
+    ZkBaseDataAccessor<ZNRecord> accessor = new 
ZkBaseDataAccessor<ZNRecord>(_gZkClient);
+    accessor.create(path1, new ZNRecord("foo"), AccessOption.PERSISTENT);
+    accessor.create(path2, new ZNRecord("bar"), AccessOption.PERSISTENT);
+
+    boolean success = accessor.multiSet(Map.of(path1, new 
DataUpdater<ZNRecord>() {
+      @Override
+      public ZNRecord update(ZNRecord currentData) {
+        currentData.setMapField("key", Map.of("key1", "value1"));
+        return currentData;
+      }
+    }, path2, new DataUpdater<ZNRecord>() {
+      @Override
+      public ZNRecord update(ZNRecord currentData) {
+        currentData.setSimpleField("key", "value");
+        return currentData;
+      }
+    }));
+    Assert.assertTrue(success);
+    ZNRecord getRecord1 = _gZkClient.readData(path1);
+    ZNRecord getRecord2 = _gZkClient.readData(path2);
+
+    Assert.assertNotNull(getRecord1);
+    Assert.assertEquals(getRecord1.getId(), "foo");
+    Assert.assertEquals(getRecord1.getMapField("key"), Map.of("key1", 
"value1"));
+
+    Assert.assertNotNull(getRecord2);
+    Assert.assertEquals(getRecord2.getId(), "bar");
+    Assert.assertEquals(getRecord2.getSimpleField("key"), "value");
+
+    System.out.println("END " + testName + " at " + new 
Date(System.currentTimeMillis()));
+  }
+
+  @Test
+  public void testSyncMultiSetOneRecordNoExist() {
+    String className = TestHelper.getTestClassName();
+    String methodName = TestHelper.getTestMethodName();
+    String testName = className + "_" + methodName;
+
+    System.out.println("START " + testName + " at " + new 
Date(System.currentTimeMillis()));
+
+    String path1 = String.format("/%s/%s", _rootPath, "foo");
+    String path2 = String.format("/%s/%s", _rootPath, "bar");
+    ZkBaseDataAccessor<ZNRecord> accessor = new 
ZkBaseDataAccessor<ZNRecord>(_gZkClient);
+    accessor.create(path1, new ZNRecord("foo"), AccessOption.PERSISTENT);
+
+    boolean success = accessor.multiSet(Map.of(path1, new 
DataUpdater<ZNRecord>() {
+      @Override
+      public ZNRecord update(ZNRecord currentData) {
+        currentData.setMapField("key", Map.of("key1", "value1"));
+        return currentData;
+      }
+    }, path2, new DataUpdater<ZNRecord>() {
+      @Override
+      public ZNRecord update(ZNRecord currentData) {
+        currentData.setSimpleField("key", "value");
+        return currentData;
+      }
+    }));
+    Assert.assertFalse(success);
+    ZNRecord getRecord1 = _gZkClient.readData(path1);
+
+    Assert.assertNotNull(getRecord1);
+    Assert.assertEquals(getRecord1.getId(), "foo");
+    Assert.assertNotSame(getRecord1.getMapField("key"), Map.of("key1", 
"value1"));
+
+    System.out.println("END " + testName + " at " + new 
Date(System.currentTimeMillis()));
+  }
+
   @Test
   public void testSyncRemove() {
     String className = TestHelper.getTestClassName();

Reply via email to