This is an automated email from the ASF dual-hosted git repository. hulee pushed a commit to branch master in repository https://gitbox.apache.org/repos/asf/helix.git
commit 6014df6ddc89cb62163b8448cc408075b1691dc4 Author: Hunter Lee <[email protected]> AuthorDate: Fri Jul 24 15:01:04 2020 -0700 Modify realm-aware ZkClient and Helix API for configurable routing source This commit changes old MSDS-based interfaces and replaces them with a more generic configurable routing data source interfaces. This commit also adds test cases for Helix API. --- .../main/java/org/apache/helix/ConfigAccessor.java | 7 +- .../manager/zk/GenericBaseDataAccessorBuilder.java | 3 +- .../helix/manager/zk/GenericZkHelixApiBuilder.java | 21 +++--- .../org/apache/helix/manager/zk/ZKHelixAdmin.java | 14 ++-- .../java/org/apache/helix/manager/zk/ZKUtil.java | 3 +- .../helix/manager/zk/ZkBaseDataAccessor.java | 7 +- .../helix/manager/zk/ZkBucketDataAccessor.java | 2 +- .../java/org/apache/helix/tools/ClusterSetup.java | 8 +- .../multizk/TestMultiZkHelixJavaApis.java | 88 ++++++++++++++++++++-- .../apache/helix/rest/server/ServerContext.java | 5 +- .../zookeeper/api/client/RealmAwareZkClient.java | 44 ++++++++--- .../zookeeper/constant/RoutingDataReaderType.java | 14 ++++ .../zookeeper/impl/client/DedicatedZkClient.java | 18 ++--- .../zookeeper/impl/client/FederatedZkClient.java | 17 ++--- .../zookeeper/impl/client/SharedZkClient.java | 17 ++--- 15 files changed, 183 insertions(+), 85 deletions(-) diff --git a/helix-core/src/main/java/org/apache/helix/ConfigAccessor.java b/helix-core/src/main/java/org/apache/helix/ConfigAccessor.java index 48bcfd4..4885f31 100644 --- a/helix-core/src/main/java/org/apache/helix/ConfigAccessor.java +++ b/helix-core/src/main/java/org/apache/helix/ConfigAccessor.java @@ -19,7 +19,6 @@ package org.apache.helix; * under the License. */ -import java.io.IOException; import java.util.ArrayList; import java.util.Arrays; import java.util.Collections; @@ -30,8 +29,8 @@ import java.util.TreeMap; import org.apache.helix.manager.zk.GenericZkHelixApiBuilder; import org.apache.helix.manager.zk.ZKUtil; -import org.apache.helix.model.ClusterConfig; import org.apache.helix.model.CloudConfig; +import org.apache.helix.model.ClusterConfig; import org.apache.helix.model.ConfigScope; import org.apache.helix.model.CustomizedStateConfig; import org.apache.helix.model.HelixConfigScope; @@ -40,8 +39,8 @@ import org.apache.helix.model.InstanceConfig; import org.apache.helix.model.RESTConfig; import org.apache.helix.model.ResourceConfig; import org.apache.helix.model.builder.HelixConfigScopeBuilder; -import org.apache.helix.util.HelixUtil; import org.apache.helix.msdcommon.exception.InvalidRoutingDataException; +import org.apache.helix.util.HelixUtil; import org.apache.helix.util.StringTemplate; import org.apache.helix.zookeeper.api.client.HelixZkClient; import org.apache.helix.zookeeper.api.client.RealmAwareZkClient; @@ -120,7 +119,7 @@ public class ConfigAccessor { new RealmAwareZkClient.RealmAwareZkClientConfig() .setZkSerializer(new ZNRecordSerializer())); return; - } catch (IOException | InvalidRoutingDataException | IllegalStateException e) { + } catch (InvalidRoutingDataException | IllegalStateException e) { throw new HelixException("Failed to create ConfigAccessor!", e); } } diff --git a/helix-core/src/main/java/org/apache/helix/manager/zk/GenericBaseDataAccessorBuilder.java b/helix-core/src/main/java/org/apache/helix/manager/zk/GenericBaseDataAccessorBuilder.java index 054e693..a75c8cf 100644 --- a/helix-core/src/main/java/org/apache/helix/manager/zk/GenericBaseDataAccessorBuilder.java +++ b/helix-core/src/main/java/org/apache/helix/manager/zk/GenericBaseDataAccessorBuilder.java @@ -19,7 +19,6 @@ package org.apache.helix.manager.zk; * under the License. */ -import java.io.IOException; import java.util.concurrent.TimeUnit; import org.apache.helix.HelixException; @@ -83,7 +82,7 @@ public class GenericBaseDataAccessorBuilder<B extends GenericBaseDataAccessorBui case MULTI_REALM: try { zkClient = new FederatedZkClient(connectionConfig, clientConfig); - } catch (IOException | InvalidRoutingDataException e) { + } catch (InvalidRoutingDataException e) { throw new HelixException("Not able to connect on multi-realm mode.", e); } break; diff --git a/helix-core/src/main/java/org/apache/helix/manager/zk/GenericZkHelixApiBuilder.java b/helix-core/src/main/java/org/apache/helix/manager/zk/GenericZkHelixApiBuilder.java index 840ec8f..d02f67e 100644 --- a/helix-core/src/main/java/org/apache/helix/manager/zk/GenericZkHelixApiBuilder.java +++ b/helix-core/src/main/java/org/apache/helix/manager/zk/GenericZkHelixApiBuilder.java @@ -19,8 +19,6 @@ package org.apache.helix.manager.zk; * under the License. */ -import java.io.IOException; - import org.apache.helix.HelixException; import org.apache.helix.msdcommon.datamodel.MetadataStoreRoutingData; import org.apache.helix.msdcommon.exception.InvalidRoutingDataException; @@ -109,7 +107,7 @@ public abstract class GenericZkHelixApiBuilder<B extends GenericZkHelixApiBuilde try { _zkAddress = resolveZkAddressWithShardingKey(_realmAwareZkConnectionConfig); isZkAddressSet = true; - } catch (IOException | InvalidRoutingDataException e) { + } catch (InvalidRoutingDataException e) { LOG.warn( "GenericZkHelixApiBuilder: ZkAddress is not set and failed to resolve ZkAddress with ZK path sharding key!", e); @@ -166,7 +164,7 @@ public abstract class GenericZkHelixApiBuilder<B extends GenericZkHelixApiBuilde try { return new FederatedZkClient(connectionConfig, clientConfig.setZkSerializer(new ZNRecordSerializer())); - } catch (IOException | InvalidRoutingDataException | IllegalStateException e) { + } catch (InvalidRoutingDataException | IllegalStateException e) { throw new HelixException("GenericZkHelixApiBuilder: Failed to create FederatedZkClient!", e); } @@ -197,17 +195,18 @@ public abstract class GenericZkHelixApiBuilder<B extends GenericZkHelixApiBuilde * ZK address is not given in this Builder. * @param connectionConfig * @return - * @throws IOException * @throws InvalidRoutingDataException */ private String resolveZkAddressWithShardingKey( RealmAwareZkClient.RealmAwareZkConnectionConfig connectionConfig) - throws IOException, InvalidRoutingDataException { - boolean isMsdsEndpointSet = - connectionConfig.getMsdsEndpoint() != null && !connectionConfig.getMsdsEndpoint().isEmpty(); - // TODO: Make RoutingDataReaderType configurable - MetadataStoreRoutingData routingData = isMsdsEndpointSet ? RoutingDataManager - .getMetadataStoreRoutingData(RoutingDataReaderType.HTTP, connectionConfig.getMsdsEndpoint()) + throws InvalidRoutingDataException { + boolean isRoutingDataSourceEndpointSet = + connectionConfig.getRoutingDataSourceEndpoint() != null && !connectionConfig + .getRoutingDataSourceEndpoint().isEmpty(); + MetadataStoreRoutingData routingData = isRoutingDataSourceEndpointSet ? RoutingDataManager + .getMetadataStoreRoutingData( + RoutingDataReaderType.lookUp(connectionConfig.getRoutingDataSourceType()), + connectionConfig.getRoutingDataSourceEndpoint()) : RoutingDataManager.getMetadataStoreRoutingData(); return routingData.getMetadataStoreRealm(connectionConfig.getZkRealmShardingKey()); } diff --git a/helix-core/src/main/java/org/apache/helix/manager/zk/ZKHelixAdmin.java b/helix-core/src/main/java/org/apache/helix/manager/zk/ZKHelixAdmin.java index 348f8d8..1759116 100644 --- a/helix-core/src/main/java/org/apache/helix/manager/zk/ZKHelixAdmin.java +++ b/helix-core/src/main/java/org/apache/helix/manager/zk/ZKHelixAdmin.java @@ -151,7 +151,7 @@ public class ZKHelixAdmin implements HelixAdmin { try { zkClient = new FederatedZkClient( new RealmAwareZkClient.RealmAwareZkConnectionConfig.Builder().build(), clientConfig); - } catch (IllegalStateException | IOException | InvalidRoutingDataException e) { + } catch (IllegalStateException | InvalidRoutingDataException e) { throw new HelixException("Not able to connect on multi-realm mode.", e); } } else { @@ -965,13 +965,15 @@ public class ZKHelixAdmin implements HelixAdmin { || _zkClient instanceof FederatedZkClient) { // If on multi-zk mode, we retrieve cluster information from Metadata Store Directory Service. Map<String, List<String>> realmToShardingKeys; - String msdsEndpoint = _zkClient.getRealmAwareZkConnectionConfig().getMsdsEndpoint(); - if (msdsEndpoint == null || msdsEndpoint.isEmpty()) { + String routingDataSourceEndpoint = + _zkClient.getRealmAwareZkConnectionConfig().getRoutingDataSourceEndpoint(); + if (routingDataSourceEndpoint == null || routingDataSourceEndpoint.isEmpty()) { + // If endpoint is not given explicitly, use HTTP and the endpoint set in System Properties realmToShardingKeys = RoutingDataManager.getRawRoutingData(); } else { - // TODO: Make RoutingDataReaderType configurable - realmToShardingKeys = - RoutingDataManager.getRawRoutingData(RoutingDataReaderType.HTTP, msdsEndpoint); + realmToShardingKeys = RoutingDataManager.getRawRoutingData(RoutingDataReaderType + .lookUp(_zkClient.getRealmAwareZkConnectionConfig().getRoutingDataSourceType()), + routingDataSourceEndpoint); } if (realmToShardingKeys == null || realmToShardingKeys.isEmpty()) { diff --git a/helix-core/src/main/java/org/apache/helix/manager/zk/ZKUtil.java b/helix-core/src/main/java/org/apache/helix/manager/zk/ZKUtil.java index cda5f39..1015834 100644 --- a/helix-core/src/main/java/org/apache/helix/manager/zk/ZKUtil.java +++ b/helix-core/src/main/java/org/apache/helix/manager/zk/ZKUtil.java @@ -19,7 +19,6 @@ package org.apache.helix.manager.zk; * under the License. */ -import java.io.IOException; import java.util.ArrayList; import java.util.Collections; import java.util.HashMap; @@ -622,7 +621,7 @@ public final class ZKUtil { RealmAwareZkClient.RealmAwareZkClientConfig clientConfig = new RealmAwareZkClient.RealmAwareZkClientConfig(); return new FederatedZkClient(connectionConfig, clientConfig); - } catch (IllegalArgumentException | IOException | InvalidRoutingDataException e) { + } catch (IllegalArgumentException | InvalidRoutingDataException e) { throw new HelixException("Not able to connect on realm-aware mode", e); } } diff --git a/helix-core/src/main/java/org/apache/helix/manager/zk/ZkBaseDataAccessor.java b/helix-core/src/main/java/org/apache/helix/manager/zk/ZkBaseDataAccessor.java index d2096b0..4e6a7b0 100644 --- a/helix-core/src/main/java/org/apache/helix/manager/zk/ZkBaseDataAccessor.java +++ b/helix-core/src/main/java/org/apache/helix/manager/zk/ZkBaseDataAccessor.java @@ -19,7 +19,6 @@ package org.apache.helix.manager.zk; * under the License. */ -import java.io.IOException; import java.util.ArrayList; import java.util.Arrays; import java.util.Collections; @@ -40,6 +39,7 @@ import org.apache.helix.util.HelixUtil; import org.apache.helix.zookeeper.api.client.HelixZkClient; import org.apache.helix.zookeeper.api.client.RealmAwareZkClient; import org.apache.helix.zookeeper.datamodel.ZNRecord; +import org.apache.helix.zookeeper.datamodel.serializer.ZNRecordSerializer; import org.apache.helix.zookeeper.exception.ZkClientException; import org.apache.helix.zookeeper.impl.client.FederatedZkClient; import org.apache.helix.zookeeper.impl.factory.DedicatedZkClientFactory; @@ -52,9 +52,8 @@ import org.apache.helix.zookeeper.zkclient.exception.ZkBadVersionException; import org.apache.helix.zookeeper.zkclient.exception.ZkException; import org.apache.helix.zookeeper.zkclient.exception.ZkNoNodeException; import org.apache.helix.zookeeper.zkclient.exception.ZkNodeExistsException; -import org.apache.helix.zookeeper.zkclient.serialize.ZkSerializer; import org.apache.helix.zookeeper.zkclient.serialize.PathBasedZkSerializer; -import org.apache.helix.zookeeper.datamodel.serializer.ZNRecordSerializer; +import org.apache.helix.zookeeper.zkclient.serialize.ZkSerializer; import org.apache.zookeeper.CreateMode; import org.apache.zookeeper.KeeperException.Code; import org.apache.zookeeper.data.Stat; @@ -1342,7 +1341,7 @@ public class ZkBaseDataAccessor<T> implements BaseDataAccessor<T> { try { return new FederatedZkClient( new RealmAwareZkClient.RealmAwareZkConnectionConfig.Builder().build(), clientConfig); - } catch (IllegalStateException | IOException | InvalidRoutingDataException e) { + } catch (IllegalStateException | InvalidRoutingDataException e) { throw new HelixException("Not able to connect on multi-realm mode.", e); } } diff --git a/helix-core/src/main/java/org/apache/helix/manager/zk/ZkBucketDataAccessor.java b/helix-core/src/main/java/org/apache/helix/manager/zk/ZkBucketDataAccessor.java index 8f0aa16..efdde81 100644 --- a/helix-core/src/main/java/org/apache/helix/manager/zk/ZkBucketDataAccessor.java +++ b/helix-core/src/main/java/org/apache/helix/manager/zk/ZkBucketDataAccessor.java @@ -94,7 +94,7 @@ public class ZkBucketDataAccessor implements BucketDataAccessor, AutoCloseable { RealmAwareZkClient.RealmAwareZkClientConfig clientConfig = new RealmAwareZkClient.RealmAwareZkClientConfig(); _zkClient = new FederatedZkClient(connectionConfig, clientConfig); - } catch (IllegalArgumentException | IOException | InvalidRoutingDataException e) { + } catch (IllegalArgumentException | InvalidRoutingDataException e) { throw new HelixException("Not able to connect on realm-aware mode", e); } } else { diff --git a/helix-core/src/main/java/org/apache/helix/tools/ClusterSetup.java b/helix-core/src/main/java/org/apache/helix/tools/ClusterSetup.java index 59180eb..667cab7 100644 --- a/helix-core/src/main/java/org/apache/helix/tools/ClusterSetup.java +++ b/helix-core/src/main/java/org/apache/helix/tools/ClusterSetup.java @@ -40,12 +40,10 @@ import org.apache.helix.ConfigAccessor; import org.apache.helix.HelixAdmin; import org.apache.helix.HelixConstants; import org.apache.helix.HelixException; -import org.apache.helix.PropertyKey.Builder; -import org.apache.helix.zookeeper.datamodel.ZNRecord; -import org.apache.helix.cloud.azure.AzureConstants; -import org.apache.helix.cloud.constants.CloudProvider; import org.apache.helix.PropertyKey; import org.apache.helix.SystemPropertyKeys; +import org.apache.helix.cloud.azure.AzureConstants; +import org.apache.helix.cloud.constants.CloudProvider; import org.apache.helix.manager.zk.GenericZkHelixApiBuilder; import org.apache.helix.manager.zk.ZKHelixAdmin; import org.apache.helix.manager.zk.ZKHelixDataAccessor; @@ -162,7 +160,7 @@ public class ClusterSetup { new RealmAwareZkClient.RealmAwareZkConnectionConfig.Builder().build(), new RealmAwareZkClient.RealmAwareZkClientConfig() .setZkSerializer(new ZNRecordSerializer())); - } catch (IOException | InvalidRoutingDataException | IllegalStateException e) { + } catch (InvalidRoutingDataException | IllegalStateException e) { throw new HelixException("Failed to create ConfigAccessor!", e); } } else { diff --git a/helix-core/src/test/java/org/apache/helix/integration/multizk/TestMultiZkHelixJavaApis.java b/helix-core/src/test/java/org/apache/helix/integration/multizk/TestMultiZkHelixJavaApis.java index 5146ce6..d72318a 100644 --- a/helix-core/src/test/java/org/apache/helix/integration/multizk/TestMultiZkHelixJavaApis.java +++ b/helix-core/src/test/java/org/apache/helix/integration/multizk/TestMultiZkHelixJavaApis.java @@ -20,6 +20,7 @@ package org.apache.helix.integration.multizk; */ import java.io.IOException; +import java.util.ArrayList; import java.util.Collection; import java.util.Collections; import java.util.HashMap; @@ -73,11 +74,14 @@ import org.apache.helix.tools.ClusterVerifiers.ZkHelixClusterVerifier; import org.apache.helix.zookeeper.api.client.HelixZkClient; import org.apache.helix.zookeeper.api.client.RealmAwareZkClient; import org.apache.helix.zookeeper.api.client.ZkClientType; +import org.apache.helix.zookeeper.constant.RoutingDataReaderType; import org.apache.helix.zookeeper.datamodel.ZNRecord; import org.apache.helix.zookeeper.datamodel.serializer.ZNRecordSerializer; +import org.apache.helix.zookeeper.exception.MultiZkException; import org.apache.helix.zookeeper.impl.client.FederatedZkClient; import org.apache.helix.zookeeper.impl.factory.DedicatedZkClientFactory; import org.apache.helix.zookeeper.impl.factory.SharedZkClientFactory; +import org.apache.helix.zookeeper.routing.RoutingDataManager; import org.apache.helix.zookeeper.zkclient.ZkServer; import org.testng.Assert; import org.testng.annotations.AfterClass; @@ -109,16 +113,17 @@ public class TestMultiZkHelixJavaApis { private HelixAdmin _zkHelixAdmin; // Save System property configs from before this test and pass onto after the test - private Map<String, String> _configStore = new HashMap<>(); + private final Map<String, String> _configStore = new HashMap<>(); + + private static final String ZK_PREFIX = "localhost:"; + private static final int ZK_START_PORT = 8777; + private String _msdsEndpoint; @BeforeClass public void beforeClass() throws Exception { // Create 3 in-memory zookeepers and routing mapping - final String zkPrefix = "localhost:"; - final int zkStartPort = 8777; - for (int i = 0; i < NUM_ZK; i++) { - String zkAddress = zkPrefix + (zkStartPort + i); + String zkAddress = ZK_PREFIX + (ZK_START_PORT + i); ZK_SERVER_MAP.put(zkAddress, TestHelper.startZkServer(zkAddress)); ZK_CLIENT_MAP.put(zkAddress, DedicatedZkClientFactory.getInstance() .buildZkClient(new HelixZkClient.ZkConnectionConfig(zkAddress), @@ -132,6 +137,8 @@ public class TestMultiZkHelixJavaApis { final String msdsHostName = "localhost"; final int msdsPort = 11117; final String msdsNamespace = "multiZkTest"; + _msdsEndpoint = + "http://" + msdsHostName + ":" + msdsPort + "/admin/v2/namespaces/" + msdsNamespace; _msds = new MockMetadataStoreDirectoryServer(msdsHostName, msdsPort, msdsNamespace, _rawRoutingData); _msds.startServer(); @@ -151,8 +158,7 @@ public class TestMultiZkHelixJavaApis { // Turn on multiZk mode in System config System.setProperty(SystemPropertyKeys.MULTI_ZK_ENABLED, "true"); // MSDS endpoint: http://localhost:11117/admin/v2/namespaces/multiZkTest - System.setProperty(MetadataStoreRoutingConstants.MSDS_SERVER_ENDPOINT_KEY, - "http://" + msdsHostName + ":" + msdsPort + "/admin/v2/namespaces/" + msdsNamespace); + System.setProperty(MetadataStoreRoutingConstants.MSDS_SERVER_ENDPOINT_KEY, _msdsEndpoint); // Create a FederatedZkClient for admin work _zkClient = @@ -782,7 +788,8 @@ public class TestMultiZkHelixJavaApis { new MockMetadataStoreDirectoryServer("localhost", 11118, "multiZkTest", secondRoutingData); final RealmAwareZkClient.RealmAwareZkConnectionConfig connectionConfig = new RealmAwareZkClient.RealmAwareZkConnectionConfig.Builder() - .setMsdsEndpoint(secondMsds.getEndpoint()).build(); + .setRoutingDataSourceType(RoutingDataReaderType.HTTP.name()) + .setRoutingDataSourceEndpoint(secondMsds.getEndpoint()).build(); secondMsds.startServer(); try { @@ -996,4 +1003,69 @@ public class TestMultiZkHelixJavaApis { } return sb.toString(); } + + /** + * Testing using ZK as the routing data source. We use BaseDataAccessor as the representative + * Helix API. + * Two modes are tested: ZK and HTTP-ZK fallback + */ + @Test(dependsOnMethods = "testDifferentMsdsEndpointConfigs") + public void testZkRoutingDataSourceConfigs() { + // Set up routing data in ZK by connecting directly to ZK + BaseDataAccessor<ZNRecord> accessor = + new ZkBaseDataAccessor.Builder<ZNRecord>().setZkAddress(ZK_PREFIX + ZK_START_PORT).build(); + + // Create ZK realm routing data ZNRecord + _rawRoutingData.forEach((realm, keys) -> { + ZNRecord znRecord = new ZNRecord(realm); + znRecord.setListField(MetadataStoreRoutingConstants.ZNRECORD_LIST_FIELD_KEY, + new ArrayList<>(keys)); + accessor.set(MetadataStoreRoutingConstants.ROUTING_DATA_PATH + "/" + realm, znRecord, + AccessOption.PERSISTENT); + }); + + // Create connection configs with the source type set to each type + final RealmAwareZkClient.RealmAwareZkConnectionConfig.Builder connectionConfigBuilder = + new RealmAwareZkClient.RealmAwareZkConnectionConfig.Builder(); + final RealmAwareZkClient.RealmAwareZkConnectionConfig connectionConfigZk = + connectionConfigBuilder.setRoutingDataSourceType(RoutingDataReaderType.ZK.name()) + .setRoutingDataSourceEndpoint(ZK_PREFIX + ZK_START_PORT).build(); + final RealmAwareZkClient.RealmAwareZkConnectionConfig connectionConfigHttpZkFallback = + connectionConfigBuilder + .setRoutingDataSourceType(RoutingDataReaderType.HTTP_ZK_FALLBACK.name()) + .setRoutingDataSourceEndpoint(_msdsEndpoint + "," + ZK_PREFIX + ZK_START_PORT).build(); + final RealmAwareZkClient.RealmAwareZkConnectionConfig connectionConfigHttp = + connectionConfigBuilder.setRoutingDataSourceType(RoutingDataReaderType.HTTP.name()) + .setRoutingDataSourceEndpoint(_msdsEndpoint).build(); + + // Reset cached routing data + RoutingDataManager.reset(); + // Shutdown MSDS to ensure that these accessors are able to pull routing data from ZK + _msds.stopServer(); + + // Create a BaseDataAccessor instance with the connection config + BaseDataAccessor<ZNRecord> zkBasedAccessor = new ZkBaseDataAccessor.Builder<ZNRecord>() + .setRealmAwareZkConnectionConfig(connectionConfigZk).build(); + BaseDataAccessor<ZNRecord> httpZkFallbackBasedAccessor = + new ZkBaseDataAccessor.Builder<ZNRecord>() + .setRealmAwareZkConnectionConfig(connectionConfigHttpZkFallback).build(); + try { + BaseDataAccessor<ZNRecord> httpBasedAccessor = new ZkBaseDataAccessor.Builder<ZNRecord>() + .setRealmAwareZkConnectionConfig(connectionConfigHttp).build(); + Assert.fail("Must fail with a MultiZkException because HTTP connection will be refused."); + } catch (MultiZkException e) { + // Okay + } + + // Check that all clusters appear as existing to this accessor + CLUSTER_LIST.forEach(cluster -> { + Assert.assertTrue(zkBasedAccessor.exists("/" + cluster, AccessOption.PERSISTENT)); + Assert.assertTrue(httpZkFallbackBasedAccessor.exists("/" + cluster, AccessOption.PERSISTENT)); + }); + + // Close all connections + accessor.close(); + zkBasedAccessor.close(); + httpZkFallbackBasedAccessor.close(); + } } diff --git a/helix-rest/src/main/java/org/apache/helix/rest/server/ServerContext.java b/helix-rest/src/main/java/org/apache/helix/rest/server/ServerContext.java index f5d8915..176180f 100644 --- a/helix-rest/src/main/java/org/apache/helix/rest/server/ServerContext.java +++ b/helix-rest/src/main/java/org/apache/helix/rest/server/ServerContext.java @@ -20,7 +20,6 @@ package org.apache.helix.rest.server; * under the License. */ -import java.io.IOException; import java.util.List; import java.util.Map; import java.util.concurrent.ConcurrentHashMap; @@ -134,13 +133,13 @@ public class ServerContext implements IZkDataListener, IZkChildListener, IZkStat new RealmAwareZkClient.RealmAwareZkConnectionConfig.Builder(); // If MSDS endpoint is set for this namespace, use that instead. if (_msdsEndpoint != null && !_msdsEndpoint.isEmpty()) { - connectionConfigBuilder.setMsdsEndpoint(_msdsEndpoint); + connectionConfigBuilder.setRoutingDataSourceEndpoint(_msdsEndpoint); } _zkClient = new FederatedZkClient(connectionConfigBuilder.build(), new RealmAwareZkClient.RealmAwareZkClientConfig() .setZkSerializer(new ZNRecordSerializer())); LOG.info("ServerContext: FederatedZkClient created successfully!"); - } catch (IOException | InvalidRoutingDataException | IllegalStateException e) { + } catch (InvalidRoutingDataException | IllegalStateException e) { throw new HelixException("Failed to create FederatedZkClient!", e); } } else { diff --git a/zookeeper-api/src/main/java/org/apache/helix/zookeeper/api/client/RealmAwareZkClient.java b/zookeeper-api/src/main/java/org/apache/helix/zookeeper/api/client/RealmAwareZkClient.java index b1ecc38..e8c91f1 100644 --- a/zookeeper-api/src/main/java/org/apache/helix/zookeeper/api/client/RealmAwareZkClient.java +++ b/zookeeper-api/src/main/java/org/apache/helix/zookeeper/api/client/RealmAwareZkClient.java @@ -23,6 +23,7 @@ import java.util.List; import java.util.concurrent.TimeUnit; import org.apache.helix.msdcommon.constant.MetadataStoreRoutingConstants; +import org.apache.helix.zookeeper.constant.RoutingDataReaderType; import org.apache.helix.zookeeper.zkclient.DataUpdater; import org.apache.helix.zookeeper.zkclient.IZkChildListener; import org.apache.helix.zookeeper.zkclient.IZkDataListener; @@ -56,8 +57,7 @@ public interface RealmAwareZkClient { * MULTI_REALM: CRUD and change subscription are supported. Operations involving EPHEMERAL CreateMode will throw an UnsupportedOperationException. */ enum RealmMode { - SINGLE_REALM, - MULTI_REALM + SINGLE_REALM, MULTI_REALM } int DEFAULT_OPERATION_TIMEOUT = Integer.MAX_VALUE; @@ -65,6 +65,7 @@ public interface RealmAwareZkClient { int DEFAULT_SESSION_TIMEOUT = 30 * 1000; // listener subscription + /** * Subscribe the path and the listener will handle child events of the path. * Add exists watch to path if the path does not exist in ZooKeeper server. @@ -86,7 +87,8 @@ public interface RealmAwareZkClient { * @return ChildrentSubsribeResult. If the path does not exists, the isInstalled field * is false. Otherwise, it is true and list of children are returned. */ - ChildrenSubscribeResult subscribeChildChanges(String path, IZkChildListener listener, boolean skipWatchingNonExistNode); + ChildrenSubscribeResult subscribeChildChanges(String path, IZkChildListener listener, + boolean skipWatchingNonExistNode); void unsubscribeChildChanges(String path, IZkChildListener listener); @@ -108,7 +110,8 @@ public interface RealmAwareZkClient { * @param skipWatchingNonExistNode True means not installing any watch if path does not exist. * return True if installation of watch succeed. Otherwise, return false. */ - boolean subscribeDataChanges(String path, IZkDataListener listener, boolean skipWatchingNonExistNode); + boolean subscribeDataChanges(String path, IZkDataListener listener, + boolean skipWatchingNonExistNode); void unsubscribeDataChanges(String path, IZkDataListener listener); @@ -378,12 +381,14 @@ public interface RealmAwareZkClient { * NOTE: this field will be ignored if RealmMode is MULTI_REALM! */ private String _zkRealmShardingKey; - private String _msdsEndpoint; + private String _routingDataSourceType; + private String _routingDataSourceEndpoint; private int _sessionTimeout = DEFAULT_SESSION_TIMEOUT; private RealmAwareZkConnectionConfig(Builder builder) { _zkRealmShardingKey = builder._zkRealmShardingKey; - _msdsEndpoint = builder._msdsEndpoint; + _routingDataSourceType = builder._routingDataSourceType; + _routingDataSourceEndpoint = builder._routingDataSourceEndpoint; _sessionTimeout = builder._sessionTimeout; } @@ -424,14 +429,19 @@ public interface RealmAwareZkClient { return _sessionTimeout; } - public String getMsdsEndpoint() { - return _msdsEndpoint; + public String getRoutingDataSourceType() { + return _routingDataSourceType; + } + + public String getRoutingDataSourceEndpoint() { + return _routingDataSourceEndpoint; } public static class Builder { private RealmMode _realmMode; private String _zkRealmShardingKey; - private String _msdsEndpoint; + private String _routingDataSourceType; + private String _routingDataSourceEndpoint; private int _sessionTimeout = DEFAULT_SESSION_TIMEOUT; public Builder() { @@ -453,8 +463,13 @@ public interface RealmAwareZkClient { return this; } - public Builder setMsdsEndpoint(String msdsEndpoint) { - _msdsEndpoint = msdsEndpoint; + public Builder setRoutingDataSourceType(String routingDataSourceType) { + _routingDataSourceType = routingDataSourceType; + return this; + } + + public Builder setRoutingDataSourceEndpoint(String routingDataSourceEndpoint) { + _routingDataSourceEndpoint = routingDataSourceEndpoint; return this; } @@ -482,6 +497,13 @@ public interface RealmAwareZkClient { throw new IllegalArgumentException( "RealmAwareZkConnectionConfig.Builder: ZK sharding key must be set on single-realm mode!"); } + if ((_routingDataSourceEndpoint == null && _routingDataSourceType != null) || ( + _routingDataSourceEndpoint != null && _routingDataSourceType == null)) { + // For routing data source type and endpoint, if one is set and not the other, it is invalid + throw new IllegalArgumentException( + "RealmAwareZkConnectionConfig.Builder: routing data source type and endpoint are not configured properly! Type: " + + _routingDataSourceType + " Endpoint: " + _routingDataSourceEndpoint); + } } } } diff --git a/zookeeper-api/src/main/java/org/apache/helix/zookeeper/constant/RoutingDataReaderType.java b/zookeeper-api/src/main/java/org/apache/helix/zookeeper/constant/RoutingDataReaderType.java index aedef36..33b6c70 100644 --- a/zookeeper-api/src/main/java/org/apache/helix/zookeeper/constant/RoutingDataReaderType.java +++ b/zookeeper-api/src/main/java/org/apache/helix/zookeeper/constant/RoutingDataReaderType.java @@ -19,6 +19,10 @@ package org.apache.helix.zookeeper.constant; * under the License. */ +import org.apache.commons.lang3.EnumUtils; +import org.apache.helix.zookeeper.exception.MultiZkException; + + /** * RoutingDataReaderType is an enum that designates the reader type and the class name that can be * used to create an instance of RoutingDataReader by reflection. @@ -37,4 +41,14 @@ public enum RoutingDataReaderType { public String getClassName() { return this.className; } + + public static RoutingDataReaderType lookUp(String enumString) { + RoutingDataReaderType type = + EnumUtils.getEnumIgnoreCase(RoutingDataReaderType.class, enumString); + if (type == null) { + throw new MultiZkException( + "RoutingDataReaderType::lookUp: Unable to find the enum! String given: " + enumString); + } + return type; + } } diff --git a/zookeeper-api/src/main/java/org/apache/helix/zookeeper/impl/client/DedicatedZkClient.java b/zookeeper-api/src/main/java/org/apache/helix/zookeeper/impl/client/DedicatedZkClient.java index 501670c..dbe64f3 100644 --- a/zookeeper-api/src/main/java/org/apache/helix/zookeeper/impl/client/DedicatedZkClient.java +++ b/zookeeper-api/src/main/java/org/apache/helix/zookeeper/impl/client/DedicatedZkClient.java @@ -19,7 +19,6 @@ package org.apache.helix.zookeeper.impl.client; * under the License. */ -import java.io.IOException; import java.util.List; import java.util.NoSuchElementException; import java.util.concurrent.TimeUnit; @@ -69,12 +68,10 @@ public class DedicatedZkClient implements RealmAwareZkClient { * such as CRUD, change callback, and ephemeral operations for a single ZkRealmShardingKey. * @param connectionConfig * @param clientConfig - * @throws IOException * @throws InvalidRoutingDataException */ public DedicatedZkClient(RealmAwareZkClient.RealmAwareZkConnectionConfig connectionConfig, - RealmAwareZkClient.RealmAwareZkClientConfig clientConfig) - throws IOException, InvalidRoutingDataException { + RealmAwareZkClient.RealmAwareZkClientConfig clientConfig) throws InvalidRoutingDataException { if (connectionConfig == null) { throw new IllegalArgumentException("RealmAwareZkConnectionConfig cannot be null!"); } @@ -84,14 +81,15 @@ public class DedicatedZkClient implements RealmAwareZkClient { _connectionConfig = connectionConfig; _clientConfig = clientConfig; - // Get the routing data from a static Singleton HttpRoutingDataReader - String msdsEndpoint = connectionConfig.getMsdsEndpoint(); - if (msdsEndpoint == null || msdsEndpoint.isEmpty()) { + // Get MetadataStoreRoutingData + String routingDataSourceEndpoint = connectionConfig.getRoutingDataSourceEndpoint(); + if (routingDataSourceEndpoint == null || routingDataSourceEndpoint.isEmpty()) { + // If endpoint is not given explicitly, use HTTP and the endpoint set in System Properties _metadataStoreRoutingData = RoutingDataManager.getMetadataStoreRoutingData(); } else { - // TODO: Make RoutingDataReaderType configurable - _metadataStoreRoutingData = - RoutingDataManager.getMetadataStoreRoutingData(RoutingDataReaderType.HTTP, msdsEndpoint); + _metadataStoreRoutingData = RoutingDataManager.getMetadataStoreRoutingData( + RoutingDataReaderType.lookUp(connectionConfig.getRoutingDataSourceType()), + routingDataSourceEndpoint); } _zkRealmShardingKey = connectionConfig.getZkRealmShardingKey(); diff --git a/zookeeper-api/src/main/java/org/apache/helix/zookeeper/impl/client/FederatedZkClient.java b/zookeeper-api/src/main/java/org/apache/helix/zookeeper/impl/client/FederatedZkClient.java index 22b9fe5..01e7fc7 100644 --- a/zookeeper-api/src/main/java/org/apache/helix/zookeeper/impl/client/FederatedZkClient.java +++ b/zookeeper-api/src/main/java/org/apache/helix/zookeeper/impl/client/FederatedZkClient.java @@ -19,7 +19,6 @@ package org.apache.helix.zookeeper.impl.client; * under the License. */ -import java.io.IOException; import java.util.Iterator; import java.util.List; import java.util.Map; @@ -87,8 +86,7 @@ public class FederatedZkClient implements RealmAwareZkClient { // TODO: support capacity of ZkClient number in one FederatedZkClient and do garbage collection. public FederatedZkClient(RealmAwareZkClient.RealmAwareZkConnectionConfig connectionConfig, - RealmAwareZkClient.RealmAwareZkClientConfig clientConfig) - throws IOException, InvalidRoutingDataException { + RealmAwareZkClient.RealmAwareZkClientConfig clientConfig) throws InvalidRoutingDataException { if (connectionConfig == null) { throw new IllegalArgumentException("RealmAwareZkConnectionConfig cannot be null!"); } @@ -96,14 +94,15 @@ public class FederatedZkClient implements RealmAwareZkClient { throw new IllegalArgumentException("RealmAwareZkClientConfig cannot be null!"); } - // Attempt to get MetadataStoreRoutingData - String msdsEndpoint = connectionConfig.getMsdsEndpoint(); - if (msdsEndpoint == null || msdsEndpoint.isEmpty()) { + // Get MetadataStoreRoutingData + String routingDataSourceEndpoint = connectionConfig.getRoutingDataSourceEndpoint(); + if (routingDataSourceEndpoint == null || routingDataSourceEndpoint.isEmpty()) { + // If endpoint is not given explicitly, use HTTP and the endpoint set in System Properties _metadataStoreRoutingData = RoutingDataManager.getMetadataStoreRoutingData(); } else { - // TODO: Make RoutingDataReaderType configurable - _metadataStoreRoutingData = - RoutingDataManager.getMetadataStoreRoutingData(RoutingDataReaderType.HTTP, msdsEndpoint); + _metadataStoreRoutingData = RoutingDataManager.getMetadataStoreRoutingData( + RoutingDataReaderType.lookUp(connectionConfig.getRoutingDataSourceType()), + routingDataSourceEndpoint); } _isClosed = false; diff --git a/zookeeper-api/src/main/java/org/apache/helix/zookeeper/impl/client/SharedZkClient.java b/zookeeper-api/src/main/java/org/apache/helix/zookeeper/impl/client/SharedZkClient.java index 341731e..7c3a562 100644 --- a/zookeeper-api/src/main/java/org/apache/helix/zookeeper/impl/client/SharedZkClient.java +++ b/zookeeper-api/src/main/java/org/apache/helix/zookeeper/impl/client/SharedZkClient.java @@ -19,7 +19,6 @@ package org.apache.helix.zookeeper.impl.client; * under the License. */ -import java.io.IOException; import java.util.List; import java.util.NoSuchElementException; import java.util.concurrent.TimeUnit; @@ -65,8 +64,7 @@ public class SharedZkClient implements RealmAwareZkClient { private final RealmAwareZkClient.RealmAwareZkClientConfig _clientConfig; public SharedZkClient(RealmAwareZkClient.RealmAwareZkConnectionConfig connectionConfig, - RealmAwareZkClient.RealmAwareZkClientConfig clientConfig) - throws IOException, InvalidRoutingDataException { + RealmAwareZkClient.RealmAwareZkClientConfig clientConfig) throws InvalidRoutingDataException { if (connectionConfig == null) { throw new IllegalArgumentException("RealmAwareZkConnectionConfig cannot be null!"); } @@ -76,14 +74,15 @@ public class SharedZkClient implements RealmAwareZkClient { _connectionConfig = connectionConfig; _clientConfig = clientConfig; - // Get the routing data from a static Singleton HttpRoutingDataReader - String msdsEndpoint = connectionConfig.getMsdsEndpoint(); - if (msdsEndpoint == null || msdsEndpoint.isEmpty()) { + // Get MetadataStoreRoutingData + String routingDataSourceEndpoint = connectionConfig.getRoutingDataSourceEndpoint(); + if (routingDataSourceEndpoint == null || routingDataSourceEndpoint.isEmpty()) { + // If endpoint is not given explicitly, use HTTP and the endpoint set in System Properties _metadataStoreRoutingData = RoutingDataManager.getMetadataStoreRoutingData(); } else { - // TODO: Make RoutingDataReaderType configurable - _metadataStoreRoutingData = - RoutingDataManager.getMetadataStoreRoutingData(RoutingDataReaderType.HTTP, msdsEndpoint); + _metadataStoreRoutingData = RoutingDataManager.getMetadataStoreRoutingData( + RoutingDataReaderType.lookUp(connectionConfig.getRoutingDataSourceType()), + routingDataSourceEndpoint); } _zkRealmShardingKey = connectionConfig.getZkRealmShardingKey();
