This is an automated email from the ASF dual-hosted git repository. jxue pushed a commit to branch master in repository https://gitbox.apache.org/repos/asf/helix.git
commit 370e277966f75a7fba45f5b96f7608c127b2905c Author: Junkai Xue <[email protected]> AuthorDate: Mon May 4 13:02:13 2020 -0700 Revert "Add async call retry to resolve the transient ZK connection issue. (#970)" This reverts commit 96ebb27c23004a7a69dc4799b14586ff82d53c9e. --- .../apache/helix/zookeeper/zkclient/ZkClient.java | 96 ++--- .../callback/CancellableZkAsyncCallback.java | 8 - .../callback/ZkAsyncCallMonitorContext.java | 46 --- .../zkclient/callback/ZkAsyncCallbacks.java | 153 +++----- .../zkclient/callback/ZkAsyncRetryCallContext.java | 49 --- .../zkclient/callback/ZkAsyncRetryThread.java | 57 --- .../apache/helix/zookeeper/impl/ZkTestBase.java | 2 +- .../impl/client/TestZkClientAsyncRetry.java | 405 --------------------- 8 files changed, 81 insertions(+), 735 deletions(-) diff --git a/zookeeper-api/src/main/java/org/apache/helix/zookeeper/zkclient/ZkClient.java b/zookeeper-api/src/main/java/org/apache/helix/zookeeper/zkclient/ZkClient.java index 89f9e32..562143f 100644 --- a/zookeeper-api/src/main/java/org/apache/helix/zookeeper/zkclient/ZkClient.java +++ b/zookeeper-api/src/main/java/org/apache/helix/zookeeper/zkclient/ZkClient.java @@ -27,10 +27,7 @@ import javax.management.JMException; import org.apache.helix.zookeeper.datamodel.ZNRecord; import org.apache.helix.zookeeper.exception.ZkClientException; import org.apache.helix.zookeeper.zkclient.annotation.PreFetch; -import org.apache.helix.zookeeper.zkclient.callback.ZkAsyncCallMonitorContext; import org.apache.helix.zookeeper.zkclient.callback.ZkAsyncCallbacks; -import org.apache.helix.zookeeper.zkclient.callback.ZkAsyncRetryCallContext; -import org.apache.helix.zookeeper.zkclient.callback.ZkAsyncRetryThread; import org.apache.helix.zookeeper.zkclient.exception.ZkBadVersionException; import org.apache.helix.zookeeper.zkclient.exception.ZkException; import org.apache.helix.zookeeper.zkclient.exception.ZkInterruptedException; @@ -99,10 +96,6 @@ public class ZkClient implements Watcher { private PathBasedZkSerializer _pathBasedZkSerializer; private ZkClientMonitor _monitor; - // To automatically retry the async operation, we need a separate thread other than the - // ZkEventThread. Otherwise the retry request might block the normal event processing. - protected final ZkAsyncRetryThread _asyncCallRetryThread; - private class IZkDataListenerEntry { final IZkDataListener _dataListener; final boolean _prefetchData; @@ -190,9 +183,6 @@ public class ZkClient implements Watcher { _operationRetryTimeoutInMillis = operationRetryTimeout; _isNewSessionEventFired = false; - _asyncCallRetryThread = new ZkAsyncRetryThread(zkConnection.getServers()); - _asyncCallRetryThread.start(); - connect(connectionTimeout, this); // initiate monitor @@ -1746,23 +1736,15 @@ public class ZkClient implements Watcher { data = (datat == null ? null : serialize(datat, path)); } catch (ZkMarshallingError e) { cb.processResult(KeeperException.Code.MARSHALLINGERROR.intValue(), path, - new ZkAsyncCallMonitorContext(_monitor, startT, 0, false), null); + new ZkAsyncCallbacks.ZkAsyncCallContext(_monitor, startT, 0, false), null); return; } - doAsyncCreate(path, data, mode, startT, cb); - } - - private void doAsyncCreate(final String path, final byte[] data, final CreateMode mode, - final long startT, final ZkAsyncCallbacks.CreateCallbackHandler cb) { retryUntilConnected(() -> { ((ZkConnection) getConnection()).getZookeeper() - .create(path, data, ZooDefs.Ids.OPEN_ACL_UNSAFE, mode, cb, - new ZkAsyncRetryCallContext(_asyncCallRetryThread, cb, _monitor, startT, 0, false) { - @Override - protected void doRetry() { - doAsyncCreate(path, data, mode, System.currentTimeMillis(), cb); - } - }); + .create(path, data, ZooDefs.Ids.OPEN_ACL_UNSAFE, + // Arrays.asList(DEFAULT_ACL), + mode, cb, new ZkAsyncCallbacks.ZkAsyncCallContext(_monitor, startT, + data == null ? 0 : data.length, false)); return null; }); } @@ -1776,66 +1758,50 @@ public class ZkClient implements Watcher { data = serialize(datat, path); } catch (ZkMarshallingError e) { cb.processResult(KeeperException.Code.MARSHALLINGERROR.intValue(), path, - new ZkAsyncCallMonitorContext(_monitor, startT, 0, false), null); + new ZkAsyncCallbacks.ZkAsyncCallContext(_monitor, startT, 0, false), null); return; } - doAsyncSetData(path, data, version, startT, cb); - } - - private void doAsyncSetData(final String path, byte[] data, final int version, final long startT, - final ZkAsyncCallbacks.SetDataCallbackHandler cb) { retryUntilConnected(() -> { ((ZkConnection) getConnection()).getZookeeper().setData(path, data, version, cb, - new ZkAsyncRetryCallContext(_asyncCallRetryThread, cb, _monitor, startT, - data == null ? 0 : data.length, false) { - @Override - protected void doRetry() { - doAsyncSetData(path, data, version, System.currentTimeMillis(), cb); - } - }); + new ZkAsyncCallbacks.ZkAsyncCallContext(_monitor, startT, + data == null ? 0 : data.length, false)); return null; }); } public void asyncGetData(final String path, final ZkAsyncCallbacks.GetDataCallbackHandler cb) { final long startT = System.currentTimeMillis(); - retryUntilConnected(() -> { - ((ZkConnection) getConnection()).getZookeeper().getData(path, null, cb, - new ZkAsyncRetryCallContext(_asyncCallRetryThread, cb, _monitor, startT, 0, true) { - @Override - protected void doRetry() { - asyncGetData(path, cb); - } - }); - return null; + retryUntilConnected(new Callable<Object>() { + @Override + public Object call() throws Exception { + ((ZkConnection) getConnection()).getZookeeper().getData(path, null, cb, + new ZkAsyncCallbacks.ZkAsyncCallContext(_monitor, startT, 0, true)); + return null; + } }); } public void asyncExists(final String path, final ZkAsyncCallbacks.ExistsCallbackHandler cb) { final long startT = System.currentTimeMillis(); - retryUntilConnected(() -> { - ((ZkConnection) getConnection()).getZookeeper().exists(path, null, cb, - new ZkAsyncRetryCallContext(_asyncCallRetryThread, cb, _monitor, startT, 0, true) { - @Override - protected void doRetry() { - asyncExists(path, cb); - } - }); - return null; + retryUntilConnected(new Callable<Object>() { + @Override + public Object call() throws Exception { + ((ZkConnection) getConnection()).getZookeeper().exists(path, null, cb, + new ZkAsyncCallbacks.ZkAsyncCallContext(_monitor, startT, 0, true)); + return null; + } }); } public void asyncDelete(final String path, final ZkAsyncCallbacks.DeleteCallbackHandler cb) { final long startT = System.currentTimeMillis(); - retryUntilConnected(() -> { - ((ZkConnection) getConnection()).getZookeeper().delete(path, -1, cb, - new ZkAsyncRetryCallContext(_asyncCallRetryThread, cb, _monitor, startT, 0, false) { - @Override - protected void doRetry() { - asyncDelete(path, cb); - } - }); - return null; + retryUntilConnected(new Callable<Object>() { + @Override + public Object call() throws Exception { + ((ZkConnection) getConnection()).getZookeeper().delete(path, -1, cb, + new ZkAsyncCallbacks.ZkAsyncCallContext(_monitor, startT, 0, false)); + return null; + } }); } @@ -1989,10 +1955,6 @@ public class ZkClient implements Watcher { return; } setShutdownTrigger(true); - if (_asyncCallRetryThread != null) { - _asyncCallRetryThread.interrupt(); - _asyncCallRetryThread.join(2000); - } _eventThread.interrupt(); _eventThread.join(2000); if (isManagingZkConnection()) { diff --git a/zookeeper-api/src/main/java/org/apache/helix/zookeeper/zkclient/callback/CancellableZkAsyncCallback.java b/zookeeper-api/src/main/java/org/apache/helix/zookeeper/zkclient/callback/CancellableZkAsyncCallback.java deleted file mode 100644 index 27d92e8..0000000 --- a/zookeeper-api/src/main/java/org/apache/helix/zookeeper/zkclient/callback/CancellableZkAsyncCallback.java +++ /dev/null @@ -1,8 +0,0 @@ -package org.apache.helix.zookeeper.zkclient.callback; - -public interface CancellableZkAsyncCallback { - /** - * Notify all the callers that are waiting for the callback to cancel the wait. - */ - void notifyCallers(); -} diff --git a/zookeeper-api/src/main/java/org/apache/helix/zookeeper/zkclient/callback/ZkAsyncCallMonitorContext.java b/zookeeper-api/src/main/java/org/apache/helix/zookeeper/zkclient/callback/ZkAsyncCallMonitorContext.java deleted file mode 100644 index bf2fd44..0000000 --- a/zookeeper-api/src/main/java/org/apache/helix/zookeeper/zkclient/callback/ZkAsyncCallMonitorContext.java +++ /dev/null @@ -1,46 +0,0 @@ -package org.apache.helix.zookeeper.zkclient.callback; - -import org.apache.helix.zookeeper.zkclient.metric.ZkClientMonitor; - -public class ZkAsyncCallMonitorContext { - private final long _startTimeMilliSec; - private final ZkClientMonitor _monitor; - private final boolean _isRead; - private int _bytes; - - /** - * @param monitor ZkClient monitor for update the operation result. - * @param startTimeMilliSec Operation initialization time. - * @param bytes The data size in bytes that is involved in the operation. - * @param isRead True if the operation is readonly. - */ - public ZkAsyncCallMonitorContext(final ZkClientMonitor monitor, long startTimeMilliSec, int bytes, - boolean isRead) { - _monitor = monitor; - _startTimeMilliSec = startTimeMilliSec; - _bytes = bytes; - _isRead = isRead; - } - - /** - * Update the operated data size in bytes. - * @param bytes - */ - void setBytes(int bytes) { - _bytes = bytes; - } - - /** - * Record the operation result into the specified ZkClient monitor. - * @param path - */ - void recordAccess(String path) { - if (_monitor != null) { - if (_isRead) { - _monitor.record(path, _bytes, _startTimeMilliSec, ZkClientMonitor.AccessType.READ); - } else { - _monitor.record(path, _bytes, _startTimeMilliSec, ZkClientMonitor.AccessType.WRITE); - } - } - } -} diff --git a/zookeeper-api/src/main/java/org/apache/helix/zookeeper/zkclient/callback/ZkAsyncCallbacks.java b/zookeeper-api/src/main/java/org/apache/helix/zookeeper/zkclient/callback/ZkAsyncCallbacks.java index 70dbab4..04c4058 100644 --- a/zookeeper-api/src/main/java/org/apache/helix/zookeeper/zkclient/callback/ZkAsyncCallbacks.java +++ b/zookeeper-api/src/main/java/org/apache/helix/zookeeper/zkclient/callback/ZkAsyncCallbacks.java @@ -31,35 +31,41 @@ import org.apache.zookeeper.data.Stat; import org.slf4j.Logger; import org.slf4j.LoggerFactory; + public class ZkAsyncCallbacks { private static Logger LOG = LoggerFactory.getLogger(ZkAsyncCallbacks.class); - public static final int UNKNOWN_RET_CODE = 255; public static class GetDataCallbackHandler extends DefaultCallback implements DataCallback { public byte[] _data; public Stat _stat; @Override + public void handle() { + // TODO Auto-generated method stub + } + + @Override public void processResult(int rc, String path, Object ctx, byte[] data, Stat stat) { if (rc == 0) { _data = data; _stat = stat; // update ctx with data size - if (_data != null && ctx != null && ctx instanceof ZkAsyncCallMonitorContext) { - ((ZkAsyncCallMonitorContext) ctx).setBytes(_data.length); + if (_data != null && ctx != null && ctx instanceof ZkAsyncCallContext) { + ZkAsyncCallContext zkCtx = (ZkAsyncCallContext) ctx; + zkCtx._bytes = _data.length; } } callback(rc, path, ctx); } + } + + public static class SetDataCallbackHandler extends DefaultCallback implements StatCallback { + Stat _stat; @Override public void handle() { // TODO Auto-generated method stub } - } - - public static class SetDataCallbackHandler extends DefaultCallback implements StatCallback { - Stat _stat; @Override public void processResult(int rc, String path, Object ctx, Stat stat) { @@ -72,15 +78,15 @@ public class ZkAsyncCallbacks { public Stat getStat() { return _stat; } + } + + public static class ExistsCallbackHandler extends DefaultCallback implements StatCallback { + public Stat _stat; @Override public void handle() { // TODO Auto-generated method stub } - } - - public static class ExistsCallbackHandler extends DefaultCallback implements StatCallback { - public Stat _stat; @Override public void processResult(int rc, String path, Object ctx, Stat stat) { @@ -89,11 +95,6 @@ public class ZkAsyncCallbacks { } callback(rc, path, ctx); } - - @Override - public void handle() { - // TODO Auto-generated method stub - } } public static class CreateCallbackHandler extends DefaultCallback implements StringCallback { @@ -121,66 +122,44 @@ public class ZkAsyncCallbacks { } /** - * Default callback for zookeeper async api. + * Default callback for zookeeper async api */ - public static abstract class DefaultCallback implements CancellableZkAsyncCallback { - AtomicBoolean _isOperationDone = new AtomicBoolean(false); - int _rc = UNKNOWN_RET_CODE; + public static abstract class DefaultCallback { + AtomicBoolean _lock = new AtomicBoolean(false); + int _rc = -1; public void callback(int rc, String path, Object ctx) { if (rc != 0 && LOG.isDebugEnabled()) { LOG.debug(this + ", rc:" + Code.get(rc) + ", path: " + path); } - if (ctx != null && ctx instanceof ZkAsyncCallMonitorContext) { - ((ZkAsyncCallMonitorContext) ctx).recordAccess(path); - } - - _rc = rc; - - // If retry is requested by passing the retry callback context, do retry if necessary. - if (needRetry(rc)) { - if (ctx != null && ctx instanceof ZkAsyncRetryCallContext) { - try { - if (((ZkAsyncRetryCallContext) ctx).requestRetry()) { - // The retry operation will be done asynchronously. Once it is done, the same callback - // handler object shall be triggered to ensure the result is notified to the right - // caller(s). - return; - } else { - LOG.warn( - "Cannot request to retry the operation. The retry request thread may have been stopped."); - } - } catch (Throwable t) { - LOG.error("Failed to request to retry the operation.", t); + if (ctx != null && ctx instanceof ZkAsyncCallContext) { + ZkAsyncCallContext zkCtx = (ZkAsyncCallContext) ctx; + if (zkCtx._monitor != null) { + if (zkCtx._isRead) { + zkCtx._monitor.record(path, zkCtx._bytes, zkCtx._startTimeMilliSec, + ZkClientMonitor.AccessType.READ); + } else { + zkCtx._monitor.record(path, zkCtx._bytes, zkCtx._startTimeMilliSec, + ZkClientMonitor.AccessType.WRITE); } - } else { - LOG.warn( - "The provided callback context {} is not ZkAsyncRetryCallContext. Skip retrying.", - ctx.getClass().getName()); } } - // If operation is done successfully or no retry needed, notify the caller(s). - try { - handle(); - } finally { - markOperationDone(); - } - } + _rc = rc; + handle(); - public boolean isOperationDone() { - return _isOperationDone.get(); + synchronized (_lock) { + _lock.set(true); + _lock.notify(); + } } - /** - * The blocking call that return true once the operation has been completed without retrying. - */ public boolean waitForSuccess() { try { - synchronized (_isOperationDone) { - while (!_isOperationDone.get()) { - _isOperationDone.wait(); + synchronized (_lock) { + while (!_lock.get()) { + _lock.wait(); } } } catch (InterruptedException e) { @@ -193,52 +172,22 @@ public class ZkAsyncCallbacks { return _rc; } - @Override - public void notifyCallers() { - LOG.warn("The callback {} has been cancelled.", this); - markOperationDone(); - } - - /** - * Additional callback handling. - */ abstract public void handle(); + } - private void markOperationDone() { - synchronized (_isOperationDone) { - _isOperationDone.set(true); - _isOperationDone.notifyAll(); - } - } + public static class ZkAsyncCallContext { + private long _startTimeMilliSec; + private int _bytes; + private ZkClientMonitor _monitor; + private boolean _isRead; - /** - * @param rc the return code - * @return true if the error is transient and the operation may succeed when being retried. - */ - private boolean needRetry(int rc) { - try { - switch (Code.get(rc)) { - /** Connection to the server has been lost */ - case CONNECTIONLOSS: - /** The session has been expired by the server */ - case SESSIONEXPIRED: - /** Session moved to another server, so operation is ignored */ - case SESSIONMOVED: - return true; - default: - return false; - } - } catch (ClassCastException | NullPointerException ex) { - LOG.error("Failed to handle unknown return code {}. Skip retrying.", rc, ex); - return false; - } + public ZkAsyncCallContext(final ZkClientMonitor monitor, long startTimeMilliSec, int bytes, + boolean isRead) { + _monitor = monitor; + _startTimeMilliSec = startTimeMilliSec; + _bytes = bytes; + _isRead = isRead; } } - @Deprecated - public static class ZkAsyncCallContext extends ZkAsyncCallMonitorContext { - ZkAsyncCallContext(ZkClientMonitor monitor, long startTimeMilliSec, int bytes, boolean isRead) { - super(monitor, startTimeMilliSec, bytes, isRead); - } - } } diff --git a/zookeeper-api/src/main/java/org/apache/helix/zookeeper/zkclient/callback/ZkAsyncRetryCallContext.java b/zookeeper-api/src/main/java/org/apache/helix/zookeeper/zkclient/callback/ZkAsyncRetryCallContext.java deleted file mode 100644 index 4a9402f..0000000 --- a/zookeeper-api/src/main/java/org/apache/helix/zookeeper/zkclient/callback/ZkAsyncRetryCallContext.java +++ /dev/null @@ -1,49 +0,0 @@ -package org.apache.helix.zookeeper.zkclient.callback; - -import org.apache.helix.zookeeper.zkclient.metric.ZkClientMonitor; -import org.slf4j.Logger; -import org.slf4j.LoggerFactory; - -public abstract class ZkAsyncRetryCallContext extends ZkAsyncCallMonitorContext { - private static Logger LOG = LoggerFactory.getLogger(ZkAsyncRetryCallContext.class); - private final ZkAsyncRetryThread _retryThread; - private final CancellableZkAsyncCallback _cancellableCallback; - - /** - * @param retryThread The thread that executes the retry operation. - * Note that retry in the ZkEventThread is not allowed to avoid dead lock. - * @param callback Cancellable asynchronous callback to notify when the retry is cancelled. - * @param monitor ZkClient monitor for update the operation result. - * @param startTimeMilliSec Operation initialization time. - * @param bytes The data size in bytes that is involved in the operation. - * @param isRead True if the operation is readonly. - */ - public ZkAsyncRetryCallContext(final ZkAsyncRetryThread retryThread, - final CancellableZkAsyncCallback callback, final ZkClientMonitor monitor, - long startTimeMilliSec, int bytes, boolean isRead) { - super(monitor, startTimeMilliSec, bytes, isRead); - _retryThread = retryThread; - _cancellableCallback = callback; - } - - /** - * Request a retry. - * - * @return True if the request was sent successfully. - */ - boolean requestRetry() { - return _retryThread.sendRetryRequest(this); - } - - /** - * Notify the pending callback that retry has been cancelled. - */ - void cancel() { - _cancellableCallback.notifyCallers(); - } - - /** - * The actual retry operation logic. - */ - protected abstract void doRetry() throws Exception; -} diff --git a/zookeeper-api/src/main/java/org/apache/helix/zookeeper/zkclient/callback/ZkAsyncRetryThread.java b/zookeeper-api/src/main/java/org/apache/helix/zookeeper/zkclient/callback/ZkAsyncRetryThread.java deleted file mode 100644 index c59d423..0000000 --- a/zookeeper-api/src/main/java/org/apache/helix/zookeeper/zkclient/callback/ZkAsyncRetryThread.java +++ /dev/null @@ -1,57 +0,0 @@ -package org.apache.helix.zookeeper.zkclient.callback; - -import java.util.concurrent.BlockingQueue; -import java.util.concurrent.LinkedBlockingQueue; - -import org.apache.helix.zookeeper.zkclient.exception.ZkInterruptedException; -import org.slf4j.Logger; -import org.slf4j.LoggerFactory; - -public class ZkAsyncRetryThread extends Thread { - private static Logger LOG = LoggerFactory.getLogger(ZkAsyncRetryThread.class); - private BlockingQueue<ZkAsyncRetryCallContext> _retryContexts = new LinkedBlockingQueue<>(); - private volatile boolean _isReady = true; - - public ZkAsyncRetryThread(String name) { - setDaemon(true); - setName("ZkClient-AsyncCallback-Retry-" + getId() + "-" + name); - } - - @Override - public void run() { - LOG.info("Starting ZkClient AsyncCallback retry thread."); - try { - while (!isInterrupted()) { - ZkAsyncRetryCallContext context = _retryContexts.take(); - try { - context.doRetry(); - } catch (InterruptedException | ZkInterruptedException e) { - // if interrupted, stop retrying and interrupt the thread. - context.cancel(); - interrupt(); - } catch (Throwable e) { - LOG.error("Error retrying callback " + context, e); - } - } - } catch (InterruptedException e) { - LOG.info("ZkClient AsyncCallback retry thread is interrupted."); - } - synchronized (this) { - // Mark ready to be false, so no new requests will be sent. - _isReady = false; - // Notify to all the callers waiting for the result. - for (ZkAsyncRetryCallContext context : _retryContexts) { - context.cancel(); - } - } - LOG.info("Terminate ZkClient AsyncCallback retry thread."); - } - - synchronized boolean sendRetryRequest(ZkAsyncRetryCallContext context) { - if (_isReady) { - _retryContexts.add(context); - return true; - } - return false; - } -} diff --git a/zookeeper-api/src/test/java/org/apache/helix/zookeeper/impl/ZkTestBase.java b/zookeeper-api/src/test/java/org/apache/helix/zookeeper/impl/ZkTestBase.java index 2b8b1b3..51eda80 100644 --- a/zookeeper-api/src/test/java/org/apache/helix/zookeeper/impl/ZkTestBase.java +++ b/zookeeper-api/src/test/java/org/apache/helix/zookeeper/impl/ZkTestBase.java @@ -58,7 +58,7 @@ public class ZkTestBase { * Multiple ZK references */ // The following maps hold ZK connect string as keys - protected static final Map<String, ZkServer> _zkServerMap = new HashMap<>(); + protected final Map<String, ZkServer> _zkServerMap = new HashMap<>(); protected static int _numZk = 1; // Initial value @BeforeSuite diff --git a/zookeeper-api/src/test/java/org/apache/helix/zookeeper/impl/client/TestZkClientAsyncRetry.java b/zookeeper-api/src/test/java/org/apache/helix/zookeeper/impl/client/TestZkClientAsyncRetry.java deleted file mode 100644 index 4e5b06f..0000000 --- a/zookeeper-api/src/test/java/org/apache/helix/zookeeper/impl/client/TestZkClientAsyncRetry.java +++ /dev/null @@ -1,405 +0,0 @@ -package org.apache.helix.zookeeper.impl.client; - -/* - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, - * software distributed under the License is distributed on an - * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY - * KIND, either express or implied. See the License for the - * specific language governing permissions and limitations - * under the License. - */ - -import org.apache.helix.zookeeper.datamodel.ZNRecord; -import org.apache.helix.zookeeper.datamodel.serializer.ZNRecordSerializer; -import org.apache.helix.zookeeper.impl.ZkTestBase; -import org.apache.helix.zookeeper.zkclient.callback.ZkAsyncCallbacks; -import org.apache.helix.zookeeper.zkclient.callback.ZkAsyncRetryCallContext; -import org.apache.helix.zookeeper.zkclient.exception.ZkInterruptedException; -import org.apache.zookeeper.CreateMode; -import org.apache.zookeeper.KeeperException; -import org.testng.Assert; -import org.testng.annotations.AfterClass; -import org.testng.annotations.BeforeClass; -import org.testng.annotations.Test; - -import static org.apache.helix.zookeeper.zkclient.callback.ZkAsyncCallbacks.UNKNOWN_RET_CODE; -import static org.apache.zookeeper.KeeperException.Code.CONNECTIONLOSS; - -/** - * Note this is a whitebox test to test the async operation callback/context. - * We don't have a good way to simulate an async ZK operation failure in the server side yet. - */ -public class TestZkClientAsyncRetry extends ZkTestBase { - private final String TEST_ROOT = String.format("/%s", getClass().getSimpleName()); - private final String NODE_PATH = TEST_ROOT + "/async"; - - private org.apache.helix.zookeeper.zkclient.ZkClient _zkClient; - private String _zkServerAddress; - - @BeforeClass - public void beforeClass() { - _zkClient = _zkServerMap.values().iterator().next().getZkClient(); - _zkServerAddress = _zkClient.getServers(); - _zkClient.createPersistent(TEST_ROOT); - } - - @AfterClass - public void afterClass() { - _zkClient.deleteRecursively(TEST_ROOT); - _zkClient.close(); - } - - private boolean waitAsyncOperation(ZkAsyncCallbacks.DefaultCallback callback, long timeout) { - final boolean[] ret = { false }; - Thread waitThread = new Thread(() -> ret[0] = callback.waitForSuccess()); - waitThread.start(); - try { - waitThread.join(timeout); - waitThread.interrupt(); - return ret[0]; - } catch (InterruptedException e) { - return false; - } - } - - @Test - public void testAsyncRetryCategories() { - MockAsyncZkClient testZkClient = new MockAsyncZkClient(_zkServerAddress); - try { - ZNRecord tmpRecord = new ZNRecord("tmpRecord"); - tmpRecord.setSimpleField("foo", "bar"); - // Loop all possible error codes to test async create. - // Only connectivity issues will be retried, the other issues will be return error immediately. - for (KeeperException.Code code : KeeperException.Code.values()) { - if (code == KeeperException.Code.OK) { - continue; - } - ZkAsyncCallbacks.CreateCallbackHandler createCallback = - new ZkAsyncCallbacks.CreateCallbackHandler(); - Assert.assertEquals(createCallback.getRc(), UNKNOWN_RET_CODE); - testZkClient.setAsyncCallRC(code.intValue()); - if (code == CONNECTIONLOSS || code == KeeperException.Code.SESSIONEXPIRED - || code == KeeperException.Code.SESSIONMOVED) { - // Async create will be pending due to the mock error rc is retryable. - testZkClient.asyncCreate(NODE_PATH, null, CreateMode.PERSISTENT, createCallback); - Assert.assertFalse(createCallback.isOperationDone()); - Assert.assertEquals(createCallback.getRc(), code.intValue()); - // Change the mock response - testZkClient.setAsyncCallRC(KeeperException.Code.OK.intValue()); - // Async retry will succeed now. Wait until the operation is successfully done and verify. - Assert.assertTrue(waitAsyncOperation(createCallback, 1000)); - Assert.assertEquals(createCallback.getRc(), KeeperException.Code.OK.intValue()); - Assert.assertTrue(testZkClient.exists(NODE_PATH)); - Assert.assertTrue(testZkClient.getAndResetRetryCount() >= 1); - } else { - // Async create will fail due to the mock error rc is not recoverable. - testZkClient.asyncCreate(NODE_PATH, null, CreateMode.PERSISTENT, createCallback); - Assert.assertTrue(waitAsyncOperation(createCallback, 1000)); - Assert.assertEquals(createCallback.getRc(), code.intValue()); - Assert.assertEquals(testZkClient.getAndResetRetryCount(), 0); - } - testZkClient.delete(NODE_PATH); - Assert.assertFalse(testZkClient.exists(NODE_PATH)); - } - } finally { - testZkClient.setAsyncCallRC(KeeperException.Code.OK.intValue()); - testZkClient.close(); - _zkClient.delete(NODE_PATH); - } - } - - @Test(dependsOnMethods = "testAsyncRetryCategories") - public void testAsyncWriteRetry() { - MockAsyncZkClient testZkClient = new MockAsyncZkClient(_zkServerAddress); - try { - ZNRecord tmpRecord = new ZNRecord("tmpRecord"); - tmpRecord.setSimpleField("foo", "bar"); - testZkClient.createPersistent(NODE_PATH, tmpRecord); - - // 1. Test async set retry - ZkAsyncCallbacks.SetDataCallbackHandler setCallback = - new ZkAsyncCallbacks.SetDataCallbackHandler(); - Assert.assertEquals(setCallback.getRc(), UNKNOWN_RET_CODE); - - tmpRecord.setSimpleField("test", "data"); - testZkClient.setAsyncCallRC(CONNECTIONLOSS.intValue()); - // Async set will be pending due to the mock error rc is retryable. - testZkClient.asyncSetData(NODE_PATH, tmpRecord, -1, setCallback); - Assert.assertFalse(setCallback.isOperationDone()); - Assert.assertEquals(setCallback.getRc(), CONNECTIONLOSS.intValue()); - // Change the mock return code. - testZkClient.setAsyncCallRC(KeeperException.Code.OK.intValue()); - // Async retry will succeed now. Wait until the operation is successfully done and verify. - Assert.assertTrue(waitAsyncOperation(setCallback, 1000)); - Assert.assertEquals(setCallback.getRc(), KeeperException.Code.OK.intValue()); - Assert.assertEquals(((ZNRecord) testZkClient.readData(NODE_PATH)).getSimpleField("test"), - "data"); - Assert.assertTrue(testZkClient.getAndResetRetryCount() >= 1); - - // 2. Test async delete - ZkAsyncCallbacks.DeleteCallbackHandler deleteCallback = - new ZkAsyncCallbacks.DeleteCallbackHandler(); - Assert.assertEquals(deleteCallback.getRc(), UNKNOWN_RET_CODE); - - testZkClient.setAsyncCallRC(CONNECTIONLOSS.intValue()); - // Async delete will be pending due to the mock error rc is retryable. - testZkClient.asyncDelete(NODE_PATH, deleteCallback); - Assert.assertFalse(deleteCallback.isOperationDone()); - Assert.assertEquals(deleteCallback.getRc(), CONNECTIONLOSS.intValue()); - // Change the mock return code. - testZkClient.setAsyncCallRC(KeeperException.Code.OK.intValue()); - // Async retry will succeed now. Wait until the operation is successfully done and verify. - Assert.assertTrue(waitAsyncOperation(deleteCallback, 1000)); - Assert.assertEquals(deleteCallback.getRc(), KeeperException.Code.OK.intValue()); - Assert.assertFalse(testZkClient.exists(NODE_PATH)); - Assert.assertTrue(testZkClient.getAndResetRetryCount() >= 1); - } finally { - testZkClient.setAsyncCallRC(KeeperException.Code.OK.intValue()); - testZkClient.close(); - _zkClient.delete(NODE_PATH); - } - } - - @Test(dependsOnMethods = "testAsyncWriteRetry") - public void testAsyncReadRetry() { - MockAsyncZkClient testZkClient = new MockAsyncZkClient(_zkServerAddress); - try { - ZNRecord tmpRecord = new ZNRecord("tmpRecord"); - tmpRecord.setSimpleField("foo", "bar"); - testZkClient.createPersistent(NODE_PATH, tmpRecord); - - // 1. Test async exist check - ZkAsyncCallbacks.ExistsCallbackHandler existsCallback = - new ZkAsyncCallbacks.ExistsCallbackHandler(); - Assert.assertEquals(existsCallback.getRc(), UNKNOWN_RET_CODE); - - testZkClient.setAsyncCallRC(CONNECTIONLOSS.intValue()); - // Async exist check will be pending due to the mock error rc is retryable. - testZkClient.asyncExists(NODE_PATH, existsCallback); - Assert.assertFalse(existsCallback.isOperationDone()); - Assert.assertEquals(existsCallback.getRc(), CONNECTIONLOSS.intValue()); - // Change the mock return code. - testZkClient.setAsyncCallRC(KeeperException.Code.OK.intValue()); - // Async retry will succeed now. Wait until the operation is successfully done and verify. - Assert.assertTrue(waitAsyncOperation(existsCallback, 1000)); - Assert.assertEquals(existsCallback.getRc(), KeeperException.Code.OK.intValue()); - Assert.assertTrue(existsCallback._stat != null); - Assert.assertTrue(testZkClient.getAndResetRetryCount() >= 1); - - // 2. Test async get - ZkAsyncCallbacks.GetDataCallbackHandler getCallback = - new ZkAsyncCallbacks.GetDataCallbackHandler(); - Assert.assertEquals(getCallback.getRc(), UNKNOWN_RET_CODE); - - testZkClient.setAsyncCallRC(CONNECTIONLOSS.intValue()); - // Async get will be pending due to the mock error rc is retryable. - testZkClient.asyncGetData(NODE_PATH, getCallback); - Assert.assertFalse(getCallback.isOperationDone()); - Assert.assertEquals(getCallback.getRc(), CONNECTIONLOSS.intValue()); - // Change the mock return code. - testZkClient.setAsyncCallRC(KeeperException.Code.OK.intValue()); - // Async retry will succeed now. Wait until the operation is successfully done and verify. - Assert.assertTrue(waitAsyncOperation(getCallback, 1000)); - Assert.assertEquals(getCallback.getRc(), KeeperException.Code.OK.intValue()); - ZNRecord record = testZkClient.deserialize(getCallback._data, NODE_PATH); - Assert.assertEquals(record.getSimpleField("foo"), "bar"); - Assert.assertTrue(testZkClient.getAndResetRetryCount() >= 1); - } finally { - testZkClient.setAsyncCallRC(KeeperException.Code.OK.intValue()); - testZkClient.close(); - _zkClient.delete(NODE_PATH); - } - } - - @Test(dependsOnMethods = "testAsyncReadRetry") - public void testAsyncRequestCleanup() { - int cbCount = 10; - MockAsyncZkClient testZkClient = new MockAsyncZkClient(_zkServerAddress); - try { - ZNRecord tmpRecord = new ZNRecord("tmpRecord"); - tmpRecord.setSimpleField("foo", "bar"); - testZkClient.createPersistent(NODE_PATH, tmpRecord); - - // Create 10 async exists check requests - ZkAsyncCallbacks.ExistsCallbackHandler[] existsCallbacks = - new ZkAsyncCallbacks.ExistsCallbackHandler[cbCount]; - for (int i = 0; i < cbCount; i++) { - existsCallbacks[i] = new ZkAsyncCallbacks.ExistsCallbackHandler(); - } - testZkClient.setAsyncCallRC(CONNECTIONLOSS.intValue()); - // All async exist check calls will be pending due to the mock error rc is retryable. - for (ZkAsyncCallbacks.ExistsCallbackHandler cb : existsCallbacks) { - testZkClient.asyncExists(NODE_PATH, cb); - Assert.assertEquals(cb.getRc(), CONNECTIONLOSS.intValue()); - } - // Wait for a while, no callback finishes - Assert.assertFalse(waitAsyncOperation(existsCallbacks[0], 1000)); - for (ZkAsyncCallbacks.ExistsCallbackHandler cb : existsCallbacks) { - Assert.assertEquals(cb.getRc(), CONNECTIONLOSS.intValue()); - Assert.assertFalse(cb.isOperationDone()); - } - testZkClient.close(); - // All callback retry will be cancelled because the zkclient is closed. - for (ZkAsyncCallbacks.ExistsCallbackHandler cb : existsCallbacks) { - Assert.assertTrue(waitAsyncOperation(cb, 1000)); - Assert.assertEquals(cb.getRc(), CONNECTIONLOSS.intValue()); - } - Assert.assertTrue(testZkClient.getAndResetRetryCount() >= 1); - } finally { - testZkClient.setAsyncCallRC(KeeperException.Code.OK.intValue()); - testZkClient.close(); - _zkClient.delete(NODE_PATH); - } - } - - /** - * Mock client to whitebox test async functionality. - */ - class MockAsyncZkClient extends ZkClient { - private static final long RETRY_INTERVAL_MS = 500; - private long _retryCount = 0; - - /** - * If the specified return code is OK, call the real function. - * Otherwise, trigger the callback with the specified RC without triggering the real ZK call. - */ - private int _asyncCallRetCode = KeeperException.Code.OK.intValue(); - - public MockAsyncZkClient(String zkAddress) { - super(zkAddress); - setZkSerializer(new ZNRecordSerializer()); - } - - public void setAsyncCallRC(int rc) { - _asyncCallRetCode = rc; - } - - public long getAndResetRetryCount() { - long tmpCount = _retryCount; - _retryCount = 0; - return tmpCount; - } - - @Override - public void asyncCreate(String path, Object datat, CreateMode mode, - ZkAsyncCallbacks.CreateCallbackHandler cb) { - if (_asyncCallRetCode == KeeperException.Code.OK.intValue()) { - super.asyncCreate(path, datat, mode, cb); - return; - } else { - cb.processResult(_asyncCallRetCode, path, - new ZkAsyncRetryCallContext(_asyncCallRetryThread, cb, null, 0, 0, false) { - @Override - protected void doRetry() { - _retryCount++; - try { - Thread.sleep(RETRY_INTERVAL_MS); - } catch (InterruptedException e) { - throw new ZkInterruptedException(e); - } - asyncCreate(path, datat, mode, cb); - } - }, null); - } - } - - @Override - public void asyncSetData(String path, Object datat, int version, - ZkAsyncCallbacks.SetDataCallbackHandler cb) { - if (_asyncCallRetCode == KeeperException.Code.OK.intValue()) { - super.asyncSetData(path, datat, version, cb); - return; - } else { - cb.processResult(_asyncCallRetCode, path, - new ZkAsyncRetryCallContext(_asyncCallRetryThread, cb, null, 0, 0, false) { - @Override - protected void doRetry() { - _retryCount++; - try { - Thread.sleep(RETRY_INTERVAL_MS); - } catch (InterruptedException e) { - throw new ZkInterruptedException(e); - } - asyncSetData(path, datat, version, cb); - } - }, null); - } - } - - @Override - public void asyncGetData(String path, ZkAsyncCallbacks.GetDataCallbackHandler cb) { - if (_asyncCallRetCode == KeeperException.Code.OK.intValue()) { - super.asyncGetData(path, cb); - return; - } else { - cb.processResult(_asyncCallRetCode, path, - new ZkAsyncRetryCallContext(_asyncCallRetryThread, cb, null, 0, 0, true) { - @Override - protected void doRetry() { - _retryCount++; - try { - Thread.sleep(RETRY_INTERVAL_MS); - } catch (InterruptedException e) { - throw new ZkInterruptedException(e); - } - asyncGetData(path, cb); - } - }, null, null); - } - } - - @Override - public void asyncExists(String path, ZkAsyncCallbacks.ExistsCallbackHandler cb) { - if (_asyncCallRetCode == KeeperException.Code.OK.intValue()) { - super.asyncExists(path, cb); - return; - } else { - cb.processResult(_asyncCallRetCode, path, - new ZkAsyncRetryCallContext(_asyncCallRetryThread, cb, null, 0, 0, true) { - @Override - protected void doRetry() { - _retryCount++; - try { - Thread.sleep(RETRY_INTERVAL_MS); - } catch (InterruptedException e) { - throw new ZkInterruptedException(e); - } - asyncExists(path, cb); - } - }, null); - } - } - - @Override - public void asyncDelete(String path, ZkAsyncCallbacks.DeleteCallbackHandler cb) { - if (_asyncCallRetCode == KeeperException.Code.OK.intValue()) { - super.asyncDelete(path, cb); - return; - } else { - cb.processResult(_asyncCallRetCode, path, - new ZkAsyncRetryCallContext(_asyncCallRetryThread, cb, null, 0, 0, false) { - @Override - protected void doRetry() { - _retryCount++; - try { - Thread.sleep(RETRY_INTERVAL_MS); - } catch (InterruptedException e) { - throw new ZkInterruptedException(e); - } - asyncDelete(path, cb); - } - }); - } - } - } -}
