This is an automated email from the ASF dual-hosted git repository. xyuanlu pushed a commit to branch metaclient in repository https://gitbox.apache.org/repos/asf/helix.git
commit 57741e4c6dcb24277da3f7785ffad02d16e8a7fc Author: xyuanlu <[email protected]> AuthorDate: Mon Feb 6 22:27:01 2023 -0800 Fix ZkClient retry logic for customized callback and test Fix ZkClient retry logic for customized callback and test --- .../zkclient/callback/ZkAsyncCallbacks.java | 26 +++- .../impl/client/TestZkClientAsyncRetry.java | 168 ++++++++++++++++++++- 2 files changed, 191 insertions(+), 3 deletions(-) diff --git a/zookeeper-api/src/main/java/org/apache/helix/zookeeper/zkclient/callback/ZkAsyncCallbacks.java b/zookeeper-api/src/main/java/org/apache/helix/zookeeper/zkclient/callback/ZkAsyncCallbacks.java index 72e2b95b7..99d9719df 100644 --- a/zookeeper-api/src/main/java/org/apache/helix/zookeeper/zkclient/callback/ZkAsyncCallbacks.java +++ b/zookeeper-api/src/main/java/org/apache/helix/zookeeper/zkclient/callback/ZkAsyncCallbacks.java @@ -54,6 +54,14 @@ public class ZkAsyncCallbacks { callback(rc, path, ctx); } + public Stat getStat() { + return _stat; + } + + public byte[] getData() { + return _data; + } + @Override public void handle() { // TODO Auto-generated method stub @@ -61,7 +69,7 @@ public class ZkAsyncCallbacks { @Override protected void recordFailure(int rc, String path, ZkAsyncCallMonitorContext monitor) { - if(rc != Code.NONODE.intValue()) { + if (rc != Code.NONODE.intValue()) { monitor.recordFailure(path); } } @@ -99,6 +107,10 @@ public class ZkAsyncCallbacks { callback(rc, path, ctx); } + public Stat getStat() { + return _stat; + } + @Override public void handle() { // TODO Auto-generated method stub @@ -106,7 +118,7 @@ public class ZkAsyncCallbacks { @Override protected void recordFailure(int rc, String path, ZkAsyncCallMonitorContext monitor) { - if(rc != Code.NONODE.intValue()) { + if (rc != Code.NONODE.intValue()) { monitor.recordFailure(path); } } @@ -182,6 +194,7 @@ public class ZkAsyncCallbacks { public static abstract class DefaultCallback implements CancellableZkAsyncCallback { AtomicBoolean _isOperationDone = new AtomicBoolean(false); int _rc = KeeperException.Code.APIERROR.intValue(); + String _path; public void callback(int rc, String path, Object ctx) { if (rc != 0 && LOG.isDebugEnabled()) { @@ -198,12 +211,14 @@ public class ZkAsyncCallbacks { } _rc = rc; + _path = path; // If retry is requested by passing the retry callback context, do retry if necessary. if (needRetry(rc)) { if (ctx != null && ctx instanceof ZkAsyncRetryCallContext) { try { if (((ZkAsyncRetryCallContext) ctx).requestRetry()) { + LOG.info("Received {} for async request on path {}, requested retry.", rc, path); // The retry operation will be done asynchronously. Once it is done, the same callback // handler object shall be triggered to ensure the result is notified to the right // caller(s). @@ -225,6 +240,8 @@ public class ZkAsyncCallbacks { // If operation is done successfully or no retry needed, notify the caller(s). try { handle(); + } catch (Exception ex) { + LOG.error("Exception while handling user callback for path {}.", _path, ex); } finally { markOperationDone(); } @@ -259,9 +276,14 @@ public class ZkAsyncCallbacks { return _rc; } + public String getPath() { + return _path; + } + @Override public void notifyCallers() { LOG.warn("The callback {} has been cancelled.", this); + handle(); markOperationDone(); } diff --git a/zookeeper-api/src/test/java/org/apache/helix/zookeeper/impl/client/TestZkClientAsyncRetry.java b/zookeeper-api/src/test/java/org/apache/helix/zookeeper/impl/client/TestZkClientAsyncRetry.java index e55f8e6a9..ccbbde0d7 100644 --- a/zookeeper-api/src/test/java/org/apache/helix/zookeeper/impl/client/TestZkClientAsyncRetry.java +++ b/zookeeper-api/src/test/java/org/apache/helix/zookeeper/impl/client/TestZkClientAsyncRetry.java @@ -238,11 +238,89 @@ public class TestZkClientAsyncRetry extends ZkTestBase { } } + @Test(dependsOnMethods = "testAsyncWriteRetry") + public void testAsyncRetryCustomizedCallback() throws JMException { + // int array to store customized async callback return value. Initial value set to 100, witch + // not used by any ZK return code. + final int[] _returnCode = new int[2]; + _returnCode[0] = 100; + _returnCode[1] = 100; + + // Define Customized callback + class CustomizedSetCallback extends ZkAsyncCallbacks.SetDataCallbackHandler { + @Override + public void handle() { + _returnCode[0] = getRc(); + } + } + + MockAsyncZkClient testZkClient = new MockAsyncZkClient(_zkServerAddress); + try { + ZNRecord tmpRecord = new ZNRecord("tmpRecord"); + tmpRecord.setSimpleField("foo", "bar"); + testZkClient.createPersistent(NODE_PATH, tmpRecord); + + // 1. Test async set retry + CustomizedSetCallback setCallback = + new CustomizedSetCallback(); + Assert.assertEquals(setCallback.getRc(), KeeperException.Code.APIERROR.intValue()); + + tmpRecord.setSimpleField("test", "data"); + testZkClient.setAsyncCallRC(CONNECTIONLOSS.intValue()); + // Async set will be pending due to the mock error rc is retryable. + testZkClient.asyncSetData(NODE_PATH, tmpRecord, -1, setCallback); + Assert.assertFalse(setCallback.isOperationDone()); + Assert.assertEquals(setCallback.getRc(), CONNECTIONLOSS.intValue()); + // handle() haven't been called until retry finished or canceled, assert it is default value. + Assert.assertEquals(_returnCode[0], 100); + // Change the mock return code. + testZkClient.setAsyncCallRC(KeeperException.Code.OK.intValue()); + // Async retry will succeed now. Wait until the operation is successfully done and verify. + Assert.assertTrue(waitAsyncOperation(setCallback, RETRY_OPS_WAIT_TIMEOUT_MS)); + Assert.assertEquals(setCallback.getRc(), KeeperException.Code.OK.intValue()); + // handle() called when retry finished, check return value. + Assert.assertEquals(_returnCode[0], KeeperException.Code.OK.intValue()); + Assert.assertTrue(testZkClient.getAndResetRetryCount() >= 1); + + // 2. Test async delete + class CustomizedDeleteCallback extends ZkAsyncCallbacks.DeleteCallbackHandler{ + @Override + public void handle() { + _returnCode[1] = getRc(); + } + } + CustomizedDeleteCallback deleteCallback = + new CustomizedDeleteCallback(); + Assert.assertEquals(deleteCallback.getRc(), KeeperException.Code.APIERROR.intValue()); + + testZkClient.setAsyncCallRC(CONNECTIONLOSS.intValue()); + // Async delete will be pending due to the mock error rc is retryable. + testZkClient.asyncDelete(NODE_PATH, deleteCallback); + Assert.assertFalse(deleteCallback.isOperationDone()); + Assert.assertEquals(deleteCallback.getRc(), CONNECTIONLOSS.intValue()); + // handle() haven't been called until retry finished or canceled, assert it is default value. + Assert.assertEquals(_returnCode[1], 100); + // Change the mock return code. + testZkClient.setAsyncCallRC(KeeperException.Code.OK.intValue()); + // Async retry will succeed now. Wait until the operation is successfully done and verify. + Assert.assertTrue(waitAsyncOperation(deleteCallback, RETRY_OPS_WAIT_TIMEOUT_MS)); + Assert.assertEquals(deleteCallback.getRc(), KeeperException.Code.OK.intValue()); + Assert.assertFalse(testZkClient.exists(NODE_PATH)); + Assert.assertTrue(testZkClient.getAndResetRetryCount() >= 1); + // handle() called when retry finished, check return value. + Assert.assertEquals(_returnCode[1], KeeperException.Code.OK.intValue()); + } finally { + testZkClient.setAsyncCallRC(KeeperException.Code.OK.intValue()); + testZkClient.close(); + _zkClient.delete(NODE_PATH); + } + } + /* * Tests if exception is thrown during retry operation, * the context should be cancelled correctly. */ - @Test(dependsOnMethods = "testAsyncWriteRetry") + @Test(dependsOnMethods = "testAsyncRetryCustomizedCallback") public void testAsyncWriteRetryThrowException() throws JMException { MockAsyncZkClient testZkClient = new MockAsyncZkClient(_zkServerAddress); try { @@ -305,7 +383,95 @@ public class TestZkClientAsyncRetry extends ZkTestBase { } } + /* + * Test handle() is executed once if callback retry is canceled. + */ @Test(dependsOnMethods = "testAsyncWriteRetryThrowException") + public void testAsyncRetryCustomizedCallbackCancel() throws JMException { + // int array to store customized async callback return value. Initial value set to 100, witch + // not used by any ZK return code. + final int[] _returnCode = new int[2]; + _returnCode[0] = 100; + _returnCode[1] = 100; + + // Define Customized callback + class CustomizedCreateCallback extends ZkAsyncCallbacks.CreateCallbackHandler { + @Override + public void handle() { + _returnCode[0] = getRc(); + } + } + + MockAsyncZkClient testZkClient = new MockAsyncZkClient(_zkServerAddress); + try { + ZNRecord tmpRecord = new ZNRecord("tmpRecord"); + tmpRecord.setSimpleField("foo", "bar"); + testZkClient.createPersistent(NODE_PATH, tmpRecord); + + // 1. Test async create retry + CustomizedCreateCallback createCallback = + new CustomizedCreateCallback(); + Assert.assertEquals(createCallback.getRc(), KeeperException.Code.APIERROR.intValue()); + + tmpRecord.setSimpleField("test", "data"); + testZkClient.setAsyncCallRC(CONNECTIONLOSS.intValue()); + // Async set will be pending due to the mock error rc is retryable. + testZkClient.asyncCreate(NODE_PATH, tmpRecord, CreateMode.PERSISTENT, createCallback); + Assert.assertFalse(createCallback.isOperationDone()); + // Original callback should have return code set to CONNECTIONLOSS + Assert.assertEquals(createCallback.getRc(), CONNECTIONLOSS.intValue()); + // handle() haven't been called until retry finished or canceled, assert it is default value. + Assert.assertEquals(_returnCode[0], 100); + // Throw exception in retry + testZkClient.setZkExceptionInRetry(true); + // Async retry will succeed now. Wait until the operation is done and verify. + Assert.assertTrue(waitAsyncOperation(createCallback, RETRY_OPS_WAIT_TIMEOUT_MS), + "Async callback should have been canceled"); + Assert.assertEquals(createCallback.getRc(), CONNECTIONLOSS.intValue()); + Assert.assertEquals(_returnCode[0], CONNECTIONLOSS.intValue()); + Assert.assertTrue(testZkClient.getAndResetRetryCount() >= 1); + + // Restore the state + testZkClient.setZkExceptionInRetry(false); + + class CustomizedSetCallback extends ZkAsyncCallbacks.SetDataCallbackHandler { + @Override + public void handle() { + _returnCode[1] = getRc(); + } + } + + // 1. Test async set retry + CustomizedSetCallback setCallback = + new CustomizedSetCallback(); + Assert.assertEquals(setCallback.getRc(), KeeperException.Code.APIERROR.intValue()); + + tmpRecord.setSimpleField("test", "data"); + testZkClient.setAsyncCallRC(CONNECTIONLOSS.intValue()); + // Async set will be pending due to the mock error rc is retryable. + testZkClient.asyncSetData(NODE_PATH, tmpRecord, -1, setCallback); + Assert.assertFalse(setCallback.isOperationDone()); + // Original callback should have return code set to CONNECTIONLOSS + Assert.assertEquals(createCallback.getRc(), CONNECTIONLOSS.intValue()); + // handle() haven't been called until retry finished or canceled, assert it is default value. + Assert.assertEquals(_returnCode[1], 100); + // Throw exception in retry + testZkClient.setZkExceptionInRetry(true); + // Async retry will succeed now. Wait until the operation is done and verify. + Assert.assertTrue(waitAsyncOperation(setCallback, RETRY_OPS_WAIT_TIMEOUT_MS), + "Async callback should have been canceled"); + Assert.assertEquals(setCallback.getRc(), CONNECTIONLOSS.intValue()); + Assert.assertEquals(_returnCode[1], CONNECTIONLOSS.intValue()); + Assert.assertTrue(testZkClient.getAndResetRetryCount() >= 1); + + } finally { + testZkClient.setAsyncCallRC(KeeperException.Code.OK.intValue()); + testZkClient.close(); + _zkClient.delete(NODE_PATH); + } + } + + @Test(dependsOnMethods = "testAsyncRetryCustomizedCallbackCancel") public void testAsyncReadRetry() throws JMException { MockAsyncZkClient testZkClient = new MockAsyncZkClient(_zkServerAddress); try {
