This is an automated email from the ASF dual-hosted git repository. hulee pushed a commit to branch master in repository https://gitbox.apache.org/repos/asf/helix.git
commit 2d310f31b6f64fea7ef8243be553379a45defb7a Author: Hunter Lee <[email protected]> AuthorDate: Wed Mar 25 22:20:10 2020 -0700 Make Helix REST realm-aware (#908) Helix REST needs to start using a realm-aware ZkClient on multi-zk mode. Also it needs to become a listener on routing data because we don't want to restart the HelixRestServer every time we update the routing data. Changelist: Make ServerContext listen on routing data paths if run on multi-zk mode Make HelixRestServer use RealmAwareZkClient (FederatedZkClient) on multi-zk mode --- .../java/org/apache/helix/task/TaskDriver.java | 6 +- .../helix/rest/common/HelixRestNamespace.java | 24 +- .../metadatastore/ZkMetadataStoreDirectory.java | 55 +++- .../accessor/ZkRoutingDataReader.java | 37 +-- .../accessor/ZkRoutingDataWriter.java | 17 +- .../apache/helix/rest/server/HelixRestMain.java | 6 +- .../apache/helix/rest/server/HelixRestServer.java | 2 +- .../apache/helix/rest/server/ServerContext.java | 289 +++++++++++++++++---- .../resources/helix/AbstractHelixResource.java | 14 +- .../server/resources/helix/ClusterAccessor.java | 16 +- .../server/resources/helix/ResourceAccessor.java | 10 +- .../integration/TestRoutingDataUpdate.java | 176 +++++++++++++ .../helix/rest/server/AbstractTestClass.java | 40 +-- .../constant/MetadataStoreRoutingConstants.java | 2 + .../helix/msdcommon/util/ZkValidationUtil.java | 3 +- pom.xml | 34 --- .../zookeeper/api/client/RealmAwareZkClient.java | 16 ++ .../zookeeper/util/HttpRoutingDataReader.java | 14 +- .../impl/client/RealmAwareZkClientTestBase.java | 2 +- 19 files changed, 573 insertions(+), 190 deletions(-) diff --git a/helix-core/src/main/java/org/apache/helix/task/TaskDriver.java b/helix-core/src/main/java/org/apache/helix/task/TaskDriver.java index 987cc44..506a06b 100644 --- a/helix-core/src/main/java/org/apache/helix/task/TaskDriver.java +++ b/helix-core/src/main/java/org/apache/helix/task/TaskDriver.java @@ -47,7 +47,7 @@ import org.apache.helix.model.builder.CustomModeISBuilder; import org.apache.helix.store.HelixPropertyStore; import org.apache.helix.store.zk.ZkHelixPropertyStore; import org.apache.helix.util.HelixUtil; -import org.apache.helix.zookeeper.api.client.HelixZkClient; +import org.apache.helix.zookeeper.api.client.RealmAwareZkClient; import org.apache.helix.zookeeper.datamodel.ZNRecord; import org.apache.helix.zookeeper.zkclient.DataUpdater; import org.slf4j.Logger; @@ -98,12 +98,12 @@ public class TaskDriver { } @Deprecated - public TaskDriver(HelixZkClient client, String clusterName) { + public TaskDriver(RealmAwareZkClient client, String clusterName) { this(client, new ZkBaseDataAccessor<>(client), clusterName); } @Deprecated - public TaskDriver(HelixZkClient client, ZkBaseDataAccessor<ZNRecord> baseAccessor, + public TaskDriver(RealmAwareZkClient client, ZkBaseDataAccessor<ZNRecord> baseAccessor, String clusterName) { this(new ZKHelixAdmin(client), new ZKHelixDataAccessor(clusterName, baseAccessor), new ZkHelixPropertyStore<>(baseAccessor, PropertyPathBuilder.propertyStore(clusterName), diff --git a/helix-rest/src/main/java/org/apache/helix/rest/common/HelixRestNamespace.java b/helix-rest/src/main/java/org/apache/helix/rest/common/HelixRestNamespace.java index a2fb52c..0632f36 100644 --- a/helix-rest/src/main/java/org/apache/helix/rest/common/HelixRestNamespace.java +++ b/helix-rest/src/main/java/org/apache/helix/rest/common/HelixRestNamespace.java @@ -34,7 +34,8 @@ public class HelixRestNamespace { NAME, METADATA_STORE_TYPE, METADATA_STORE_ADDRESS, - IS_DEFAULT + IS_DEFAULT, + MSDS_ENDPOINT } /** @@ -55,7 +56,7 @@ public class HelixRestNamespace { private HelixMetadataStoreType _metadataStoreType; /** - * Address of metadata store. Should be informat of + * Address of metadata store. Should be in the format of * "[ip-address]:[port]" or "[dns-name]:[port]" */ private String _metadataStoreAddress; @@ -65,16 +66,27 @@ public class HelixRestNamespace { */ private boolean _isDefault; + /** + * Endpoint for accessing MSDS for this namespace. + */ + private String _msdsEndpoint; + public HelixRestNamespace(String metadataStoreAddress) throws IllegalArgumentException { this(DEFAULT_NAMESPACE_NAME, HelixMetadataStoreType.ZOOKEEPER, metadataStoreAddress, true); } - public HelixRestNamespace(String name, HelixMetadataStoreType metadataStoreType, String metadataStoreAddress, boolean isDefault) - throws IllegalArgumentException { + public HelixRestNamespace(String name, HelixMetadataStoreType metadataStoreType, + String metadataStoreAddress, boolean isDefault) throws IllegalArgumentException { + this(name, metadataStoreType, metadataStoreAddress, isDefault, null); + } + + public HelixRestNamespace(String name, HelixMetadataStoreType metadataStoreType, + String metadataStoreAddress, boolean isDefault, String msdsEndpoint) { _name = name; _metadataStoreAddress = metadataStoreAddress; _metadataStoreType = metadataStoreType; _isDefault = isDefault; + _msdsEndpoint = msdsEndpoint; validate(); } @@ -109,4 +121,8 @@ public class HelixRestNamespace { ret.put(HelixRestNamespaceProperty.IS_DEFAULT.name(), String.valueOf(_isDefault)); return ret; } + + public String getMsdsEndpoint() { + return _msdsEndpoint; + } } diff --git a/helix-rest/src/main/java/org/apache/helix/rest/metadatastore/ZkMetadataStoreDirectory.java b/helix-rest/src/main/java/org/apache/helix/rest/metadatastore/ZkMetadataStoreDirectory.java index c83245f..42b2b17 100644 --- a/helix-rest/src/main/java/org/apache/helix/rest/metadatastore/ZkMetadataStoreDirectory.java +++ b/helix-rest/src/main/java/org/apache/helix/rest/metadatastore/ZkMetadataStoreDirectory.java @@ -39,6 +39,7 @@ import org.apache.helix.rest.metadatastore.accessor.MetadataStoreRoutingDataWrit import org.apache.helix.rest.metadatastore.accessor.ZkRoutingDataReader; import org.apache.helix.rest.metadatastore.accessor.ZkRoutingDataWriter; import org.apache.helix.zookeeper.api.client.HelixZkClient; +import org.apache.helix.zookeeper.datamodel.ZNRecord; import org.apache.helix.zookeeper.datamodel.serializer.ZNRecordSerializer; import org.apache.helix.zookeeper.impl.factory.DedicatedZkClientFactory; import org.apache.helix.zookeeper.zkclient.exception.ZkNodeExistsException; @@ -98,20 +99,27 @@ public class ZkMetadataStoreDirectory implements MetadataStoreDirectory, Routing if (!_routingZkAddressMap.containsKey(namespace)) { synchronized (_routingZkAddressMap) { if (!_routingZkAddressMap.containsKey(namespace)) { - // Ensure that ROUTING_DATA_PATH exists in ZK. - HelixZkClient zkClient = DedicatedZkClientFactory.getInstance() - .buildZkClient(new HelixZkClient.ZkConnectionConfig(zkAddress), - new HelixZkClient.ZkClientConfig().setZkSerializer(new ZNRecordSerializer())); + HelixZkClient zkClient = null; try { - zkClient.createPersistent(MetadataStoreRoutingConstants.ROUTING_DATA_PATH, true); - } catch (ZkNodeExistsException e) { - // The node already exists and it's okay + // Ensure that ROUTING_DATA_PATH exists in ZK. + zkClient = DedicatedZkClientFactory.getInstance() + .buildZkClient(new HelixZkClient.ZkConnectionConfig(zkAddress), + new HelixZkClient.ZkClientConfig().setZkSerializer(new ZNRecordSerializer())); + createRoutingDataPath(zkClient, zkAddress); + } finally { + if (zkClient != null && !zkClient.isClosed()) { + zkClient.close(); + } + } + try { + _routingZkAddressMap.put(namespace, zkAddress); + _routingDataReaderMap + .put(namespace, new ZkRoutingDataReader(namespace, zkAddress, this)); + _routingDataWriterMap.put(namespace, new ZkRoutingDataWriter(namespace, zkAddress)); + } catch (IllegalArgumentException | IllegalStateException e) { + LOG.error("ZkMetadataStoreDirectory: initializing ZkRoutingDataReader/Writer failed!", + e); } - - _routingZkAddressMap.put(namespace, zkAddress); - _routingDataReaderMap.put(namespace, new ZkRoutingDataReader(namespace, zkAddress, this)); - _routingDataWriterMap.put(namespace, new ZkRoutingDataWriter(namespace, zkAddress)); - // Populate realmToShardingKeys with ZkRoutingDataReader Map<String, List<String>> rawRoutingData = _routingDataReaderMap.get(namespace).getRoutingData(); @@ -119,7 +127,8 @@ public class ZkMetadataStoreDirectory implements MetadataStoreDirectory, Routing try { _routingDataMap.put(namespace, new TrieRoutingData(rawRoutingData)); } catch (InvalidRoutingDataException e) { - LOG.warn("TrieRoutingData is not created for namespace {}", namespace, e); + LOG.warn("ZkMetadataStoreDirectory: TrieRoutingData is not created for namespace {}", + namespace, e); } } } @@ -145,7 +154,7 @@ public class ZkMetadataStoreDirectory implements MetadataStoreDirectory, Routing throw new NoSuchElementException("Namespace " + namespace + " does not exist!"); } Set<String> allShardingKeys = new HashSet<>(); - _realmToShardingKeysMap.get(namespace).values().forEach(keys -> allShardingKeys.addAll(keys)); + _realmToShardingKeysMap.get(namespace).values().forEach(allShardingKeys::addAll); return allShardingKeys; } @@ -339,4 +348,22 @@ public class ZkMetadataStoreDirectory implements MetadataStoreDirectory, Routing _routingDataMap.clear(); _zkMetadataStoreDirectoryInstance = null; } + + /** + * Make sure the root routing data path exists. Also, register the routing ZK address. + * @param zkClient + */ + public static void createRoutingDataPath(HelixZkClient zkClient, String zkAddress) { + try { + zkClient.createPersistent(MetadataStoreRoutingConstants.ROUTING_DATA_PATH, true); + } catch (ZkNodeExistsException e) { + // The node already exists and it's okay + } + // Make sure ROUTING_DATA_PATH is mapped to the routing ZK so that FederatedZkClient used + // in Helix REST can subscribe to the routing data path + ZNRecord znRecord = new ZNRecord(MetadataStoreRoutingConstants.ROUTING_DATA_PATH.substring(1)); + znRecord.setListField(MetadataStoreRoutingConstants.ROUTING_ZK_ADDRESS_KEY, + Collections.singletonList(zkAddress)); + zkClient.writeData(MetadataStoreRoutingConstants.ROUTING_DATA_PATH, znRecord); + } } \ No newline at end of file diff --git a/helix-rest/src/main/java/org/apache/helix/rest/metadatastore/accessor/ZkRoutingDataReader.java b/helix-rest/src/main/java/org/apache/helix/rest/metadatastore/accessor/ZkRoutingDataReader.java index 6c75618..cfe6eb5 100644 --- a/helix-rest/src/main/java/org/apache/helix/rest/metadatastore/accessor/ZkRoutingDataReader.java +++ b/helix-rest/src/main/java/org/apache/helix/rest/metadatastore/accessor/ZkRoutingDataReader.java @@ -27,7 +27,9 @@ import java.util.Map; import org.apache.helix.msdcommon.callback.RoutingDataListener; import org.apache.helix.msdcommon.constant.MetadataStoreRoutingConstants; import org.apache.helix.msdcommon.exception.InvalidRoutingDataException; +import org.apache.helix.rest.metadatastore.ZkMetadataStoreDirectory; import org.apache.helix.zookeeper.api.client.HelixZkClient; +import org.apache.helix.zookeeper.api.client.RealmAwareZkClient; import org.apache.helix.zookeeper.datamodel.ZNRecord; import org.apache.helix.zookeeper.datamodel.serializer.ZNRecordSerializer; import org.apache.helix.zookeeper.impl.factory.DedicatedZkClientFactory; @@ -35,7 +37,6 @@ import org.apache.helix.zookeeper.zkclient.IZkChildListener; import org.apache.helix.zookeeper.zkclient.IZkDataListener; import org.apache.helix.zookeeper.zkclient.IZkStateListener; import org.apache.helix.zookeeper.zkclient.exception.ZkNoNodeException; -import org.apache.helix.zookeeper.zkclient.exception.ZkNodeExistsException; import org.apache.zookeeper.Watcher; @@ -59,24 +60,11 @@ public class ZkRoutingDataReader implements MetadataStoreRoutingDataReader, IZkD .buildZkClient(new HelixZkClient.ZkConnectionConfig(zkAddress), new HelixZkClient.ZkClientConfig().setZkSerializer(new ZNRecordSerializer())); - // Ensure that ROUTING_DATA_PATH exists in ZK. If not, create - // create() semantic will fail if it already exists - try { - _zkClient.createPersistent(MetadataStoreRoutingConstants.ROUTING_DATA_PATH, true); - } catch (ZkNodeExistsException e) { - // This is okay - } + ZkMetadataStoreDirectory.createRoutingDataPath(_zkClient, _zkAddress); _routingDataListener = routingDataListener; if (_routingDataListener != null) { - // Subscribe child changes - _zkClient.subscribeChildChanges(MetadataStoreRoutingConstants.ROUTING_DATA_PATH, this); - // Subscribe data changes - for (String child : _zkClient.getChildren(MetadataStoreRoutingConstants.ROUTING_DATA_PATH)) { - _zkClient - .subscribeDataChanges(MetadataStoreRoutingConstants.ROUTING_DATA_PATH + "/" + child, - this); - } + _zkClient.subscribeRoutingDataChanges(this, this); } } @@ -118,7 +106,7 @@ public class ZkRoutingDataReader implements MetadataStoreRoutingDataReader, IZkD @Override public synchronized void handleDataChange(String s, Object o) { - if (_zkClient.isClosed()) { + if (_zkClient == null || _zkClient.isClosed()) { return; } _routingDataListener.refreshRoutingData(_namespace); @@ -138,7 +126,7 @@ public class ZkRoutingDataReader implements MetadataStoreRoutingDataReader, IZkD @Override public synchronized void handleStateChanged(Watcher.Event.KeeperState state) { - if (_zkClient.isClosed()) { + if (_zkClient == null || _zkClient.isClosed()) { return; } _routingDataListener.refreshRoutingData(_namespace); @@ -146,7 +134,7 @@ public class ZkRoutingDataReader implements MetadataStoreRoutingDataReader, IZkD @Override public synchronized void handleNewSession(String sessionId) { - if (_zkClient.isClosed()) { + if (_zkClient == null || _zkClient.isClosed()) { return; } _routingDataListener.refreshRoutingData(_namespace); @@ -154,24 +142,19 @@ public class ZkRoutingDataReader implements MetadataStoreRoutingDataReader, IZkD @Override public synchronized void handleSessionEstablishmentError(Throwable error) { - if (_zkClient.isClosed()) { + if (_zkClient == null || _zkClient.isClosed()) { return; } _routingDataListener.refreshRoutingData(_namespace); } private void handleResubscription() { - if (_zkClient.isClosed()) { + if (_zkClient == null || _zkClient.isClosed()) { return; } - // Renew subscription _zkClient.unsubscribeAll(); - _zkClient.subscribeChildChanges(MetadataStoreRoutingConstants.ROUTING_DATA_PATH, this); - for (String child : _zkClient.getChildren(MetadataStoreRoutingConstants.ROUTING_DATA_PATH)) { - _zkClient.subscribeDataChanges(MetadataStoreRoutingConstants.ROUTING_DATA_PATH + "/" + child, - this); - } + _zkClient.subscribeRoutingDataChanges(this, this); _routingDataListener.refreshRoutingData(_namespace); } } diff --git a/helix-rest/src/main/java/org/apache/helix/rest/metadatastore/accessor/ZkRoutingDataWriter.java b/helix-rest/src/main/java/org/apache/helix/rest/metadatastore/accessor/ZkRoutingDataWriter.java index 32b7681..791d9bb 100644 --- a/helix-rest/src/main/java/org/apache/helix/rest/metadatastore/accessor/ZkRoutingDataWriter.java +++ b/helix-rest/src/main/java/org/apache/helix/rest/metadatastore/accessor/ZkRoutingDataWriter.java @@ -30,6 +30,7 @@ import javax.ws.rs.core.Response; import org.apache.helix.msdcommon.constant.MetadataStoreRoutingConstants; import org.apache.helix.rest.common.HttpConstants; +import org.apache.helix.rest.metadatastore.ZkMetadataStoreDirectory; import org.apache.helix.rest.metadatastore.concurrency.ZkDistributedLeaderElection; import org.apache.helix.zookeeper.api.client.HelixZkClient; import org.apache.helix.zookeeper.datamodel.ZNRecord; @@ -82,20 +83,16 @@ public class ZkRoutingDataWriter implements MetadataStoreRoutingDataWriter { .buildZkClient(new HelixZkClient.ZkConnectionConfig(zkAddress), new HelixZkClient.ZkClientConfig().setZkSerializer(new ZNRecordSerializer())); - // Ensure that ROUTING_DATA_PATH exists in ZK. If not, create - // create() semantic will fail if it already exists - try { - _zkClient.createPersistent(MetadataStoreRoutingConstants.ROUTING_DATA_PATH, true); - } catch (ZkNodeExistsException e) { - // This is okay - } + ZkMetadataStoreDirectory.createRoutingDataPath(_zkClient, zkAddress); // Get the hostname (REST endpoint) from System property String hostName = System.getProperty(MetadataStoreRoutingConstants.MSDS_SERVER_HOSTNAME_KEY); if (hostName == null || hostName.isEmpty()) { - throw new IllegalStateException( - "Hostname is not set or is empty. System.getProperty fails to fetch " - + MetadataStoreRoutingConstants.MSDS_SERVER_HOSTNAME_KEY + "."); + String errMsg = + "ZkRoutingDataWriter: Hostname is not set or is empty. System.getProperty fails to fetch " + + MetadataStoreRoutingConstants.MSDS_SERVER_HOSTNAME_KEY; + LOG.error(errMsg); + throw new IllegalStateException(errMsg); } _myHostName = HttpConstants.HTTP_PROTOCOL_PREFIX + hostName; diff --git a/helix-rest/src/main/java/org/apache/helix/rest/server/HelixRestMain.java b/helix-rest/src/main/java/org/apache/helix/rest/server/HelixRestMain.java index b28f227..49940c3 100644 --- a/helix-rest/src/main/java/org/apache/helix/rest/server/HelixRestMain.java +++ b/helix-rest/src/main/java/org/apache/helix/rest/server/HelixRestMain.java @@ -146,10 +146,12 @@ public class HelixRestMain { // Currently we don't support adding default namespace through yaml manifest so all // namespaces created here will not be default // TODO: support specifying default namespace from config file - namespaces.add(new HelixRestNamespace(config.get(HelixRestNamespace.HelixRestNamespaceProperty.NAME.name()), + namespaces.add(new HelixRestNamespace( + config.get(HelixRestNamespace.HelixRestNamespaceProperty.NAME.name()), HelixRestNamespace.HelixMetadataStoreType.valueOf( config.get(HelixRestNamespace.HelixRestNamespaceProperty.METADATA_STORE_TYPE.name())), - config.get(HelixRestNamespace.HelixRestNamespaceProperty.METADATA_STORE_ADDRESS.name()), false)); + config.get(HelixRestNamespace.HelixRestNamespaceProperty.METADATA_STORE_ADDRESS.name()), + false, config.get(HelixRestNamespace.HelixRestNamespaceProperty.MSDS_ENDPOINT.name()))); } } diff --git a/helix-rest/src/main/java/org/apache/helix/rest/server/HelixRestServer.java b/helix-rest/src/main/java/org/apache/helix/rest/server/HelixRestServer.java index 64c1139..e6b5b34 100644 --- a/helix-rest/src/main/java/org/apache/helix/rest/server/HelixRestServer.java +++ b/helix-rest/src/main/java/org/apache/helix/rest/server/HelixRestServer.java @@ -149,7 +149,7 @@ public class HelixRestServer { // Enable the default statistical monitoring MBean for Jersey server cfg.property(ServerProperties.MONITORING_STATISTICS_MBEANS_ENABLED, true); cfg.property(ContextPropertyKeys.SERVER_CONTEXT.name(), - new ServerContext(namespace.getMetadataStoreAddress())); + new ServerContext(namespace.getMetadataStoreAddress(), namespace.getMsdsEndpoint())); if (type == ServletType.DEFAULT_SERVLET) { cfg.property(ContextPropertyKeys.ALL_NAMESPACES.name(), _helixNamespaces); } else { diff --git a/helix-rest/src/main/java/org/apache/helix/rest/server/ServerContext.java b/helix-rest/src/main/java/org/apache/helix/rest/server/ServerContext.java index b845356..52f1738 100644 --- a/helix-rest/src/main/java/org/apache/helix/rest/server/ServerContext.java +++ b/helix-rest/src/main/java/org/apache/helix/rest/server/ServerContext.java @@ -20,34 +20,53 @@ package org.apache.helix.rest.server; * under the License. */ -import java.util.HashMap; +import java.io.IOException; +import java.util.List; import java.util.Map; +import java.util.concurrent.ConcurrentHashMap; import org.apache.helix.ConfigAccessor; import org.apache.helix.HelixAdmin; import org.apache.helix.HelixDataAccessor; +import org.apache.helix.HelixException; import org.apache.helix.InstanceType; -import org.apache.helix.rest.metadatastore.ZkMetadataStoreDirectory; -import org.apache.helix.zookeeper.datamodel.ZNRecord; +import org.apache.helix.SystemPropertyKeys; import org.apache.helix.manager.zk.ZKHelixAdmin; import org.apache.helix.manager.zk.ZKHelixDataAccessor; -import org.apache.helix.manager.zk.ZNRecordSerializer; import org.apache.helix.manager.zk.ZkBaseDataAccessor; -import org.apache.helix.zookeeper.impl.client.ZkClient; -import org.apache.helix.zookeeper.api.client.HelixZkClient; -import org.apache.helix.zookeeper.impl.factory.SharedZkClientFactory; +import org.apache.helix.msdcommon.exception.InvalidRoutingDataException; +import org.apache.helix.rest.metadatastore.ZkMetadataStoreDirectory; import org.apache.helix.task.TaskDriver; import org.apache.helix.tools.ClusterSetup; +import org.apache.helix.zookeeper.api.client.HelixZkClient; +import org.apache.helix.zookeeper.api.client.RealmAwareZkClient; +import org.apache.helix.zookeeper.datamodel.ZNRecord; +import org.apache.helix.zookeeper.datamodel.serializer.ZNRecordSerializer; +import org.apache.helix.zookeeper.impl.client.FederatedZkClient; +import org.apache.helix.zookeeper.impl.client.ZkClient; +import org.apache.helix.zookeeper.impl.factory.DedicatedZkClientFactory; +import org.apache.helix.zookeeper.impl.factory.SharedZkClientFactory; +import org.apache.helix.zookeeper.util.HttpRoutingDataReader; +import org.apache.helix.zookeeper.zkclient.IZkChildListener; +import org.apache.helix.zookeeper.zkclient.IZkDataListener; +import org.apache.helix.zookeeper.zkclient.IZkStateListener; import org.apache.helix.zookeeper.zkclient.exception.ZkMarshallingError; import org.apache.helix.zookeeper.zkclient.serialize.ZkSerializer; +import org.apache.zookeeper.Watcher; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +public class ServerContext implements IZkDataListener, IZkChildListener, IZkStateListener { + private static final Logger LOG = LoggerFactory.getLogger(ServerContext.class); -public class ServerContext { private final String _zkAddr; - private HelixZkClient _zkClient; - private ZKHelixAdmin _zkHelixAdmin; - private ClusterSetup _clusterSetup; - private ConfigAccessor _configAccessor; + private final String _msdsEndpoint; + private volatile RealmAwareZkClient _zkClient; + + private volatile ZKHelixAdmin _zkHelixAdmin; + private volatile ClusterSetup _clusterSetup; + private volatile ConfigAccessor _configAccessor; // A lazily-initialized base data accessor that reads/writes byte array to ZK // TODO: Only read (deserialize) is supported at this time. This baseDataAccessor should support write (serialize) as needs arise private volatile ZkBaseDataAccessor<byte[]> _byteArrayZkBaseDataAccessor; @@ -55,76 +74,149 @@ public class ServerContext { private final Map<String, HelixDataAccessor> _helixDataAccessorPool; // 1 Cluster name will correspond to 1 task driver private final Map<String, TaskDriver> _taskDriverPool; + + /** + * Multi-ZK support + */ private ZkMetadataStoreDirectory _zkMetadataStoreDirectory; + // Create a dedicated ZkClient for listening to data changes in routing data + private RealmAwareZkClient _zkClientForListener; public ServerContext(String zkAddr) { + this(zkAddr, null); + } + + /** + * Initializes a ServerContext for this namespace. + * @param zkAddr routing ZK address (on multi-zk mode) + * @param msdsEndpoint if given, this server context will try to read routing data from this MSDS. + */ + public ServerContext(String zkAddr, String msdsEndpoint) { _zkAddr = zkAddr; + _msdsEndpoint = msdsEndpoint; // only applicable on multi-zk mode // We should NOT initiate _zkClient and anything that depends on _zkClient in // constructor, as it is reasonable to start up HelixRestServer first and then // ZooKeeper. In this case, initializing _zkClient will fail and HelixRestServer // cannot be started correctly. - _helixDataAccessorPool = new HashMap<>(); - _taskDriverPool = new HashMap<>(); + _helixDataAccessorPool = new ConcurrentHashMap<>(); + _taskDriverPool = new ConcurrentHashMap<>(); + // Initialize the singleton ZkMetadataStoreDirectory instance to allow it to be closed later _zkMetadataStoreDirectory = ZkMetadataStoreDirectory.getInstance(); } - public HelixZkClient getHelixZkClient() { + public RealmAwareZkClient getRealmAwareZkClient() { if (_zkClient == null) { - HelixZkClient.ZkClientConfig clientConfig = new HelixZkClient.ZkClientConfig(); - clientConfig.setZkSerializer(new ZNRecordSerializer()); - _zkClient = SharedZkClientFactory.getInstance() - .buildZkClient(new HelixZkClient.ZkConnectionConfig(_zkAddr), clientConfig); + synchronized (this) { + if (_zkClient == null) { + // If the multi ZK config is enabled, use FederatedZkClient on multi-realm mode + if (Boolean.parseBoolean(System.getProperty(SystemPropertyKeys.MULTI_ZK_ENABLED))) { + LOG.info("ServerContext: initializing FederatedZkClient with routing ZK at {}!", + _zkAddr); + try { + RealmAwareZkClient.RealmAwareZkConnectionConfig.Builder connectionConfigBuilder = + new RealmAwareZkClient.RealmAwareZkConnectionConfig.Builder(); + // If MSDS endpoint is set for this namespace, use that instead. + if (_msdsEndpoint != null && !_msdsEndpoint.isEmpty()) { + connectionConfigBuilder.setMsdsEndpoint(_msdsEndpoint); + } + _zkClient = new FederatedZkClient(connectionConfigBuilder.build(), + new RealmAwareZkClient.RealmAwareZkClientConfig() + .setZkSerializer(new ZNRecordSerializer())); + + // Make sure the ServerContext is subscribed to routing data change so that it knows + // when to reset ZkClient and Helix APIs + if (_zkClientForListener == null) { + _zkClientForListener = DedicatedZkClientFactory.getInstance() + .buildZkClient(new HelixZkClient.ZkConnectionConfig(_zkAddr), + new HelixZkClient.ZkClientConfig() + .setZkSerializer(new ZNRecordSerializer())); + } + // Refresh data subscription + _zkClientForListener.unsubscribeAll(); + _zkClientForListener.subscribeRoutingDataChanges(this, this); + } catch (IOException | InvalidRoutingDataException | IllegalStateException e) { + throw new HelixException("Failed to create FederatedZkClient!", e); + } + } else { + // If multi ZK config is not set, just connect to the ZK address given + HelixZkClient.ZkClientConfig clientConfig = new HelixZkClient.ZkClientConfig(); + clientConfig.setZkSerializer(new ZNRecordSerializer()); + _zkClient = SharedZkClientFactory.getInstance() + .buildZkClient(new HelixZkClient.ZkConnectionConfig(_zkAddr), clientConfig); + } + } + } } return _zkClient; } @Deprecated public ZkClient getZkClient() { - return (ZkClient) getHelixZkClient(); + return (ZkClient) getRealmAwareZkClient(); } public HelixAdmin getHelixAdmin() { if (_zkHelixAdmin == null) { - _zkHelixAdmin = new ZKHelixAdmin(getHelixZkClient()); + synchronized (this) { + if (_zkHelixAdmin == null) { + _zkHelixAdmin = new ZKHelixAdmin(getRealmAwareZkClient()); + } + } } return _zkHelixAdmin; } public ClusterSetup getClusterSetup() { if (_clusterSetup == null) { - _clusterSetup = new ClusterSetup(getHelixZkClient(), getHelixAdmin()); + synchronized (this) { + if (_clusterSetup == null) { + _clusterSetup = new ClusterSetup(getRealmAwareZkClient(), getHelixAdmin()); + } + } } return _clusterSetup; } public TaskDriver getTaskDriver(String clusterName) { - synchronized (_taskDriverPool) { - if (!_taskDriverPool.containsKey(clusterName)) { - _taskDriverPool.put(clusterName, new TaskDriver(getHelixZkClient(), clusterName)); + TaskDriver taskDriver = _taskDriverPool.get(clusterName); + if (taskDriver == null) { + synchronized (this) { + if (!_taskDriverPool.containsKey(clusterName)) { + _taskDriverPool.put(clusterName, new TaskDriver(getRealmAwareZkClient(), clusterName)); + } + taskDriver = _taskDriverPool.get(clusterName); } - return _taskDriverPool.get(clusterName); } + return taskDriver; } public ConfigAccessor getConfigAccessor() { if (_configAccessor == null) { - _configAccessor = new ConfigAccessor(getHelixZkClient()); + synchronized (this) { + if (_configAccessor == null) { + _configAccessor = new ConfigAccessor(getRealmAwareZkClient()); + } + } } return _configAccessor; } - public HelixDataAccessor getDataAccssor(String clusterName) { - synchronized (_helixDataAccessorPool) { - if (!_helixDataAccessorPool.containsKey(clusterName)) { - ZkBaseDataAccessor<ZNRecord> baseDataAccessor = - new ZkBaseDataAccessor<>(getHelixZkClient()); - _helixDataAccessorPool.put(clusterName, - new ZKHelixDataAccessor(clusterName, InstanceType.ADMINISTRATOR, baseDataAccessor)); + public HelixDataAccessor getDataAccessor(String clusterName) { + HelixDataAccessor dataAccessor = _helixDataAccessorPool.get(clusterName); + if (dataAccessor == null) { + synchronized (this) { + if (!_helixDataAccessorPool.containsKey(clusterName)) { + ZkBaseDataAccessor<ZNRecord> baseDataAccessor = + new ZkBaseDataAccessor<>(getRealmAwareZkClient()); + _helixDataAccessorPool.put(clusterName, + new ZKHelixDataAccessor(clusterName, InstanceType.ADMINISTRATOR, baseDataAccessor)); + } + dataAccessor = _helixDataAccessorPool.get(clusterName); } - return _helixDataAccessorPool.get(clusterName); } + return dataAccessor; } /** @@ -132,30 +224,26 @@ public class ServerContext { * @return */ public ZkBaseDataAccessor<byte[]> getByteArrayZkBaseDataAccessor() { - ZkBaseDataAccessor<byte[]> byteArrayZkBaseDataAccessor = _byteArrayZkBaseDataAccessor; - if (byteArrayZkBaseDataAccessor != null) { // First check (no locking) - return byteArrayZkBaseDataAccessor; - } + if (_byteArrayZkBaseDataAccessor == null) { + synchronized (this) { + if (_byteArrayZkBaseDataAccessor == null) { - synchronized (this) { - if (_byteArrayZkBaseDataAccessor == null) { // Second check (with locking) - _byteArrayZkBaseDataAccessor = new ZkBaseDataAccessor<>(_zkAddr, new ZkSerializer() { - @Override - public byte[] serialize(Object o) - throws ZkMarshallingError { - // TODO: Support serialize for write methods if necessary - throw new UnsupportedOperationException("serialize() is not supported."); - } + _byteArrayZkBaseDataAccessor = new ZkBaseDataAccessor<>(_zkAddr, new ZkSerializer() { + @Override + public byte[] serialize(Object o) throws ZkMarshallingError { + // TODO: Support serialize for write methods if necessary + throw new UnsupportedOperationException("serialize() is not supported."); + } - @Override - public Object deserialize(byte[] bytes) - throws ZkMarshallingError { - return bytes; - } - }); + @Override + public Object deserialize(byte[] bytes) throws ZkMarshallingError { + return bytes; + } + }); + } } - return _byteArrayZkBaseDataAccessor; } + return _byteArrayZkBaseDataAccessor; } public void close() { @@ -165,5 +253,96 @@ public class ServerContext { if (_zkMetadataStoreDirectory != null) { _zkMetadataStoreDirectory.close(); } + if (_zkClientForListener != null) { + _zkClientForListener.close(); + } + } + + @Override + public void handleChildChange(String parentPath, List<String> currentChilds) { + if (_zkClientForListener == null || _zkClientForListener.isClosed()) { + return; + } + // Resubscribe + _zkClientForListener.unsubscribeAll(); + _zkClientForListener.subscribeRoutingDataChanges(this, this); + resetZkResources(); + } + + @Override + public void handleDataChange(String dataPath, Object data) { + if (_zkClientForListener == null || _zkClientForListener.isClosed()) { + return; + } + resetZkResources(); + } + + @Override + public void handleDataDeleted(String dataPath) { + // NOP because this is covered by handleChildChange() + } + + @Override + public void handleStateChanged(Watcher.Event.KeeperState state) { + if (_zkClientForListener == null || _zkClientForListener.isClosed()) { + return; + } + // Resubscribe + _zkClientForListener.unsubscribeAll(); + _zkClientForListener.subscribeRoutingDataChanges(this, this); + resetZkResources(); + } + + @Override + public void handleNewSession(String sessionId) { + if (_zkClientForListener == null || _zkClientForListener.isClosed()) { + return; + } + // Resubscribe + _zkClientForListener.unsubscribeAll(); + _zkClientForListener.subscribeRoutingDataChanges(this, this); + resetZkResources(); + } + + @Override + public void handleSessionEstablishmentError(Throwable error) { + if (_zkClientForListener == null || _zkClientForListener.isClosed()) { + return; + } + // Resubscribe + _zkClientForListener.unsubscribeAll(); + _zkClientForListener.subscribeRoutingDataChanges(this, this); + resetZkResources(); + } + + /** + * Resets all internally cached routing data by closing and nullifying the ZkClient and Helix APIs. + * This is okay because routing data update should be infrequent. + */ + private void resetZkResources() { + synchronized (this) { + LOG.info("ServerContext: Resetting ZK resources due to routing data change! Routing ZK: {}", + _zkAddr); + try { + // Reset HttpRoutingDataReader's cache + HttpRoutingDataReader.reset(); + // All Helix APIs will be closed implicitly because ZkClient is closed + if (_zkClient != null && !_zkClient.isClosed()) { + _zkClient.close(); + } + if (_byteArrayZkBaseDataAccessor != null) { + _byteArrayZkBaseDataAccessor.close(); + } + _zkClient = null; + _zkHelixAdmin = null; + _clusterSetup = null; + _configAccessor = null; + _byteArrayZkBaseDataAccessor = null; + _helixDataAccessorPool.clear(); + _taskDriverPool.clear(); + } catch (Exception e) { + LOG.error("Failed to reset ZkClient and Helix APIs in ServerContext!", e); + } + } } } diff --git a/helix-rest/src/main/java/org/apache/helix/rest/server/resources/helix/AbstractHelixResource.java b/helix-rest/src/main/java/org/apache/helix/rest/server/resources/helix/AbstractHelixResource.java index f1bb583..487316b 100644 --- a/helix-rest/src/main/java/org/apache/helix/rest/server/resources/helix/AbstractHelixResource.java +++ b/helix-rest/src/main/java/org/apache/helix/rest/server/resources/helix/AbstractHelixResource.java @@ -24,15 +24,15 @@ import java.io.IOException; import org.apache.helix.ConfigAccessor; import org.apache.helix.HelixAdmin; import org.apache.helix.HelixDataAccessor; -import org.apache.helix.zookeeper.datamodel.ZNRecord; import org.apache.helix.manager.zk.ZkBaseDataAccessor; -import org.apache.helix.zookeeper.impl.client.ZkClient; -import org.apache.helix.zookeeper.api.client.HelixZkClient; import org.apache.helix.rest.common.ContextPropertyKeys; import org.apache.helix.rest.server.ServerContext; import org.apache.helix.rest.server.resources.AbstractResource; import org.apache.helix.task.TaskDriver; import org.apache.helix.tools.ClusterSetup; +import org.apache.helix.zookeeper.api.client.RealmAwareZkClient; +import org.apache.helix.zookeeper.datamodel.ZNRecord; +import org.apache.helix.zookeeper.impl.client.ZkClient; /** @@ -42,14 +42,14 @@ import org.apache.helix.tools.ClusterSetup; */ public class AbstractHelixResource extends AbstractResource { - public HelixZkClient getHelixZkClient() { + public RealmAwareZkClient getRealmAwareZkClient() { ServerContext serverContext = getServerContext(); - return serverContext.getHelixZkClient(); + return serverContext.getRealmAwareZkClient(); } @Deprecated public ZkClient getZkClient() { - return (ZkClient) getHelixZkClient(); + return (ZkClient) getRealmAwareZkClient(); } public HelixAdmin getHelixAdmin() { @@ -74,7 +74,7 @@ public class AbstractHelixResource extends AbstractResource { public HelixDataAccessor getDataAccssor(String clusterName) { ServerContext serverContext = getServerContext(); - return serverContext.getDataAccssor(clusterName); + return serverContext.getDataAccessor(clusterName); } protected ZkBaseDataAccessor<byte[]> getByteArrayDataAccessor() { diff --git a/helix-rest/src/main/java/org/apache/helix/rest/server/resources/helix/ClusterAccessor.java b/helix-rest/src/main/java/org/apache/helix/rest/server/resources/helix/ClusterAccessor.java index 6beeb2c..4ca775c 100644 --- a/helix-rest/src/main/java/org/apache/helix/rest/server/resources/helix/ClusterAccessor.java +++ b/helix-rest/src/main/java/org/apache/helix/rest/server/resources/helix/ClusterAccessor.java @@ -44,10 +44,7 @@ import org.apache.helix.HelixDataAccessor; import org.apache.helix.HelixException; import org.apache.helix.PropertyKey; import org.apache.helix.PropertyPathBuilder; -import org.apache.helix.model.RESTConfig; -import org.apache.helix.zookeeper.datamodel.ZNRecord; import org.apache.helix.manager.zk.ZKUtil; -import org.apache.helix.zookeeper.api.client.HelixZkClient; import org.apache.helix.model.CloudConfig; import org.apache.helix.model.ClusterConfig; import org.apache.helix.model.ControllerHistory; @@ -55,12 +52,15 @@ import org.apache.helix.model.HelixConfigScope; import org.apache.helix.model.LiveInstance; import org.apache.helix.model.MaintenanceSignal; import org.apache.helix.model.Message; +import org.apache.helix.model.RESTConfig; import org.apache.helix.model.StateModelDefinition; import org.apache.helix.model.builder.HelixConfigScopeBuilder; import org.apache.helix.rest.server.json.cluster.ClusterTopology; import org.apache.helix.rest.server.service.ClusterService; import org.apache.helix.rest.server.service.ClusterServiceImpl; import org.apache.helix.tools.ClusterSetup; +import org.apache.helix.zookeeper.api.client.RealmAwareZkClient; +import org.apache.helix.zookeeper.datamodel.ZNRecord; import org.codehaus.jackson.type.TypeReference; import org.slf4j.Logger; import org.slf4j.LoggerFactory; @@ -472,7 +472,7 @@ public class ClusterAccessor extends AbstractHelixResource { LOG.error("Failed to deserialize user's input {}. Exception: {}.", content, e); return badRequest("Input is not a valid ZNRecord!"); } - HelixZkClient zkClient = getHelixZkClient(); + RealmAwareZkClient zkClient = getRealmAwareZkClient(); String path = PropertyPathBuilder.stateModelDef(clusterId); try { ZKUtil.createChildren(zkClient, path, record); @@ -656,7 +656,7 @@ public class ClusterAccessor extends AbstractHelixResource { } private boolean doesClusterExist(String cluster) { - HelixZkClient zkClient = getHelixZkClient(); + RealmAwareZkClient zkClient = getRealmAwareZkClient(); return ZKUtil.isClusterSetup(cluster, zkClient); } @@ -664,7 +664,7 @@ public class ClusterAccessor extends AbstractHelixResource { @Path("{clusterId}/cloudconfig") public Response addCloudConfig(@PathParam("clusterId") String clusterId, String content) { - HelixZkClient zkClient = getHelixZkClient(); + RealmAwareZkClient zkClient = getRealmAwareZkClient(); if (!ZKUtil.isClusterSetup(clusterId, zkClient)) { return notFound("Cluster is not properly setup!"); } @@ -696,7 +696,7 @@ public class ClusterAccessor extends AbstractHelixResource { @Path("{clusterId}/cloudconfig") public Response getCloudConfig(@PathParam("clusterId") String clusterId) { - HelixZkClient zkClient = getHelixZkClient(); + RealmAwareZkClient zkClient = getRealmAwareZkClient(); if (!ZKUtil.isClusterSetup(clusterId, zkClient)) { return notFound(); } @@ -724,7 +724,7 @@ public class ClusterAccessor extends AbstractHelixResource { public Response updateCloudConfig(@PathParam("clusterId") String clusterId, @QueryParam("command") String commandStr, String content) { - HelixZkClient zkClient = getHelixZkClient(); + RealmAwareZkClient zkClient = getRealmAwareZkClient(); if (!ZKUtil.isClusterSetup(clusterId, zkClient)) { return notFound(); } diff --git a/helix-rest/src/main/java/org/apache/helix/rest/server/resources/helix/ResourceAccessor.java b/helix-rest/src/main/java/org/apache/helix/rest/server/resources/helix/ResourceAccessor.java index ca2189e..b8c7c38 100644 --- a/helix-rest/src/main/java/org/apache/helix/rest/server/resources/helix/ResourceAccessor.java +++ b/helix-rest/src/main/java/org/apache/helix/rest/server/resources/helix/ResourceAccessor.java @@ -41,18 +41,18 @@ import org.apache.helix.ConfigAccessor; import org.apache.helix.HelixAdmin; import org.apache.helix.HelixException; import org.apache.helix.PropertyPathBuilder; -import org.apache.helix.zookeeper.datamodel.ZNRecord; -import org.apache.helix.zookeeper.api.client.HelixZkClient; import org.apache.helix.model.ExternalView; import org.apache.helix.model.HelixConfigScope; import org.apache.helix.model.IdealState; import org.apache.helix.model.ResourceConfig; import org.apache.helix.model.StateModelDefinition; import org.apache.helix.model.builder.HelixConfigScopeBuilder; -import org.codehaus.jackson.type.TypeReference; +import org.apache.helix.zookeeper.api.client.RealmAwareZkClient; +import org.apache.helix.zookeeper.datamodel.ZNRecord; import org.codehaus.jackson.node.ArrayNode; import org.codehaus.jackson.node.JsonNodeFactory; import org.codehaus.jackson.node.ObjectNode; +import org.codehaus.jackson.type.TypeReference; import org.slf4j.Logger; import org.slf4j.LoggerFactory; @@ -79,7 +79,7 @@ public class ResourceAccessor extends AbstractHelixResource { ObjectNode root = JsonNodeFactory.instance.objectNode(); root.put(Properties.id.name(), JsonNodeFactory.instance.textNode(clusterId)); - HelixZkClient zkClient = getHelixZkClient(); + RealmAwareZkClient zkClient = getRealmAwareZkClient(); ArrayNode idealStatesNode = root.putArray(ResourceProperties.idealStates.name()); ArrayNode externalViewsNode = root.putArray(ResourceProperties.externalViews.name()); @@ -109,7 +109,7 @@ public class ResourceAccessor extends AbstractHelixResource { @Path("health") public Response getResourceHealth(@PathParam("clusterId") String clusterId) { - HelixZkClient zkClient = getHelixZkClient(); + RealmAwareZkClient zkClient = getRealmAwareZkClient(); List<String> resourcesInIdealState = zkClient.getChildren(PropertyPathBuilder.idealState(clusterId)); diff --git a/helix-rest/src/test/java/org/apache/helix/rest/metadatastore/integration/TestRoutingDataUpdate.java b/helix-rest/src/test/java/org/apache/helix/rest/metadatastore/integration/TestRoutingDataUpdate.java new file mode 100644 index 0000000..0babf05 --- /dev/null +++ b/helix-rest/src/test/java/org/apache/helix/rest/metadatastore/integration/TestRoutingDataUpdate.java @@ -0,0 +1,176 @@ +package org.apache.helix.rest.metadatastore.integration; + +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ + +import java.util.ArrayList; +import java.util.Arrays; +import java.util.Collections; +import java.util.HashMap; +import java.util.List; +import java.util.Map; +import javax.ws.rs.client.Entity; +import javax.ws.rs.core.MediaType; +import javax.ws.rs.core.Response; + +import org.apache.helix.SystemPropertyKeys; +import org.apache.helix.msdcommon.constant.MetadataStoreRoutingConstants; +import org.apache.helix.rest.server.AbstractTestClass; +import org.testng.Assert; +import org.testng.annotations.AfterClass; +import org.testng.annotations.BeforeClass; +import org.testng.annotations.Test; + + +/** + * TestRoutingDataUpdate tests that Helix REST server's ServerContext gets a proper update whenever + * there is change in the routing data. + */ +public class TestRoutingDataUpdate extends AbstractTestClass { + private static final String CLUSTER_0_SHARDING_KEY = "/TestRoutingDataUpdate-cluster-0"; + private static final String CLUSTER_1_SHARDING_KEY = "/TestRoutingDataUpdate-cluster-1"; + private final Map<String, List<String>> _routingData = new HashMap<>(); + + @BeforeClass + public void beforeClass() { + System.setProperty(MetadataStoreRoutingConstants.MSDS_SERVER_HOSTNAME_KEY, + getBaseUri().getHost()); + System.setProperty(MetadataStoreRoutingConstants.MSDS_SERVER_PORT_KEY, + Integer.toString(getBaseUri().getPort())); + + // Set the multi-zk config + System.setProperty(SystemPropertyKeys.MULTI_ZK_ENABLED, "true"); + // Set the MSDS address + String msdsEndpoint = getBaseUri().toString(); + System.setProperty(MetadataStoreRoutingConstants.MSDS_SERVER_ENDPOINT_KEY, msdsEndpoint); + + // Restart Helix Rest server to get a fresh ServerContext created + restartRestServer(); + } + + @AfterClass + public void afterClass() { + // Clear all property + System.clearProperty(MetadataStoreRoutingConstants.MSDS_SERVER_HOSTNAME_KEY); + System.clearProperty(MetadataStoreRoutingConstants.MSDS_SERVER_PORT_KEY); + System.clearProperty(SystemPropertyKeys.MULTI_ZK_ENABLED); + + restartRestServer(); + } + + @Test + public void testRoutingDataUpdate() throws Exception { + // Set up routing data + _routingData.put(ZK_ADDR, Arrays.asList(CLUSTER_0_SHARDING_KEY, CLUSTER_1_SHARDING_KEY)); + _routingData.put(_zkAddrTestNS, new ArrayList<>()); + String routingDataString = OBJECT_MAPPER.writeValueAsString(_routingData); + put(MetadataStoreRoutingConstants.MSDS_GET_ALL_ROUTING_DATA_ENDPOINT, null, + Entity.entity(routingDataString, MediaType.APPLICATION_JSON_TYPE), + Response.Status.CREATED.getStatusCode()); + + // Need to wait so that ServerContext processes the callback + // TODO: Think of another way to wait - + // this is only used because of the nature of the testing environment + // in production, the server might return a 500 if a http call comes in before callbacks get + // processed fully + Thread.sleep(500L); + + // Create the first cluster using Helix REST API via ClusterAccessor + put("/clusters" + CLUSTER_0_SHARDING_KEY, null, + Entity.entity("", MediaType.APPLICATION_JSON_TYPE), + Response.Status.CREATED.getStatusCode()); + // Check that the first cluster is created in the first ZK as designated by routing data + Assert.assertTrue(_gZkClient.exists(CLUSTER_0_SHARDING_KEY)); + Assert.assertFalse(_gZkClientTestNS.exists(CLUSTER_0_SHARDING_KEY)); + + // Change the routing data mapping so that CLUSTER_1 points to the second ZK + _routingData.clear(); + _routingData.put(ZK_ADDR, Collections.singletonList(CLUSTER_0_SHARDING_KEY)); + _routingData.put(_zkAddrTestNS, Collections.singletonList(CLUSTER_1_SHARDING_KEY)); + routingDataString = OBJECT_MAPPER.writeValueAsString(_routingData); + put(MetadataStoreRoutingConstants.MSDS_GET_ALL_ROUTING_DATA_ENDPOINT, null, + Entity.entity(routingDataString, MediaType.APPLICATION_JSON_TYPE), + Response.Status.CREATED.getStatusCode()); + + // Need to wait so that ServerContext processes the callback + // TODO: Think of another way to wait - + // this is only used because of the nature of the testing environment + // in production, the server might return a 500 if a http call comes in before callbacks get + // processed fully + Thread.sleep(500L); + + // Create the second cluster using Helix REST API via ClusterAccessor + put("/clusters" + CLUSTER_1_SHARDING_KEY, null, + Entity.entity("", MediaType.APPLICATION_JSON_TYPE), + Response.Status.CREATED.getStatusCode()); + // Check that the second cluster is created in the second ZK as designated by routing data + Assert.assertTrue(_gZkClientTestNS.exists(CLUSTER_1_SHARDING_KEY)); + Assert.assertFalse(_gZkClient.exists(CLUSTER_1_SHARDING_KEY)); + + // Remove all routing data + put(MetadataStoreRoutingConstants.MSDS_GET_ALL_ROUTING_DATA_ENDPOINT, null, Entity + .entity(OBJECT_MAPPER.writeValueAsString(Collections.emptyMap()), + MediaType.APPLICATION_JSON_TYPE), Response.Status.CREATED.getStatusCode()); + + // Need to wait so that ServerContext processes the callback + // TODO: Think of another way to wait - + // this is only used because of the nature of the testing environment + // in production, the server might return a 500 if a http call comes in before callbacks get + // processed fully + Thread.sleep(500L); + + // Delete clusters - both should fail because routing data don't have these clusters + delete("/clusters" + CLUSTER_0_SHARDING_KEY, + Response.Status.INTERNAL_SERVER_ERROR.getStatusCode()); + delete("/clusters" + CLUSTER_1_SHARDING_KEY, + Response.Status.INTERNAL_SERVER_ERROR.getStatusCode()); + + // Set the routing data again + put(MetadataStoreRoutingConstants.MSDS_GET_ALL_ROUTING_DATA_ENDPOINT, null, + Entity.entity(routingDataString, MediaType.APPLICATION_JSON_TYPE), + Response.Status.CREATED.getStatusCode()); + + // Need to wait so that ServerContext processes the callback + // TODO: Think of another way to wait - + // this is only used because of the nature of the testing environment + // in production, the server might return a 500 if a http call comes in before callbacks get + // processed fully + Thread.sleep(500L); + + // Attempt deletion again - now they should succeed + delete("/clusters" + CLUSTER_0_SHARDING_KEY, Response.Status.OK.getStatusCode()); + delete("/clusters" + CLUSTER_1_SHARDING_KEY, Response.Status.OK.getStatusCode()); + + // Double-verify using ZkClients + Assert.assertFalse(_gZkClientTestNS.exists(CLUSTER_1_SHARDING_KEY)); + Assert.assertFalse(_gZkClient.exists(CLUSTER_0_SHARDING_KEY)); + + // Remove all routing data + put(MetadataStoreRoutingConstants.MSDS_GET_ALL_ROUTING_DATA_ENDPOINT, null, Entity + .entity(OBJECT_MAPPER.writeValueAsString(Collections.emptyMap()), + MediaType.APPLICATION_JSON_TYPE), Response.Status.CREATED.getStatusCode()); + } + + private void restartRestServer() { + if (_helixRestServer != null) { + _helixRestServer.shutdown(); + } + _helixRestServer = startRestServer(); + } +} diff --git a/helix-rest/src/test/java/org/apache/helix/rest/server/AbstractTestClass.java b/helix-rest/src/test/java/org/apache/helix/rest/server/AbstractTestClass.java index c5ffd41..0fee10d 100644 --- a/helix-rest/src/test/java/org/apache/helix/rest/server/AbstractTestClass.java +++ b/helix-rest/src/test/java/org/apache/helix/rest/server/AbstractTestClass.java @@ -189,21 +189,7 @@ public class AbstractTestClass extends JerseyTestNg.ContainerPerClassTest { @Override public void start() { if (_helixRestServer == null) { - // Create namespace manifest map - List<HelixRestNamespace> namespaces = new ArrayList<>(); - // Add test namespace - namespaces.add(new HelixRestNamespace(TEST_NAMESPACE, - HelixRestNamespace.HelixMetadataStoreType.ZOOKEEPER, _zkAddrTestNS, false)); - // Add default namesapce - namespaces.add(new HelixRestNamespace(ZK_ADDR)); - try { - _helixRestServer = - new HelixRestServer(namespaces, baseUri.getPort(), baseUri.getPath(), - Collections.singletonList(_auditLogger)); - _helixRestServer.start(); - } catch (Exception ex) { - throw new TestContainerException(ex); - } + _helixRestServer = startRestServer(); } } @@ -584,4 +570,28 @@ public class AbstractTestClass extends JerseyTestNg.ContainerPerClassTest { _clusters.add(STOPPABLE_CLUSTER); _workflowMap.put(STOPPABLE_CLUSTER, createWorkflows(STOPPABLE_CLUSTER, 3)); } + + /** + * Starts a HelixRestServer for the test suite. + * @return + */ + protected HelixRestServer startRestServer() { + // Create namespace manifest map + List<HelixRestNamespace> namespaces = new ArrayList<>(); + // Add test namespace + namespaces.add(new HelixRestNamespace(TEST_NAMESPACE, + HelixRestNamespace.HelixMetadataStoreType.ZOOKEEPER, _zkAddrTestNS, false)); + // Add default namesapce + namespaces.add(new HelixRestNamespace(ZK_ADDR)); + HelixRestServer server; + try { + server = + new HelixRestServer(namespaces, getBaseUri().getPort(), getBaseUri().getPath(), + Collections.singletonList(_auditLogger)); + server.start(); + } catch (Exception ex) { + throw new TestContainerException(ex); + } + return server; + } } diff --git a/metadata-store-directory-common/src/main/java/org/apache/helix/msdcommon/constant/MetadataStoreRoutingConstants.java b/metadata-store-directory-common/src/main/java/org/apache/helix/msdcommon/constant/MetadataStoreRoutingConstants.java index 41d5011..6cceb50 100644 --- a/metadata-store-directory-common/src/main/java/org/apache/helix/msdcommon/constant/MetadataStoreRoutingConstants.java +++ b/metadata-store-directory-common/src/main/java/org/apache/helix/msdcommon/constant/MetadataStoreRoutingConstants.java @@ -22,6 +22,8 @@ package org.apache.helix.msdcommon.constant; public class MetadataStoreRoutingConstants { public static final String ROUTING_DATA_PATH = "/METADATA_STORE_ROUTING_DATA"; + public static final String ROUTING_ZK_ADDRESS_KEY = "ROUTING_ZK_ADDRESS"; + // For ZK only public static final String ZNRECORD_LIST_FIELD_KEY = "ZK_PATH_SHARDING_KEYS"; diff --git a/metadata-store-directory-common/src/main/java/org/apache/helix/msdcommon/util/ZkValidationUtil.java b/metadata-store-directory-common/src/main/java/org/apache/helix/msdcommon/util/ZkValidationUtil.java index 472b3d9..ab8258d 100644 --- a/metadata-store-directory-common/src/main/java/org/apache/helix/msdcommon/util/ZkValidationUtil.java +++ b/metadata-store-directory-common/src/main/java/org/apache/helix/msdcommon/util/ZkValidationUtil.java @@ -27,12 +27,13 @@ public class ZkValidationUtil { * / * /abc * /abc/abc/abc/abc + * /abc/localhost:1234 * Invalid matches: * null or empty string * /abc/ * /abc/abc/abc/abc/ **/ public static boolean isPathValid(String path) { - return path.matches("^/|(/[\\w-]+)+$"); + return path.matches("^/|(/[\\w?:-]+)+$"); } } diff --git a/pom.xml b/pom.xml index e8c9bd3..60f8d87 100644 --- a/pom.xml +++ b/pom.xml @@ -630,40 +630,6 @@ under the License. <groupId>org.apache.maven.plugins</groupId> <artifactId>maven-surefire-plugin</artifactId> <version>3.0.0-M3</version> - <executions> - <!-- - "executions" enables multiple runs of integration test suites. This is to enable two - runs: 1. run in a single-ZK environment, 2. run in a multi-ZK environment. - "multiZk" is the config accessible via Systems.Properties so that the two runs could be - differentiated. - --> - <execution> - <goals> - <goal>test</goal> - </goals> - <id>default-test</id> - <phase>test</phase> - <configuration> - <rerunFailingTestsCount>3</rerunFailingTestsCount> - <skipAfterFailureCount>10</skipAfterFailureCount> - </configuration> - </execution> - <execution> - <goals> - <goal>test</goal> - </goals> - <id>multi-zk</id> - <phase>test</phase> - <configuration> - <systemPropertyVariables> - <multiZk>true</multiZk> - <numZk>3</numZk> - </systemPropertyVariables> - <rerunFailingTestsCount>3</rerunFailingTestsCount> - <skipAfterFailureCount>10</skipAfterFailureCount> - </configuration> - </execution> - </executions> </plugin> <plugin> <groupId>org.apache.rat</groupId> diff --git a/zookeeper-api/src/main/java/org/apache/helix/zookeeper/api/client/RealmAwareZkClient.java b/zookeeper-api/src/main/java/org/apache/helix/zookeeper/api/client/RealmAwareZkClient.java index 0e461b7..ee8c8e3 100644 --- a/zookeeper-api/src/main/java/org/apache/helix/zookeeper/api/client/RealmAwareZkClient.java +++ b/zookeeper-api/src/main/java/org/apache/helix/zookeeper/api/client/RealmAwareZkClient.java @@ -23,6 +23,7 @@ import java.io.IOException; import java.util.List; import java.util.concurrent.TimeUnit; +import org.apache.helix.msdcommon.constant.MetadataStoreRoutingConstants; import org.apache.helix.msdcommon.exception.InvalidRoutingDataException; import org.apache.helix.zookeeper.exception.ZkClientException; import org.apache.helix.zookeeper.util.HttpRoutingDataReader; @@ -579,4 +580,19 @@ public interface RealmAwareZkClient { .setConnectInitTimeout(_connectInitTimeout); } } + + /** + * Subscribes to the routing data paths using the provided ZkClient. + * Note: this method assumes that the routing data path has already been created. + * @param childListener + * @param dataListener + */ + default void subscribeRoutingDataChanges(IZkChildListener childListener, + IZkDataListener dataListener) { + subscribeChildChanges(MetadataStoreRoutingConstants.ROUTING_DATA_PATH, childListener); + for (String child : getChildren(MetadataStoreRoutingConstants.ROUTING_DATA_PATH)) { + subscribeDataChanges(MetadataStoreRoutingConstants.ROUTING_DATA_PATH + "/" + child, + dataListener); + } + } } diff --git a/zookeeper-api/src/main/java/org/apache/helix/zookeeper/util/HttpRoutingDataReader.java b/zookeeper-api/src/main/java/org/apache/helix/zookeeper/util/HttpRoutingDataReader.java index f2f907a..b214cc4 100644 --- a/zookeeper-api/src/main/java/org/apache/helix/zookeeper/util/HttpRoutingDataReader.java +++ b/zookeeper-api/src/main/java/org/apache/helix/zookeeper/util/HttpRoutingDataReader.java @@ -87,7 +87,7 @@ public class HttpRoutingDataReader { synchronized (HttpRoutingDataReader.class) { rawRoutingData = _rawRoutingDataMap.get(msdsEndpoint); if (rawRoutingData == null) { - String routingDataJson = getAllRoutingData(); + String routingDataJson = getAllRoutingData(msdsEndpoint); // Update the reference if reading routingData over HTTP is successful rawRoutingData = parseRoutingData(routingDataJson); _rawRoutingDataMap.put(msdsEndpoint, rawRoutingData); @@ -136,16 +136,24 @@ public class HttpRoutingDataReader { } /** + * Clears the statically-cached routing data in HttpRoutingDataReader. + */ + public static void reset() { + _rawRoutingDataMap.clear(); + _metadataStoreRoutingDataMap.clear(); + } + + /** * Makes an HTTP call to fetch all routing data. * @return * @throws IOException */ - private static String getAllRoutingData() throws IOException { + private static String getAllRoutingData(String msdsEndpoint) throws IOException { // Note that MSDS_ENDPOINT should provide high-availability - it risks becoming a single point // of failure if it's backed by a single IP address/host // Retry count is 3 by default. HttpGet requestAllData = new HttpGet( - SYSTEM_MSDS_ENDPOINT + MetadataStoreRoutingConstants.MSDS_GET_ALL_ROUTING_DATA_ENDPOINT); + msdsEndpoint + MetadataStoreRoutingConstants.MSDS_GET_ALL_ROUTING_DATA_ENDPOINT); // Define timeout configs RequestConfig config = RequestConfig.custom().setConnectTimeout(HTTP_TIMEOUT_IN_MS) diff --git a/zookeeper-api/src/test/java/org/apache/helix/zookeeper/impl/client/RealmAwareZkClientTestBase.java b/zookeeper-api/src/test/java/org/apache/helix/zookeeper/impl/client/RealmAwareZkClientTestBase.java index 900c79f..acb2299 100644 --- a/zookeeper-api/src/test/java/org/apache/helix/zookeeper/impl/client/RealmAwareZkClientTestBase.java +++ b/zookeeper-api/src/test/java/org/apache/helix/zookeeper/impl/client/RealmAwareZkClientTestBase.java @@ -43,7 +43,7 @@ public abstract class RealmAwareZkClientTestBase extends ZkTestBase { @BeforeClass public void beforeClass() throws IOException, InvalidRoutingDataException { - // Create a mock MSDS so that HttpRoudingDataReader could fetch the routing data + // Create a mock MSDS so that HttpRoutingDataReader could fetch the routing data if (_msdsServer == null) { // Do not create again if Mock MSDS server has already been created by other tests _msdsServer = new MockMetadataStoreDirectoryServer(MSDS_HOSTNAME, MSDS_PORT, MSDS_NAMESPACE,
