KNOX-998 - Merge from trunk 0.14.0 code
Project: http://git-wip-us.apache.org/repos/asf/knox/repo Commit: http://git-wip-us.apache.org/repos/asf/knox/commit/e766b3b7 Tree: http://git-wip-us.apache.org/repos/asf/knox/tree/e766b3b7 Diff: http://git-wip-us.apache.org/repos/asf/knox/diff/e766b3b7 Branch: refs/heads/KNOX-998-Package_Restructuring Commit: e766b3b77bf2d3a0a00e4f8bf8ef261a5f8122fb Parents: 22a7304 Author: Sandeep More <m...@apache.org> Authored: Thu Dec 14 16:11:49 2017 -0500 Committer: Sandeep More <m...@apache.org> Committed: Thu Dec 14 16:11:49 2017 -0500 ---------------------------------------------------------------------- .../discovery/ambari/AmbariClientCommon.java | 102 ---- ...bariClusterConfigurationMonitorProvider.java | 35 -- .../ambari/AmbariConfigurationMonitor.java | 525 ---------------- .../topology/discovery/ambari/RESTInvoker.java | 136 ----- .../discovery/ambari/AmbariClientCommon.java | 102 ++++ ...bariClusterConfigurationMonitorProvider.java | 36 ++ .../ambari/AmbariConfigurationMonitor.java | 525 ++++++++++++++++ .../topology/discovery/ambari/RESTInvoker.java | 136 +++++ ...iscovery.ClusterConfigurationMonitorProvider | 19 - ...iscovery.ClusterConfigurationMonitorProvider | 19 + .../ambari/AmbariConfigurationMonitorTest.java | 319 ---------- .../ambari/AmbariConfigurationMonitorTest.java | 319 ++++++++++ ...faultClusterConfigurationMonitorService.java | 81 --- .../DefaultConfigurationMonitorProvider.java | 31 - .../DefaultRemoteConfigurationMonitor.java | 228 ------- .../RemoteConfigurationMonitorFactory.java | 74 --- .../gateway/services/CLIGatewayServices.java | 2 +- ...faultClusterConfigurationMonitorService.java | 81 +++ .../DefaultConfigurationMonitorProvider.java | 31 + .../DefaultRemoteConfigurationMonitor.java | 228 +++++++ .../RemoteConfigurationMonitorFactory.java | 74 +++ .../org/apache/knox/gateway/util/KnoxCLI.java | 16 +- ...y.monitor.RemoteConfigurationMonitorProvider | 19 - ...y.monitor.RemoteConfigurationMonitorProvider | 19 + ...emoteConfigurationRegistryClientService.java | 263 -------- ...figurationRegistryClientServiceProvider.java | 32 - .../ZooKeeperConfigurationMonitorTest.java | 355 ----------- ...emoteConfigurationRegistryClientService.java | 263 ++++++++ ...figurationRegistryClientServiceProvider.java | 32 + .../ZooKeeperConfigurationMonitorTest.java | 355 +++++++++++ .../apache/knox/gateway/util/KnoxCLITest.java | 2 +- ...teConfigurationRegistryClientServiceProvider | 19 - ...teConfigurationRegistryClientServiceProvider | 19 + .../services/ambariui/2.2.1/service.xml | 0 .../remote/RemoteConfigurationMessages.java | 49 -- ...nfigurationRegistryClientServiceFactory.java | 41 -- ...figurationRegistryClientServiceProvider.java | 27 - .../RemoteConfigurationRegistryConfig.java | 43 -- .../DefaultRemoteConfigurationRegistries.java | 104 ---- .../config/RemoteConfigurationRegistries.java | 33 - .../RemoteConfigurationRegistriesAccessor.java | 60 -- .../RemoteConfigurationRegistriesParser.java | 48 -- .../config/RemoteConfigurationRegistry.java | 139 ----- .../config/remote/zk/CuratorClientService.java | 464 -------------- .../RemoteConfigurationRegistryJAASConfig.java | 179 ------ .../remote/zk/ZooKeeperClientService.java | 25 - .../zk/ZooKeeperClientServiceProvider.java | 34 -- .../remote/RemoteConfigurationMessages.java | 49 ++ ...nfigurationRegistryClientServiceFactory.java | 41 ++ ...figurationRegistryClientServiceProvider.java | 27 + .../RemoteConfigurationRegistryConfig.java | 43 ++ .../DefaultRemoteConfigurationRegistries.java | 104 ++++ .../config/RemoteConfigurationRegistries.java | 33 + .../RemoteConfigurationRegistriesAccessor.java | 60 ++ .../RemoteConfigurationRegistriesParser.java | 48 ++ .../config/RemoteConfigurationRegistry.java | 139 +++++ .../config/remote/zk/CuratorClientService.java | 464 ++++++++++++++ .../RemoteConfigurationRegistryJAASConfig.java | 179 ++++++ .../remote/zk/ZooKeeperClientService.java | 25 + .../zk/ZooKeeperClientServiceProvider.java | 34 ++ ...teConfigurationRegistryClientServiceProvider | 19 - ...teConfigurationRegistryClientServiceProvider | 19 + ...efaultRemoteConfigurationRegistriesTest.java | 184 ------ ...teConfigurationRegistryConfigParserTest.java | 108 ---- .../util/RemoteRegistryConfigTestUtils.java | 117 ---- ...eConfigurationRegistryClientServiceTest.java | 424 ------------- ...moteConfigurationRegistryJAASConfigTest.java | 255 -------- ...efaultRemoteConfigurationRegistriesTest.java | 184 ++++++ ...teConfigurationRegistryConfigParserTest.java | 115 ++++ .../util/RemoteRegistryConfigTestUtils.java | 117 ++++ ...eConfigurationRegistryClientServiceTest.java | 424 +++++++++++++ ...moteConfigurationRegistryJAASConfigTest.java | 255 ++++++++ .../RemoteConfigurationRegistryClient.java | 80 --- ...emoteConfigurationRegistryClientService.java | 28 - .../ClusterConfigurationMonitorService.java | 43 -- .../discovery/ClusterConfigurationMonitor.java | 48 -- .../ClusterConfigurationMonitorProvider.java | 27 - .../monitor/RemoteConfigurationMonitor.java | 24 - .../RemoteConfigurationMonitorProvider.java | 34 -- .../RemoteConfigurationRegistryClient.java | 80 +++ ...emoteConfigurationRegistryClientService.java | 28 + .../ClusterConfigurationMonitorService.java | 43 ++ .../discovery/ClusterConfigurationMonitor.java | 48 ++ .../ClusterConfigurationMonitorProvider.java | 27 + .../monitor/RemoteConfigurationMonitor.java | 24 + .../RemoteConfigurationMonitorProvider.java | 34 ++ .../SimpleDescriptorHandlerFuncTest.java | 275 --------- .../monitor/RemoteConfigurationMonitorTest.java | 603 ------------------- .../SimpleDescriptorHandlerFuncTest.java | 275 +++++++++ .../monitor/RemoteConfigurationMonitorTest.java | 603 +++++++++++++++++++ ...eway.topology.discovery.ServiceDiscoveryType | 19 - ...eway.topology.discovery.ServiceDiscoveryType | 19 + 92 files changed, 5790 insertions(+), 5782 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/knox/blob/e766b3b7/gateway-discovery-ambari/src/main/java/org/apache/hadoop/gateway/topology/discovery/ambari/AmbariClientCommon.java ---------------------------------------------------------------------- diff --git a/gateway-discovery-ambari/src/main/java/org/apache/hadoop/gateway/topology/discovery/ambari/AmbariClientCommon.java b/gateway-discovery-ambari/src/main/java/org/apache/hadoop/gateway/topology/discovery/ambari/AmbariClientCommon.java deleted file mode 100644 index a2bf4ea..0000000 --- a/gateway-discovery-ambari/src/main/java/org/apache/hadoop/gateway/topology/discovery/ambari/AmbariClientCommon.java +++ /dev/null @@ -1,102 +0,0 @@ -/** - * Licensed to the Apache Software Foundation (ASF) under one or more - * contributor license agreements. See the NOTICE file distributed with this - * work for additional information regarding copyright ownership. The ASF - * licenses this file to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance with the License. - * You may obtain a copy of the License at - * <p> - * http://www.apache.org/licenses/LICENSE-2.0 - * <p> - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, WITHOUT - * WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the - * License for the specific language governing permissions and limitations under - * the License. - */ -package org.apache.hadoop.gateway.topology.discovery.ambari; - -import net.minidev.json.JSONArray; -import net.minidev.json.JSONObject; -import org.apache.hadoop.gateway.i18n.messages.MessagesFactory; -import org.apache.hadoop.gateway.services.security.AliasService; -import org.apache.hadoop.gateway.topology.discovery.ServiceDiscoveryConfig; - -import java.util.HashMap; -import java.util.Map; - -class AmbariClientCommon { - - static final String AMBARI_CLUSTERS_URI = "/api/v1/clusters"; - - static final String AMBARI_HOSTROLES_URI = - AMBARI_CLUSTERS_URI + "/%s/services?fields=components/host_components/HostRoles"; - - static final String AMBARI_SERVICECONFIGS_URI = - AMBARI_CLUSTERS_URI + "/%s/configurations/service_config_versions?is_current=true"; - - private static final AmbariServiceDiscoveryMessages log = MessagesFactory.get(AmbariServiceDiscoveryMessages.class); - - private RESTInvoker restClient; - - - AmbariClientCommon(AliasService aliasService) { - this(new RESTInvoker(aliasService)); - } - - - AmbariClientCommon(RESTInvoker restInvoker) { - this.restClient = restInvoker; - } - - - - Map<String, Map<String, AmbariCluster.ServiceConfiguration>> getActiveServiceConfigurations(String clusterName, - ServiceDiscoveryConfig config) { - return getActiveServiceConfigurations(config.getAddress(), - clusterName, - config.getUser(), - config.getPasswordAlias()); - } - - - Map<String, Map<String, AmbariCluster.ServiceConfiguration>> getActiveServiceConfigurations(String discoveryAddress, - String clusterName, - String discoveryUser, - String discoveryPwdAlias) { - Map<String, Map<String, AmbariCluster.ServiceConfiguration>> serviceConfigurations = new HashMap<>(); - - String serviceConfigsURL = String.format("%s" + AMBARI_SERVICECONFIGS_URI, discoveryAddress, clusterName); - - JSONObject serviceConfigsJSON = restClient.invoke(serviceConfigsURL, discoveryUser, discoveryPwdAlias); - if (serviceConfigsJSON != null) { - // Process the service configurations - JSONArray serviceConfigs = (JSONArray) serviceConfigsJSON.get("items"); - for (Object serviceConfig : serviceConfigs) { - String serviceName = (String) ((JSONObject) serviceConfig).get("service_name"); - JSONArray configurations = (JSONArray) ((JSONObject) serviceConfig).get("configurations"); - for (Object configuration : configurations) { - String configType = (String) ((JSONObject) configuration).get("type"); - String configVersion = String.valueOf(((JSONObject) configuration).get("version")); - - Map<String, String> configProps = new HashMap<>(); - JSONObject configProperties = (JSONObject) ((JSONObject) configuration).get("properties"); - for (String propertyName : configProperties.keySet()) { - configProps.put(propertyName, String.valueOf(((JSONObject) configProperties).get(propertyName))); - } - if (!serviceConfigurations.containsKey(serviceName)) { - serviceConfigurations.put(serviceName, new HashMap<>()); - } - serviceConfigurations.get(serviceName).put(configType, - new AmbariCluster.ServiceConfiguration(configType, - configVersion, - configProps)); - } - } - } - - return serviceConfigurations; - } - - -} http://git-wip-us.apache.org/repos/asf/knox/blob/e766b3b7/gateway-discovery-ambari/src/main/java/org/apache/hadoop/gateway/topology/discovery/ambari/AmbariClusterConfigurationMonitorProvider.java ---------------------------------------------------------------------- diff --git a/gateway-discovery-ambari/src/main/java/org/apache/hadoop/gateway/topology/discovery/ambari/AmbariClusterConfigurationMonitorProvider.java b/gateway-discovery-ambari/src/main/java/org/apache/hadoop/gateway/topology/discovery/ambari/AmbariClusterConfigurationMonitorProvider.java deleted file mode 100644 index 3b31124..0000000 --- a/gateway-discovery-ambari/src/main/java/org/apache/hadoop/gateway/topology/discovery/ambari/AmbariClusterConfigurationMonitorProvider.java +++ /dev/null @@ -1,35 +0,0 @@ -/** - * Licensed to the Apache Software Foundation (ASF) under one or more - * contributor license agreements. See the NOTICE file distributed with this - * work for additional information regarding copyright ownership. The ASF - * licenses this file to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance with the License. - * You may obtain a copy of the License at - * <p> - * http://www.apache.org/licenses/LICENSE-2.0 - * <p> - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, WITHOUT - * WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the - * License for the specific language governing permissions and limitations under - * the License. - */ -package org.apache.hadoop.gateway.topology.discovery.ambari; - -import org.apache.hadoop.gateway.config.GatewayConfig; -import org.apache.hadoop.gateway.services.security.AliasService; -import org.apache.hadoop.gateway.topology.discovery.ClusterConfigurationMonitor; -import org.apache.hadoop.gateway.topology.discovery.ClusterConfigurationMonitorProvider; - -public class AmbariClusterConfigurationMonitorProvider implements ClusterConfigurationMonitorProvider { - - @Override - public String getType() { - return AmbariConfigurationMonitor.getType(); - } - - @Override - public ClusterConfigurationMonitor newInstance(GatewayConfig config, AliasService aliasService) { - return new AmbariConfigurationMonitor(config, aliasService); - } -} http://git-wip-us.apache.org/repos/asf/knox/blob/e766b3b7/gateway-discovery-ambari/src/main/java/org/apache/hadoop/gateway/topology/discovery/ambari/AmbariConfigurationMonitor.java ---------------------------------------------------------------------- diff --git a/gateway-discovery-ambari/src/main/java/org/apache/hadoop/gateway/topology/discovery/ambari/AmbariConfigurationMonitor.java b/gateway-discovery-ambari/src/main/java/org/apache/hadoop/gateway/topology/discovery/ambari/AmbariConfigurationMonitor.java deleted file mode 100644 index e4b5e43..0000000 --- a/gateway-discovery-ambari/src/main/java/org/apache/hadoop/gateway/topology/discovery/ambari/AmbariConfigurationMonitor.java +++ /dev/null @@ -1,525 +0,0 @@ -/** - * Licensed to the Apache Software Foundation (ASF) under one or more - * contributor license agreements. See the NOTICE file distributed with this - * work for additional information regarding copyright ownership. The ASF - * licenses this file to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance with the License. - * You may obtain a copy of the License at - * <p> - * http://www.apache.org/licenses/LICENSE-2.0 - * <p> - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, WITHOUT - * WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the - * License for the specific language governing permissions and limitations under - * the License. - */ -package org.apache.hadoop.gateway.topology.discovery.ambari; - -import org.apache.commons.io.FileUtils; -import org.apache.hadoop.gateway.config.GatewayConfig; -import org.apache.hadoop.gateway.i18n.messages.MessagesFactory; -import org.apache.hadoop.gateway.services.security.AliasService; -import org.apache.hadoop.gateway.topology.discovery.ClusterConfigurationMonitor; -import org.apache.hadoop.gateway.topology.discovery.ServiceDiscoveryConfig; - -import java.io.File; -import java.io.FileInputStream; -import java.io.FileOutputStream; -import java.io.IOException; -import java.util.ArrayList; -import java.util.Collection; -import java.util.HashMap; -import java.util.List; -import java.util.Map; -import java.util.Properties; -import java.util.concurrent.locks.ReadWriteLock; -import java.util.concurrent.locks.ReentrantReadWriteLock; - - -class AmbariConfigurationMonitor implements ClusterConfigurationMonitor { - - private static final String TYPE = "Ambari"; - - private static final String CLUSTERS_DATA_DIR_NAME = "clusters"; - - private static final String PERSISTED_FILE_COMMENT = "Generated File. Do Not Edit!"; - - private static final String PROP_CLUSTER_PREFIX = "cluster."; - private static final String PROP_CLUSTER_SOURCE = PROP_CLUSTER_PREFIX + "source"; - private static final String PROP_CLUSTER_NAME = PROP_CLUSTER_PREFIX + "name"; - private static final String PROP_CLUSTER_USER = PROP_CLUSTER_PREFIX + "user"; - private static final String PROP_CLUSTER_ALIAS = PROP_CLUSTER_PREFIX + "pwd.alias"; - - static final String INTERVAL_PROPERTY_NAME = "org.apache.hadoop.gateway.topology.discovery.ambari.monitor.interval"; - - - private static final AmbariServiceDiscoveryMessages log = MessagesFactory.get(AmbariServiceDiscoveryMessages.class); - - // Ambari address - // clusterName -> ServiceDiscoveryConfig - // - Map<String, Map<String, ServiceDiscoveryConfig>> clusterMonitorConfigurations = new HashMap<>(); - - // Ambari address - // clusterName - // configType -> version - // - Map<String, Map<String, Map<String, String>>> ambariClusterConfigVersions = new HashMap<>(); - - ReadWriteLock configVersionsLock = new ReentrantReadWriteLock(); - - private List<ConfigurationChangeListener> changeListeners = new ArrayList<>(); - - private AmbariClientCommon ambariClient; - - PollingConfigAnalyzer internalMonitor; - - GatewayConfig gatewayConfig = null; - - static String getType() { - return TYPE; - } - - AmbariConfigurationMonitor(GatewayConfig config, AliasService aliasService) { - this.gatewayConfig = config; - this.ambariClient = new AmbariClientCommon(aliasService); - this.internalMonitor = new PollingConfigAnalyzer(this); - - // Override the default polling interval if it has been configured - int interval = config.getClusterMonitorPollingInterval(getType()); - if (interval > 0) { - setPollingInterval(interval); - } - - init(); - } - - @Override - public void setPollingInterval(int interval) { - internalMonitor.setInterval(interval); - } - - private void init() { - loadDiscoveryConfiguration(); - loadClusterVersionData(); - } - - /** - * Load any previously-persisted service discovery configurations. - * This is necessary for checking previously-deployed topologies. - */ - private void loadDiscoveryConfiguration() { - File persistenceDir = getPersistenceDir(); - if (persistenceDir != null) { - Collection<File> persistedConfigs = FileUtils.listFiles(persistenceDir, new String[]{"conf"}, false); - for (File persisted : persistedConfigs) { - Properties props = new Properties(); - try { - props.load(new FileInputStream(persisted)); - - addDiscoveryConfig(props.getProperty(PROP_CLUSTER_NAME), new ServiceDiscoveryConfig() { - public String getAddress() { - return props.getProperty(PROP_CLUSTER_SOURCE); - } - - public String getUser() { - return props.getProperty(PROP_CLUSTER_USER); - } - - public String getPasswordAlias() { - return props.getProperty(PROP_CLUSTER_ALIAS); - } - }); - } catch (IOException e) { - log.failedToLoadClusterMonitorServiceDiscoveryConfig(getType(), e); - } - } - } - } - - /** - * Load any previously-persisted cluster configuration version records, so the monitor will check - * previously-deployed topologies against the current cluster configuration. - */ - private void loadClusterVersionData() { - File persistenceDir = getPersistenceDir(); - if (persistenceDir != null) { - Collection<File> persistedConfigs = FileUtils.listFiles(getPersistenceDir(), new String[]{"ver"}, false); - for (File persisted : persistedConfigs) { - Properties props = new Properties(); - try { - props.load(new FileInputStream(persisted)); - - String source = props.getProperty(PROP_CLUSTER_SOURCE); - String clusterName = props.getProperty(PROP_CLUSTER_NAME); - - Map<String, String> configVersions = new HashMap<>(); - for (String name : props.stringPropertyNames()) { - if (!name.startsWith(PROP_CLUSTER_PREFIX)) { // Ignore implementation-specific properties - configVersions.put(name, props.getProperty(name)); - } - } - - // Map the config versions to the cluster name - addClusterConfigVersions(source, clusterName, configVersions); - - } catch (IOException e) { - log.failedToLoadClusterMonitorConfigVersions(getType(), e); - } - } - } - } - - private void persistDiscoveryConfiguration(String clusterName, ServiceDiscoveryConfig sdc) { - File persistenceDir = getPersistenceDir(); - if (persistenceDir != null) { - - Properties props = new Properties(); - props.setProperty(PROP_CLUSTER_NAME, clusterName); - props.setProperty(PROP_CLUSTER_SOURCE, sdc.getAddress()); - - String username = sdc.getUser(); - if (username != null) { - props.setProperty(PROP_CLUSTER_USER, username); - } - String pwdAlias = sdc.getPasswordAlias(); - if (pwdAlias != null) { - props.setProperty(PROP_CLUSTER_ALIAS, pwdAlias); - } - - persist(props, getDiscoveryConfigPersistenceFile(sdc.getAddress(), clusterName)); - } - } - - private void persistClusterVersionData(String address, String clusterName, Map<String, String> configVersions) { - File persistenceDir = getPersistenceDir(); - if (persistenceDir != null) { - Properties props = new Properties(); - props.setProperty(PROP_CLUSTER_NAME, clusterName); - props.setProperty(PROP_CLUSTER_SOURCE, address); - for (String name : configVersions.keySet()) { - props.setProperty(name, configVersions.get(name)); - } - - persist(props, getConfigVersionsPersistenceFile(address, clusterName)); - } - } - - private void persist(Properties props, File dest) { - try { - props.store(new FileOutputStream(dest), PERSISTED_FILE_COMMENT); - } catch (Exception e) { - log.failedToPersistClusterMonitorData(getType(), dest.getAbsolutePath(), e); - } - } - - private File getPersistenceDir() { - File persistenceDir = null; - - File dataDir = new File(gatewayConfig.getGatewayDataDir()); - if (dataDir.exists()) { - File clustersDir = new File(dataDir, CLUSTERS_DATA_DIR_NAME); - if (!clustersDir.exists()) { - clustersDir.mkdirs(); - } - persistenceDir = clustersDir; - } - - return persistenceDir; - } - - private File getDiscoveryConfigPersistenceFile(String address, String clusterName) { - return getPersistenceFile(address, clusterName, "conf"); - } - - private File getConfigVersionsPersistenceFile(String address, String clusterName) { - return getPersistenceFile(address, clusterName, "ver"); - } - - private File getPersistenceFile(String address, String clusterName, String ext) { - String fileName = address.replace(":", "_").replace("/", "_") + "-" + clusterName + "." + ext; - return new File(getPersistenceDir(), fileName); - } - - /** - * Add cluster configuration details to the monitor's in-memory record. - * - * @param address An Ambari instance address. - * @param clusterName The name of a cluster associated with the Ambari instance. - * @param configVersions A Map of configuration types and their corresponding versions. - */ - private void addClusterConfigVersions(String address, String clusterName, Map<String, String> configVersions) { - configVersionsLock.writeLock().lock(); - try { - ambariClusterConfigVersions.computeIfAbsent(address, k -> new HashMap<>()) - .put(clusterName, configVersions); - } finally { - configVersionsLock.writeLock().unlock(); - } - } - - public void start() { - (new Thread(internalMonitor, "AmbariConfigurationMonitor")).start(); - } - - public void stop() { - internalMonitor.stop(); - } - - @Override - public void addListener(ConfigurationChangeListener listener) { - changeListeners.add(listener); - } - - /** - * Add discovery configuration details for the specified cluster, so the monitor knows how to connect to check for - * changes. - * - * @param clusterName The name of the cluster. - * @param config The associated service discovery configuration. - */ - void addDiscoveryConfig(String clusterName, ServiceDiscoveryConfig config) { - clusterMonitorConfigurations.computeIfAbsent(config.getAddress(), k -> new HashMap<>()).put(clusterName, config); - } - - - /** - * Get the service discovery configuration associated with the specified Ambari instance and cluster. - * - * @param address An Ambari instance address. - * @param clusterName The name of a cluster associated with the Ambari instance. - * - * @return The associated ServiceDiscoveryConfig object. - */ - ServiceDiscoveryConfig getDiscoveryConfig(String address, String clusterName) { - ServiceDiscoveryConfig config = null; - if (clusterMonitorConfigurations.containsKey(address)) { - config = clusterMonitorConfigurations.get(address).get(clusterName); - } - return config; - } - - - /** - * Add cluster configuration data to the monitor, which it will use when determining if configuration has changed. - * - * @param cluster An AmbariCluster object. - * @param discoveryConfig The discovery configuration associated with the cluster. - */ - void addClusterConfigVersions(AmbariCluster cluster, ServiceDiscoveryConfig discoveryConfig) { - - String clusterName = cluster.getName(); - - // Register the cluster discovery configuration for the monitor connections - persistDiscoveryConfiguration(clusterName, discoveryConfig); - addDiscoveryConfig(clusterName, discoveryConfig); - - // Build the set of configuration versions - Map<String, String> configVersions = new HashMap<>(); - Map<String, Map<String, AmbariCluster.ServiceConfiguration>> serviceConfigs = cluster.getServiceConfigurations(); - for (String serviceName : serviceConfigs.keySet()) { - Map<String, AmbariCluster.ServiceConfiguration> configTypeVersionMap = serviceConfigs.get(serviceName); - for (AmbariCluster.ServiceConfiguration config : configTypeVersionMap.values()) { - String configType = config.getType(); - String version = config.getVersion(); - configVersions.put(configType, version); - } - } - - persistClusterVersionData(discoveryConfig.getAddress(), clusterName, configVersions); - addClusterConfigVersions(discoveryConfig.getAddress(), clusterName, configVersions); - } - - - /** - * Remove the configuration record for the specified Ambari instance and cluster name. - * - * @param address An Ambari instance address. - * @param clusterName The name of a cluster associated with the Ambari instance. - * - * @return The removed data; A Map of configuration types and their corresponding versions. - */ - Map<String, String> removeClusterConfigVersions(String address, String clusterName) { - Map<String, String> result = new HashMap<>(); - - configVersionsLock.writeLock().lock(); - try { - if (ambariClusterConfigVersions.containsKey(address)) { - result.putAll(ambariClusterConfigVersions.get(address).remove(clusterName)); - } - } finally { - configVersionsLock.writeLock().unlock(); - } - - // Delete the associated persisted record - File persisted = getConfigVersionsPersistenceFile(address, clusterName); - if (persisted.exists()) { - persisted.delete(); - } - - return result; - } - - /** - * Get the cluster configuration details for the specified cluster and Ambari instance. - * - * @param address An Ambari instance address. - * @param clusterName The name of a cluster associated with the Ambari instance. - * - * @return A Map of configuration types and their corresponding versions. - */ - Map<String, String> getClusterConfigVersions(String address, String clusterName) { - Map<String, String> result = new HashMap<>(); - - configVersionsLock.readLock().lock(); - try { - if (ambariClusterConfigVersions.containsKey(address)) { - result.putAll(ambariClusterConfigVersions.get(address).get(clusterName)); - } - } finally { - configVersionsLock.readLock().unlock(); - } - - return result; - } - - - /** - * Get all the clusters the monitor knows about. - * - * @return A Map of Ambari instance addresses to associated cluster names. - */ - Map<String, List<String>> getClusterNames() { - Map<String, List<String>> result = new HashMap<>(); - - configVersionsLock.readLock().lock(); - try { - for (String address : ambariClusterConfigVersions.keySet()) { - List<String> clusterNames = new ArrayList<>(); - clusterNames.addAll(ambariClusterConfigVersions.get(address).keySet()); - result.put(address, clusterNames); - } - } finally { - configVersionsLock.readLock().unlock(); - } - - return result; - - } - - - /** - * Notify registered change listeners. - * - * @param source The address of the Ambari instance from which the cluster details were determined. - * @param clusterName The name of the cluster whose configuration details have changed. - */ - void notifyChangeListeners(String source, String clusterName) { - for (ConfigurationChangeListener listener : changeListeners) { - listener.onConfigurationChange(source, clusterName); - } - } - - - /** - * Request the current active configuration version info from Ambari. - * - * @param address The Ambari instance address. - * @param clusterName The name of the cluster for which the details are desired. - * - * @return A Map of service configuration types and their corresponding versions. - */ - Map<String, String> getUpdatedConfigVersions(String address, String clusterName) { - Map<String, String> configVersions = new HashMap<>(); - - Map<String, Map<String, AmbariCluster.ServiceConfiguration>> serviceConfigs = - ambariClient.getActiveServiceConfigurations(clusterName, getDiscoveryConfig(address, clusterName)); - - for (Map<String, AmbariCluster.ServiceConfiguration> serviceConfig : serviceConfigs.values()) { - for (AmbariCluster.ServiceConfiguration config : serviceConfig.values()) { - configVersions.put(config.getType(), config.getVersion()); - } - } - - return configVersions; - } - - - /** - * The thread that polls Ambari for configuration details for clusters associated with discovered topologies, - * compares them with the current recorded values, and notifies any listeners when differences are discovered. - */ - static final class PollingConfigAnalyzer implements Runnable { - - private static final int DEFAULT_POLLING_INTERVAL = 60; - - // Polling interval in seconds - private int interval = DEFAULT_POLLING_INTERVAL; - - private AmbariConfigurationMonitor delegate; - - private boolean isActive = false; - - PollingConfigAnalyzer(AmbariConfigurationMonitor delegate) { - this.delegate = delegate; - this.interval = Integer.getInteger(INTERVAL_PROPERTY_NAME, PollingConfigAnalyzer.DEFAULT_POLLING_INTERVAL); - } - - void setInterval(int interval) { - this.interval = interval; - } - - - void stop() { - isActive = false; - } - - @Override - public void run() { - isActive = true; - - log.startedAmbariConfigMonitor(interval); - - while (isActive) { - for (Map.Entry<String, List<String>> entry : delegate.getClusterNames().entrySet()) { - String address = entry.getKey(); - for (String clusterName : entry.getValue()) { - Map<String, String> configVersions = delegate.getClusterConfigVersions(address, clusterName); - if (configVersions != null && !configVersions.isEmpty()) { - Map<String, String> updatedVersions = delegate.getUpdatedConfigVersions(address, clusterName); - if (updatedVersions != null && !updatedVersions.isEmpty()) { - boolean configHasChanged = false; - - // If the config sets don't match in size, then something has changed - if (updatedVersions.size() != configVersions.size()) { - configHasChanged = true; - } else { - // Perform the comparison of all the config versions - for (Map.Entry<String, String> configVersion : configVersions.entrySet()) { - if (!updatedVersions.get(configVersion.getKey()).equals(configVersion.getValue())) { - configHasChanged = true; - break; - } - } - } - - // If a change has occurred, notify the listeners - if (configHasChanged) { - delegate.notifyChangeListeners(address, clusterName); - } - } - } - } - } - - try { - Thread.sleep(interval * 1000); - } catch (InterruptedException e) { - // Ignore - } - } - } - } - -} http://git-wip-us.apache.org/repos/asf/knox/blob/e766b3b7/gateway-discovery-ambari/src/main/java/org/apache/hadoop/gateway/topology/discovery/ambari/RESTInvoker.java ---------------------------------------------------------------------- diff --git a/gateway-discovery-ambari/src/main/java/org/apache/hadoop/gateway/topology/discovery/ambari/RESTInvoker.java b/gateway-discovery-ambari/src/main/java/org/apache/hadoop/gateway/topology/discovery/ambari/RESTInvoker.java deleted file mode 100644 index 6a6fad8..0000000 --- a/gateway-discovery-ambari/src/main/java/org/apache/hadoop/gateway/topology/discovery/ambari/RESTInvoker.java +++ /dev/null @@ -1,136 +0,0 @@ -/** - * Licensed to the Apache Software Foundation (ASF) under one or more - * contributor license agreements. See the NOTICE file distributed with this - * work for additional information regarding copyright ownership. The ASF - * licenses this file to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance with the License. - * You may obtain a copy of the License at - * <p> - * http://www.apache.org/licenses/LICENSE-2.0 - * <p> - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, WITHOUT - * WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the - * License for the specific language governing permissions and limitations under - * the License. - */ -package org.apache.hadoop.gateway.topology.discovery.ambari; - -import net.minidev.json.JSONObject; -import net.minidev.json.JSONValue; -import org.apache.hadoop.gateway.config.ConfigurationException; -import org.apache.hadoop.gateway.i18n.messages.MessagesFactory; -import org.apache.hadoop.gateway.services.security.AliasService; -import org.apache.hadoop.gateway.services.security.AliasServiceException; -import org.apache.http.HttpEntity; -import org.apache.http.HttpStatus; -import org.apache.http.client.methods.CloseableHttpResponse; -import org.apache.http.client.methods.HttpGet; -import org.apache.http.impl.client.CloseableHttpClient; -import org.apache.http.message.BasicHeader; -import org.apache.http.util.EntityUtils; - -import java.io.IOException; - -class RESTInvoker { - - private static final String DEFAULT_USER_ALIAS = "ambari.discovery.user"; - private static final String DEFAULT_PWD_ALIAS = "ambari.discovery.password"; - - private static final AmbariServiceDiscoveryMessages log = MessagesFactory.get(AmbariServiceDiscoveryMessages.class); - - private AliasService aliasService = null; - - private CloseableHttpClient httpClient = org.apache.http.impl.client.HttpClients.createDefault(); - - - RESTInvoker(AliasService aliasService) { - this.aliasService = aliasService; - } - - - JSONObject invoke(String url, String username, String passwordAlias) { - JSONObject result = null; - - CloseableHttpResponse response = null; - try { - HttpGet request = new HttpGet(url); - - // If no configured username, then use default username alias - String password = null; - if (username == null) { - if (aliasService != null) { - try { - char[] defaultUser = aliasService.getPasswordFromAliasForGateway(DEFAULT_USER_ALIAS); - if (defaultUser != null) { - username = new String(defaultUser); - } - } catch (AliasServiceException e) { - log.aliasServiceUserError(DEFAULT_USER_ALIAS, e.getLocalizedMessage()); - } - } - - // If username is still null - if (username == null) { - log.aliasServiceUserNotFound(); - throw new ConfigurationException("No username is configured for Ambari service discovery."); - } - } - - if (aliasService != null) { - // If no password alias is configured, then try the default alias - if (passwordAlias == null) { - passwordAlias = DEFAULT_PWD_ALIAS; - } - - try { - char[] pwd = aliasService.getPasswordFromAliasForGateway(passwordAlias); - if (pwd != null) { - password = new String(pwd); - } - - } catch (AliasServiceException e) { - log.aliasServicePasswordError(passwordAlias, e.getLocalizedMessage()); - } - } - - // If the password could not be determined - if (password == null) { - log.aliasServicePasswordNotFound(); - throw new ConfigurationException("No password is configured for Ambari service discovery."); - } - - // Add an auth header if credentials are available - String encodedCreds = - org.apache.commons.codec.binary.Base64.encodeBase64String((username + ":" + password).getBytes()); - request.addHeader(new BasicHeader("Authorization", "Basic " + encodedCreds)); - - response = httpClient.execute(request); - - if (HttpStatus.SC_OK == response.getStatusLine().getStatusCode()) { - HttpEntity entity = response.getEntity(); - if (entity != null) { - result = (JSONObject) JSONValue.parse((EntityUtils.toString(entity))); - log.debugJSON(result.toJSONString()); - } else { - log.noJSON(url); - } - } else { - log.unexpectedRestResponseStatusCode(url, response.getStatusLine().getStatusCode()); - } - - } catch (IOException e) { - log.restInvocationError(url, e); - } finally { - if(response != null) { - try { - response.close(); - } catch (IOException e) { - // Ignore - } - } - } - return result; - } - -} http://git-wip-us.apache.org/repos/asf/knox/blob/e766b3b7/gateway-discovery-ambari/src/main/java/org/apache/knox/gateway/topology/discovery/ambari/AmbariClientCommon.java ---------------------------------------------------------------------- diff --git a/gateway-discovery-ambari/src/main/java/org/apache/knox/gateway/topology/discovery/ambari/AmbariClientCommon.java b/gateway-discovery-ambari/src/main/java/org/apache/knox/gateway/topology/discovery/ambari/AmbariClientCommon.java new file mode 100644 index 0000000..9e5dcb3 --- /dev/null +++ b/gateway-discovery-ambari/src/main/java/org/apache/knox/gateway/topology/discovery/ambari/AmbariClientCommon.java @@ -0,0 +1,102 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with this + * work for additional information regarding copyright ownership. The ASF + * licenses this file to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * <p> + * http://www.apache.org/licenses/LICENSE-2.0 + * <p> + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, WITHOUT + * WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the + * License for the specific language governing permissions and limitations under + * the License. + */ +package org.apache.knox.gateway.topology.discovery.ambari; + +import net.minidev.json.JSONArray; +import net.minidev.json.JSONObject; +import org.apache.knox.gateway.i18n.messages.MessagesFactory; +import org.apache.knox.gateway.services.security.AliasService; +import org.apache.knox.gateway.topology.discovery.ServiceDiscoveryConfig; + +import java.util.HashMap; +import java.util.Map; + +class AmbariClientCommon { + + static final String AMBARI_CLUSTERS_URI = "/api/v1/clusters"; + + static final String AMBARI_HOSTROLES_URI = + AMBARI_CLUSTERS_URI + "/%s/services?fields=components/host_components/HostRoles"; + + static final String AMBARI_SERVICECONFIGS_URI = + AMBARI_CLUSTERS_URI + "/%s/configurations/service_config_versions?is_current=true"; + + private static final AmbariServiceDiscoveryMessages log = MessagesFactory.get(AmbariServiceDiscoveryMessages.class); + + private RESTInvoker restClient; + + + AmbariClientCommon(AliasService aliasService) { + this(new RESTInvoker(aliasService)); + } + + + AmbariClientCommon(RESTInvoker restInvoker) { + this.restClient = restInvoker; + } + + + + Map<String, Map<String, AmbariCluster.ServiceConfiguration>> getActiveServiceConfigurations(String clusterName, + ServiceDiscoveryConfig config) { + return getActiveServiceConfigurations(config.getAddress(), + clusterName, + config.getUser(), + config.getPasswordAlias()); + } + + + Map<String, Map<String, AmbariCluster.ServiceConfiguration>> getActiveServiceConfigurations(String discoveryAddress, + String clusterName, + String discoveryUser, + String discoveryPwdAlias) { + Map<String, Map<String, AmbariCluster.ServiceConfiguration>> serviceConfigurations = new HashMap<>(); + + String serviceConfigsURL = String.format("%s" + AMBARI_SERVICECONFIGS_URI, discoveryAddress, clusterName); + + JSONObject serviceConfigsJSON = restClient.invoke(serviceConfigsURL, discoveryUser, discoveryPwdAlias); + if (serviceConfigsJSON != null) { + // Process the service configurations + JSONArray serviceConfigs = (JSONArray) serviceConfigsJSON.get("items"); + for (Object serviceConfig : serviceConfigs) { + String serviceName = (String) ((JSONObject) serviceConfig).get("service_name"); + JSONArray configurations = (JSONArray) ((JSONObject) serviceConfig).get("configurations"); + for (Object configuration : configurations) { + String configType = (String) ((JSONObject) configuration).get("type"); + String configVersion = String.valueOf(((JSONObject) configuration).get("version")); + + Map<String, String> configProps = new HashMap<>(); + JSONObject configProperties = (JSONObject) ((JSONObject) configuration).get("properties"); + for (String propertyName : configProperties.keySet()) { + configProps.put(propertyName, String.valueOf(((JSONObject) configProperties).get(propertyName))); + } + if (!serviceConfigurations.containsKey(serviceName)) { + serviceConfigurations.put(serviceName, new HashMap<>()); + } + serviceConfigurations.get(serviceName).put(configType, + new AmbariCluster.ServiceConfiguration(configType, + configVersion, + configProps)); + } + } + } + + return serviceConfigurations; + } + + +} http://git-wip-us.apache.org/repos/asf/knox/blob/e766b3b7/gateway-discovery-ambari/src/main/java/org/apache/knox/gateway/topology/discovery/ambari/AmbariClusterConfigurationMonitorProvider.java ---------------------------------------------------------------------- diff --git a/gateway-discovery-ambari/src/main/java/org/apache/knox/gateway/topology/discovery/ambari/AmbariClusterConfigurationMonitorProvider.java b/gateway-discovery-ambari/src/main/java/org/apache/knox/gateway/topology/discovery/ambari/AmbariClusterConfigurationMonitorProvider.java new file mode 100644 index 0000000..95b0280 --- /dev/null +++ b/gateway-discovery-ambari/src/main/java/org/apache/knox/gateway/topology/discovery/ambari/AmbariClusterConfigurationMonitorProvider.java @@ -0,0 +1,36 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with this + * work for additional information regarding copyright ownership. The ASF + * licenses this file to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * <p> + * http://www.apache.org/licenses/LICENSE-2.0 + * <p> + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, WITHOUT + * WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the + * License for the specific language governing permissions and limitations under + * the License. + */ +package org.apache.knox.gateway.topology.discovery.ambari; + +import org.apache.knox.gateway.config.GatewayConfig; +import org.apache.knox.gateway.services.security.AliasService; +import org.apache.knox.gateway.topology.discovery.ClusterConfigurationMonitor; +import org.apache.knox.gateway.topology.discovery.ClusterConfigurationMonitorProvider; + +public class AmbariClusterConfigurationMonitorProvider implements + ClusterConfigurationMonitorProvider { + + @Override + public String getType() { + return AmbariConfigurationMonitor.getType(); + } + + @Override + public ClusterConfigurationMonitor newInstance(GatewayConfig config, AliasService aliasService) { + return new AmbariConfigurationMonitor(config, aliasService); + } +} http://git-wip-us.apache.org/repos/asf/knox/blob/e766b3b7/gateway-discovery-ambari/src/main/java/org/apache/knox/gateway/topology/discovery/ambari/AmbariConfigurationMonitor.java ---------------------------------------------------------------------- diff --git a/gateway-discovery-ambari/src/main/java/org/apache/knox/gateway/topology/discovery/ambari/AmbariConfigurationMonitor.java b/gateway-discovery-ambari/src/main/java/org/apache/knox/gateway/topology/discovery/ambari/AmbariConfigurationMonitor.java new file mode 100644 index 0000000..c3aa27a --- /dev/null +++ b/gateway-discovery-ambari/src/main/java/org/apache/knox/gateway/topology/discovery/ambari/AmbariConfigurationMonitor.java @@ -0,0 +1,525 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with this + * work for additional information regarding copyright ownership. The ASF + * licenses this file to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * <p> + * http://www.apache.org/licenses/LICENSE-2.0 + * <p> + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, WITHOUT + * WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the + * License for the specific language governing permissions and limitations under + * the License. + */ +package org.apache.knox.gateway.topology.discovery.ambari; + +import org.apache.commons.io.FileUtils; +import org.apache.knox.gateway.config.GatewayConfig; +import org.apache.knox.gateway.i18n.messages.MessagesFactory; +import org.apache.knox.gateway.services.security.AliasService; +import org.apache.knox.gateway.topology.discovery.ClusterConfigurationMonitor; +import org.apache.knox.gateway.topology.discovery.ServiceDiscoveryConfig; + +import java.io.File; +import java.io.FileInputStream; +import java.io.FileOutputStream; +import java.io.IOException; +import java.util.ArrayList; +import java.util.Collection; +import java.util.HashMap; +import java.util.List; +import java.util.Map; +import java.util.Properties; +import java.util.concurrent.locks.ReadWriteLock; +import java.util.concurrent.locks.ReentrantReadWriteLock; + + +class AmbariConfigurationMonitor implements ClusterConfigurationMonitor { + + private static final String TYPE = "Ambari"; + + private static final String CLUSTERS_DATA_DIR_NAME = "clusters"; + + private static final String PERSISTED_FILE_COMMENT = "Generated File. Do Not Edit!"; + + private static final String PROP_CLUSTER_PREFIX = "cluster."; + private static final String PROP_CLUSTER_SOURCE = PROP_CLUSTER_PREFIX + "source"; + private static final String PROP_CLUSTER_NAME = PROP_CLUSTER_PREFIX + "name"; + private static final String PROP_CLUSTER_USER = PROP_CLUSTER_PREFIX + "user"; + private static final String PROP_CLUSTER_ALIAS = PROP_CLUSTER_PREFIX + "pwd.alias"; + + static final String INTERVAL_PROPERTY_NAME = "org.apache.knox.gateway.topology.discovery.ambari.monitor.interval"; + + + private static final AmbariServiceDiscoveryMessages log = MessagesFactory.get(AmbariServiceDiscoveryMessages.class); + + // Ambari address + // clusterName -> ServiceDiscoveryConfig + // + Map<String, Map<String, ServiceDiscoveryConfig>> clusterMonitorConfigurations = new HashMap<>(); + + // Ambari address + // clusterName + // configType -> version + // + Map<String, Map<String, Map<String, String>>> ambariClusterConfigVersions = new HashMap<>(); + + ReadWriteLock configVersionsLock = new ReentrantReadWriteLock(); + + private List<ConfigurationChangeListener> changeListeners = new ArrayList<>(); + + private AmbariClientCommon ambariClient; + + PollingConfigAnalyzer internalMonitor; + + GatewayConfig gatewayConfig = null; + + static String getType() { + return TYPE; + } + + AmbariConfigurationMonitor(GatewayConfig config, AliasService aliasService) { + this.gatewayConfig = config; + this.ambariClient = new AmbariClientCommon(aliasService); + this.internalMonitor = new PollingConfigAnalyzer(this); + + // Override the default polling interval if it has been configured + int interval = config.getClusterMonitorPollingInterval(getType()); + if (interval > 0) { + setPollingInterval(interval); + } + + init(); + } + + @Override + public void setPollingInterval(int interval) { + internalMonitor.setInterval(interval); + } + + private void init() { + loadDiscoveryConfiguration(); + loadClusterVersionData(); + } + + /** + * Load any previously-persisted service discovery configurations. + * This is necessary for checking previously-deployed topologies. + */ + private void loadDiscoveryConfiguration() { + File persistenceDir = getPersistenceDir(); + if (persistenceDir != null) { + Collection<File> persistedConfigs = FileUtils.listFiles(persistenceDir, new String[]{"conf"}, false); + for (File persisted : persistedConfigs) { + Properties props = new Properties(); + try { + props.load(new FileInputStream(persisted)); + + addDiscoveryConfig(props.getProperty(PROP_CLUSTER_NAME), new ServiceDiscoveryConfig() { + public String getAddress() { + return props.getProperty(PROP_CLUSTER_SOURCE); + } + + public String getUser() { + return props.getProperty(PROP_CLUSTER_USER); + } + + public String getPasswordAlias() { + return props.getProperty(PROP_CLUSTER_ALIAS); + } + }); + } catch (IOException e) { + log.failedToLoadClusterMonitorServiceDiscoveryConfig(getType(), e); + } + } + } + } + + /** + * Load any previously-persisted cluster configuration version records, so the monitor will check + * previously-deployed topologies against the current cluster configuration. + */ + private void loadClusterVersionData() { + File persistenceDir = getPersistenceDir(); + if (persistenceDir != null) { + Collection<File> persistedConfigs = FileUtils.listFiles(getPersistenceDir(), new String[]{"ver"}, false); + for (File persisted : persistedConfigs) { + Properties props = new Properties(); + try { + props.load(new FileInputStream(persisted)); + + String source = props.getProperty(PROP_CLUSTER_SOURCE); + String clusterName = props.getProperty(PROP_CLUSTER_NAME); + + Map<String, String> configVersions = new HashMap<>(); + for (String name : props.stringPropertyNames()) { + if (!name.startsWith(PROP_CLUSTER_PREFIX)) { // Ignore implementation-specific properties + configVersions.put(name, props.getProperty(name)); + } + } + + // Map the config versions to the cluster name + addClusterConfigVersions(source, clusterName, configVersions); + + } catch (IOException e) { + log.failedToLoadClusterMonitorConfigVersions(getType(), e); + } + } + } + } + + private void persistDiscoveryConfiguration(String clusterName, ServiceDiscoveryConfig sdc) { + File persistenceDir = getPersistenceDir(); + if (persistenceDir != null) { + + Properties props = new Properties(); + props.setProperty(PROP_CLUSTER_NAME, clusterName); + props.setProperty(PROP_CLUSTER_SOURCE, sdc.getAddress()); + + String username = sdc.getUser(); + if (username != null) { + props.setProperty(PROP_CLUSTER_USER, username); + } + String pwdAlias = sdc.getPasswordAlias(); + if (pwdAlias != null) { + props.setProperty(PROP_CLUSTER_ALIAS, pwdAlias); + } + + persist(props, getDiscoveryConfigPersistenceFile(sdc.getAddress(), clusterName)); + } + } + + private void persistClusterVersionData(String address, String clusterName, Map<String, String> configVersions) { + File persistenceDir = getPersistenceDir(); + if (persistenceDir != null) { + Properties props = new Properties(); + props.setProperty(PROP_CLUSTER_NAME, clusterName); + props.setProperty(PROP_CLUSTER_SOURCE, address); + for (String name : configVersions.keySet()) { + props.setProperty(name, configVersions.get(name)); + } + + persist(props, getConfigVersionsPersistenceFile(address, clusterName)); + } + } + + private void persist(Properties props, File dest) { + try { + props.store(new FileOutputStream(dest), PERSISTED_FILE_COMMENT); + } catch (Exception e) { + log.failedToPersistClusterMonitorData(getType(), dest.getAbsolutePath(), e); + } + } + + private File getPersistenceDir() { + File persistenceDir = null; + + File dataDir = new File(gatewayConfig.getGatewayDataDir()); + if (dataDir.exists()) { + File clustersDir = new File(dataDir, CLUSTERS_DATA_DIR_NAME); + if (!clustersDir.exists()) { + clustersDir.mkdirs(); + } + persistenceDir = clustersDir; + } + + return persistenceDir; + } + + private File getDiscoveryConfigPersistenceFile(String address, String clusterName) { + return getPersistenceFile(address, clusterName, "conf"); + } + + private File getConfigVersionsPersistenceFile(String address, String clusterName) { + return getPersistenceFile(address, clusterName, "ver"); + } + + private File getPersistenceFile(String address, String clusterName, String ext) { + String fileName = address.replace(":", "_").replace("/", "_") + "-" + clusterName + "." + ext; + return new File(getPersistenceDir(), fileName); + } + + /** + * Add cluster configuration details to the monitor's in-memory record. + * + * @param address An Ambari instance address. + * @param clusterName The name of a cluster associated with the Ambari instance. + * @param configVersions A Map of configuration types and their corresponding versions. + */ + private void addClusterConfigVersions(String address, String clusterName, Map<String, String> configVersions) { + configVersionsLock.writeLock().lock(); + try { + ambariClusterConfigVersions.computeIfAbsent(address, k -> new HashMap<>()) + .put(clusterName, configVersions); + } finally { + configVersionsLock.writeLock().unlock(); + } + } + + public void start() { + (new Thread(internalMonitor, "AmbariConfigurationMonitor")).start(); + } + + public void stop() { + internalMonitor.stop(); + } + + @Override + public void addListener(ConfigurationChangeListener listener) { + changeListeners.add(listener); + } + + /** + * Add discovery configuration details for the specified cluster, so the monitor knows how to connect to check for + * changes. + * + * @param clusterName The name of the cluster. + * @param config The associated service discovery configuration. + */ + void addDiscoveryConfig(String clusterName, ServiceDiscoveryConfig config) { + clusterMonitorConfigurations.computeIfAbsent(config.getAddress(), k -> new HashMap<>()).put(clusterName, config); + } + + + /** + * Get the service discovery configuration associated with the specified Ambari instance and cluster. + * + * @param address An Ambari instance address. + * @param clusterName The name of a cluster associated with the Ambari instance. + * + * @return The associated ServiceDiscoveryConfig object. + */ + ServiceDiscoveryConfig getDiscoveryConfig(String address, String clusterName) { + ServiceDiscoveryConfig config = null; + if (clusterMonitorConfigurations.containsKey(address)) { + config = clusterMonitorConfigurations.get(address).get(clusterName); + } + return config; + } + + + /** + * Add cluster configuration data to the monitor, which it will use when determining if configuration has changed. + * + * @param cluster An AmbariCluster object. + * @param discoveryConfig The discovery configuration associated with the cluster. + */ + void addClusterConfigVersions(AmbariCluster cluster, ServiceDiscoveryConfig discoveryConfig) { + + String clusterName = cluster.getName(); + + // Register the cluster discovery configuration for the monitor connections + persistDiscoveryConfiguration(clusterName, discoveryConfig); + addDiscoveryConfig(clusterName, discoveryConfig); + + // Build the set of configuration versions + Map<String, String> configVersions = new HashMap<>(); + Map<String, Map<String, AmbariCluster.ServiceConfiguration>> serviceConfigs = cluster.getServiceConfigurations(); + for (String serviceName : serviceConfigs.keySet()) { + Map<String, AmbariCluster.ServiceConfiguration> configTypeVersionMap = serviceConfigs.get(serviceName); + for (AmbariCluster.ServiceConfiguration config : configTypeVersionMap.values()) { + String configType = config.getType(); + String version = config.getVersion(); + configVersions.put(configType, version); + } + } + + persistClusterVersionData(discoveryConfig.getAddress(), clusterName, configVersions); + addClusterConfigVersions(discoveryConfig.getAddress(), clusterName, configVersions); + } + + + /** + * Remove the configuration record for the specified Ambari instance and cluster name. + * + * @param address An Ambari instance address. + * @param clusterName The name of a cluster associated with the Ambari instance. + * + * @return The removed data; A Map of configuration types and their corresponding versions. + */ + Map<String, String> removeClusterConfigVersions(String address, String clusterName) { + Map<String, String> result = new HashMap<>(); + + configVersionsLock.writeLock().lock(); + try { + if (ambariClusterConfigVersions.containsKey(address)) { + result.putAll(ambariClusterConfigVersions.get(address).remove(clusterName)); + } + } finally { + configVersionsLock.writeLock().unlock(); + } + + // Delete the associated persisted record + File persisted = getConfigVersionsPersistenceFile(address, clusterName); + if (persisted.exists()) { + persisted.delete(); + } + + return result; + } + + /** + * Get the cluster configuration details for the specified cluster and Ambari instance. + * + * @param address An Ambari instance address. + * @param clusterName The name of a cluster associated with the Ambari instance. + * + * @return A Map of configuration types and their corresponding versions. + */ + Map<String, String> getClusterConfigVersions(String address, String clusterName) { + Map<String, String> result = new HashMap<>(); + + configVersionsLock.readLock().lock(); + try { + if (ambariClusterConfigVersions.containsKey(address)) { + result.putAll(ambariClusterConfigVersions.get(address).get(clusterName)); + } + } finally { + configVersionsLock.readLock().unlock(); + } + + return result; + } + + + /** + * Get all the clusters the monitor knows about. + * + * @return A Map of Ambari instance addresses to associated cluster names. + */ + Map<String, List<String>> getClusterNames() { + Map<String, List<String>> result = new HashMap<>(); + + configVersionsLock.readLock().lock(); + try { + for (String address : ambariClusterConfigVersions.keySet()) { + List<String> clusterNames = new ArrayList<>(); + clusterNames.addAll(ambariClusterConfigVersions.get(address).keySet()); + result.put(address, clusterNames); + } + } finally { + configVersionsLock.readLock().unlock(); + } + + return result; + + } + + + /** + * Notify registered change listeners. + * + * @param source The address of the Ambari instance from which the cluster details were determined. + * @param clusterName The name of the cluster whose configuration details have changed. + */ + void notifyChangeListeners(String source, String clusterName) { + for (ConfigurationChangeListener listener : changeListeners) { + listener.onConfigurationChange(source, clusterName); + } + } + + + /** + * Request the current active configuration version info from Ambari. + * + * @param address The Ambari instance address. + * @param clusterName The name of the cluster for which the details are desired. + * + * @return A Map of service configuration types and their corresponding versions. + */ + Map<String, String> getUpdatedConfigVersions(String address, String clusterName) { + Map<String, String> configVersions = new HashMap<>(); + + Map<String, Map<String, AmbariCluster.ServiceConfiguration>> serviceConfigs = + ambariClient.getActiveServiceConfigurations(clusterName, getDiscoveryConfig(address, clusterName)); + + for (Map<String, AmbariCluster.ServiceConfiguration> serviceConfig : serviceConfigs.values()) { + for (AmbariCluster.ServiceConfiguration config : serviceConfig.values()) { + configVersions.put(config.getType(), config.getVersion()); + } + } + + return configVersions; + } + + + /** + * The thread that polls Ambari for configuration details for clusters associated with discovered topologies, + * compares them with the current recorded values, and notifies any listeners when differences are discovered. + */ + static final class PollingConfigAnalyzer implements Runnable { + + private static final int DEFAULT_POLLING_INTERVAL = 60; + + // Polling interval in seconds + private int interval = DEFAULT_POLLING_INTERVAL; + + private AmbariConfigurationMonitor delegate; + + private boolean isActive = false; + + PollingConfigAnalyzer(AmbariConfigurationMonitor delegate) { + this.delegate = delegate; + this.interval = Integer.getInteger(INTERVAL_PROPERTY_NAME, PollingConfigAnalyzer.DEFAULT_POLLING_INTERVAL); + } + + void setInterval(int interval) { + this.interval = interval; + } + + + void stop() { + isActive = false; + } + + @Override + public void run() { + isActive = true; + + log.startedAmbariConfigMonitor(interval); + + while (isActive) { + for (Map.Entry<String, List<String>> entry : delegate.getClusterNames().entrySet()) { + String address = entry.getKey(); + for (String clusterName : entry.getValue()) { + Map<String, String> configVersions = delegate.getClusterConfigVersions(address, clusterName); + if (configVersions != null && !configVersions.isEmpty()) { + Map<String, String> updatedVersions = delegate.getUpdatedConfigVersions(address, clusterName); + if (updatedVersions != null && !updatedVersions.isEmpty()) { + boolean configHasChanged = false; + + // If the config sets don't match in size, then something has changed + if (updatedVersions.size() != configVersions.size()) { + configHasChanged = true; + } else { + // Perform the comparison of all the config versions + for (Map.Entry<String, String> configVersion : configVersions.entrySet()) { + if (!updatedVersions.get(configVersion.getKey()).equals(configVersion.getValue())) { + configHasChanged = true; + break; + } + } + } + + // If a change has occurred, notify the listeners + if (configHasChanged) { + delegate.notifyChangeListeners(address, clusterName); + } + } + } + } + } + + try { + Thread.sleep(interval * 1000); + } catch (InterruptedException e) { + // Ignore + } + } + } + } + +} http://git-wip-us.apache.org/repos/asf/knox/blob/e766b3b7/gateway-discovery-ambari/src/main/java/org/apache/knox/gateway/topology/discovery/ambari/RESTInvoker.java ---------------------------------------------------------------------- diff --git a/gateway-discovery-ambari/src/main/java/org/apache/knox/gateway/topology/discovery/ambari/RESTInvoker.java b/gateway-discovery-ambari/src/main/java/org/apache/knox/gateway/topology/discovery/ambari/RESTInvoker.java new file mode 100644 index 0000000..8830115 --- /dev/null +++ b/gateway-discovery-ambari/src/main/java/org/apache/knox/gateway/topology/discovery/ambari/RESTInvoker.java @@ -0,0 +1,136 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with this + * work for additional information regarding copyright ownership. The ASF + * licenses this file to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * <p> + * http://www.apache.org/licenses/LICENSE-2.0 + * <p> + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, WITHOUT + * WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the + * License for the specific language governing permissions and limitations under + * the License. + */ +package org.apache.knox.gateway.topology.discovery.ambari; + +import net.minidev.json.JSONObject; +import net.minidev.json.JSONValue; +import org.apache.knox.gateway.config.ConfigurationException; +import org.apache.knox.gateway.i18n.messages.MessagesFactory; +import org.apache.knox.gateway.services.security.AliasService; +import org.apache.knox.gateway.services.security.AliasServiceException; +import org.apache.http.HttpEntity; +import org.apache.http.HttpStatus; +import org.apache.http.client.methods.CloseableHttpResponse; +import org.apache.http.client.methods.HttpGet; +import org.apache.http.impl.client.CloseableHttpClient; +import org.apache.http.message.BasicHeader; +import org.apache.http.util.EntityUtils; + +import java.io.IOException; + +class RESTInvoker { + + private static final String DEFAULT_USER_ALIAS = "ambari.discovery.user"; + private static final String DEFAULT_PWD_ALIAS = "ambari.discovery.password"; + + private static final AmbariServiceDiscoveryMessages log = MessagesFactory.get(AmbariServiceDiscoveryMessages.class); + + private AliasService aliasService = null; + + private CloseableHttpClient httpClient = org.apache.http.impl.client.HttpClients.createDefault(); + + + RESTInvoker(AliasService aliasService) { + this.aliasService = aliasService; + } + + + JSONObject invoke(String url, String username, String passwordAlias) { + JSONObject result = null; + + CloseableHttpResponse response = null; + try { + HttpGet request = new HttpGet(url); + + // If no configured username, then use default username alias + String password = null; + if (username == null) { + if (aliasService != null) { + try { + char[] defaultUser = aliasService.getPasswordFromAliasForGateway(DEFAULT_USER_ALIAS); + if (defaultUser != null) { + username = new String(defaultUser); + } + } catch (AliasServiceException e) { + log.aliasServiceUserError(DEFAULT_USER_ALIAS, e.getLocalizedMessage()); + } + } + + // If username is still null + if (username == null) { + log.aliasServiceUserNotFound(); + throw new ConfigurationException("No username is configured for Ambari service discovery."); + } + } + + if (aliasService != null) { + // If no password alias is configured, then try the default alias + if (passwordAlias == null) { + passwordAlias = DEFAULT_PWD_ALIAS; + } + + try { + char[] pwd = aliasService.getPasswordFromAliasForGateway(passwordAlias); + if (pwd != null) { + password = new String(pwd); + } + + } catch (AliasServiceException e) { + log.aliasServicePasswordError(passwordAlias, e.getLocalizedMessage()); + } + } + + // If the password could not be determined + if (password == null) { + log.aliasServicePasswordNotFound(); + throw new ConfigurationException("No password is configured for Ambari service discovery."); + } + + // Add an auth header if credentials are available + String encodedCreds = + org.apache.commons.codec.binary.Base64.encodeBase64String((username + ":" + password).getBytes()); + request.addHeader(new BasicHeader("Authorization", "Basic " + encodedCreds)); + + response = httpClient.execute(request); + + if (HttpStatus.SC_OK == response.getStatusLine().getStatusCode()) { + HttpEntity entity = response.getEntity(); + if (entity != null) { + result = (JSONObject) JSONValue.parse((EntityUtils.toString(entity))); + log.debugJSON(result.toJSONString()); + } else { + log.noJSON(url); + } + } else { + log.unexpectedRestResponseStatusCode(url, response.getStatusLine().getStatusCode()); + } + + } catch (IOException e) { + log.restInvocationError(url, e); + } finally { + if(response != null) { + try { + response.close(); + } catch (IOException e) { + // Ignore + } + } + } + return result; + } + +} http://git-wip-us.apache.org/repos/asf/knox/blob/e766b3b7/gateway-discovery-ambari/src/main/resources/META-INF/services/org.apache.hadoop.gateway.topology.discovery.ClusterConfigurationMonitorProvider ---------------------------------------------------------------------- diff --git a/gateway-discovery-ambari/src/main/resources/META-INF/services/org.apache.hadoop.gateway.topology.discovery.ClusterConfigurationMonitorProvider b/gateway-discovery-ambari/src/main/resources/META-INF/services/org.apache.hadoop.gateway.topology.discovery.ClusterConfigurationMonitorProvider deleted file mode 100644 index d9b2b05..0000000 --- a/gateway-discovery-ambari/src/main/resources/META-INF/services/org.apache.hadoop.gateway.topology.discovery.ClusterConfigurationMonitorProvider +++ /dev/null @@ -1,19 +0,0 @@ -########################################################################## -# Licensed to the Apache Software Foundation (ASF) under one -# or more contributor license agreements. See the NOTICE file -# distributed with this work for additional information -# regarding copyright ownership. The ASF licenses this file -# to you under the Apache License, Version 2.0 (the -# "License"); you may not use this file except in compliance -# with the License. You may obtain a copy of the License at -# -# http://www.apache.org/licenses/LICENSE-2.0 -# -# Unless required by applicable law or agreed to in writing, software -# distributed under the License is distributed on an "AS IS" BASIS, -# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. -# See the License for the specific language governing permissions and -# limitations under the License. -########################################################################## - -org.apache.hadoop.gateway.topology.discovery.ambari.AmbariClusterConfigurationMonitorProvider \ No newline at end of file http://git-wip-us.apache.org/repos/asf/knox/blob/e766b3b7/gateway-discovery-ambari/src/main/resources/META-INF/services/org.apache.knox.gateway.topology.discovery.ClusterConfigurationMonitorProvider ---------------------------------------------------------------------- diff --git a/gateway-discovery-ambari/src/main/resources/META-INF/services/org.apache.knox.gateway.topology.discovery.ClusterConfigurationMonitorProvider b/gateway-discovery-ambari/src/main/resources/META-INF/services/org.apache.knox.gateway.topology.discovery.ClusterConfigurationMonitorProvider new file mode 100644 index 0000000..280485f --- /dev/null +++ b/gateway-discovery-ambari/src/main/resources/META-INF/services/org.apache.knox.gateway.topology.discovery.ClusterConfigurationMonitorProvider @@ -0,0 +1,19 @@ +########################################################################## +# Licensed to the Apache Software Foundation (ASF) under one +# or more contributor license agreements. See the NOTICE file +# distributed with this work for additional information +# regarding copyright ownership. The ASF licenses this file +# to you under the Apache License, Version 2.0 (the +# "License"); you may not use this file except in compliance +# with the License. You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. +########################################################################## + +org.apache.knox.gateway.topology.discovery.ambari.AmbariClusterConfigurationMonitorProvider \ No newline at end of file