This is an automated email from the ASF dual-hosted git repository. xyuanlu pushed a commit to branch metaclient in repository https://gitbox.apache.org/repos/asf/helix.git
commit 5f7f47a839a0755cd21f7899a83f2e1e76ceaa16 Author: xyuanlu <[email protected]> AuthorDate: Mon Dec 5 15:42:35 2022 -0800 Add Meta client factory and ZkMetaClient constructor (#2291) Add Meta client factory and ZkMetaClient constructor --- .../metaclient/constants/MetaClientConstants.java | 41 ++++++ .../metaclient/constants/MetaClientException.java | 23 ++++ .../metaclient/factories/MetaClientConfig.java | 93 +++++++++---- .../helix/metaclient/impl/zk/ZkMetaClient.java | 15 ++- .../impl/zk/factory/ZkMetaClientConfig.java | 145 +++++++++++++++++++++ .../impl/zk/factory/ZkMetaClientFactory.java | 36 +++++ 6 files changed, 321 insertions(+), 32 deletions(-) diff --git a/meta-client/src/main/java/org/apache/helix/metaclient/constants/MetaClientConstants.java b/meta-client/src/main/java/org/apache/helix/metaclient/constants/MetaClientConstants.java new file mode 100644 index 000000000..e38b1f6a9 --- /dev/null +++ b/meta-client/src/main/java/org/apache/helix/metaclient/constants/MetaClientConstants.java @@ -0,0 +1,41 @@ +package org.apache.helix.metaclient.constants; + +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ + +public final class MetaClientConstants { + + private MetaClientConstants(){ + + } + + // Stop retrying when we reach timeout + //TODO The value should be the same as Helix default ZK retry time. Modify when change #2293 merged + public static final int DEFAULT_OPERATION_RETRY_TIMEOUT_MS = Integer.MAX_VALUE; + + // maxMsToWaitUntilConnected + public static final int DEFAULT_CONNECTION_INIT_TIMEOUT_MS = 60 * 1000; + + // When a client becomes partitioned from the metadata service for more than session timeout, + // new session will be established. + public static final int DEFAULT_SESSION_TIMEOUT_MS = 30 * 1000; + + + +} diff --git a/meta-client/src/main/java/org/apache/helix/metaclient/constants/MetaClientException.java b/meta-client/src/main/java/org/apache/helix/metaclient/constants/MetaClientException.java new file mode 100644 index 000000000..cf8a07c56 --- /dev/null +++ b/meta-client/src/main/java/org/apache/helix/metaclient/constants/MetaClientException.java @@ -0,0 +1,23 @@ +package org.apache.helix.metaclient.constants; + +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ + +public final class MetaClientException { +} diff --git a/meta-client/src/main/java/org/apache/helix/metaclient/factories/MetaClientConfig.java b/meta-client/src/main/java/org/apache/helix/metaclient/factories/MetaClientConfig.java index f317fd923..11f16e5b5 100644 --- a/meta-client/src/main/java/org/apache/helix/metaclient/factories/MetaClientConfig.java +++ b/meta-client/src/main/java/org/apache/helix/metaclient/factories/MetaClientConfig.java @@ -19,14 +19,23 @@ package org.apache.helix.metaclient.factories; * under the License. */ -class MetaClientConfig { +import org.apache.helix.metaclient.constants.MetaClientConstants; + +public class MetaClientConfig { public enum StoreType { ZOOKEEPER, ETCD, CUSTOMIZED } private final String _connectionAddress; - private final long _connectionTimeout; + + // Wait for init timeout time until connection is initiated + private final long _connectionInitTimeoutInMillis; + + // When a client becomes partitioned from the metadata service for more than session timeout, + // new session will be established when reconnect. + private final long _sessionTimeoutInMillis; + private final boolean _enableAuth; private final StoreType _storeType; @@ -34,8 +43,8 @@ class MetaClientConfig { return _connectionAddress; } - public long getConnectionTimeout() { - return _connectionTimeout; + public long getConnectionInitTimeoutInMillis() { + return _connectionInitTimeoutInMillis; } public boolean isAuthEnabled() { @@ -46,6 +55,10 @@ class MetaClientConfig { return _storeType; } + public long getSessionTimeoutInMillis() { + return _sessionTimeoutInMillis; + } + // TODO: More options to add later // private boolean _autoReRegistWatcher; // re-register one time watcher when set to true // private boolean _resetWatchWhenReConnect; // re-register previous existing watcher when reconnect @@ -56,56 +69,82 @@ class MetaClientConfig { // private RetryProtocol _retryProtocol; - private MetaClientConfig(String connectionAddress, long connectionTimeout, boolean enableAuth, - StoreType storeType) { + protected MetaClientConfig(String connectionAddress, long connectionInitTimeoutInMillis, + long sessionTimeoutInMillis, boolean enableAuth, StoreType storeType) { _connectionAddress = connectionAddress; - _connectionTimeout = connectionTimeout; + _connectionInitTimeoutInMillis = connectionInitTimeoutInMillis; + _sessionTimeoutInMillis = sessionTimeoutInMillis; _enableAuth = enableAuth; _storeType = storeType; } - public static class Builder { - private String _connectionAddress; - - private long _connectionTimeout; - private boolean _enableAuth; - //private RetryProtocol _retryProtocol; - private StoreType _storeType; + public static class MetaClientConfigBuilder<B extends MetaClientConfigBuilder<B>> { + protected String _connectionAddress; + protected long _connectionInitTimeoutInMillis; + protected long _sessionTimeoutInMillis; + // protected long _operationRetryTimeout; + // protected RetryProtocol _retryProtocol; + protected boolean _enableAuth; + protected StoreType _storeType; public MetaClientConfig build() { validate(); - return new MetaClientConfig(_connectionAddress, _connectionTimeout, _enableAuth, _storeType); + return new MetaClientConfig(_connectionAddress, _connectionInitTimeoutInMillis, + _sessionTimeoutInMillis, + _enableAuth, _storeType); } - public Builder() { + public MetaClientConfigBuilder() { // set default values setAuthEnabled(false); - setConnectionTimeout(-1); + setConnectionInitTimeoutInMillis(MetaClientConstants.DEFAULT_CONNECTION_INIT_TIMEOUT_MS); + setSessionTimeoutInMillis(MetaClientConstants.DEFAULT_SESSION_TIMEOUT_MS); } - public Builder setConnectionAddress(String connectionAddress) { + public B setConnectionAddress(String connectionAddress) { _connectionAddress = connectionAddress; - return this; + return self(); } - public Builder setAuthEnabled(Boolean enableAuth) { + public B setAuthEnabled(Boolean enableAuth) { _enableAuth = enableAuth; - return this; + return self(); + } + + /** + * Set timeout in mm for connection initialization timeout + * @param timeout + * @return + */ + public B setConnectionInitTimeoutInMillis(long timeout) { + _connectionInitTimeoutInMillis = timeout; + return self(); } - public Builder setConnectionTimeout(long timeout) { - _connectionTimeout = timeout; - return this; + /** + * Set timeout in mm for session timeout. When a client becomes partitioned from the metadata + * service for more than session timeout, new session will be established. + * @param timeout + * @return + */ + public B setSessionTimeoutInMillis(long timeout) { + _sessionTimeoutInMillis = timeout; + return self(); } - public Builder setStoreType(StoreType storeType) { + public B setStoreType(StoreType storeType) { _storeType = storeType; - return this; + return self(); + } + + @SuppressWarnings("unchecked") + final B self() { + return (B) this; } - private void validate() { + protected void validate() { if (_storeType == null || _connectionAddress == null) { throw new IllegalArgumentException( "MetaClientConfig.Builder: store type or connection string is null"); diff --git a/meta-client/src/main/java/org/apache/helix/metaclient/impl/zk/ZkMetaClient.java b/meta-client/src/main/java/org/apache/helix/metaclient/impl/zk/ZkMetaClient.java index 8437e387f..194cf18f9 100644 --- a/meta-client/src/main/java/org/apache/helix/metaclient/impl/zk/ZkMetaClient.java +++ b/meta-client/src/main/java/org/apache/helix/metaclient/impl/zk/ZkMetaClient.java @@ -31,15 +31,21 @@ import org.apache.helix.metaclient.api.DirectChildChangeListener; import org.apache.helix.metaclient.api.DirectChildSubscribeResult; import org.apache.helix.metaclient.api.MetaClientInterface; import org.apache.helix.metaclient.api.OpResult; -import org.apache.helix.zookeeper.api.client.RealmAwareZkClient; +import org.apache.helix.metaclient.impl.zk.factory.ZkMetaClientConfig; +import org.apache.helix.zookeeper.impl.client.ZkClient; +import org.apache.helix.zookeeper.zkclient.ZkConnection; public class ZkMetaClient implements MetaClientInterface { - private RealmAwareZkClient _zkClient; - - public ZkMetaClient() { + private final ZkClient _zkClient; + public ZkMetaClient(ZkMetaClientConfig config) { + _zkClient = new ZkClient(new ZkConnection(config.getConnectionAddress(), + (int) config.getSessionTimeoutInMillis()), + (int) config.getConnectionInitTimeoutInMillis(), -1 /*operationRetryTimeout*/, + config.getZkSerializer(), config.getMonitorType(), config.getMonitorKey(), + config.getMonitorInstanceName(), config.getMonitorRootPathOnly()); } @Override @@ -147,7 +153,6 @@ public class ZkMetaClient implements MetaClientInterface { } - @Override public boolean connect() { return false; diff --git a/meta-client/src/main/java/org/apache/helix/metaclient/impl/zk/factory/ZkMetaClientConfig.java b/meta-client/src/main/java/org/apache/helix/metaclient/impl/zk/factory/ZkMetaClientConfig.java new file mode 100644 index 000000000..c4190fc84 --- /dev/null +++ b/meta-client/src/main/java/org/apache/helix/metaclient/impl/zk/factory/ZkMetaClientConfig.java @@ -0,0 +1,145 @@ +package org.apache.helix.metaclient.impl.zk.factory; + +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ + +import org.apache.helix.metaclient.factories.MetaClientConfig; +import org.apache.helix.zookeeper.zkclient.serialize.BasicZkSerializer; +import org.apache.helix.zookeeper.zkclient.serialize.PathBasedZkSerializer; +import org.apache.helix.zookeeper.zkclient.serialize.SerializableSerializer; +import org.apache.helix.zookeeper.zkclient.serialize.ZkSerializer; + + +public class ZkMetaClientConfig extends MetaClientConfig { + + protected final PathBasedZkSerializer _zkSerializer; + + // Monitoring related fields. MBean names are crated using following variables in format of + // MonitorPrefix_monitorType_monitorKey_monitorInstanceName, where _monitorInstanceName is optional + // TODO: right now all zkClient mBean object has prefix `HelixZkClient` had coded. We should change + // it to a configurable name. + protected final String _monitorType; + protected final String _monitorKey; + protected final String _monitorInstanceName; + protected final boolean _monitorRootPathOnly; + + public PathBasedZkSerializer getZkSerializer() { + return _zkSerializer; + } + + public String getMonitorType() { + return _monitorType; + } + + public String getMonitorKey() { + return _monitorKey; + } + + public String getMonitorInstanceName() { + return _monitorInstanceName; + } + + public boolean getMonitorRootPathOnly() { + return _monitorRootPathOnly; + } + + protected ZkMetaClientConfig(String connectionAddress, long connectionInitTimeoutInMillis, + long sessionTimeoutInMillis, boolean enableAuth, StoreType storeType, String monitorType, + String monitorKey, String monitorInstanceName, boolean monitorRootPathOnly, + PathBasedZkSerializer zkSerializer) { + super(connectionAddress, connectionInitTimeoutInMillis, sessionTimeoutInMillis, enableAuth, + storeType); + _zkSerializer = zkSerializer; + _monitorType = monitorType; + _monitorKey = monitorKey; + _monitorInstanceName = monitorInstanceName; + _monitorRootPathOnly = monitorRootPathOnly; + + } + + public static class ZkMetaClientConfigBuilder extends MetaClientConfig.MetaClientConfigBuilder<ZkMetaClientConfigBuilder> { + + protected PathBasedZkSerializer _zkSerializer; + + // Monitoring + // Type as in MBean object + protected String _monitorType; + protected String _monitorKey; + protected String _monitorInstanceName = null; + protected boolean _monitorRootPathOnly = true; + + public ZkMetaClientConfigBuilder setZkSerializer( + org.apache.helix.zookeeper.zkclient.serialize.PathBasedZkSerializer zkSerializer) { + this._zkSerializer = zkSerializer; + return this; + } + + public ZkMetaClientConfigBuilder setZkSerializer(ZkSerializer zkSerializer) { + this._zkSerializer = new BasicZkSerializer(zkSerializer); + return this; + } + + /** + * Used as part of the MBean ObjectName. This item is required for enabling monitoring. + * @param monitorType + */ + public ZkMetaClientConfigBuilder setMonitorType(String monitorType) { + this._monitorType = monitorType; + return this; + } + + /** + * Used as part of the MBean ObjectName. This item is required for enabling monitoring. + * @param monitorKey + */ + public ZkMetaClientConfigBuilder setMonitorKey(String monitorKey) { + this._monitorKey = monitorKey; + return this; + } + + /** + * Used as part of the MBean ObjectName. This item is optional. + * @param instanceName + */ + public ZkMetaClientConfigBuilder setMonitorInstanceName(String instanceName) { + this._monitorInstanceName = instanceName; + return this; + } + + public ZkMetaClientConfigBuilder setMonitorRootPathOnly(Boolean monitorRootPathOnly) { + this._monitorRootPathOnly = monitorRootPathOnly; + return this; + } + + @Override + public MetaClientConfig build() { + if (_zkSerializer == null) { + _zkSerializer = new BasicZkSerializer(new SerializableSerializer()); + } + return new ZkMetaClientConfig(_connectionAddress, _connectionInitTimeoutInMillis, + _sessionTimeoutInMillis, _enableAuth, MetaClientConfig.StoreType.ZOOKEEPER, _monitorType, + _monitorKey, _monitorInstanceName, _monitorRootPathOnly, _zkSerializer); + } + + @Override + protected void validate() { + super.validate(); + } + } +} \ No newline at end of file diff --git a/meta-client/src/main/java/org/apache/helix/metaclient/impl/zk/factory/ZkMetaClientFactory.java b/meta-client/src/main/java/org/apache/helix/metaclient/impl/zk/factory/ZkMetaClientFactory.java new file mode 100644 index 000000000..b3480b143 --- /dev/null +++ b/meta-client/src/main/java/org/apache/helix/metaclient/impl/zk/factory/ZkMetaClientFactory.java @@ -0,0 +1,36 @@ +package org.apache.helix.metaclient.impl.zk.factory; + +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ + +import org.apache.helix.metaclient.api.MetaClientInterface; +import org.apache.helix.metaclient.factories.MetaClientConfig; +import org.apache.helix.metaclient.factories.MetaClientFactory; +import org.apache.helix.metaclient.impl.zk.ZkMetaClient; + +public class ZkMetaClientFactory extends MetaClientFactory { + @Override + public MetaClientInterface getMetaClient(MetaClientConfig config) { + if (config.getStoreType() == MetaClientConfig.StoreType.ZOOKEEPER + && config instanceof ZkMetaClientConfig) { + return new ZkMetaClient((ZkMetaClientConfig) config); + } + throw new IllegalArgumentException("Invalid MetaClientConfig type."); + } +}
