This is an automated email from the ASF dual-hosted git repository. hulee pushed a commit to branch zooscalability_merge in repository https://gitbox.apache.org/repos/asf/helix.git
commit ea880eedf6f72ac8ac54adf81caca118e68131d8 Author: Hunter Lee <[email protected]> AuthorDate: Mon Mar 2 20:48:10 2020 -0800 Make ConfigAccessor and ZkUtil realm-aware (#838) To make Helix Java APIs realm-aware, we first make ConfigAccessor and ZkUtil realm-aware by instrumenting these APIs with a Builder and RealmAwareZkClients. The Builder pattern is chosen because it is a scalable option when there are a lot of configurable parameters. It makes it easy to validate the given parameters as well. --- .../main/java/org/apache/helix/ConfigAccessor.java | 100 +++++++++++++++++- .../java/org/apache/helix/manager/zk/ZKUtil.java | 54 ++++++---- metadata-store-directory-common/pom.xml | 7 -- .../helix/zookeeper/api/client/HelixZkClient.java | 5 +- .../zookeeper/api/client/RealmAwareZkClient.java | 112 +++++++++++++++++++-- .../impl/client/RealmAwareZkClientTestBase.java | 7 +- 6 files changed, 239 insertions(+), 46 deletions(-) diff --git a/helix-core/src/main/java/org/apache/helix/ConfigAccessor.java b/helix-core/src/main/java/org/apache/helix/ConfigAccessor.java index 8ae7fd5..ad77b4e 100644 --- a/helix-core/src/main/java/org/apache/helix/ConfigAccessor.java +++ b/helix-core/src/main/java/org/apache/helix/ConfigAccessor.java @@ -19,6 +19,7 @@ package org.apache.helix; * under the License. */ +import java.io.IOException; import java.util.ArrayList; import java.util.Arrays; import java.util.Collections; @@ -39,9 +40,12 @@ import org.apache.helix.model.RESTConfig; import org.apache.helix.model.ResourceConfig; import org.apache.helix.model.builder.HelixConfigScopeBuilder; import org.apache.helix.util.HelixUtil; +import org.apache.helix.msdcommon.exception.InvalidRoutingDataException; import org.apache.helix.util.StringTemplate; -import org.apache.helix.zookeeper.datamodel.ZNRecord; import org.apache.helix.zookeeper.api.client.HelixZkClient; +import org.apache.helix.zookeeper.api.client.RealmAwareZkClient; +import org.apache.helix.zookeeper.datamodel.ZNRecord; +import org.apache.helix.zookeeper.impl.client.FederatedZkClient; import org.apache.helix.zookeeper.impl.factory.SharedZkClientFactory; import org.slf4j.Logger; import org.slf4j.LoggerFactory; @@ -74,12 +78,37 @@ public class ConfigAccessor { // @formatter:on } - private final HelixZkClient _zkClient; + private final RealmAwareZkClient _zkClient; // true if ConfigAccessor was instantiated with a HelixZkClient, false otherwise // This is used for close() to determine how ConfigAccessor should close the underlying ZkClient private final boolean _usesExternalZkClient; /** + * Constructor that creates a realm-aware ConfigAccessor using a builder. + * @param builder + */ + private ConfigAccessor(Builder builder) throws IOException, InvalidRoutingDataException { + switch (builder._realmMode) { + case MULTI_REALM: + // TODO: make sure FederatedZkClient is created correctly + // TODO: pass in MSDS endpoint or pass in _realmAwareZkConnectionConfig + String msdsEndpoint = builder._realmAwareZkConnectionConfig.getMsdsEndpoint(); + _zkClient = new FederatedZkClient(); + break; + case SINGLE_REALM: + // Create a HelixZkClient: Use a SharedZkClient because ConfigAccessor does not need to do + // ephemeral operations + _zkClient = SharedZkClientFactory.getInstance() + .buildZkClient(builder._realmAwareZkConnectionConfig.createZkConnectionConfig(), + builder._realmAwareZkClientConfig.createHelixZkClientConfig()); + break; + default: + throw new HelixException("Invalid RealmMode given: " + builder._realmMode); + } + _usesExternalZkClient = false; + } + + /** * Initialize an accessor with a Zookeeper client * Note: it is recommended to use the other constructor instead to avoid having to create a * HelixZkClient. @@ -945,4 +974,71 @@ public class ConfigAccessor { _zkClient.close(); } } + + public static class Builder { + private String _zkAddress; + private RealmAwareZkClient.RealmMode _realmMode; + private RealmAwareZkClient.RealmAwareZkConnectionConfig _realmAwareZkConnectionConfig; + private RealmAwareZkClient.RealmAwareZkClientConfig _realmAwareZkClientConfig; + + public Builder() { + } + + public Builder setZkAddress(String zkAddress) { + _zkAddress = zkAddress; + return this; + } + + public Builder setRealmMode(RealmAwareZkClient.RealmMode realmMode) { + _realmMode = realmMode; + return this; + } + + public Builder setRealmAwareZkConnectionConfig( + RealmAwareZkClient.RealmAwareZkConnectionConfig realmAwareZkConnectionConfig) { + _realmAwareZkConnectionConfig = realmAwareZkConnectionConfig; + return this; + } + + public Builder setRealmAwareZkClientConfig( + RealmAwareZkClient.RealmAwareZkClientConfig realmAwareZkClientConfig) { + _realmAwareZkClientConfig = realmAwareZkClientConfig; + return this; + } + + public ConfigAccessor build() throws Exception { + validate(); + return new ConfigAccessor(this); + } + + /** + * Validate the given parameters before creating an instance of ConfigAccessor. + */ + private void validate() { + // Resolve RealmMode based on other parameters + boolean isZkAddressSet = _zkAddress != null && !_zkAddress.isEmpty(); + if (_realmMode == RealmAwareZkClient.RealmMode.SINGLE_REALM && !isZkAddressSet) { + throw new HelixException( + "ConfigAccessor: RealmMode cannot be single-realm without a valid ZkAddress set!"); + } + if (_realmMode == null) { + _realmMode = isZkAddressSet ? RealmAwareZkClient.RealmMode.SINGLE_REALM + : RealmAwareZkClient.RealmMode.MULTI_REALM; + } + + // Resolve RealmAwareZkClientConfig + boolean isZkClientConfigSet = _realmAwareZkClientConfig != null; + // Resolve which clientConfig to use + _realmAwareZkClientConfig = + isZkClientConfigSet ? _realmAwareZkClientConfig.createHelixZkClientConfig() + : new HelixZkClient.ZkClientConfig().setZkSerializer(new ZNRecordSerializer()); + + // Resolve RealmAwareZkConnectionConfig + if (_realmAwareZkConnectionConfig == null) { + // If not set, create a default one + _realmAwareZkConnectionConfig = + new RealmAwareZkClient.RealmAwareZkConnectionConfig.Builder().build(); + } + } + } } diff --git a/helix-core/src/main/java/org/apache/helix/manager/zk/ZKUtil.java b/helix-core/src/main/java/org/apache/helix/manager/zk/ZKUtil.java index 1c5784f..70042ff 100644 --- a/helix-core/src/main/java/org/apache/helix/manager/zk/ZKUtil.java +++ b/helix-core/src/main/java/org/apache/helix/manager/zk/ZKUtil.java @@ -23,11 +23,11 @@ import java.util.ArrayList; import java.util.Collections; import java.util.List; -import org.apache.helix.BaseDataAccessor; import org.apache.helix.HelixException; import org.apache.helix.InstanceType; import org.apache.helix.PropertyPathBuilder; import org.apache.helix.zookeeper.api.client.HelixZkClient; +import org.apache.helix.zookeeper.api.client.RealmAwareZkClient; import org.apache.helix.zookeeper.datamodel.ZNRecord; import org.apache.helix.zookeeper.impl.factory.DedicatedZkClientFactory; import org.apache.helix.zookeeper.zkclient.DataUpdater; @@ -36,6 +36,7 @@ import org.apache.zookeeper.data.Stat; import org.slf4j.Logger; import org.slf4j.LoggerFactory; + /** * Using this ZKUtil class for production purposes is NOT recommended since a lot of the static * methods require a ZkClient instance to be passed in. @@ -65,7 +66,7 @@ public final class ZKUtil { return result; } - public static boolean isClusterSetup(String clusterName, HelixZkClient zkClient) { + public static boolean isClusterSetup(String clusterName, RealmAwareZkClient zkClient) { if (clusterName == null) { logger.info("Fail to check cluster setup : cluster name is null!"); return false; @@ -75,7 +76,7 @@ public final class ZKUtil { logger.info("Fail to check cluster setup : zookeeper client is null!"); return false; } - ArrayList<String> requiredPaths = new ArrayList<String>(); + List<String> requiredPaths = new ArrayList<>(); requiredPaths.add(PropertyPathBuilder.idealState(clusterName)); requiredPaths.add(PropertyPathBuilder.clusterConfig(clusterName)); requiredPaths.add(PropertyPathBuilder.instanceConfig(clusterName)); @@ -92,15 +93,21 @@ public final class ZKUtil { requiredPaths.add(PropertyPathBuilder.controllerHistory(clusterName)); boolean isValid = true; - BaseDataAccessor<Object> baseAccessor = new ZkBaseDataAccessor<Object>(zkClient); - boolean[] ret = baseAccessor.exists(requiredPaths, 0); + boolean[] ret = new boolean[requiredPaths.size()]; + for (int i = 0; i < requiredPaths.size(); i++) { + try { + ret[i] = zkClient.exists(requiredPaths.get(i)); + } catch (Exception e) { + ret[i] = false; + } + } StringBuilder errorMsg = new StringBuilder(); for (int i = 0; i < ret.length; i++) { if (!ret[i]) { isValid = false; - errorMsg - .append(("Invalid cluster setup, missing znode path: " + requiredPaths.get(i)) + "\n"); + errorMsg.append("Invalid cluster setup, missing znode path: ").append(requiredPaths.get(i)) + .append("\n"); } } @@ -132,10 +139,10 @@ public final class ZKUtil { return result; } - public static boolean isInstanceSetup(HelixZkClient zkclient, String clusterName, + public static boolean isInstanceSetup(RealmAwareZkClient zkclient, String clusterName, String instanceName, InstanceType type) { if (type == InstanceType.PARTICIPANT || type == InstanceType.CONTROLLER_PARTICIPANT) { - ArrayList<String> requiredPaths = new ArrayList<>(); + List<String> requiredPaths = new ArrayList<>(); requiredPaths.add(PropertyPathBuilder.instanceConfig(clusterName, instanceName)); requiredPaths.add(PropertyPathBuilder.instanceMessage(clusterName, instanceName)); requiredPaths.add(PropertyPathBuilder.instanceCurrentState(clusterName, instanceName)); @@ -157,7 +164,6 @@ public final class ZKUtil { if (!zkclient.exists(historyPath)) { zkclient.createPersistent(historyPath, true); } - } return isValid; } @@ -181,7 +187,8 @@ public final class ZKUtil { } } - public static void createChildren(HelixZkClient client, String parentPath, List<ZNRecord> list) { + public static void createChildren(RealmAwareZkClient client, String parentPath, + List<ZNRecord> list) { client.createPersistent(parentPath, true); if (list != null) { for (ZNRecord record : list) { @@ -206,7 +213,8 @@ public final class ZKUtil { } } - public static void createChildren(HelixZkClient client, String parentPath, ZNRecord nodeRecord) { + public static void createChildren(RealmAwareZkClient client, String parentPath, + ZNRecord nodeRecord) { client.createPersistent(parentPath, true); String id = nodeRecord.getId(); @@ -230,7 +238,8 @@ public final class ZKUtil { } } - public static void dropChildren(HelixZkClient client, String parentPath, List<ZNRecord> list) { + public static void dropChildren(RealmAwareZkClient client, String parentPath, + List<ZNRecord> list) { // TODO: check if parentPath exists if (list != null) { for (ZNRecord record : list) { @@ -255,7 +264,8 @@ public final class ZKUtil { } } - public static void dropChildren(HelixZkClient client, String parentPath, ZNRecord nodeRecord) { + public static void dropChildren(RealmAwareZkClient client, String parentPath, + ZNRecord nodeRecord) { // TODO: check if parentPath exists String id = nodeRecord.getId(); String temp = parentPath + "/" + id; @@ -280,7 +290,7 @@ public final class ZKUtil { return result; } - public static List<ZNRecord> getChildren(HelixZkClient client, String path) { + public static List<ZNRecord> getChildren(RealmAwareZkClient client, String path) { // parent watch will be set by zkClient List<String> children = client.getChildren(path); if (children == null || children.size() == 0) { @@ -321,7 +331,7 @@ public final class ZKUtil { } } - public static void updateIfExists(HelixZkClient client, String path, final ZNRecord record, + public static void updateIfExists(RealmAwareZkClient client, String path, final ZNRecord record, boolean mergeOnUpdate) { if (client.exists(path)) { DataUpdater<Object> updater = new DataUpdater<Object>() { @@ -353,7 +363,7 @@ public final class ZKUtil { } } - public static void createOrMerge(HelixZkClient client, String path, final ZNRecord record, + public static void createOrMerge(RealmAwareZkClient client, String path, final ZNRecord record, final boolean persistent, final boolean mergeOnUpdate) { int retryCount = 0; while (retryCount < RETRYLIMIT) { @@ -408,7 +418,7 @@ public final class ZKUtil { } } - public static void createOrUpdate(HelixZkClient client, String path, final ZNRecord record, + public static void createOrUpdate(RealmAwareZkClient client, String path, final ZNRecord record, final boolean persistent, final boolean mergeOnUpdate) { int retryCount = 0; while (retryCount < RETRYLIMIT) { @@ -457,8 +467,8 @@ public final class ZKUtil { } } - public static void asyncCreateOrMerge(HelixZkClient client, String path, final ZNRecord record, - final boolean persistent, final boolean mergeOnUpdate) { + public static void asyncCreateOrMerge(RealmAwareZkClient client, String path, + final ZNRecord record, final boolean persistent, final boolean mergeOnUpdate) { try { if (client.exists(path)) { if (mergeOnUpdate) { @@ -510,7 +520,7 @@ public final class ZKUtil { } } - public static void createOrReplace(HelixZkClient client, String path, final ZNRecord record, + public static void createOrReplace(RealmAwareZkClient client, String path, final ZNRecord record, final boolean persistent) { int retryCount = 0; while (retryCount < RETRYLIMIT) { @@ -553,7 +563,7 @@ public final class ZKUtil { } } - public static void subtract(HelixZkClient client, final String path, + public static void subtract(RealmAwareZkClient client, final String path, final ZNRecord recordTosubtract) { int retryCount = 0; while (retryCount < RETRYLIMIT) { diff --git a/metadata-store-directory-common/pom.xml b/metadata-store-directory-common/pom.xml index 1b0d964..a38d287 100644 --- a/metadata-store-directory-common/pom.xml +++ b/metadata-store-directory-common/pom.xml @@ -33,8 +33,6 @@ under the License. <properties> <osgi.import> - org.apache.commons.cli*, - org.apache.commons.io*;version="[1.4,2)", org.slf4j*;version="[1.6,2)", * </osgi.import> @@ -69,11 +67,6 @@ under the License. <version>3.8.1</version> </dependency> <dependency> - <groupId>commons-cli</groupId> - <artifactId>commons-cli</artifactId> - <version>1.2</version> - </dependency> - <dependency> <groupId>org.testng</groupId> <artifactId>testng</artifactId> <scope>test</scope> diff --git a/zookeeper-api/src/main/java/org/apache/helix/zookeeper/api/client/HelixZkClient.java b/zookeeper-api/src/main/java/org/apache/helix/zookeeper/api/client/HelixZkClient.java index 03bf000..d9f7461 100644 --- a/zookeeper-api/src/main/java/org/apache/helix/zookeeper/api/client/HelixZkClient.java +++ b/zookeeper-api/src/main/java/org/apache/helix/zookeeper/api/client/HelixZkClient.java @@ -19,7 +19,6 @@ package org.apache.helix.zookeeper.api.client; * under the License. */ -import org.apache.helix.zookeeper.zkclient.ZkClient; import org.apache.helix.zookeeper.zkclient.serialize.BasicZkSerializer; import org.apache.helix.zookeeper.zkclient.serialize.PathBasedZkSerializer; import org.apache.helix.zookeeper.zkclient.serialize.ZkSerializer; @@ -150,8 +149,8 @@ public interface HelixZkClient extends RealmAwareZkClient { } @Override - public ZkClientConfig setConnectInitTimeout(long _connectInitTimeout) { - this._connectInitTimeout = _connectInitTimeout; + public ZkClientConfig setConnectInitTimeout(long connectInitTimeout) { + this._connectInitTimeout = connectInitTimeout; return this; } } diff --git a/zookeeper-api/src/main/java/org/apache/helix/zookeeper/api/client/RealmAwareZkClient.java b/zookeeper-api/src/main/java/org/apache/helix/zookeeper/api/client/RealmAwareZkClient.java index e466d36..fb10073 100644 --- a/zookeeper-api/src/main/java/org/apache/helix/zookeeper/api/client/RealmAwareZkClient.java +++ b/zookeeper-api/src/main/java/org/apache/helix/zookeeper/api/client/RealmAwareZkClient.java @@ -19,9 +19,13 @@ package org.apache.helix.zookeeper.api.client; * under the License. */ +import java.io.IOException; import java.util.List; import java.util.concurrent.TimeUnit; +import org.apache.helix.msdcommon.exception.InvalidRoutingDataException; +import org.apache.helix.zookeeper.exception.ZkClientException; +import org.apache.helix.zookeeper.util.HttpRoutingDataReader; import org.apache.helix.zookeeper.zkclient.DataUpdater; import org.apache.helix.zookeeper.zkclient.IZkChildListener; import org.apache.helix.zookeeper.zkclient.IZkDataListener; @@ -54,9 +58,8 @@ public interface RealmAwareZkClient { * SINGLE_REALM: CRUD, change subscription, and EPHEMERAL CreateMode are supported. * MULTI_REALM: CRUD and change subscription are supported. Operations involving EPHEMERAL CreateMode will throw an UnsupportedOperationException. */ - enum MODE { - SINGLE_REALM, - MULTI_REALM + enum RealmMode { + SINGLE_REALM, MULTI_REALM } int DEFAULT_OPERATION_TIMEOUT = Integer.MAX_VALUE; @@ -325,16 +328,18 @@ public interface RealmAwareZkClient { * ZkConnection-related configs for creating an instance of RealmAwareZkClient. */ class RealmAwareZkConnectionConfig { - /** * zkRealmShardingKey: used to deduce which ZK realm this RealmAwareZkClientConfig should connect to. - * NOTE: this field will be ignored if MODE is MULTI_REALM! + * NOTE: this field will be ignored if RealmMode is MULTI_REALM! */ - private final String _zkRealmShardingKey; - private int _sessionTimeout = DEFAULT_SESSION_TIMEOUT; - - public RealmAwareZkConnectionConfig(String zkRealmShardingKey) { - _zkRealmShardingKey = zkRealmShardingKey; + private String _zkRealmShardingKey; + private String _msdsEndpoint; + private int _sessionTimeout; + + private RealmAwareZkConnectionConfig(Builder builder) { + _zkRealmShardingKey = builder._zkRealmShardingKey; + _msdsEndpoint = builder._msdsEndpoint; + _sessionTimeout = builder._sessionTimeout; } @Override @@ -373,6 +378,81 @@ public interface RealmAwareZkClient { public int getSessionTimeout() { return _sessionTimeout; } + + public String getMsdsEndpoint() { + return _msdsEndpoint; + } + + public HelixZkClient.ZkConnectionConfig createZkConnectionConfig() + throws IOException, InvalidRoutingDataException { + // Convert to a single-realm HelixZkClient's ZkConnectionConfig + if (_zkRealmShardingKey == null || _zkRealmShardingKey.isEmpty()) { + throw new ZkClientException( + "Cannot create ZkConnectionConfig because ZK realm sharding key is either null or empty!"); + } + + String zkAddress; + // Look up the ZK address for the given ZK realm sharding key + if (_msdsEndpoint == null || _msdsEndpoint.isEmpty()) { + zkAddress = HttpRoutingDataReader.getMetadataStoreRoutingData() + .getMetadataStoreRealm(_zkRealmShardingKey); + } else { + zkAddress = HttpRoutingDataReader.getMetadataStoreRoutingData(_msdsEndpoint) + .getMetadataStoreRealm(_zkRealmShardingKey); + } + + return new HelixZkClient.ZkConnectionConfig(zkAddress).setSessionTimeout(_sessionTimeout); + } + + public static class Builder { + private RealmMode _realmMode; + private String _zkRealmShardingKey; + private String _msdsEndpoint; + private int _sessionTimeout = DEFAULT_SESSION_TIMEOUT; + + public Builder() { + } + + public Builder setRealmMode(RealmMode mode) { + _realmMode = mode; + return this; + } + + public Builder setZkRealmShardingKey(String shardingKey) { + _zkRealmShardingKey = shardingKey; + return this; + } + + public Builder setMsdsEndpoint(String msdsEndpoint) { + _msdsEndpoint = msdsEndpoint; + return this; + } + + public Builder setSessionTimeout(int sessionTimeout) { + _sessionTimeout = sessionTimeout; + return this; + } + + public RealmAwareZkConnectionConfig build() { + validate(); + return new RealmAwareZkConnectionConfig(this); + } + + /** + * Validate the internal fields of the builder before creating an instance. + */ + private void validate() { + boolean isShardingKeySet = _zkRealmShardingKey != null && !_zkRealmShardingKey.isEmpty(); + if (_realmMode == RealmMode.MULTI_REALM && isShardingKeySet) { + throw new IllegalArgumentException( + "ZK sharding key cannot be set on multi-realm mode! Sharding key: " + + _zkRealmShardingKey); + } + if (_realmMode == RealmMode.SINGLE_REALM && !isShardingKeySet) { + throw new IllegalArgumentException("ZK sharding key must be set on single-realm mode!"); + } + } + } } /** @@ -479,5 +559,17 @@ public interface RealmAwareZkClient { public long getConnectInitTimeout() { return _connectInitTimeout; } + + /** + * Create HelixZkClient.ZkClientConfig based on RealmAwareZkClientConfig. + * @return + */ + public HelixZkClient.ZkClientConfig createHelixZkClientConfig() { + return new HelixZkClient.ZkClientConfig().setZkSerializer(_zkSerializer) + .setMonitorType(_monitorType).setMonitorKey(_monitorKey) + .setMonitorInstanceName(_monitorInstanceName).setMonitorRootPathOnly(_monitorRootPathOnly) + .setOperationRetryTimeout(_operationRetryTimeout) + .setConnectInitTimeout(_connectInitTimeout); + } } } diff --git a/zookeeper-api/src/test/java/org/apache/helix/zookeeper/impl/client/RealmAwareZkClientTestBase.java b/zookeeper-api/src/test/java/org/apache/helix/zookeeper/impl/client/RealmAwareZkClientTestBase.java index d500ce4..323d5f4 100644 --- a/zookeeper-api/src/test/java/org/apache/helix/zookeeper/impl/client/RealmAwareZkClientTestBase.java +++ b/zookeeper-api/src/test/java/org/apache/helix/zookeeper/impl/client/RealmAwareZkClientTestBase.java @@ -85,8 +85,10 @@ public abstract class RealmAwareZkClientTestBase extends ZkTestBase { new RealmAwareZkClient.RealmAwareZkClientConfig(); // Create a connection config with the invalid sharding key + RealmAwareZkClient.RealmAwareZkConnectionConfig.Builder builder = + new RealmAwareZkClient.RealmAwareZkConnectionConfig.Builder(); RealmAwareZkClient.RealmAwareZkConnectionConfig connectionConfig = - new RealmAwareZkClient.RealmAwareZkConnectionConfig(invalidShardingKey); + builder.setZkRealmShardingKey(invalidShardingKey).build(); try { _realmAwareZkClient = _realmAwareZkClientFactory @@ -98,7 +100,8 @@ public abstract class RealmAwareZkClientTestBase extends ZkTestBase { // Use a valid sharding key this time around String validShardingKey = ZK_SHARDING_KEY_PREFIX + "_" + 0; // Use TEST_SHARDING_KEY_0 - connectionConfig = new RealmAwareZkClient.RealmAwareZkConnectionConfig(validShardingKey); + builder.setZkRealmShardingKey(validShardingKey); + connectionConfig = builder.build(); _realmAwareZkClient = _realmAwareZkClientFactory .buildZkClient(connectionConfig, clientConfig, _metadataStoreRoutingData); }
