This is an automated email from the ASF dual-hosted git repository. xyuanlu pushed a commit to branch metaclient in repository https://gitbox.apache.org/repos/asf/helix.git
commit 0f9476d8c314d9f69c896498626ee1ac8f0c561a Author: xyuanlu <[email protected]> AuthorDate: Sat Mar 18 22:34:16 2023 -0700 use reconnect timeout for crud retry timeout (#2410) --- .../metaclient/constants/MetaClientConstants.java | 3 ++ .../metaclient/factories/MetaClientConfig.java | 37 +++------------------- .../helix/metaclient/impl/zk/ZkMetaClient.java | 14 ++++---- .../impl/zk/factory/ZkMetaClientConfig.java | 21 ++++++------ .../policy/MetaClientReconnectPolicy.java | 7 +++- 5 files changed, 33 insertions(+), 49 deletions(-) diff --git a/meta-client/src/main/java/org/apache/helix/metaclient/constants/MetaClientConstants.java b/meta-client/src/main/java/org/apache/helix/metaclient/constants/MetaClientConstants.java index 2c554189b..d05c0ea5b 100644 --- a/meta-client/src/main/java/org/apache/helix/metaclient/constants/MetaClientConstants.java +++ b/meta-client/src/main/java/org/apache/helix/metaclient/constants/MetaClientConstants.java @@ -42,5 +42,8 @@ public final class MetaClientConstants { // Initial backoff window for exponential reconnect back off policy. by default is 500 ms. public static final long DEFAULT_INIT_EXP_BACKOFF_RETRY_INTERVAL_MS = 500; + // Auto Reconnect timeout + public static final long DEFAULT_AUTO_RECONNECT_TIMEOUT_MS = 30 * 60 * 1000; + //public static final long DEFAULT_MAX_LINEAR_BACKOFF_RETRY_WINDOW_MS = 5*1000; } diff --git a/meta-client/src/main/java/org/apache/helix/metaclient/factories/MetaClientConfig.java b/meta-client/src/main/java/org/apache/helix/metaclient/factories/MetaClientConfig.java index 081727741..501669c7e 100644 --- a/meta-client/src/main/java/org/apache/helix/metaclient/factories/MetaClientConfig.java +++ b/meta-client/src/main/java/org/apache/helix/metaclient/factories/MetaClientConfig.java @@ -19,9 +19,9 @@ package org.apache.helix.metaclient.factories; * under the License. */ +import org.apache.helix.metaclient.constants.MetaClientConstants; import org.apache.helix.metaclient.policy.ExponentialBackoffReconnectPolicy; import org.apache.helix.metaclient.policy.MetaClientReconnectPolicy; -import org.apache.helix.metaclient.constants.MetaClientConstants; public class MetaClientConfig { @@ -34,10 +34,6 @@ public class MetaClientConfig { // Wait for init timeout time until connection is initiated private final long _connectionInitTimeoutInMillis; - // Operation failed because of connection lost will be auto retried if connection has recovered - // within timeout time. - private final long _operationRetryTimeoutInMillis; - // When a client becomes partitioned from the metadata service for more than session timeout, // new session will be established when reconnect. private final long _sessionTimeoutInMillis; @@ -57,10 +53,6 @@ public class MetaClientConfig { return _connectionInitTimeoutInMillis; } - public long getOperationRetryTimeoutInMillis() { - return _operationRetryTimeoutInMillis; - } - public boolean isAuthEnabled() { return _enableAuth; } @@ -82,11 +74,10 @@ public class MetaClientConfig { // private boolean _resetWatchWhenReConnect; // re-register previous existing watcher when reconnect protected MetaClientConfig(String connectionAddress, long connectionInitTimeoutInMillis, - long operationRetryTimeoutInMillis, long sessionTimeoutInMillis, - MetaClientReconnectPolicy metaClientReconnectPolicy, boolean enableAuth, StoreType storeType) { + long sessionTimeoutInMillis, MetaClientReconnectPolicy metaClientReconnectPolicy, + boolean enableAuth, StoreType storeType) { _connectionAddress = connectionAddress; _connectionInitTimeoutInMillis = connectionInitTimeoutInMillis; - _operationRetryTimeoutInMillis = operationRetryTimeoutInMillis; _sessionTimeoutInMillis = sessionTimeoutInMillis; _metaClientReconnectPolicy = metaClientReconnectPolicy; _enableAuth = enableAuth; @@ -98,7 +89,6 @@ public class MetaClientConfig { protected long _connectionInitTimeoutInMillis; protected long _sessionTimeoutInMillis; - protected long _operationRetryTimeout; protected boolean _enableAuth; protected StoreType _storeType; protected MetaClientReconnectPolicy _metaClientReconnectPolicy; @@ -107,11 +97,12 @@ public class MetaClientConfig { public MetaClientConfig build() { validate(); return new MetaClientConfig(_connectionAddress, _connectionInitTimeoutInMillis, - _operationRetryTimeout, _sessionTimeoutInMillis, _metaClientReconnectPolicy, _enableAuth, _storeType); + _sessionTimeoutInMillis, _metaClientReconnectPolicy, _enableAuth, _storeType); } public MetaClientConfigBuilder() { // set default values + setStoreType(StoreType.ZOOKEEPER); setAuthEnabled(false); setConnectionInitTimeoutInMillis(MetaClientConstants.DEFAULT_CONNECTION_INIT_TIMEOUT_MS); setSessionTimeoutInMillis(MetaClientConstants.DEFAULT_SESSION_TIMEOUT_MS); @@ -137,16 +128,6 @@ public class MetaClientConfig { return self(); } - /** - * Set timeout in ms for operation retry timeout - * @param timeout - * @return - */ - public B setOperationRetryTimeoutInMillis(long timeout) { - _operationRetryTimeout = timeout; - return self(); - } - /** * Set reconnect policy when connection is lost or expired. By default is * ExponentialBackoffReconnectPolicy @@ -184,14 +165,6 @@ public class MetaClientConfig { _metaClientReconnectPolicy = new ExponentialBackoffReconnectPolicy(); } - // check if reconnect policy and retry policy conflict. - if (_metaClientReconnectPolicy.getPolicyName() - == MetaClientReconnectPolicy.RetryPolicyName.NO_RETRY && _operationRetryTimeout > 0) { - throw new IllegalArgumentException( - "MetaClientConfig.Builder: Incompatible operationRetryTimeout with NO_RETRY ReconnectPolicy."); - } - // TODO: check operationRetryTimeout should be less than ReconnectPolicy timeout. - if (_storeType == null || _connectionAddress == null) { throw new IllegalArgumentException( "MetaClientConfig.Builder: store type or connection string is null"); diff --git a/meta-client/src/main/java/org/apache/helix/metaclient/impl/zk/ZkMetaClient.java b/meta-client/src/main/java/org/apache/helix/metaclient/impl/zk/ZkMetaClient.java index 2a9262208..9614e53d6 100644 --- a/meta-client/src/main/java/org/apache/helix/metaclient/impl/zk/ZkMetaClient.java +++ b/meta-client/src/main/java/org/apache/helix/metaclient/impl/zk/ZkMetaClient.java @@ -60,18 +60,20 @@ import static org.apache.helix.metaclient.impl.zk.util.ZkMetaClientUtil.translat public class ZkMetaClient<T> implements MetaClientInterface<T>, AutoCloseable { private static final Logger LOG = LoggerFactory.getLogger(ZkMetaClient.class); private final ZkClient _zkClient; - private final int _connectionTimeout; + private final long _initConnectionTimeout; + private final long _reconnectTimeout; public ZkMetaClient(ZkMetaClientConfig config) { - _connectionTimeout = (int) config.getConnectionInitTimeoutInMillis(); + _initConnectionTimeout = config.getConnectionInitTimeoutInMillis(); + _reconnectTimeout = config.getMetaClientReconnectPolicy().getAutoReconnectTimeout(); // TODO: Right new ZkClient reconnect using exp backoff with fixed max backoff interval. We should // 1. Allow user to config max backoff interval (next PR) // 2. Allow user to config reconnect policy (future PR) _zkClient = new ZkClient( new ZkConnection(config.getConnectionAddress(), (int) config.getSessionTimeoutInMillis()), - _connectionTimeout, config.getOperationRetryTimeoutInMillis(), config.getZkSerializer(), - config.getMonitorType(), config.getMonitorKey(), config.getMonitorInstanceName(), - config.getMonitorRootPathOnly(), false); + (int) _initConnectionTimeout, _reconnectTimeout /*use reconnect timeout for retry timeout*/, + config.getZkSerializer(), config.getMonitorType(), config.getMonitorKey(), + config.getMonitorInstanceName(), config.getMonitorRootPathOnly(), false); } @Override @@ -266,7 +268,7 @@ public class ZkMetaClient<T> implements MetaClientInterface<T>, AutoCloseable { public void connect() { // TODO: throws IllegalStateException when already connected try { - _zkClient.connect(_connectionTimeout, _zkClient); + _zkClient.connect(_initConnectionTimeout, _zkClient); } catch (ZkException e) { throw translateZkExceptionToMetaclientException(e); } diff --git a/meta-client/src/main/java/org/apache/helix/metaclient/impl/zk/factory/ZkMetaClientConfig.java b/meta-client/src/main/java/org/apache/helix/metaclient/impl/zk/factory/ZkMetaClientConfig.java index 3f21fa7df..82c9bb20a 100644 --- a/meta-client/src/main/java/org/apache/helix/metaclient/impl/zk/factory/ZkMetaClientConfig.java +++ b/meta-client/src/main/java/org/apache/helix/metaclient/impl/zk/factory/ZkMetaClientConfig.java @@ -19,8 +19,8 @@ package org.apache.helix.metaclient.impl.zk.factory; * under the License. */ -import org.apache.helix.metaclient.policy.MetaClientReconnectPolicy; import org.apache.helix.metaclient.factories.MetaClientConfig; +import org.apache.helix.metaclient.policy.MetaClientReconnectPolicy; import org.apache.helix.zookeeper.zkclient.serialize.BasicZkSerializer; import org.apache.helix.zookeeper.zkclient.serialize.PathBasedZkSerializer; import org.apache.helix.zookeeper.zkclient.serialize.SerializableSerializer; @@ -61,12 +61,11 @@ public class ZkMetaClientConfig extends MetaClientConfig { } protected ZkMetaClientConfig(String connectionAddress, long connectionInitTimeoutInMillis, - long operationRetryTimeoutInMillis, long sessionTimeoutInMillis, - MetaClientReconnectPolicy reconnectPolicy, boolean enableAuth, StoreType storeType, - String monitorType, String monitorKey, String monitorInstanceName, + long sessionTimeoutInMillis, MetaClientReconnectPolicy reconnectPolicy, boolean enableAuth, + StoreType storeType, String monitorType, String monitorKey, String monitorInstanceName, boolean monitorRootPathOnly, PathBasedZkSerializer zkSerializer) { - super(connectionAddress, connectionInitTimeoutInMillis, operationRetryTimeoutInMillis, - sessionTimeoutInMillis, reconnectPolicy, enableAuth, storeType); + super(connectionAddress, connectionInitTimeoutInMillis, sessionTimeoutInMillis, reconnectPolicy, + enableAuth, storeType); _zkSerializer = zkSerializer; _monitorType = monitorType; _monitorKey = monitorKey; @@ -130,11 +129,10 @@ public class ZkMetaClientConfig extends MetaClientConfig { @Override public ZkMetaClientConfig build() { - if (_zkSerializer == null) { - _zkSerializer = new BasicZkSerializer(new SerializableSerializer()); - } + validate(); + return new ZkMetaClientConfig(_connectionAddress, _connectionInitTimeoutInMillis, - _operationRetryTimeout, _sessionTimeoutInMillis, _metaClientReconnectPolicy, _enableAuth, + _sessionTimeoutInMillis, _metaClientReconnectPolicy, _enableAuth, MetaClientConfig.StoreType.ZOOKEEPER, _monitorType, _monitorKey, _monitorInstanceName, _monitorRootPathOnly, _zkSerializer); } @@ -142,6 +140,9 @@ public class ZkMetaClientConfig extends MetaClientConfig { @Override protected void validate() { super.validate(); + if (_zkSerializer == null) { + _zkSerializer = new BasicZkSerializer(new SerializableSerializer()); + } } } } \ No newline at end of file diff --git a/meta-client/src/main/java/org/apache/helix/metaclient/policy/MetaClientReconnectPolicy.java b/meta-client/src/main/java/org/apache/helix/metaclient/policy/MetaClientReconnectPolicy.java index 3d4cba3de..be80fdd59 100644 --- a/meta-client/src/main/java/org/apache/helix/metaclient/policy/MetaClientReconnectPolicy.java +++ b/meta-client/src/main/java/org/apache/helix/metaclient/policy/MetaClientReconnectPolicy.java @@ -19,6 +19,9 @@ package org.apache.helix.metaclient.policy; * under the License. */ +import static org.apache.helix.metaclient.constants.MetaClientConstants.DEFAULT_AUTO_RECONNECT_TIMEOUT_MS; + + /** * Policy to define client re-establish connection behavior when connection to underlying metadata * store is expired. @@ -34,5 +37,7 @@ public interface MetaClientReconnectPolicy { RetryPolicyName getPolicyName(); - // TODO: add reconnect timeout + default long getAutoReconnectTimeout() { + return DEFAULT_AUTO_RECONNECT_TIMEOUT_MS; + } }
