KNOX-1013 - Monitor Ambari for Cluster Topology changes (Phil Zampino via lmccay)
Project: http://git-wip-us.apache.org/repos/asf/knox/repo Commit: http://git-wip-us.apache.org/repos/asf/knox/commit/a874f399 Tree: http://git-wip-us.apache.org/repos/asf/knox/tree/a874f399 Diff: http://git-wip-us.apache.org/repos/asf/knox/diff/a874f399 Branch: refs/heads/KNOX-998-Package_Restructuring Commit: a874f399e05835c359ded4ee2c7b822c7baa3231 Parents: 13287d2 Author: Larry McCay <[email protected]> Authored: Tue Dec 5 15:07:16 2017 -0500 Committer: Larry McCay <[email protected]> Committed: Tue Dec 5 15:07:32 2017 -0500 ---------------------------------------------------------------------- .../discovery/ambari/AmbariClientCommon.java | 102 ++++ .../discovery/ambari/AmbariCluster.java | 5 + ...bariClusterConfigurationMonitorProvider.java | 35 ++ .../ambari/AmbariConfigurationMonitor.java | 525 +++++++++++++++++++ .../ambari/AmbariServiceDiscovery.java | 228 ++++---- .../ambari/AmbariServiceDiscoveryMessages.java | 51 +- .../topology/discovery/ambari/RESTInvoker.java | 136 +++++ .../ambari/ServiceURLPropertyConfig.java | 2 +- ...iscovery.ClusterConfigurationMonitorProvider | 19 + .../ambari/AmbariConfigurationMonitorTest.java | 319 +++++++++++ .../ambari/AmbariServiceDiscoveryTest.java | 28 +- gateway-release/home/conf/gateway-site.xml | 12 + .../apache/hadoop/gateway/GatewayMessages.java | 16 + .../gateway/config/impl/GatewayConfigImpl.java | 15 + .../services/DefaultGatewayServices.java | 10 + ...faultClusterConfigurationMonitorService.java | 81 +++ .../topology/impl/DefaultTopologyService.java | 57 +- .../simple/SimpleDescriptorHandler.java | 9 +- .../hadoop/gateway/config/GatewayConfig.java | 18 +- .../gateway/services/GatewayServices.java | 2 + .../ClusterConfigurationMonitorService.java | 43 ++ .../discovery/ClusterConfigurationMonitor.java | 48 ++ .../ClusterConfigurationMonitorProvider.java | 27 + .../hadoop/gateway/GatewayTestConfig.java | 10 + 24 files changed, 1633 insertions(+), 165 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/knox/blob/a874f399/gateway-discovery-ambari/src/main/java/org/apache/hadoop/gateway/topology/discovery/ambari/AmbariClientCommon.java ---------------------------------------------------------------------- diff --git a/gateway-discovery-ambari/src/main/java/org/apache/hadoop/gateway/topology/discovery/ambari/AmbariClientCommon.java b/gateway-discovery-ambari/src/main/java/org/apache/hadoop/gateway/topology/discovery/ambari/AmbariClientCommon.java new file mode 100644 index 0000000..a2bf4ea --- /dev/null +++ b/gateway-discovery-ambari/src/main/java/org/apache/hadoop/gateway/topology/discovery/ambari/AmbariClientCommon.java @@ -0,0 +1,102 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with this + * work for additional information regarding copyright ownership. The ASF + * licenses this file to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * <p> + * http://www.apache.org/licenses/LICENSE-2.0 + * <p> + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, WITHOUT + * WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the + * License for the specific language governing permissions and limitations under + * the License. + */ +package org.apache.hadoop.gateway.topology.discovery.ambari; + +import net.minidev.json.JSONArray; +import net.minidev.json.JSONObject; +import org.apache.hadoop.gateway.i18n.messages.MessagesFactory; +import org.apache.hadoop.gateway.services.security.AliasService; +import org.apache.hadoop.gateway.topology.discovery.ServiceDiscoveryConfig; + +import java.util.HashMap; +import java.util.Map; + +class AmbariClientCommon { + + static final String AMBARI_CLUSTERS_URI = "/api/v1/clusters"; + + static final String AMBARI_HOSTROLES_URI = + AMBARI_CLUSTERS_URI + "/%s/services?fields=components/host_components/HostRoles"; + + static final String AMBARI_SERVICECONFIGS_URI = + AMBARI_CLUSTERS_URI + "/%s/configurations/service_config_versions?is_current=true"; + + private static final AmbariServiceDiscoveryMessages log = MessagesFactory.get(AmbariServiceDiscoveryMessages.class); + + private RESTInvoker restClient; + + + AmbariClientCommon(AliasService aliasService) { + this(new RESTInvoker(aliasService)); + } + + + AmbariClientCommon(RESTInvoker restInvoker) { + this.restClient = restInvoker; + } + + + + Map<String, Map<String, AmbariCluster.ServiceConfiguration>> getActiveServiceConfigurations(String clusterName, + ServiceDiscoveryConfig config) { + return getActiveServiceConfigurations(config.getAddress(), + clusterName, + config.getUser(), + config.getPasswordAlias()); + } + + + Map<String, Map<String, AmbariCluster.ServiceConfiguration>> getActiveServiceConfigurations(String discoveryAddress, + String clusterName, + String discoveryUser, + String discoveryPwdAlias) { + Map<String, Map<String, AmbariCluster.ServiceConfiguration>> serviceConfigurations = new HashMap<>(); + + String serviceConfigsURL = String.format("%s" + AMBARI_SERVICECONFIGS_URI, discoveryAddress, clusterName); + + JSONObject serviceConfigsJSON = restClient.invoke(serviceConfigsURL, discoveryUser, discoveryPwdAlias); + if (serviceConfigsJSON != null) { + // Process the service configurations + JSONArray serviceConfigs = (JSONArray) serviceConfigsJSON.get("items"); + for (Object serviceConfig : serviceConfigs) { + String serviceName = (String) ((JSONObject) serviceConfig).get("service_name"); + JSONArray configurations = (JSONArray) ((JSONObject) serviceConfig).get("configurations"); + for (Object configuration : configurations) { + String configType = (String) ((JSONObject) configuration).get("type"); + String configVersion = String.valueOf(((JSONObject) configuration).get("version")); + + Map<String, String> configProps = new HashMap<>(); + JSONObject configProperties = (JSONObject) ((JSONObject) configuration).get("properties"); + for (String propertyName : configProperties.keySet()) { + configProps.put(propertyName, String.valueOf(((JSONObject) configProperties).get(propertyName))); + } + if (!serviceConfigurations.containsKey(serviceName)) { + serviceConfigurations.put(serviceName, new HashMap<>()); + } + serviceConfigurations.get(serviceName).put(configType, + new AmbariCluster.ServiceConfiguration(configType, + configVersion, + configProps)); + } + } + } + + return serviceConfigurations; + } + + +} http://git-wip-us.apache.org/repos/asf/knox/blob/a874f399/gateway-discovery-ambari/src/main/java/org/apache/hadoop/gateway/topology/discovery/ambari/AmbariCluster.java ---------------------------------------------------------------------- diff --git a/gateway-discovery-ambari/src/main/java/org/apache/hadoop/gateway/topology/discovery/ambari/AmbariCluster.java b/gateway-discovery-ambari/src/main/java/org/apache/hadoop/gateway/topology/discovery/ambari/AmbariCluster.java index c841d9c..1d308cc 100644 --- a/gateway-discovery-ambari/src/main/java/org/apache/hadoop/gateway/topology/discovery/ambari/AmbariCluster.java +++ b/gateway-discovery-ambari/src/main/java/org/apache/hadoop/gateway/topology/discovery/ambari/AmbariCluster.java @@ -63,6 +63,11 @@ class AmbariCluster implements ServiceDiscovery.Cluster { } + Map<String, Map<String, ServiceConfiguration>> getServiceConfigurations() { + return serviceConfigurations; + } + + Map<String, AmbariComponent> getComponents() { return components; } http://git-wip-us.apache.org/repos/asf/knox/blob/a874f399/gateway-discovery-ambari/src/main/java/org/apache/hadoop/gateway/topology/discovery/ambari/AmbariClusterConfigurationMonitorProvider.java ---------------------------------------------------------------------- diff --git a/gateway-discovery-ambari/src/main/java/org/apache/hadoop/gateway/topology/discovery/ambari/AmbariClusterConfigurationMonitorProvider.java b/gateway-discovery-ambari/src/main/java/org/apache/hadoop/gateway/topology/discovery/ambari/AmbariClusterConfigurationMonitorProvider.java new file mode 100644 index 0000000..3b31124 --- /dev/null +++ b/gateway-discovery-ambari/src/main/java/org/apache/hadoop/gateway/topology/discovery/ambari/AmbariClusterConfigurationMonitorProvider.java @@ -0,0 +1,35 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with this + * work for additional information regarding copyright ownership. The ASF + * licenses this file to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * <p> + * http://www.apache.org/licenses/LICENSE-2.0 + * <p> + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, WITHOUT + * WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the + * License for the specific language governing permissions and limitations under + * the License. + */ +package org.apache.hadoop.gateway.topology.discovery.ambari; + +import org.apache.hadoop.gateway.config.GatewayConfig; +import org.apache.hadoop.gateway.services.security.AliasService; +import org.apache.hadoop.gateway.topology.discovery.ClusterConfigurationMonitor; +import org.apache.hadoop.gateway.topology.discovery.ClusterConfigurationMonitorProvider; + +public class AmbariClusterConfigurationMonitorProvider implements ClusterConfigurationMonitorProvider { + + @Override + public String getType() { + return AmbariConfigurationMonitor.getType(); + } + + @Override + public ClusterConfigurationMonitor newInstance(GatewayConfig config, AliasService aliasService) { + return new AmbariConfigurationMonitor(config, aliasService); + } +} http://git-wip-us.apache.org/repos/asf/knox/blob/a874f399/gateway-discovery-ambari/src/main/java/org/apache/hadoop/gateway/topology/discovery/ambari/AmbariConfigurationMonitor.java ---------------------------------------------------------------------- diff --git a/gateway-discovery-ambari/src/main/java/org/apache/hadoop/gateway/topology/discovery/ambari/AmbariConfigurationMonitor.java b/gateway-discovery-ambari/src/main/java/org/apache/hadoop/gateway/topology/discovery/ambari/AmbariConfigurationMonitor.java new file mode 100644 index 0000000..e4b5e43 --- /dev/null +++ b/gateway-discovery-ambari/src/main/java/org/apache/hadoop/gateway/topology/discovery/ambari/AmbariConfigurationMonitor.java @@ -0,0 +1,525 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with this + * work for additional information regarding copyright ownership. The ASF + * licenses this file to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * <p> + * http://www.apache.org/licenses/LICENSE-2.0 + * <p> + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, WITHOUT + * WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the + * License for the specific language governing permissions and limitations under + * the License. + */ +package org.apache.hadoop.gateway.topology.discovery.ambari; + +import org.apache.commons.io.FileUtils; +import org.apache.hadoop.gateway.config.GatewayConfig; +import org.apache.hadoop.gateway.i18n.messages.MessagesFactory; +import org.apache.hadoop.gateway.services.security.AliasService; +import org.apache.hadoop.gateway.topology.discovery.ClusterConfigurationMonitor; +import org.apache.hadoop.gateway.topology.discovery.ServiceDiscoveryConfig; + +import java.io.File; +import java.io.FileInputStream; +import java.io.FileOutputStream; +import java.io.IOException; +import java.util.ArrayList; +import java.util.Collection; +import java.util.HashMap; +import java.util.List; +import java.util.Map; +import java.util.Properties; +import java.util.concurrent.locks.ReadWriteLock; +import java.util.concurrent.locks.ReentrantReadWriteLock; + + +class AmbariConfigurationMonitor implements ClusterConfigurationMonitor { + + private static final String TYPE = "Ambari"; + + private static final String CLUSTERS_DATA_DIR_NAME = "clusters"; + + private static final String PERSISTED_FILE_COMMENT = "Generated File. Do Not Edit!"; + + private static final String PROP_CLUSTER_PREFIX = "cluster."; + private static final String PROP_CLUSTER_SOURCE = PROP_CLUSTER_PREFIX + "source"; + private static final String PROP_CLUSTER_NAME = PROP_CLUSTER_PREFIX + "name"; + private static final String PROP_CLUSTER_USER = PROP_CLUSTER_PREFIX + "user"; + private static final String PROP_CLUSTER_ALIAS = PROP_CLUSTER_PREFIX + "pwd.alias"; + + static final String INTERVAL_PROPERTY_NAME = "org.apache.hadoop.gateway.topology.discovery.ambari.monitor.interval"; + + + private static final AmbariServiceDiscoveryMessages log = MessagesFactory.get(AmbariServiceDiscoveryMessages.class); + + // Ambari address + // clusterName -> ServiceDiscoveryConfig + // + Map<String, Map<String, ServiceDiscoveryConfig>> clusterMonitorConfigurations = new HashMap<>(); + + // Ambari address + // clusterName + // configType -> version + // + Map<String, Map<String, Map<String, String>>> ambariClusterConfigVersions = new HashMap<>(); + + ReadWriteLock configVersionsLock = new ReentrantReadWriteLock(); + + private List<ConfigurationChangeListener> changeListeners = new ArrayList<>(); + + private AmbariClientCommon ambariClient; + + PollingConfigAnalyzer internalMonitor; + + GatewayConfig gatewayConfig = null; + + static String getType() { + return TYPE; + } + + AmbariConfigurationMonitor(GatewayConfig config, AliasService aliasService) { + this.gatewayConfig = config; + this.ambariClient = new AmbariClientCommon(aliasService); + this.internalMonitor = new PollingConfigAnalyzer(this); + + // Override the default polling interval if it has been configured + int interval = config.getClusterMonitorPollingInterval(getType()); + if (interval > 0) { + setPollingInterval(interval); + } + + init(); + } + + @Override + public void setPollingInterval(int interval) { + internalMonitor.setInterval(interval); + } + + private void init() { + loadDiscoveryConfiguration(); + loadClusterVersionData(); + } + + /** + * Load any previously-persisted service discovery configurations. + * This is necessary for checking previously-deployed topologies. + */ + private void loadDiscoveryConfiguration() { + File persistenceDir = getPersistenceDir(); + if (persistenceDir != null) { + Collection<File> persistedConfigs = FileUtils.listFiles(persistenceDir, new String[]{"conf"}, false); + for (File persisted : persistedConfigs) { + Properties props = new Properties(); + try { + props.load(new FileInputStream(persisted)); + + addDiscoveryConfig(props.getProperty(PROP_CLUSTER_NAME), new ServiceDiscoveryConfig() { + public String getAddress() { + return props.getProperty(PROP_CLUSTER_SOURCE); + } + + public String getUser() { + return props.getProperty(PROP_CLUSTER_USER); + } + + public String getPasswordAlias() { + return props.getProperty(PROP_CLUSTER_ALIAS); + } + }); + } catch (IOException e) { + log.failedToLoadClusterMonitorServiceDiscoveryConfig(getType(), e); + } + } + } + } + + /** + * Load any previously-persisted cluster configuration version records, so the monitor will check + * previously-deployed topologies against the current cluster configuration. + */ + private void loadClusterVersionData() { + File persistenceDir = getPersistenceDir(); + if (persistenceDir != null) { + Collection<File> persistedConfigs = FileUtils.listFiles(getPersistenceDir(), new String[]{"ver"}, false); + for (File persisted : persistedConfigs) { + Properties props = new Properties(); + try { + props.load(new FileInputStream(persisted)); + + String source = props.getProperty(PROP_CLUSTER_SOURCE); + String clusterName = props.getProperty(PROP_CLUSTER_NAME); + + Map<String, String> configVersions = new HashMap<>(); + for (String name : props.stringPropertyNames()) { + if (!name.startsWith(PROP_CLUSTER_PREFIX)) { // Ignore implementation-specific properties + configVersions.put(name, props.getProperty(name)); + } + } + + // Map the config versions to the cluster name + addClusterConfigVersions(source, clusterName, configVersions); + + } catch (IOException e) { + log.failedToLoadClusterMonitorConfigVersions(getType(), e); + } + } + } + } + + private void persistDiscoveryConfiguration(String clusterName, ServiceDiscoveryConfig sdc) { + File persistenceDir = getPersistenceDir(); + if (persistenceDir != null) { + + Properties props = new Properties(); + props.setProperty(PROP_CLUSTER_NAME, clusterName); + props.setProperty(PROP_CLUSTER_SOURCE, sdc.getAddress()); + + String username = sdc.getUser(); + if (username != null) { + props.setProperty(PROP_CLUSTER_USER, username); + } + String pwdAlias = sdc.getPasswordAlias(); + if (pwdAlias != null) { + props.setProperty(PROP_CLUSTER_ALIAS, pwdAlias); + } + + persist(props, getDiscoveryConfigPersistenceFile(sdc.getAddress(), clusterName)); + } + } + + private void persistClusterVersionData(String address, String clusterName, Map<String, String> configVersions) { + File persistenceDir = getPersistenceDir(); + if (persistenceDir != null) { + Properties props = new Properties(); + props.setProperty(PROP_CLUSTER_NAME, clusterName); + props.setProperty(PROP_CLUSTER_SOURCE, address); + for (String name : configVersions.keySet()) { + props.setProperty(name, configVersions.get(name)); + } + + persist(props, getConfigVersionsPersistenceFile(address, clusterName)); + } + } + + private void persist(Properties props, File dest) { + try { + props.store(new FileOutputStream(dest), PERSISTED_FILE_COMMENT); + } catch (Exception e) { + log.failedToPersistClusterMonitorData(getType(), dest.getAbsolutePath(), e); + } + } + + private File getPersistenceDir() { + File persistenceDir = null; + + File dataDir = new File(gatewayConfig.getGatewayDataDir()); + if (dataDir.exists()) { + File clustersDir = new File(dataDir, CLUSTERS_DATA_DIR_NAME); + if (!clustersDir.exists()) { + clustersDir.mkdirs(); + } + persistenceDir = clustersDir; + } + + return persistenceDir; + } + + private File getDiscoveryConfigPersistenceFile(String address, String clusterName) { + return getPersistenceFile(address, clusterName, "conf"); + } + + private File getConfigVersionsPersistenceFile(String address, String clusterName) { + return getPersistenceFile(address, clusterName, "ver"); + } + + private File getPersistenceFile(String address, String clusterName, String ext) { + String fileName = address.replace(":", "_").replace("/", "_") + "-" + clusterName + "." + ext; + return new File(getPersistenceDir(), fileName); + } + + /** + * Add cluster configuration details to the monitor's in-memory record. + * + * @param address An Ambari instance address. + * @param clusterName The name of a cluster associated with the Ambari instance. + * @param configVersions A Map of configuration types and their corresponding versions. + */ + private void addClusterConfigVersions(String address, String clusterName, Map<String, String> configVersions) { + configVersionsLock.writeLock().lock(); + try { + ambariClusterConfigVersions.computeIfAbsent(address, k -> new HashMap<>()) + .put(clusterName, configVersions); + } finally { + configVersionsLock.writeLock().unlock(); + } + } + + public void start() { + (new Thread(internalMonitor, "AmbariConfigurationMonitor")).start(); + } + + public void stop() { + internalMonitor.stop(); + } + + @Override + public void addListener(ConfigurationChangeListener listener) { + changeListeners.add(listener); + } + + /** + * Add discovery configuration details for the specified cluster, so the monitor knows how to connect to check for + * changes. + * + * @param clusterName The name of the cluster. + * @param config The associated service discovery configuration. + */ + void addDiscoveryConfig(String clusterName, ServiceDiscoveryConfig config) { + clusterMonitorConfigurations.computeIfAbsent(config.getAddress(), k -> new HashMap<>()).put(clusterName, config); + } + + + /** + * Get the service discovery configuration associated with the specified Ambari instance and cluster. + * + * @param address An Ambari instance address. + * @param clusterName The name of a cluster associated with the Ambari instance. + * + * @return The associated ServiceDiscoveryConfig object. + */ + ServiceDiscoveryConfig getDiscoveryConfig(String address, String clusterName) { + ServiceDiscoveryConfig config = null; + if (clusterMonitorConfigurations.containsKey(address)) { + config = clusterMonitorConfigurations.get(address).get(clusterName); + } + return config; + } + + + /** + * Add cluster configuration data to the monitor, which it will use when determining if configuration has changed. + * + * @param cluster An AmbariCluster object. + * @param discoveryConfig The discovery configuration associated with the cluster. + */ + void addClusterConfigVersions(AmbariCluster cluster, ServiceDiscoveryConfig discoveryConfig) { + + String clusterName = cluster.getName(); + + // Register the cluster discovery configuration for the monitor connections + persistDiscoveryConfiguration(clusterName, discoveryConfig); + addDiscoveryConfig(clusterName, discoveryConfig); + + // Build the set of configuration versions + Map<String, String> configVersions = new HashMap<>(); + Map<String, Map<String, AmbariCluster.ServiceConfiguration>> serviceConfigs = cluster.getServiceConfigurations(); + for (String serviceName : serviceConfigs.keySet()) { + Map<String, AmbariCluster.ServiceConfiguration> configTypeVersionMap = serviceConfigs.get(serviceName); + for (AmbariCluster.ServiceConfiguration config : configTypeVersionMap.values()) { + String configType = config.getType(); + String version = config.getVersion(); + configVersions.put(configType, version); + } + } + + persistClusterVersionData(discoveryConfig.getAddress(), clusterName, configVersions); + addClusterConfigVersions(discoveryConfig.getAddress(), clusterName, configVersions); + } + + + /** + * Remove the configuration record for the specified Ambari instance and cluster name. + * + * @param address An Ambari instance address. + * @param clusterName The name of a cluster associated with the Ambari instance. + * + * @return The removed data; A Map of configuration types and their corresponding versions. + */ + Map<String, String> removeClusterConfigVersions(String address, String clusterName) { + Map<String, String> result = new HashMap<>(); + + configVersionsLock.writeLock().lock(); + try { + if (ambariClusterConfigVersions.containsKey(address)) { + result.putAll(ambariClusterConfigVersions.get(address).remove(clusterName)); + } + } finally { + configVersionsLock.writeLock().unlock(); + } + + // Delete the associated persisted record + File persisted = getConfigVersionsPersistenceFile(address, clusterName); + if (persisted.exists()) { + persisted.delete(); + } + + return result; + } + + /** + * Get the cluster configuration details for the specified cluster and Ambari instance. + * + * @param address An Ambari instance address. + * @param clusterName The name of a cluster associated with the Ambari instance. + * + * @return A Map of configuration types and their corresponding versions. + */ + Map<String, String> getClusterConfigVersions(String address, String clusterName) { + Map<String, String> result = new HashMap<>(); + + configVersionsLock.readLock().lock(); + try { + if (ambariClusterConfigVersions.containsKey(address)) { + result.putAll(ambariClusterConfigVersions.get(address).get(clusterName)); + } + } finally { + configVersionsLock.readLock().unlock(); + } + + return result; + } + + + /** + * Get all the clusters the monitor knows about. + * + * @return A Map of Ambari instance addresses to associated cluster names. + */ + Map<String, List<String>> getClusterNames() { + Map<String, List<String>> result = new HashMap<>(); + + configVersionsLock.readLock().lock(); + try { + for (String address : ambariClusterConfigVersions.keySet()) { + List<String> clusterNames = new ArrayList<>(); + clusterNames.addAll(ambariClusterConfigVersions.get(address).keySet()); + result.put(address, clusterNames); + } + } finally { + configVersionsLock.readLock().unlock(); + } + + return result; + + } + + + /** + * Notify registered change listeners. + * + * @param source The address of the Ambari instance from which the cluster details were determined. + * @param clusterName The name of the cluster whose configuration details have changed. + */ + void notifyChangeListeners(String source, String clusterName) { + for (ConfigurationChangeListener listener : changeListeners) { + listener.onConfigurationChange(source, clusterName); + } + } + + + /** + * Request the current active configuration version info from Ambari. + * + * @param address The Ambari instance address. + * @param clusterName The name of the cluster for which the details are desired. + * + * @return A Map of service configuration types and their corresponding versions. + */ + Map<String, String> getUpdatedConfigVersions(String address, String clusterName) { + Map<String, String> configVersions = new HashMap<>(); + + Map<String, Map<String, AmbariCluster.ServiceConfiguration>> serviceConfigs = + ambariClient.getActiveServiceConfigurations(clusterName, getDiscoveryConfig(address, clusterName)); + + for (Map<String, AmbariCluster.ServiceConfiguration> serviceConfig : serviceConfigs.values()) { + for (AmbariCluster.ServiceConfiguration config : serviceConfig.values()) { + configVersions.put(config.getType(), config.getVersion()); + } + } + + return configVersions; + } + + + /** + * The thread that polls Ambari for configuration details for clusters associated with discovered topologies, + * compares them with the current recorded values, and notifies any listeners when differences are discovered. + */ + static final class PollingConfigAnalyzer implements Runnable { + + private static final int DEFAULT_POLLING_INTERVAL = 60; + + // Polling interval in seconds + private int interval = DEFAULT_POLLING_INTERVAL; + + private AmbariConfigurationMonitor delegate; + + private boolean isActive = false; + + PollingConfigAnalyzer(AmbariConfigurationMonitor delegate) { + this.delegate = delegate; + this.interval = Integer.getInteger(INTERVAL_PROPERTY_NAME, PollingConfigAnalyzer.DEFAULT_POLLING_INTERVAL); + } + + void setInterval(int interval) { + this.interval = interval; + } + + + void stop() { + isActive = false; + } + + @Override + public void run() { + isActive = true; + + log.startedAmbariConfigMonitor(interval); + + while (isActive) { + for (Map.Entry<String, List<String>> entry : delegate.getClusterNames().entrySet()) { + String address = entry.getKey(); + for (String clusterName : entry.getValue()) { + Map<String, String> configVersions = delegate.getClusterConfigVersions(address, clusterName); + if (configVersions != null && !configVersions.isEmpty()) { + Map<String, String> updatedVersions = delegate.getUpdatedConfigVersions(address, clusterName); + if (updatedVersions != null && !updatedVersions.isEmpty()) { + boolean configHasChanged = false; + + // If the config sets don't match in size, then something has changed + if (updatedVersions.size() != configVersions.size()) { + configHasChanged = true; + } else { + // Perform the comparison of all the config versions + for (Map.Entry<String, String> configVersion : configVersions.entrySet()) { + if (!updatedVersions.get(configVersion.getKey()).equals(configVersion.getValue())) { + configHasChanged = true; + break; + } + } + } + + // If a change has occurred, notify the listeners + if (configHasChanged) { + delegate.notifyChangeListeners(address, clusterName); + } + } + } + } + } + + try { + Thread.sleep(interval * 1000); + } catch (InterruptedException e) { + // Ignore + } + } + } + } + +} http://git-wip-us.apache.org/repos/asf/knox/blob/a874f399/gateway-discovery-ambari/src/main/java/org/apache/hadoop/gateway/topology/discovery/ambari/AmbariServiceDiscovery.java ---------------------------------------------------------------------- diff --git a/gateway-discovery-ambari/src/main/java/org/apache/hadoop/gateway/topology/discovery/ambari/AmbariServiceDiscovery.java b/gateway-discovery-ambari/src/main/java/org/apache/hadoop/gateway/topology/discovery/ambari/AmbariServiceDiscovery.java index b7f9f53..765a928 100644 --- a/gateway-discovery-ambari/src/main/java/org/apache/hadoop/gateway/topology/discovery/ambari/AmbariServiceDiscovery.java +++ b/gateway-discovery-ambari/src/main/java/org/apache/hadoop/gateway/topology/discovery/ambari/AmbariServiceDiscovery.java @@ -16,7 +16,7 @@ */ package org.apache.hadoop.gateway.topology.discovery.ambari; -import java.io.IOException; +import java.lang.reflect.Method; import java.util.ArrayList; import java.util.HashMap; import java.util.List; @@ -25,38 +25,32 @@ import java.util.Properties; import net.minidev.json.JSONArray; import net.minidev.json.JSONObject; -import net.minidev.json.JSONValue; -import org.apache.hadoop.gateway.config.ConfigurationException; import org.apache.hadoop.gateway.i18n.messages.MessagesFactory; +import org.apache.hadoop.gateway.services.GatewayServices; import org.apache.hadoop.gateway.services.security.AliasService; -import org.apache.hadoop.gateway.services.security.AliasServiceException; +import org.apache.hadoop.gateway.topology.ClusterConfigurationMonitorService; +import org.apache.hadoop.gateway.topology.discovery.ClusterConfigurationMonitor; import org.apache.hadoop.gateway.topology.discovery.GatewayService; import org.apache.hadoop.gateway.topology.discovery.ServiceDiscovery; import org.apache.hadoop.gateway.topology.discovery.ServiceDiscoveryConfig; -import org.apache.http.HttpEntity; -import org.apache.http.HttpStatus; -import org.apache.http.client.methods.CloseableHttpResponse; -import org.apache.http.client.methods.HttpGet; -import org.apache.http.impl.client.CloseableHttpClient; -import org.apache.http.message.BasicHeader; -import org.apache.http.util.EntityUtils; class AmbariServiceDiscovery implements ServiceDiscovery { static final String TYPE = "AMBARI"; - static final String AMBARI_CLUSTERS_URI = "/api/v1/clusters"; + static final String AMBARI_CLUSTERS_URI = AmbariClientCommon.AMBARI_CLUSTERS_URI; - static final String AMBARI_HOSTROLES_URI = - AMBARI_CLUSTERS_URI + "/%s/services?fields=components/host_components/HostRoles"; + static final String AMBARI_HOSTROLES_URI = AmbariClientCommon.AMBARI_HOSTROLES_URI; - static final String AMBARI_SERVICECONFIGS_URI = - AMBARI_CLUSTERS_URI + "/%s/configurations/service_config_versions?is_current=true"; + static final String AMBARI_SERVICECONFIGS_URI = AmbariClientCommon.AMBARI_SERVICECONFIGS_URI; private static final String COMPONENT_CONFIG_MAPPING_FILE = "ambari-service-discovery-component-config-mapping.properties"; + private static final String GATEWAY_SERVICES_ACCESSOR_CLASS = "org.apache.hadoop.gateway.GatewayServer"; + private static final String GATEWAY_SERVICES_ACCESSOR_METHOD = "getGatewayServices"; + private static final AmbariServiceDiscoveryMessages log = MessagesFactory.get(AmbariServiceDiscoveryMessages.class); // Map of component names to service configuration types @@ -69,21 +63,76 @@ class AmbariServiceDiscovery implements ServiceDiscovery { componentServiceConfigs.put(componentName, configMapping.getProperty(componentName)); } } catch (Exception e) { - log.failedToLoadServiceDiscoveryConfiguration(COMPONENT_CONFIG_MAPPING_FILE, e); + log.failedToLoadServiceDiscoveryURLDefConfiguration(COMPONENT_CONFIG_MAPPING_FILE, e); } } - private static final String DEFAULT_USER_ALIAS = "ambari.discovery.user"; - private static final String DEFAULT_PWD_ALIAS = "ambari.discovery.password"; - @GatewayService private AliasService aliasService; - private CloseableHttpClient httpClient = null; + private RESTInvoker restClient; + private AmbariClientCommon ambariClient; + // This is used to update the monitor when new cluster configuration details are discovered. + private AmbariConfigurationMonitor configChangeMonitor; + + private boolean isInitialized = false; AmbariServiceDiscovery() { - httpClient = org.apache.http.impl.client.HttpClients.createDefault(); + } + + + AmbariServiceDiscovery(RESTInvoker restClient) { + this.restClient = restClient; + } + + + /** + * Initialization must be subsequent to construction because the AliasService member isn't assigned until after + * construction time. This is called internally prior to discovery invocations to make sure the clients have been + * initialized. + */ + private void init() { + if (!isInitialized) { + if (this.restClient == null) { + this.restClient = new RESTInvoker(aliasService); + } + this.ambariClient = new AmbariClientCommon(restClient); + this.configChangeMonitor = getConfigurationChangeMonitor(); + + isInitialized = true; + } + } + + + /** + * Get the Ambari configuration change monitor from the associated gateway service. + */ + private AmbariConfigurationMonitor getConfigurationChangeMonitor() { + AmbariConfigurationMonitor ambariMonitor = null; + try { + Class clazz = Class.forName(GATEWAY_SERVICES_ACCESSOR_CLASS); + if (clazz != null) { + Method m = clazz.getDeclaredMethod(GATEWAY_SERVICES_ACCESSOR_METHOD); + if (m != null) { + Object obj = m.invoke(null); + if (GatewayServices.class.isAssignableFrom(obj.getClass())) { + ClusterConfigurationMonitorService clusterMonitorService = + ((GatewayServices) obj).getService(GatewayServices.CLUSTER_CONFIGURATION_MONITOR_SERVICE); + ClusterConfigurationMonitor monitor = + clusterMonitorService.getMonitor(AmbariConfigurationMonitor.getType()); + if (monitor != null) { + if (AmbariConfigurationMonitor.class.isAssignableFrom(monitor.getClass())) { + ambariMonitor = (AmbariConfigurationMonitor) monitor; + } + } + } + } + } + } catch (Exception e) { + log.errorAccessingConfigurationChangeMonitor(e); + } + return ambariMonitor; } @@ -95,14 +144,16 @@ class AmbariServiceDiscovery implements ServiceDiscovery { @Override public Map<String, Cluster> discover(ServiceDiscoveryConfig config) { - Map<String, Cluster> clusters = new HashMap<String, Cluster>(); + Map<String, Cluster> clusters = new HashMap<>(); + + init(); String discoveryAddress = config.getAddress(); // Invoke Ambari REST API to discover the available clusters String clustersDiscoveryURL = String.format("%s" + AMBARI_CLUSTERS_URI, discoveryAddress); - JSONObject json = invokeREST(clustersDiscoveryURL, config.getUser(), config.getPasswordAlias()); + JSONObject json = restClient.invoke(clustersDiscoveryURL, config.getUser(), config.getPasswordAlias()); // Parse the cluster names from the response, and perform the cluster discovery JSONArray clusterItems = (JSONArray) json.get("items"); @@ -126,13 +177,15 @@ class AmbariServiceDiscovery implements ServiceDiscovery { Map<String, String> serviceComponents = new HashMap<>(); + init(); + String discoveryAddress = config.getAddress(); String discoveryUser = config.getUser(); String discoveryPwdAlias = config.getPasswordAlias(); Map<String, List<String>> componentHostNames = new HashMap<>(); String hostRolesURL = String.format("%s" + AMBARI_HOSTROLES_URI, discoveryAddress, clusterName); - JSONObject hostRolesJSON = invokeREST(hostRolesURL, discoveryUser, discoveryPwdAlias); + JSONObject hostRolesJSON = restClient.invoke(hostRolesURL, discoveryUser, discoveryPwdAlias); if (hostRolesJSON != null) { // Process the host roles JSON JSONArray items = (JSONArray) hostRolesJSON.get("items"); @@ -158,7 +211,7 @@ class AmbariServiceDiscovery implements ServiceDiscovery { if (hostName != null) { log.discoveredServiceHost(serviceName, hostName); if (!componentHostNames.containsKey(componentName)) { - componentHostNames.put(componentName, new ArrayList<String>()); + componentHostNames.put(componentName, new ArrayList<>()); } componentHostNames.get(componentName).add(hostName); } @@ -167,31 +220,15 @@ class AmbariServiceDiscovery implements ServiceDiscovery { } } + // Service configurations Map<String, Map<String, AmbariCluster.ServiceConfiguration>> serviceConfigurations = - new HashMap<String, Map<String, AmbariCluster.ServiceConfiguration>>(); - String serviceConfigsURL = String.format("%s" + AMBARI_SERVICECONFIGS_URI, discoveryAddress, clusterName); - JSONObject serviceConfigsJSON = invokeREST(serviceConfigsURL, discoveryUser, discoveryPwdAlias); - if (serviceConfigsJSON != null) { - // Process the service configurations - JSONArray serviceConfigs = (JSONArray) serviceConfigsJSON.get("items"); - for (Object serviceConfig : serviceConfigs) { - String serviceName = (String) ((JSONObject) serviceConfig).get("service_name"); - JSONArray configurations = (JSONArray) ((JSONObject) serviceConfig).get("configurations"); - for (Object configuration : configurations) { - String configType = (String) ((JSONObject) configuration).get("type"); - String configVersion = String.valueOf(((JSONObject) configuration).get("version")); - - Map<String, String> configProps = new HashMap<String, String>(); - JSONObject configProperties = (JSONObject) ((JSONObject) configuration).get("properties"); - for (String propertyName : configProperties.keySet()) { - configProps.put(propertyName, String.valueOf(((JSONObject) configProperties).get(propertyName))); - } - if (!serviceConfigurations.containsKey(serviceName)) { - serviceConfigurations.put(serviceName, new HashMap<String, AmbariCluster.ServiceConfiguration>()); - } - serviceConfigurations.get(serviceName).put(configType, new AmbariCluster.ServiceConfiguration(configType, configVersion, configProps)); - cluster.addServiceConfiguration(serviceName, configType, new AmbariCluster.ServiceConfiguration(configType, configVersion, configProps)); - } + ambariClient.getActiveServiceConfigurations(discoveryAddress, + clusterName, + discoveryUser, + discoveryPwdAlias); + for (String serviceName : serviceConfigurations.keySet()) { + for (Map.Entry<String, AmbariCluster.ServiceConfiguration> serviceConfig : serviceConfigurations.get(serviceName).entrySet()) { + cluster.addServiceConfiguration(serviceName, serviceConfig.getKey(), serviceConfig.getValue()); } } @@ -214,93 +251,12 @@ class AmbariServiceDiscovery implements ServiceDiscovery { } } - return cluster; - } - - - protected JSONObject invokeREST(String url, String username, String passwordAlias) { - JSONObject result = null; - - CloseableHttpResponse response = null; - try { - HttpGet request = new HttpGet(url); - - // If no configured username, then use default username alias - String password = null; - if (username == null) { - if (aliasService != null) { - try { - char[] defaultUser = aliasService.getPasswordFromAliasForGateway(DEFAULT_USER_ALIAS); - if (defaultUser != null) { - username = new String(defaultUser); - } - } catch (AliasServiceException e) { - log.aliasServiceUserError(DEFAULT_USER_ALIAS, e.getLocalizedMessage()); - } - } - - // If username is still null - if (username == null) { - log.aliasServiceUserNotFound(); - throw new ConfigurationException("No username is configured for Ambari service discovery."); - } - } - - if (aliasService != null) { - // If no password alias is configured, then try the default alias - if (passwordAlias == null) { - passwordAlias = DEFAULT_PWD_ALIAS; - } - - try { - char[] pwd = aliasService.getPasswordFromAliasForGateway(passwordAlias); - if (pwd != null) { - password = new String(pwd); - } - - } catch (AliasServiceException e) { - log.aliasServicePasswordError(passwordAlias, e.getLocalizedMessage()); - } - } - - // If the password could not be determined - if (password == null) { - log.aliasServicePasswordNotFound(); - throw new ConfigurationException("No password is configured for Ambari service discovery."); - } - - // Add an auth header if credentials are available - String encodedCreds = - org.apache.commons.codec.binary.Base64.encodeBase64String((username + ":" + password).getBytes()); - request.addHeader(new BasicHeader("Authorization", "Basic " + encodedCreds)); - - response = httpClient.execute(request); - - if (HttpStatus.SC_OK == response.getStatusLine().getStatusCode()) { - HttpEntity entity = response.getEntity(); - if (entity != null) { - result = (JSONObject) JSONValue.parse((EntityUtils.toString(entity))); - log.debugJSON(result.toJSONString()); - } else { - log.noJSON(url); - } - } else { - log.unexpectedRestResponseStatusCode(url, response.getStatusLine().getStatusCode()); - } - - } catch (IOException e) { - log.restInvocationError(url, e); - } finally { - if(response != null) { - try { - response.close(); - } catch (IOException e) { - // Ignore - } - } + if (configChangeMonitor != null) { + // Notify the cluster config monitor about these cluster configuration details + configChangeMonitor.addClusterConfigVersions(cluster, config); } - return result; - } + return cluster; + } } http://git-wip-us.apache.org/repos/asf/knox/blob/a874f399/gateway-discovery-ambari/src/main/java/org/apache/hadoop/gateway/topology/discovery/ambari/AmbariServiceDiscoveryMessages.java ---------------------------------------------------------------------- diff --git a/gateway-discovery-ambari/src/main/java/org/apache/hadoop/gateway/topology/discovery/ambari/AmbariServiceDiscoveryMessages.java b/gateway-discovery-ambari/src/main/java/org/apache/hadoop/gateway/topology/discovery/ambari/AmbariServiceDiscoveryMessages.java index 0661224..51bbe0e 100644 --- a/gateway-discovery-ambari/src/main/java/org/apache/hadoop/gateway/topology/discovery/ambari/AmbariServiceDiscoveryMessages.java +++ b/gateway-discovery-ambari/src/main/java/org/apache/hadoop/gateway/topology/discovery/ambari/AmbariServiceDiscoveryMessages.java @@ -25,24 +25,44 @@ import org.apache.hadoop.gateway.i18n.messages.StackTrace; public interface AmbariServiceDiscoveryMessages { @Message(level = MessageLevel.ERROR, - text = "Failed to load service discovery configuration: {1}") - void failedToLoadServiceDiscoveryConfiguration(@StackTrace(level = MessageLevel.ERROR) Exception e); + text = "Failed to persist data for cluster configuration monitor {0} {1}: {2}") + void failedToPersistClusterMonitorData(final String monitor, + final String filename, + @StackTrace(level = MessageLevel.DEBUG) Exception e); @Message(level = MessageLevel.ERROR, - text = "Failed to load service discovery configuration {0}: {1}") - void failedToLoadServiceDiscoveryConfiguration(final String configuration, - @StackTrace(level = MessageLevel.ERROR) Exception e); + text = "Failed to load persisted service discovery configuration for cluster monitor {0} : {1}") + void failedToLoadClusterMonitorServiceDiscoveryConfig(final String monitor, + @StackTrace(level = MessageLevel.DEBUG) Exception e); + + @Message(level = MessageLevel.ERROR, + text = "Failed to load persisted cluster configuration version data for cluster monitor {0} : {1}") + void failedToLoadClusterMonitorConfigVersions(final String monitor, + @StackTrace(level = MessageLevel.DEBUG) Exception e); + + @Message(level = MessageLevel.ERROR, + text = "Unable to access the Ambari Configuration Change Monitor: {0}") + void errorAccessingConfigurationChangeMonitor(@StackTrace(level = MessageLevel.DEBUG) Exception e); + + @Message(level = MessageLevel.ERROR, + text = "Failed to load service discovery URL definition configuration: {1}") + void failedToLoadServiceDiscoveryURLDefConfiguration(@StackTrace(level = MessageLevel.DEBUG) Exception e); + + @Message(level = MessageLevel.ERROR, + text = "Failed to load service discovery URL definition configuration {0}: {1}") + void failedToLoadServiceDiscoveryURLDefConfiguration(final String configuration, + @StackTrace(level = MessageLevel.ERROR) Exception e); @Message(level = MessageLevel.ERROR, text = "Encountered an error during cluster {0} discovery: {1}") void clusterDiscoveryError(final String clusterName, - @StackTrace(level = MessageLevel.ERROR) Exception e); + @StackTrace(level = MessageLevel.DEBUG) Exception e); @Message(level = MessageLevel.DEBUG, text = "REST invocation {0} failed: {1}") void restInvocationError(final String url, - @StackTrace(level = MessageLevel.ERROR) Exception e); + @StackTrace(level = MessageLevel.DEBUG) Exception e); @Message(level = MessageLevel.ERROR, @@ -75,20 +95,23 @@ public interface AmbariServiceDiscoveryMessages { void noJSON(final String url); - @Message(level = MessageLevel.DEBUG, + @Message(level = MessageLevel.TRACE, text = "REST invocation result: {0}") void debugJSON(final String json); + @Message(level = MessageLevel.DEBUG, - text = "Loaded component configuration mappings: {0}") + text = "Loaded component configuration mappings: {0}") void loadedComponentConfigMappings(final String mappings); + @Message(level = MessageLevel.ERROR, text = "Failed to load component configuration property mappings {0}: {1}") void failedToLoadComponentConfigMappings(final String mappings, - @StackTrace(level = MessageLevel.ERROR) Exception e); + @StackTrace(level = MessageLevel.DEBUG) Exception e); - @Message(level = MessageLevel.DEBUG, + + @Message(level = MessageLevel.TRACE, text = "Discovered: Service: {0}, Host: {1}") void discoveredServiceHost(final String serviceName, final String hostName); @@ -114,8 +137,12 @@ public interface AmbariServiceDiscoveryMessages { @Message(level = MessageLevel.DEBUG, - text = "Determined the service URL mapping property {0} value: {1}") + text = "Determined the service URL mapping property {0} value: {1}") void determinedPropertyValue(final String propertyName, final String propertyValue); + @Message(level = MessageLevel.INFO, + text = "Started Ambari cluster configuration monitor (checking every {0} seconds)") + void startedAmbariConfigMonitor(final long pollingInterval); + } http://git-wip-us.apache.org/repos/asf/knox/blob/a874f399/gateway-discovery-ambari/src/main/java/org/apache/hadoop/gateway/topology/discovery/ambari/RESTInvoker.java ---------------------------------------------------------------------- diff --git a/gateway-discovery-ambari/src/main/java/org/apache/hadoop/gateway/topology/discovery/ambari/RESTInvoker.java b/gateway-discovery-ambari/src/main/java/org/apache/hadoop/gateway/topology/discovery/ambari/RESTInvoker.java new file mode 100644 index 0000000..6a6fad8 --- /dev/null +++ b/gateway-discovery-ambari/src/main/java/org/apache/hadoop/gateway/topology/discovery/ambari/RESTInvoker.java @@ -0,0 +1,136 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with this + * work for additional information regarding copyright ownership. The ASF + * licenses this file to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * <p> + * http://www.apache.org/licenses/LICENSE-2.0 + * <p> + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, WITHOUT + * WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the + * License for the specific language governing permissions and limitations under + * the License. + */ +package org.apache.hadoop.gateway.topology.discovery.ambari; + +import net.minidev.json.JSONObject; +import net.minidev.json.JSONValue; +import org.apache.hadoop.gateway.config.ConfigurationException; +import org.apache.hadoop.gateway.i18n.messages.MessagesFactory; +import org.apache.hadoop.gateway.services.security.AliasService; +import org.apache.hadoop.gateway.services.security.AliasServiceException; +import org.apache.http.HttpEntity; +import org.apache.http.HttpStatus; +import org.apache.http.client.methods.CloseableHttpResponse; +import org.apache.http.client.methods.HttpGet; +import org.apache.http.impl.client.CloseableHttpClient; +import org.apache.http.message.BasicHeader; +import org.apache.http.util.EntityUtils; + +import java.io.IOException; + +class RESTInvoker { + + private static final String DEFAULT_USER_ALIAS = "ambari.discovery.user"; + private static final String DEFAULT_PWD_ALIAS = "ambari.discovery.password"; + + private static final AmbariServiceDiscoveryMessages log = MessagesFactory.get(AmbariServiceDiscoveryMessages.class); + + private AliasService aliasService = null; + + private CloseableHttpClient httpClient = org.apache.http.impl.client.HttpClients.createDefault(); + + + RESTInvoker(AliasService aliasService) { + this.aliasService = aliasService; + } + + + JSONObject invoke(String url, String username, String passwordAlias) { + JSONObject result = null; + + CloseableHttpResponse response = null; + try { + HttpGet request = new HttpGet(url); + + // If no configured username, then use default username alias + String password = null; + if (username == null) { + if (aliasService != null) { + try { + char[] defaultUser = aliasService.getPasswordFromAliasForGateway(DEFAULT_USER_ALIAS); + if (defaultUser != null) { + username = new String(defaultUser); + } + } catch (AliasServiceException e) { + log.aliasServiceUserError(DEFAULT_USER_ALIAS, e.getLocalizedMessage()); + } + } + + // If username is still null + if (username == null) { + log.aliasServiceUserNotFound(); + throw new ConfigurationException("No username is configured for Ambari service discovery."); + } + } + + if (aliasService != null) { + // If no password alias is configured, then try the default alias + if (passwordAlias == null) { + passwordAlias = DEFAULT_PWD_ALIAS; + } + + try { + char[] pwd = aliasService.getPasswordFromAliasForGateway(passwordAlias); + if (pwd != null) { + password = new String(pwd); + } + + } catch (AliasServiceException e) { + log.aliasServicePasswordError(passwordAlias, e.getLocalizedMessage()); + } + } + + // If the password could not be determined + if (password == null) { + log.aliasServicePasswordNotFound(); + throw new ConfigurationException("No password is configured for Ambari service discovery."); + } + + // Add an auth header if credentials are available + String encodedCreds = + org.apache.commons.codec.binary.Base64.encodeBase64String((username + ":" + password).getBytes()); + request.addHeader(new BasicHeader("Authorization", "Basic " + encodedCreds)); + + response = httpClient.execute(request); + + if (HttpStatus.SC_OK == response.getStatusLine().getStatusCode()) { + HttpEntity entity = response.getEntity(); + if (entity != null) { + result = (JSONObject) JSONValue.parse((EntityUtils.toString(entity))); + log.debugJSON(result.toJSONString()); + } else { + log.noJSON(url); + } + } else { + log.unexpectedRestResponseStatusCode(url, response.getStatusLine().getStatusCode()); + } + + } catch (IOException e) { + log.restInvocationError(url, e); + } finally { + if(response != null) { + try { + response.close(); + } catch (IOException e) { + // Ignore + } + } + } + return result; + } + +} http://git-wip-us.apache.org/repos/asf/knox/blob/a874f399/gateway-discovery-ambari/src/main/java/org/apache/hadoop/gateway/topology/discovery/ambari/ServiceURLPropertyConfig.java ---------------------------------------------------------------------- diff --git a/gateway-discovery-ambari/src/main/java/org/apache/hadoop/gateway/topology/discovery/ambari/ServiceURLPropertyConfig.java b/gateway-discovery-ambari/src/main/java/org/apache/hadoop/gateway/topology/discovery/ambari/ServiceURLPropertyConfig.java index 3330cc3..deb5bb3 100644 --- a/gateway-discovery-ambari/src/main/java/org/apache/hadoop/gateway/topology/discovery/ambari/ServiceURLPropertyConfig.java +++ b/gateway-discovery-ambari/src/main/java/org/apache/hadoop/gateway/topology/discovery/ambari/ServiceURLPropertyConfig.java @@ -110,7 +110,7 @@ class ServiceURLPropertyConfig { } } } catch (Exception e) { - log.failedToLoadServiceDiscoveryConfiguration(e); + log.failedToLoadServiceDiscoveryURLDefConfiguration(e); } finally { try { source.close(); http://git-wip-us.apache.org/repos/asf/knox/blob/a874f399/gateway-discovery-ambari/src/main/resources/META-INF/services/org.apache.hadoop.gateway.topology.discovery.ClusterConfigurationMonitorProvider ---------------------------------------------------------------------- diff --git a/gateway-discovery-ambari/src/main/resources/META-INF/services/org.apache.hadoop.gateway.topology.discovery.ClusterConfigurationMonitorProvider b/gateway-discovery-ambari/src/main/resources/META-INF/services/org.apache.hadoop.gateway.topology.discovery.ClusterConfigurationMonitorProvider new file mode 100644 index 0000000..d9b2b05 --- /dev/null +++ b/gateway-discovery-ambari/src/main/resources/META-INF/services/org.apache.hadoop.gateway.topology.discovery.ClusterConfigurationMonitorProvider @@ -0,0 +1,19 @@ +########################################################################## +# Licensed to the Apache Software Foundation (ASF) under one +# or more contributor license agreements. See the NOTICE file +# distributed with this work for additional information +# regarding copyright ownership. The ASF licenses this file +# to you under the Apache License, Version 2.0 (the +# "License"); you may not use this file except in compliance +# with the License. You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. +########################################################################## + +org.apache.hadoop.gateway.topology.discovery.ambari.AmbariClusterConfigurationMonitorProvider \ No newline at end of file http://git-wip-us.apache.org/repos/asf/knox/blob/a874f399/gateway-discovery-ambari/src/test/java/org/apache/hadoop/gateway/topology/discovery/ambari/AmbariConfigurationMonitorTest.java ---------------------------------------------------------------------- diff --git a/gateway-discovery-ambari/src/test/java/org/apache/hadoop/gateway/topology/discovery/ambari/AmbariConfigurationMonitorTest.java b/gateway-discovery-ambari/src/test/java/org/apache/hadoop/gateway/topology/discovery/ambari/AmbariConfigurationMonitorTest.java new file mode 100644 index 0000000..2d8b276 --- /dev/null +++ b/gateway-discovery-ambari/src/test/java/org/apache/hadoop/gateway/topology/discovery/ambari/AmbariConfigurationMonitorTest.java @@ -0,0 +1,319 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with this + * work for additional information regarding copyright ownership. The ASF + * licenses this file to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * <p> + * http://www.apache.org/licenses/LICENSE-2.0 + * <p> + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, WITHOUT + * WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the + * License for the specific language governing permissions and limitations under + * the License. + */ +package org.apache.hadoop.gateway.topology.discovery.ambari; + +import org.apache.commons.io.FileUtils; +import org.apache.hadoop.gateway.config.GatewayConfig; +import org.apache.hadoop.gateway.topology.discovery.ServiceDiscoveryConfig; +import org.easymock.EasyMock; +import org.junit.After; +import org.junit.Before; +import org.junit.Test; + +import java.io.File; +import java.util.Arrays; +import java.util.Collections; +import java.util.HashMap; +import java.util.List; +import java.util.Map; +import java.util.UUID; + +import static org.junit.Assert.assertEquals; +import static org.junit.Assert.assertNotNull; +import static org.junit.Assert.assertNull; + +public class AmbariConfigurationMonitorTest { + + private File dataDir = null; + + @Before + public void setup() throws Exception { + File targetDir = new File( System.getProperty("user.dir"), "target"); + File tempDir = new File(targetDir, this.getClass().getName() + "__data__" + UUID.randomUUID()); + FileUtils.forceMkdir(tempDir); + dataDir = tempDir; + } + + @After + public void tearDown() throws Exception { + dataDir.delete(); + } + + @Test + public void testPollingMonitor() throws Exception { + final String addr1 = "http://host1:8080"; + final String addr2 = "http://host2:8080"; + final String cluster1Name = "Cluster_One"; + final String cluster2Name = "Cluster_Two"; + + + GatewayConfig config = EasyMock.createNiceMock(GatewayConfig.class); + EasyMock.expect(config.getGatewayDataDir()).andReturn(dataDir.getAbsolutePath()).anyTimes(); + EasyMock.expect(config.getClusterMonitorPollingInterval(AmbariConfigurationMonitor.getType())) + .andReturn(10) + .anyTimes(); + EasyMock.replay(config); + + // Create the monitor + TestableAmbariConfigurationMonitor monitor = new TestableAmbariConfigurationMonitor(config); + + // Clear the system property now that the monitor has been initialized + System.clearProperty(AmbariConfigurationMonitor.INTERVAL_PROPERTY_NAME); + + + // Sequence of config changes for testing monitoring for updates + Map<String, Map<String, List<List<AmbariCluster.ServiceConfiguration>>>> updateConfigurations = new HashMap<>(); + + updateConfigurations.put(addr1, new HashMap<>()); + updateConfigurations.get(addr1).put(cluster1Name, Arrays.asList(Arrays.asList(createTestServiceConfig("zoo.cfg", "3"), + createTestServiceConfig("hive-site", "2")), + Arrays.asList(createTestServiceConfig("zoo.cfg", "3"), + createTestServiceConfig("hive-site", "3")), + Arrays.asList(createTestServiceConfig("zoo.cfg", "2"), + createTestServiceConfig("hive-site", "1")))); + + updateConfigurations.put(addr2, new HashMap<>()); + updateConfigurations.get(addr2).put(cluster2Name, Arrays.asList(Arrays.asList(createTestServiceConfig("zoo.cfg", "1"), + createTestServiceConfig("hive-site", "1")), + Collections.singletonList(createTestServiceConfig("zoo.cfg", "1")), + Arrays.asList(createTestServiceConfig("zoo.cfg", "1"), + createTestServiceConfig("hive-site", "2")))); + + updateConfigurations.get(addr2).put(cluster1Name, Arrays.asList(Arrays.asList(createTestServiceConfig("zoo.cfg", "2"), + createTestServiceConfig("hive-site", "4")), + Arrays.asList(createTestServiceConfig("zoo.cfg", "3"), + createTestServiceConfig("hive-site", "4"), + createTestServiceConfig("yarn-site", "1")), + Arrays.asList(createTestServiceConfig("zoo.cfg", "1"), + createTestServiceConfig("hive-site", "2")))); + + Map<String, Map<String, Integer>> configChangeIndex = new HashMap<>(); + configChangeIndex.put(addr1, new HashMap<>()); + configChangeIndex.get(addr1).put(cluster1Name, 0); + configChangeIndex.get(addr1).put(cluster2Name, 0); + configChangeIndex.put(addr2, new HashMap<>()); + configChangeIndex.get(addr2).put(cluster2Name, 0); + + // Setup the initial test update data + // Cluster 1 data change + monitor.addTestConfigVersion(addr1, cluster1Name, "zoo.cfg", "2"); + monitor.addTestConfigVersion(addr1, cluster1Name, "hive-site", "1"); + + // Cluster 2 NO data change + monitor.addTestConfigVersion(addr2, cluster1Name, "zoo.cfg", "1"); + monitor.addTestConfigVersion(addr2, cluster1Name, "hive-site", "1"); + + // Cluster 3 data change + monitor.addTestConfigVersion(addr2, cluster2Name, "zoo.cfg", "1"); + monitor.addTestConfigVersion(addr2, cluster2Name, "hive-site", "2"); + + Map<String, Map<String, AmbariCluster.ServiceConfiguration>> initialAmbariClusterConfigs = new HashMap<>(); + + Map<String, AmbariCluster.ServiceConfiguration> cluster1Configs = new HashMap<>(); + AmbariCluster.ServiceConfiguration zooCfg = createTestServiceConfig("zoo.cfg", "1"); + cluster1Configs.put("ZOOKEEPER", zooCfg); + + AmbariCluster.ServiceConfiguration hiveSite = createTestServiceConfig("hive-site", "1"); + cluster1Configs.put("Hive", hiveSite); + + initialAmbariClusterConfigs.put(cluster1Name, cluster1Configs); + AmbariCluster cluster1 = createTestCluster(cluster1Name, initialAmbariClusterConfigs); + + // Tell the monitor about the cluster configurations + monitor.addClusterConfigVersions(cluster1, createTestDiscoveryConfig(addr1)); + + monitor.addClusterConfigVersions(createTestCluster(cluster2Name, initialAmbariClusterConfigs), + createTestDiscoveryConfig(addr2)); + + monitor.addClusterConfigVersions(createTestCluster(cluster1Name, initialAmbariClusterConfigs), + createTestDiscoveryConfig(addr2)); + + final Map<String, Map<String, Integer>> changeNotifications = new HashMap<>(); + monitor.addListener((src, cname) -> { +// System.out.println("Cluster config changed: " + cname + " @ " + src); + // Record the notification + Integer notificationCount = changeNotifications.computeIfAbsent(src, s -> new HashMap<>()) + .computeIfAbsent(cname, c -> Integer.valueOf(0)); + changeNotifications.get(src).put(cname, (notificationCount+=1)); + + // Update the config version + int changeIndex = configChangeIndex.get(src).get(cname); + if (changeIndex < updateConfigurations.get(src).get(cname).size()) { + List<AmbariCluster.ServiceConfiguration> changes = updateConfigurations.get(src).get(cname).get(changeIndex); + +// System.out.println("Applying config update " + changeIndex + " to " + cname + " @ " + src + " ..."); + for (AmbariCluster.ServiceConfiguration change : changes) { + monitor.updateConfigState(src, cname, change.getType(), change.getVersion()); +// System.out.println(" Updated " + change.getType() + " to version " + change.getVersion()); + } + + // Increment the change index + configChangeIndex.get(src).replace(cname, changeIndex + 1); + +// System.out.println("Monitor config updated for " + cname + " @ " + src + " : " + changeIndex ); + } + }); + + try { + monitor.start(); + + long expiration = System.currentTimeMillis() + (1000 * 30); + while (!areChangeUpdatesExhausted(updateConfigurations, configChangeIndex) + && (System.currentTimeMillis() < expiration)) { + try { + Thread.sleep(5); + } catch (InterruptedException e) { + // + } + } + + } finally { + monitor.stop(); + } + + assertNotNull("Expected changes to have been reported for source 1.", + changeNotifications.get(addr1)); + + assertEquals("Expected changes to have been reported.", + 3, changeNotifications.get(addr1).get(cluster1Name).intValue()); + + assertNotNull("Expected changes to have been reported for source 2.", + changeNotifications.get(addr2)); + + assertEquals("Expected changes to have been reported.", + 3, changeNotifications.get(addr2).get(cluster2Name).intValue()); + + assertNull("Expected changes to have been reported.", + changeNotifications.get(addr2).get(cluster1Name)); + } + + + private static boolean areChangeUpdatesExhausted(Map<String, Map<String, List<List<AmbariCluster.ServiceConfiguration>>>> updates, + Map<String, Map<String, Integer>> configChangeIndeces) { + boolean isExhausted = true; + + for (String address : updates.keySet()) { + Map<String, List<List<AmbariCluster.ServiceConfiguration>>> clusterConfigs = updates.get(address); + for (String clusterName : clusterConfigs.keySet()) { + Integer configChangeCount = clusterConfigs.get(clusterName).size(); + if (configChangeIndeces.get(address).containsKey(clusterName)) { + if (configChangeIndeces.get(address).get(clusterName) < configChangeCount) { + isExhausted = false; + break; + } + } + } + } + + return isExhausted; + } + + /** + * + * @param name The cluster name + * @param serviceConfigs A map of service configurations (keyed by service name) + * + * @return + */ + private AmbariCluster createTestCluster(String name, + Map<String, Map<String, AmbariCluster.ServiceConfiguration>> serviceConfigs) { + AmbariCluster c = EasyMock.createNiceMock(AmbariCluster.class); + EasyMock.expect(c.getName()).andReturn(name).anyTimes(); + EasyMock.expect(c.getServiceConfigurations()).andReturn(serviceConfigs).anyTimes(); + EasyMock.replay(c); + return c; + } + + private AmbariCluster.ServiceConfiguration createTestServiceConfig(String name, String version) { + AmbariCluster.ServiceConfiguration sc = EasyMock.createNiceMock(AmbariCluster.ServiceConfiguration.class); + EasyMock.expect(sc.getType()).andReturn(name).anyTimes(); + EasyMock.expect(sc.getVersion()).andReturn(version).anyTimes(); + EasyMock.replay(sc); + return sc; + } + + private ServiceDiscoveryConfig createTestDiscoveryConfig(String address) { + return createTestDiscoveryConfig(address, null, null); + } + + private ServiceDiscoveryConfig createTestDiscoveryConfig(String address, String username, String pwdAlias) { + ServiceDiscoveryConfig sdc = EasyMock.createNiceMock(ServiceDiscoveryConfig.class); + EasyMock.expect(sdc.getAddress()).andReturn(address).anyTimes(); + EasyMock.expect(sdc.getUser()).andReturn(username).anyTimes(); + EasyMock.expect(sdc.getPasswordAlias()).andReturn(pwdAlias).anyTimes(); + EasyMock.replay(sdc); + return sdc; + } + + /** + * AmbariConfigurationMonitor extension that replaces the collection of updated configuration data with a static + * mechanism rather than the REST invocation mechanism. + */ + private static final class TestableAmbariConfigurationMonitor extends AmbariConfigurationMonitor { + + Map<String, Map<String, Map<String, String>>> configVersionData = new HashMap<>(); + + TestableAmbariConfigurationMonitor(GatewayConfig config) { + super(config, null); + } + + void addTestConfigVersion(String address, String clusterName, String configType, String configVersion) { + configVersionData.computeIfAbsent(address, a -> new HashMap<>()) + .computeIfAbsent(clusterName, cl -> new HashMap<>()) + .put(configType, configVersion); + } + + void addTestConfigVersions(String address, String clusterName, Map<String, String> configVersions) { + configVersionData.computeIfAbsent(address, a -> new HashMap<>()) + .computeIfAbsent(clusterName, cl -> new HashMap<>()) + .putAll(configVersions); + } + + void updateTestConfigVersion(String address, String clusterName, String configType, String updatedVersions) { + configVersionData.computeIfAbsent(address, a -> new HashMap<>()) + .computeIfAbsent(clusterName, cl -> new HashMap<>()) + .replace(configType, updatedVersions); + } + + void updateTestConfigVersions(String address, String clusterName, Map<String, String> updatedVersions) { + configVersionData.computeIfAbsent(address, a -> new HashMap<>()) + .computeIfAbsent(clusterName, cl -> new HashMap<>()) + .replaceAll((k,v) -> updatedVersions.get(k)); + } + + void updateConfigState(String address, String clusterName, String configType, String configVersion) { + configVersionsLock.writeLock().lock(); + try { + if (ambariClusterConfigVersions.containsKey(address)) { + ambariClusterConfigVersions.get(address).get(clusterName).replace(configType, configVersion); + } + } finally { + configVersionsLock.writeLock().unlock(); + } + } + + @Override + Map<String, String> getUpdatedConfigVersions(String address, String clusterName) { + Map<String, Map<String, String>> clusterConfigVersions = configVersionData.get(address); + if (clusterConfigVersions != null) { + return clusterConfigVersions.get(clusterName); + } + return null; + } + } + +} http://git-wip-us.apache.org/repos/asf/knox/blob/a874f399/gateway-discovery-ambari/src/test/java/org/apache/hadoop/gateway/topology/discovery/ambari/AmbariServiceDiscoveryTest.java ---------------------------------------------------------------------- diff --git a/gateway-discovery-ambari/src/test/java/org/apache/hadoop/gateway/topology/discovery/ambari/AmbariServiceDiscoveryTest.java b/gateway-discovery-ambari/src/test/java/org/apache/hadoop/gateway/topology/discovery/ambari/AmbariServiceDiscoveryTest.java index f7f0553..d4dad95 100644 --- a/gateway-discovery-ambari/src/test/java/org/apache/hadoop/gateway/topology/discovery/ambari/AmbariServiceDiscoveryTest.java +++ b/gateway-discovery-ambari/src/test/java/org/apache/hadoop/gateway/topology/discovery/ambari/AmbariServiceDiscoveryTest.java @@ -119,26 +119,38 @@ public class AmbariServiceDiscoveryTest { */ private static final class TestAmbariServiceDiscovery extends AmbariServiceDiscovery { + final static String CLUSTER_PLACEHOLDER = TestRESTInvoker.CLUSTER_PLACEHOLDER; + + TestAmbariServiceDiscovery(String clusterName) { + super(new TestRESTInvoker(clusterName)); + } + + } + + private static final class TestRESTInvoker extends RESTInvoker { + final static String CLUSTER_PLACEHOLDER = "CLUSTER_NAME"; private Map<String, JSONObject> cannedResponses = new HashMap<>(); - TestAmbariServiceDiscovery(String clusterName) { + TestRESTInvoker(String clusterName) { + super(null); + cannedResponses.put(AmbariServiceDiscovery.AMBARI_CLUSTERS_URI, - (JSONObject) JSONValue.parse(CLUSTERS_JSON_TEMPLATE.replaceAll(CLUSTER_PLACEHOLDER, - clusterName))); + (JSONObject) JSONValue.parse(CLUSTERS_JSON_TEMPLATE.replaceAll(CLUSTER_PLACEHOLDER, + clusterName))); cannedResponses.put(String.format(AmbariServiceDiscovery.AMBARI_HOSTROLES_URI, clusterName), - (JSONObject) JSONValue.parse(HOSTROLES_JSON_TEMPLATE.replaceAll(CLUSTER_PLACEHOLDER, - clusterName))); + (JSONObject) JSONValue.parse(HOSTROLES_JSON_TEMPLATE.replaceAll(CLUSTER_PLACEHOLDER, + clusterName))); cannedResponses.put(String.format(AmbariServiceDiscovery.AMBARI_SERVICECONFIGS_URI, clusterName), - (JSONObject) JSONValue.parse(SERVICECONFIGS_JSON_TEMPLATE.replaceAll(CLUSTER_PLACEHOLDER, - clusterName))); + (JSONObject) JSONValue.parse(SERVICECONFIGS_JSON_TEMPLATE.replaceAll(CLUSTER_PLACEHOLDER, + clusterName))); } @Override - protected JSONObject invokeREST(String url, String username, String passwordAlias) { + JSONObject invoke(String url, String username, String passwordAlias) { return cannedResponses.get(url.substring(url.indexOf("/api"))); } } http://git-wip-us.apache.org/repos/asf/knox/blob/a874f399/gateway-release/home/conf/gateway-site.xml ---------------------------------------------------------------------- diff --git a/gateway-release/home/conf/gateway-site.xml b/gateway-release/home/conf/gateway-site.xml index e06db72..fec5e87 100644 --- a/gateway-release/home/conf/gateway-site.xml +++ b/gateway-release/home/conf/gateway-site.xml @@ -73,4 +73,16 @@ limitations under the License. <description>Enable/Disable cookie scoping feature.</description> </property> + <property> + <name>gateway.cluster.config.monitor.ambari.enabled</name> + <value>false</value> + <description>Enable/disable Ambari cluster configuration monitoring.</description> + </property> + + <property> + <name>gateway.cluster.config.monitor.ambari.interval</name> + <value>60</value> + <description>The interval (in seconds) for polling Ambari for cluster configuration changes.</description> + </property> + </configuration> http://git-wip-us.apache.org/repos/asf/knox/blob/a874f399/gateway-server/src/main/java/org/apache/hadoop/gateway/GatewayMessages.java ---------------------------------------------------------------------- diff --git a/gateway-server/src/main/java/org/apache/hadoop/gateway/GatewayMessages.java b/gateway-server/src/main/java/org/apache/hadoop/gateway/GatewayMessages.java index ab0ab39..92b02ea 100644 --- a/gateway-server/src/main/java/org/apache/hadoop/gateway/GatewayMessages.java +++ b/gateway-server/src/main/java/org/apache/hadoop/gateway/GatewayMessages.java @@ -596,4 +596,20 @@ public interface GatewayMessages { text = "Correcting the suspect permissions for the remote configuration registry entry \"{0}\"." ) void correctingSuspectWritableRemoteConfigurationEntry(String entryPath); + @Message(level = MessageLevel.INFO, + text = "A cluster configuration change was noticed for {1} @ {0}") + void noticedClusterConfigurationChange(final String source, final String clusterName); + + + @Message(level = MessageLevel.INFO, + text = "Triggering topology regeneration for descriptor {2} because of change to the {1} @ {0} configuration.") + void triggeringTopologyRegeneration(final String source, final String clusterName, final String affected); + + + @Message(level = MessageLevel.ERROR, + text = "Encountered an error while responding to {1} @ {0} configuration change: {2}") + void errorRespondingToConfigChange(final String source, + final String clusterName, + @StackTrace(level = MessageLevel.DEBUG) Exception e); + }
