This is an automated email from the ASF dual-hosted git repository.

jxue pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/helix.git


The following commit(s) were added to refs/heads/master by this push:
     new 3055f26aa Metaclient updater retry logic (#2805)
3055f26aa is described below

commit 3055f26aa4f040f549ca91581fc6b2ff6b275f33
Author: Grant Paláu Spencer <[email protected]>
AuthorDate: Fri Jun 21 16:01:09 2024 -0700

    Metaclient updater retry logic (#2805)
    
    Add retry logic to MetaClient Updater
---
 .../helix/metaclient/api/MetaClientInterface.java  |  15 +-
 .../helix/metaclient/impl/zk/ZkMetaClient.java     |  67 ++++++--
 .../helix/metaclient/impl/zk/TestZkMetaClient.java | 168 ++++++++++++++++++---
 3 files changed, 222 insertions(+), 28 deletions(-)

diff --git 
a/meta-client/src/main/java/org/apache/helix/metaclient/api/MetaClientInterface.java
 
b/meta-client/src/main/java/org/apache/helix/metaclient/api/MetaClientInterface.java
index 5403e7ca4..c8a1d0d36 100644
--- 
a/meta-client/src/main/java/org/apache/helix/metaclient/api/MetaClientInterface.java
+++ 
b/meta-client/src/main/java/org/apache/helix/metaclient/api/MetaClientInterface.java
@@ -196,13 +196,26 @@ public interface MetaClientInterface<T> {
 
   /**
    * Update existing data of a given key using an updater. This method will 
issue a read to get
-   * current data and apply updater upon the current data.
+   * current data and apply updater upon the current data. This method will 
NOT retry applying updated
+   * data upon failure.
    * @param key key to identify the entry
    * @param updater An updater that modifies the entry value.
    * @return the updated value.
    */
   T update(final String key, DataUpdater<T> updater);
 
+
+  /**
+   * Update existing data of a given key using an updater. This method will 
issue a read to get
+   * current data and apply updater upon the current data.
+   * @param key key to identify the entry
+   * @param updater An updater that modifies the entry value.
+   * @param retryOnFailure If true, updater should retry applying updated data 
upon failure.
+   * @param createIfAbsent If true, create the entry if it does not exist.
+   * @return the updated value.
+   */
+  T update(final String key, DataUpdater<T> updater, boolean retryOnFailure, 
boolean createIfAbsent);
+
   /**
    * Check if there is an entry for the given key.
    * @param key key to identify the entry
diff --git 
a/meta-client/src/main/java/org/apache/helix/metaclient/impl/zk/ZkMetaClient.java
 
b/meta-client/src/main/java/org/apache/helix/metaclient/impl/zk/ZkMetaClient.java
index 16d28c6d7..f4594d5d6 100644
--- 
a/meta-client/src/main/java/org/apache/helix/metaclient/impl/zk/ZkMetaClient.java
+++ 
b/meta-client/src/main/java/org/apache/helix/metaclient/impl/zk/ZkMetaClient.java
@@ -19,6 +19,7 @@ package org.apache.helix.metaclient.impl.zk;
  * under the License.
  */
 
+import java.util.ConcurrentModificationException;
 import java.util.List;
 import java.util.concurrent.Executors;
 import java.util.concurrent.ScheduledExecutorService;
@@ -39,6 +40,7 @@ import 
org.apache.helix.metaclient.api.DirectChildSubscribeResult;
 import org.apache.helix.metaclient.api.MetaClientInterface;
 import org.apache.helix.metaclient.api.Op;
 import org.apache.helix.metaclient.api.OpResult;
+import org.apache.helix.metaclient.exception.MetaClientBadVersionException;
 import org.apache.helix.metaclient.exception.MetaClientException;
 import org.apache.helix.metaclient.exception.MetaClientNoNodeException;
 import org.apache.helix.metaclient.exception.MetaClientNodeExistsException;
@@ -207,16 +209,61 @@ public class ZkMetaClient<T> implements 
MetaClientInterface<T>, AutoCloseable {
 
   @Override
   public T update(String key, DataUpdater<T> updater) {
-    org.apache.zookeeper.data.Stat stat = new org.apache.zookeeper.data.Stat();
-    // TODO: add retry logic for ZkBadVersionException.
-    try {
-      T oldData = _zkClient.readData(key, stat);
-      T newData = updater.update(oldData);
-      set(key, newData, stat.getVersion());
-      return newData;
-    } catch (ZkException e) {
-      throw translateZkExceptionToMetaclientException(e);
-    }
+    return update(key, updater, false, false);
+  }
+
+  @Override
+  public T update(String key, DataUpdater<T> updater, boolean retryOnFailure, 
boolean createIfAbsent) {
+    final int MAX_RETRY_ATTEMPTS = 3;
+    int retryAttempts = 0;
+    boolean retry;
+    T updatedData = null;
+    do {
+      retry = false;
+      retryAttempts++;
+      try {
+        ImmutablePair<T, Stat> tup = getDataAndStat(key);
+        Stat stat = tup.right;
+        T oldData = tup.left;
+        T newData = updater.update(oldData);
+        set(key, newData, stat.getVersion());
+        updatedData = newData;
+      } catch (MetaClientBadVersionException badVersionException) {
+        // If exceeded max retry attempts, re-throw exception
+        if (retryAttempts >= MAX_RETRY_ATTEMPTS) {
+          LOG.error("Failed to update node at {} after {} attempts.", key, 
MAX_RETRY_ATTEMPTS);
+          throw badVersionException;
+        }
+        // Retry on bad version
+        retry = true;
+      } catch (MetaClientNoNodeException noNodeException) {
+        if (!createIfAbsent) {
+          LOG.error("Failed to update node at {} as node does not exist. 
createIfAbsent was {}.", key, createIfAbsent);
+          throw noNodeException;
+        }
+        // If node does not exist, attempt to create it - pass null to updater
+        T newData = updater.update(null);
+        if (newData != null) {
+          try {
+            create(key, newData);
+            updatedData = newData;
+          // If parent node for key does not exist, then updater will 
immediately fail due to uncaught NoNodeException
+          } catch (MetaClientNodeExistsException nodeExistsException) {
+            // If exceeded max retry attempts, cast to ConcurrentModification 
exception and re-throw.
+            if (retryAttempts >= MAX_RETRY_ATTEMPTS) {
+              LOG.error("Failed to update node at {} after {} attempts.", key, 
MAX_RETRY_ATTEMPTS);
+              throw new ConcurrentModificationException("Failed to update node 
at " + key + " after " +
+                  MAX_RETRY_ATTEMPTS + " attempts.", nodeExistsException);
+            }
+            // If node now exists, then retry update
+            retry = true;
+          } catch (ZkException e) {
+            throw translateZkExceptionToMetaclientException(e);
+          }
+        }
+      }
+    } while (retryOnFailure && retry);
+    return updatedData;
   }
 
   //TODO: Get Expiry Time in Stat
diff --git 
a/meta-client/src/test/java/org/apache/helix/metaclient/impl/zk/TestZkMetaClient.java
 
b/meta-client/src/test/java/org/apache/helix/metaclient/impl/zk/TestZkMetaClient.java
index a5da69f2f..2919893ff 100644
--- 
a/meta-client/src/test/java/org/apache/helix/metaclient/impl/zk/TestZkMetaClient.java
+++ 
b/meta-client/src/test/java/org/apache/helix/metaclient/impl/zk/TestZkMetaClient.java
@@ -23,11 +23,10 @@ import org.apache.commons.lang3.tuple.ImmutablePair;
 import org.apache.helix.metaclient.api.ChildChangeListener;
 import org.apache.helix.metaclient.api.DataUpdater;
 import org.apache.helix.metaclient.api.MetaClientInterface;
+import org.apache.helix.metaclient.exception.MetaClientBadVersionException;
 import org.apache.helix.metaclient.exception.MetaClientException;
 import org.apache.helix.metaclient.api.DirectChildChangeListener;
 
-import java.io.StringWriter;
-import java.util.ArrayList;
 import java.util.Arrays;
 import java.util.HashSet;
 import java.util.List;
@@ -222,33 +221,168 @@ public class TestZkMetaClient extends 
ZkMetaClientTestBase{
     try (ZkMetaClient<Integer> zkMetaClient = new ZkMetaClient<>(config)) {
       zkMetaClient.connect();
       int initValue = 3;
+      DataUpdater<Integer> updater = new DataUpdater<Integer>() {
+        @Override
+        public Integer update(Integer currentData) {
+          return currentData != null ? currentData + 1 : initValue;
+        }
+      };
+
       zkMetaClient.create(key, initValue);
-      MetaClientInterface.Stat entryStat = zkMetaClient.exists(key);
-      Assert.assertEquals(entryStat.getVersion(), 0);
 
-      // test update() and validate entry value and version
-      Integer newData = zkMetaClient.update(key, new DataUpdater<Integer>() {
+      // Test updater basic success
+      for (int i = 1; i < 3; i++) {
+        Integer newData = zkMetaClient.update(key, updater);
+        Assert.assertEquals((int) newData, initValue + i);
+        Assert.assertEquals(zkMetaClient.exists(key).getVersion(), i);
+      }
+
+      // Cleanup
+      zkMetaClient.delete(key);
+    }
+  }
+
+  @Test
+  public void testUpdateWithRetry() throws InterruptedException {
+    final boolean RETRY_ON_FAILURE = true;
+    final boolean CREATE_IF_ABSENT = true;
+    final String key = "/TestZkMetaClient_testUpdateWithRetry";
+    ZkMetaClientConfig config =
+        new 
ZkMetaClientConfig.ZkMetaClientConfigBuilder().setConnectionAddress(ZK_ADDR).build();
+    try (ZkMetaClient<Integer> zkMetaClient = new ZkMetaClient<>(config)) {
+      zkMetaClient.connect();
+      int initValue = 3;
+      // Basic updater that increments node value by 1, starting at initValue
+      DataUpdater<Integer> basicUpdater = new DataUpdater<Integer>() {
         @Override
         public Integer update(Integer currentData) {
-          return currentData + 1;
+          return currentData != null ? currentData + 1 : initValue;
         }
-      });
-      Assert.assertEquals((int) newData, (int) initValue + 1);
+      };
 
-      entryStat = zkMetaClient.exists(key);
-      Assert.assertEquals(entryStat.getVersion(), 1);
+      // Test updater fails create node if it doesn't exist when 
createIfAbsent is false
+      try {
+        zkMetaClient.update(key, basicUpdater, RETRY_ON_FAILURE, false);
+        Assert.fail("Updater should have thrown error");
+      } catch (MetaClientNoNodeException e) {
+        Assert.assertFalse(zkMetaClient.exists(key) != null);
+      }
 
-      newData = zkMetaClient.update(key, new DataUpdater<Integer>() {
+      // Test updater fails when parent path does not exist
+      try {
+        zkMetaClient.update(key + "/child", basicUpdater, RETRY_ON_FAILURE, 
CREATE_IF_ABSENT);
+        Assert.fail("Updater should have thrown error");
+      } catch (MetaClientNoNodeException e) {
+        Assert.assertFalse(zkMetaClient.exists(key + "/child") != null);
+      }
 
+      // Test updater creates node if it doesn't exist when createIfAbsent is 
true
+      Integer newData = zkMetaClient.update(key, basicUpdater, 
RETRY_ON_FAILURE, CREATE_IF_ABSENT);
+      Assert.assertEquals((int) newData, initValue);
+      Assert.assertEquals(zkMetaClient.exists(key).getVersion(), 0);
+
+      // Cleanup
+      zkMetaClient.delete(key);
+
+      AtomicBoolean latch = new AtomicBoolean();
+
+      // Increments znode version and sets latch value to true
+      DataUpdater<Integer> versionIncrementUpdater = new 
DataUpdater<Integer>() {
         @Override
         public Integer update(Integer currentData) {
-          return currentData + 1;
+          latch.set(true);
+          return currentData;
         }
-      });
+      };
 
-      entryStat = zkMetaClient.exists(key);
-      Assert.assertEquals(entryStat.getVersion(), 2);
-      Assert.assertEquals((int) newData, (int) initValue + 2);
+      // Reads znode, calls versionIncrementUpdater, fails to update due to 
bad version, then retries and should succeed
+      DataUpdater<Integer> failsOnceUpdater = new DataUpdater<Integer>() {
+        @Override
+        public Integer update(Integer currentData) {
+          try {
+            while (!latch.get()) {
+              zkMetaClient.update(key, versionIncrementUpdater, 
RETRY_ON_FAILURE, CREATE_IF_ABSENT);
+            }
+            return currentData != null ? currentData + 1 : initValue;
+          } catch (MetaClientException e) {
+            return -1;
+          }
+        }
+      };
+
+      // Always fails to update due to bad version
+      DataUpdater<Integer> alwaysFailLatchedUpdater = new 
DataUpdater<Integer>() {
+        @Override
+        public Integer update(Integer currentData) {
+          try {
+            latch.set(false);
+            while (!latch.get()) {
+              zkMetaClient.update(key, versionIncrementUpdater, 
RETRY_ON_FAILURE, CREATE_IF_ABSENT);
+            }
+            return currentData != null ? currentData + 1 : initValue;
+          } catch (MetaClientException e) {
+            return -1;
+          }
+        }
+      };
+
+      // Updater reads znode, sees it does not exist and attempts to create 
it, but should fail as znode already created
+      // due to create() call in updater. Should then retry and successfully 
update the node.
+      DataUpdater<Integer> failOnFirstCreateLatchedUpdater = new 
DataUpdater<Integer>() {
+        @Override
+        public Integer update(Integer currentData) {
+          try {
+            if (!latch.get()) {
+              zkMetaClient.create(key, initValue);
+              latch.set(true);
+            }
+            return currentData != null ? currentData + 1 : initValue;
+          } catch (MetaClientException e) {
+            return -1;
+          }
+        }
+      };
+
+      // Throws error when update called
+      DataUpdater<Integer> errorUpdater = new DataUpdater<Integer>() {
+        @Override
+        public Integer update(Integer currentData) {
+          throw new RuntimeException("IGNORABLE: Test dataUpdater correctly 
throws exception");
+        }
+      };
+
+      // Reset latch
+      latch.set(false);
+      // Test updater retries on bad version
+      // Latched updater should read znode at version 0, but attempt to write 
to version 1 which fails. Should retry
+      // and increment version to 2
+      zkMetaClient.create(key, initValue);
+      zkMetaClient.update(key, failsOnceUpdater, RETRY_ON_FAILURE, 
CREATE_IF_ABSENT);
+      Assert.assertEquals((int) zkMetaClient.get(key), initValue + 1);
+      Assert.assertEquals(zkMetaClient.exists(key).getVersion(), 2);
+
+      // Reset latch
+      latch.set(false);
+      // Test updater fails on retries exceeded
+      try {
+        zkMetaClient.update(key, alwaysFailLatchedUpdater, RETRY_ON_FAILURE, 
CREATE_IF_ABSENT);
+        Assert.fail("Updater should have thrown error");
+      } catch (MetaClientBadVersionException e) {}
+
+
+      // Test updater throws error
+      try {
+        zkMetaClient.update(key, errorUpdater, RETRY_ON_FAILURE, 
CREATE_IF_ABSENT);
+        Assert.fail("DataUpdater should have thrown error");
+      } catch (RuntimeException e) {}
+
+      // Reset latch and cleanup old node
+      latch.set(false);
+      zkMetaClient.delete(key);
+      // Test updater retries update if node does not exist on read, but then 
exists when updater attempts to create it
+      zkMetaClient.update(key, failOnFirstCreateLatchedUpdater, 
RETRY_ON_FAILURE, CREATE_IF_ABSENT);
+      Assert.assertEquals((int) zkMetaClient.get(key), initValue + 1);
+      Assert.assertEquals(zkMetaClient.exists(key).getVersion(), 1);
       zkMetaClient.delete(key);
     }
   }

Reply via email to