This is an automated email from the ASF dual-hosted git repository.
jxue pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/helix.git
The following commit(s) were added to refs/heads/master by this push:
new 3055f26aa Metaclient updater retry logic (#2805)
3055f26aa is described below
commit 3055f26aa4f040f549ca91581fc6b2ff6b275f33
Author: Grant Paláu Spencer <[email protected]>
AuthorDate: Fri Jun 21 16:01:09 2024 -0700
Metaclient updater retry logic (#2805)
Add retry logic to MetaClient Updater
---
.../helix/metaclient/api/MetaClientInterface.java | 15 +-
.../helix/metaclient/impl/zk/ZkMetaClient.java | 67 ++++++--
.../helix/metaclient/impl/zk/TestZkMetaClient.java | 168 ++++++++++++++++++---
3 files changed, 222 insertions(+), 28 deletions(-)
diff --git
a/meta-client/src/main/java/org/apache/helix/metaclient/api/MetaClientInterface.java
b/meta-client/src/main/java/org/apache/helix/metaclient/api/MetaClientInterface.java
index 5403e7ca4..c8a1d0d36 100644
---
a/meta-client/src/main/java/org/apache/helix/metaclient/api/MetaClientInterface.java
+++
b/meta-client/src/main/java/org/apache/helix/metaclient/api/MetaClientInterface.java
@@ -196,13 +196,26 @@ public interface MetaClientInterface<T> {
/**
* Update existing data of a given key using an updater. This method will
issue a read to get
- * current data and apply updater upon the current data.
+ * current data and apply updater upon the current data. This method will
NOT retry applying updated
+ * data upon failure.
* @param key key to identify the entry
* @param updater An updater that modifies the entry value.
* @return the updated value.
*/
T update(final String key, DataUpdater<T> updater);
+
+ /**
+ * Update existing data of a given key using an updater. This method will
issue a read to get
+ * current data and apply updater upon the current data.
+ * @param key key to identify the entry
+ * @param updater An updater that modifies the entry value.
+ * @param retryOnFailure If true, updater should retry applying updated data
upon failure.
+ * @param createIfAbsent If true, create the entry if it does not exist.
+ * @return the updated value.
+ */
+ T update(final String key, DataUpdater<T> updater, boolean retryOnFailure,
boolean createIfAbsent);
+
/**
* Check if there is an entry for the given key.
* @param key key to identify the entry
diff --git
a/meta-client/src/main/java/org/apache/helix/metaclient/impl/zk/ZkMetaClient.java
b/meta-client/src/main/java/org/apache/helix/metaclient/impl/zk/ZkMetaClient.java
index 16d28c6d7..f4594d5d6 100644
---
a/meta-client/src/main/java/org/apache/helix/metaclient/impl/zk/ZkMetaClient.java
+++
b/meta-client/src/main/java/org/apache/helix/metaclient/impl/zk/ZkMetaClient.java
@@ -19,6 +19,7 @@ package org.apache.helix.metaclient.impl.zk;
* under the License.
*/
+import java.util.ConcurrentModificationException;
import java.util.List;
import java.util.concurrent.Executors;
import java.util.concurrent.ScheduledExecutorService;
@@ -39,6 +40,7 @@ import
org.apache.helix.metaclient.api.DirectChildSubscribeResult;
import org.apache.helix.metaclient.api.MetaClientInterface;
import org.apache.helix.metaclient.api.Op;
import org.apache.helix.metaclient.api.OpResult;
+import org.apache.helix.metaclient.exception.MetaClientBadVersionException;
import org.apache.helix.metaclient.exception.MetaClientException;
import org.apache.helix.metaclient.exception.MetaClientNoNodeException;
import org.apache.helix.metaclient.exception.MetaClientNodeExistsException;
@@ -207,16 +209,61 @@ public class ZkMetaClient<T> implements
MetaClientInterface<T>, AutoCloseable {
@Override
public T update(String key, DataUpdater<T> updater) {
- org.apache.zookeeper.data.Stat stat = new org.apache.zookeeper.data.Stat();
- // TODO: add retry logic for ZkBadVersionException.
- try {
- T oldData = _zkClient.readData(key, stat);
- T newData = updater.update(oldData);
- set(key, newData, stat.getVersion());
- return newData;
- } catch (ZkException e) {
- throw translateZkExceptionToMetaclientException(e);
- }
+ return update(key, updater, false, false);
+ }
+
+ @Override
+ public T update(String key, DataUpdater<T> updater, boolean retryOnFailure,
boolean createIfAbsent) {
+ final int MAX_RETRY_ATTEMPTS = 3;
+ int retryAttempts = 0;
+ boolean retry;
+ T updatedData = null;
+ do {
+ retry = false;
+ retryAttempts++;
+ try {
+ ImmutablePair<T, Stat> tup = getDataAndStat(key);
+ Stat stat = tup.right;
+ T oldData = tup.left;
+ T newData = updater.update(oldData);
+ set(key, newData, stat.getVersion());
+ updatedData = newData;
+ } catch (MetaClientBadVersionException badVersionException) {
+ // If exceeded max retry attempts, re-throw exception
+ if (retryAttempts >= MAX_RETRY_ATTEMPTS) {
+ LOG.error("Failed to update node at {} after {} attempts.", key,
MAX_RETRY_ATTEMPTS);
+ throw badVersionException;
+ }
+ // Retry on bad version
+ retry = true;
+ } catch (MetaClientNoNodeException noNodeException) {
+ if (!createIfAbsent) {
+ LOG.error("Failed to update node at {} as node does not exist.
createIfAbsent was {}.", key, createIfAbsent);
+ throw noNodeException;
+ }
+ // If node does not exist, attempt to create it - pass null to updater
+ T newData = updater.update(null);
+ if (newData != null) {
+ try {
+ create(key, newData);
+ updatedData = newData;
+ // If parent node for key does not exist, then updater will
immediately fail due to uncaught NoNodeException
+ } catch (MetaClientNodeExistsException nodeExistsException) {
+ // If exceeded max retry attempts, cast to ConcurrentModification
exception and re-throw.
+ if (retryAttempts >= MAX_RETRY_ATTEMPTS) {
+ LOG.error("Failed to update node at {} after {} attempts.", key,
MAX_RETRY_ATTEMPTS);
+ throw new ConcurrentModificationException("Failed to update node
at " + key + " after " +
+ MAX_RETRY_ATTEMPTS + " attempts.", nodeExistsException);
+ }
+ // If node now exists, then retry update
+ retry = true;
+ } catch (ZkException e) {
+ throw translateZkExceptionToMetaclientException(e);
+ }
+ }
+ }
+ } while (retryOnFailure && retry);
+ return updatedData;
}
//TODO: Get Expiry Time in Stat
diff --git
a/meta-client/src/test/java/org/apache/helix/metaclient/impl/zk/TestZkMetaClient.java
b/meta-client/src/test/java/org/apache/helix/metaclient/impl/zk/TestZkMetaClient.java
index a5da69f2f..2919893ff 100644
---
a/meta-client/src/test/java/org/apache/helix/metaclient/impl/zk/TestZkMetaClient.java
+++
b/meta-client/src/test/java/org/apache/helix/metaclient/impl/zk/TestZkMetaClient.java
@@ -23,11 +23,10 @@ import org.apache.commons.lang3.tuple.ImmutablePair;
import org.apache.helix.metaclient.api.ChildChangeListener;
import org.apache.helix.metaclient.api.DataUpdater;
import org.apache.helix.metaclient.api.MetaClientInterface;
+import org.apache.helix.metaclient.exception.MetaClientBadVersionException;
import org.apache.helix.metaclient.exception.MetaClientException;
import org.apache.helix.metaclient.api.DirectChildChangeListener;
-import java.io.StringWriter;
-import java.util.ArrayList;
import java.util.Arrays;
import java.util.HashSet;
import java.util.List;
@@ -222,33 +221,168 @@ public class TestZkMetaClient extends
ZkMetaClientTestBase{
try (ZkMetaClient<Integer> zkMetaClient = new ZkMetaClient<>(config)) {
zkMetaClient.connect();
int initValue = 3;
+ DataUpdater<Integer> updater = new DataUpdater<Integer>() {
+ @Override
+ public Integer update(Integer currentData) {
+ return currentData != null ? currentData + 1 : initValue;
+ }
+ };
+
zkMetaClient.create(key, initValue);
- MetaClientInterface.Stat entryStat = zkMetaClient.exists(key);
- Assert.assertEquals(entryStat.getVersion(), 0);
- // test update() and validate entry value and version
- Integer newData = zkMetaClient.update(key, new DataUpdater<Integer>() {
+ // Test updater basic success
+ for (int i = 1; i < 3; i++) {
+ Integer newData = zkMetaClient.update(key, updater);
+ Assert.assertEquals((int) newData, initValue + i);
+ Assert.assertEquals(zkMetaClient.exists(key).getVersion(), i);
+ }
+
+ // Cleanup
+ zkMetaClient.delete(key);
+ }
+ }
+
+ @Test
+ public void testUpdateWithRetry() throws InterruptedException {
+ final boolean RETRY_ON_FAILURE = true;
+ final boolean CREATE_IF_ABSENT = true;
+ final String key = "/TestZkMetaClient_testUpdateWithRetry";
+ ZkMetaClientConfig config =
+ new
ZkMetaClientConfig.ZkMetaClientConfigBuilder().setConnectionAddress(ZK_ADDR).build();
+ try (ZkMetaClient<Integer> zkMetaClient = new ZkMetaClient<>(config)) {
+ zkMetaClient.connect();
+ int initValue = 3;
+ // Basic updater that increments node value by 1, starting at initValue
+ DataUpdater<Integer> basicUpdater = new DataUpdater<Integer>() {
@Override
public Integer update(Integer currentData) {
- return currentData + 1;
+ return currentData != null ? currentData + 1 : initValue;
}
- });
- Assert.assertEquals((int) newData, (int) initValue + 1);
+ };
- entryStat = zkMetaClient.exists(key);
- Assert.assertEquals(entryStat.getVersion(), 1);
+ // Test updater fails create node if it doesn't exist when
createIfAbsent is false
+ try {
+ zkMetaClient.update(key, basicUpdater, RETRY_ON_FAILURE, false);
+ Assert.fail("Updater should have thrown error");
+ } catch (MetaClientNoNodeException e) {
+ Assert.assertFalse(zkMetaClient.exists(key) != null);
+ }
- newData = zkMetaClient.update(key, new DataUpdater<Integer>() {
+ // Test updater fails when parent path does not exist
+ try {
+ zkMetaClient.update(key + "/child", basicUpdater, RETRY_ON_FAILURE,
CREATE_IF_ABSENT);
+ Assert.fail("Updater should have thrown error");
+ } catch (MetaClientNoNodeException e) {
+ Assert.assertFalse(zkMetaClient.exists(key + "/child") != null);
+ }
+ // Test updater creates node if it doesn't exist when createIfAbsent is
true
+ Integer newData = zkMetaClient.update(key, basicUpdater,
RETRY_ON_FAILURE, CREATE_IF_ABSENT);
+ Assert.assertEquals((int) newData, initValue);
+ Assert.assertEquals(zkMetaClient.exists(key).getVersion(), 0);
+
+ // Cleanup
+ zkMetaClient.delete(key);
+
+ AtomicBoolean latch = new AtomicBoolean();
+
+ // Increments znode version and sets latch value to true
+ DataUpdater<Integer> versionIncrementUpdater = new
DataUpdater<Integer>() {
@Override
public Integer update(Integer currentData) {
- return currentData + 1;
+ latch.set(true);
+ return currentData;
}
- });
+ };
- entryStat = zkMetaClient.exists(key);
- Assert.assertEquals(entryStat.getVersion(), 2);
- Assert.assertEquals((int) newData, (int) initValue + 2);
+ // Reads znode, calls versionIncrementUpdater, fails to update due to
bad version, then retries and should succeed
+ DataUpdater<Integer> failsOnceUpdater = new DataUpdater<Integer>() {
+ @Override
+ public Integer update(Integer currentData) {
+ try {
+ while (!latch.get()) {
+ zkMetaClient.update(key, versionIncrementUpdater,
RETRY_ON_FAILURE, CREATE_IF_ABSENT);
+ }
+ return currentData != null ? currentData + 1 : initValue;
+ } catch (MetaClientException e) {
+ return -1;
+ }
+ }
+ };
+
+ // Always fails to update due to bad version
+ DataUpdater<Integer> alwaysFailLatchedUpdater = new
DataUpdater<Integer>() {
+ @Override
+ public Integer update(Integer currentData) {
+ try {
+ latch.set(false);
+ while (!latch.get()) {
+ zkMetaClient.update(key, versionIncrementUpdater,
RETRY_ON_FAILURE, CREATE_IF_ABSENT);
+ }
+ return currentData != null ? currentData + 1 : initValue;
+ } catch (MetaClientException e) {
+ return -1;
+ }
+ }
+ };
+
+ // Updater reads znode, sees it does not exist and attempts to create
it, but should fail as znode already created
+ // due to create() call in updater. Should then retry and successfully
update the node.
+ DataUpdater<Integer> failOnFirstCreateLatchedUpdater = new
DataUpdater<Integer>() {
+ @Override
+ public Integer update(Integer currentData) {
+ try {
+ if (!latch.get()) {
+ zkMetaClient.create(key, initValue);
+ latch.set(true);
+ }
+ return currentData != null ? currentData + 1 : initValue;
+ } catch (MetaClientException e) {
+ return -1;
+ }
+ }
+ };
+
+ // Throws error when update called
+ DataUpdater<Integer> errorUpdater = new DataUpdater<Integer>() {
+ @Override
+ public Integer update(Integer currentData) {
+ throw new RuntimeException("IGNORABLE: Test dataUpdater correctly
throws exception");
+ }
+ };
+
+ // Reset latch
+ latch.set(false);
+ // Test updater retries on bad version
+ // Latched updater should read znode at version 0, but attempt to write
to version 1 which fails. Should retry
+ // and increment version to 2
+ zkMetaClient.create(key, initValue);
+ zkMetaClient.update(key, failsOnceUpdater, RETRY_ON_FAILURE,
CREATE_IF_ABSENT);
+ Assert.assertEquals((int) zkMetaClient.get(key), initValue + 1);
+ Assert.assertEquals(zkMetaClient.exists(key).getVersion(), 2);
+
+ // Reset latch
+ latch.set(false);
+ // Test updater fails on retries exceeded
+ try {
+ zkMetaClient.update(key, alwaysFailLatchedUpdater, RETRY_ON_FAILURE,
CREATE_IF_ABSENT);
+ Assert.fail("Updater should have thrown error");
+ } catch (MetaClientBadVersionException e) {}
+
+
+ // Test updater throws error
+ try {
+ zkMetaClient.update(key, errorUpdater, RETRY_ON_FAILURE,
CREATE_IF_ABSENT);
+ Assert.fail("DataUpdater should have thrown error");
+ } catch (RuntimeException e) {}
+
+ // Reset latch and cleanup old node
+ latch.set(false);
+ zkMetaClient.delete(key);
+ // Test updater retries update if node does not exist on read, but then
exists when updater attempts to create it
+ zkMetaClient.update(key, failOnFirstCreateLatchedUpdater,
RETRY_ON_FAILURE, CREATE_IF_ABSENT);
+ Assert.assertEquals((int) zkMetaClient.get(key), initValue + 1);
+ Assert.assertEquals(zkMetaClient.exists(key).getVersion(), 1);
zkMetaClient.delete(key);
}
}