http://git-wip-us.apache.org/repos/asf/knox/blob/e766b3b7/gateway-discovery-ambari/src/test/java/org/apache/hadoop/gateway/topology/discovery/ambari/AmbariConfigurationMonitorTest.java ---------------------------------------------------------------------- diff --git a/gateway-discovery-ambari/src/test/java/org/apache/hadoop/gateway/topology/discovery/ambari/AmbariConfigurationMonitorTest.java b/gateway-discovery-ambari/src/test/java/org/apache/hadoop/gateway/topology/discovery/ambari/AmbariConfigurationMonitorTest.java deleted file mode 100644 index 2d8b276..0000000 --- a/gateway-discovery-ambari/src/test/java/org/apache/hadoop/gateway/topology/discovery/ambari/AmbariConfigurationMonitorTest.java +++ /dev/null @@ -1,319 +0,0 @@ -/** - * Licensed to the Apache Software Foundation (ASF) under one or more - * contributor license agreements. See the NOTICE file distributed with this - * work for additional information regarding copyright ownership. The ASF - * licenses this file to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance with the License. - * You may obtain a copy of the License at - * <p> - * http://www.apache.org/licenses/LICENSE-2.0 - * <p> - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, WITHOUT - * WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the - * License for the specific language governing permissions and limitations under - * the License. - */ -package org.apache.hadoop.gateway.topology.discovery.ambari; - -import org.apache.commons.io.FileUtils; -import org.apache.hadoop.gateway.config.GatewayConfig; -import org.apache.hadoop.gateway.topology.discovery.ServiceDiscoveryConfig; -import org.easymock.EasyMock; -import org.junit.After; -import org.junit.Before; -import org.junit.Test; - -import java.io.File; -import java.util.Arrays; -import java.util.Collections; -import java.util.HashMap; -import java.util.List; -import java.util.Map; -import java.util.UUID; - -import static org.junit.Assert.assertEquals; -import static org.junit.Assert.assertNotNull; -import static org.junit.Assert.assertNull; - -public class AmbariConfigurationMonitorTest { - - private File dataDir = null; - - @Before - public void setup() throws Exception { - File targetDir = new File( System.getProperty("user.dir"), "target"); - File tempDir = new File(targetDir, this.getClass().getName() + "__data__" + UUID.randomUUID()); - FileUtils.forceMkdir(tempDir); - dataDir = tempDir; - } - - @After - public void tearDown() throws Exception { - dataDir.delete(); - } - - @Test - public void testPollingMonitor() throws Exception { - final String addr1 = "http://host1:8080"; - final String addr2 = "http://host2:8080"; - final String cluster1Name = "Cluster_One"; - final String cluster2Name = "Cluster_Two"; - - - GatewayConfig config = EasyMock.createNiceMock(GatewayConfig.class); - EasyMock.expect(config.getGatewayDataDir()).andReturn(dataDir.getAbsolutePath()).anyTimes(); - EasyMock.expect(config.getClusterMonitorPollingInterval(AmbariConfigurationMonitor.getType())) - .andReturn(10) - .anyTimes(); - EasyMock.replay(config); - - // Create the monitor - TestableAmbariConfigurationMonitor monitor = new TestableAmbariConfigurationMonitor(config); - - // Clear the system property now that the monitor has been initialized - System.clearProperty(AmbariConfigurationMonitor.INTERVAL_PROPERTY_NAME); - - - // Sequence of config changes for testing monitoring for updates - Map<String, Map<String, List<List<AmbariCluster.ServiceConfiguration>>>> updateConfigurations = new HashMap<>(); - - updateConfigurations.put(addr1, new HashMap<>()); - updateConfigurations.get(addr1).put(cluster1Name, Arrays.asList(Arrays.asList(createTestServiceConfig("zoo.cfg", "3"), - createTestServiceConfig("hive-site", "2")), - Arrays.asList(createTestServiceConfig("zoo.cfg", "3"), - createTestServiceConfig("hive-site", "3")), - Arrays.asList(createTestServiceConfig("zoo.cfg", "2"), - createTestServiceConfig("hive-site", "1")))); - - updateConfigurations.put(addr2, new HashMap<>()); - updateConfigurations.get(addr2).put(cluster2Name, Arrays.asList(Arrays.asList(createTestServiceConfig("zoo.cfg", "1"), - createTestServiceConfig("hive-site", "1")), - Collections.singletonList(createTestServiceConfig("zoo.cfg", "1")), - Arrays.asList(createTestServiceConfig("zoo.cfg", "1"), - createTestServiceConfig("hive-site", "2")))); - - updateConfigurations.get(addr2).put(cluster1Name, Arrays.asList(Arrays.asList(createTestServiceConfig("zoo.cfg", "2"), - createTestServiceConfig("hive-site", "4")), - Arrays.asList(createTestServiceConfig("zoo.cfg", "3"), - createTestServiceConfig("hive-site", "4"), - createTestServiceConfig("yarn-site", "1")), - Arrays.asList(createTestServiceConfig("zoo.cfg", "1"), - createTestServiceConfig("hive-site", "2")))); - - Map<String, Map<String, Integer>> configChangeIndex = new HashMap<>(); - configChangeIndex.put(addr1, new HashMap<>()); - configChangeIndex.get(addr1).put(cluster1Name, 0); - configChangeIndex.get(addr1).put(cluster2Name, 0); - configChangeIndex.put(addr2, new HashMap<>()); - configChangeIndex.get(addr2).put(cluster2Name, 0); - - // Setup the initial test update data - // Cluster 1 data change - monitor.addTestConfigVersion(addr1, cluster1Name, "zoo.cfg", "2"); - monitor.addTestConfigVersion(addr1, cluster1Name, "hive-site", "1"); - - // Cluster 2 NO data change - monitor.addTestConfigVersion(addr2, cluster1Name, "zoo.cfg", "1"); - monitor.addTestConfigVersion(addr2, cluster1Name, "hive-site", "1"); - - // Cluster 3 data change - monitor.addTestConfigVersion(addr2, cluster2Name, "zoo.cfg", "1"); - monitor.addTestConfigVersion(addr2, cluster2Name, "hive-site", "2"); - - Map<String, Map<String, AmbariCluster.ServiceConfiguration>> initialAmbariClusterConfigs = new HashMap<>(); - - Map<String, AmbariCluster.ServiceConfiguration> cluster1Configs = new HashMap<>(); - AmbariCluster.ServiceConfiguration zooCfg = createTestServiceConfig("zoo.cfg", "1"); - cluster1Configs.put("ZOOKEEPER", zooCfg); - - AmbariCluster.ServiceConfiguration hiveSite = createTestServiceConfig("hive-site", "1"); - cluster1Configs.put("Hive", hiveSite); - - initialAmbariClusterConfigs.put(cluster1Name, cluster1Configs); - AmbariCluster cluster1 = createTestCluster(cluster1Name, initialAmbariClusterConfigs); - - // Tell the monitor about the cluster configurations - monitor.addClusterConfigVersions(cluster1, createTestDiscoveryConfig(addr1)); - - monitor.addClusterConfigVersions(createTestCluster(cluster2Name, initialAmbariClusterConfigs), - createTestDiscoveryConfig(addr2)); - - monitor.addClusterConfigVersions(createTestCluster(cluster1Name, initialAmbariClusterConfigs), - createTestDiscoveryConfig(addr2)); - - final Map<String, Map<String, Integer>> changeNotifications = new HashMap<>(); - monitor.addListener((src, cname) -> { -// System.out.println("Cluster config changed: " + cname + " @ " + src); - // Record the notification - Integer notificationCount = changeNotifications.computeIfAbsent(src, s -> new HashMap<>()) - .computeIfAbsent(cname, c -> Integer.valueOf(0)); - changeNotifications.get(src).put(cname, (notificationCount+=1)); - - // Update the config version - int changeIndex = configChangeIndex.get(src).get(cname); - if (changeIndex < updateConfigurations.get(src).get(cname).size()) { - List<AmbariCluster.ServiceConfiguration> changes = updateConfigurations.get(src).get(cname).get(changeIndex); - -// System.out.println("Applying config update " + changeIndex + " to " + cname + " @ " + src + " ..."); - for (AmbariCluster.ServiceConfiguration change : changes) { - monitor.updateConfigState(src, cname, change.getType(), change.getVersion()); -// System.out.println(" Updated " + change.getType() + " to version " + change.getVersion()); - } - - // Increment the change index - configChangeIndex.get(src).replace(cname, changeIndex + 1); - -// System.out.println("Monitor config updated for " + cname + " @ " + src + " : " + changeIndex ); - } - }); - - try { - monitor.start(); - - long expiration = System.currentTimeMillis() + (1000 * 30); - while (!areChangeUpdatesExhausted(updateConfigurations, configChangeIndex) - && (System.currentTimeMillis() < expiration)) { - try { - Thread.sleep(5); - } catch (InterruptedException e) { - // - } - } - - } finally { - monitor.stop(); - } - - assertNotNull("Expected changes to have been reported for source 1.", - changeNotifications.get(addr1)); - - assertEquals("Expected changes to have been reported.", - 3, changeNotifications.get(addr1).get(cluster1Name).intValue()); - - assertNotNull("Expected changes to have been reported for source 2.", - changeNotifications.get(addr2)); - - assertEquals("Expected changes to have been reported.", - 3, changeNotifications.get(addr2).get(cluster2Name).intValue()); - - assertNull("Expected changes to have been reported.", - changeNotifications.get(addr2).get(cluster1Name)); - } - - - private static boolean areChangeUpdatesExhausted(Map<String, Map<String, List<List<AmbariCluster.ServiceConfiguration>>>> updates, - Map<String, Map<String, Integer>> configChangeIndeces) { - boolean isExhausted = true; - - for (String address : updates.keySet()) { - Map<String, List<List<AmbariCluster.ServiceConfiguration>>> clusterConfigs = updates.get(address); - for (String clusterName : clusterConfigs.keySet()) { - Integer configChangeCount = clusterConfigs.get(clusterName).size(); - if (configChangeIndeces.get(address).containsKey(clusterName)) { - if (configChangeIndeces.get(address).get(clusterName) < configChangeCount) { - isExhausted = false; - break; - } - } - } - } - - return isExhausted; - } - - /** - * - * @param name The cluster name - * @param serviceConfigs A map of service configurations (keyed by service name) - * - * @return - */ - private AmbariCluster createTestCluster(String name, - Map<String, Map<String, AmbariCluster.ServiceConfiguration>> serviceConfigs) { - AmbariCluster c = EasyMock.createNiceMock(AmbariCluster.class); - EasyMock.expect(c.getName()).andReturn(name).anyTimes(); - EasyMock.expect(c.getServiceConfigurations()).andReturn(serviceConfigs).anyTimes(); - EasyMock.replay(c); - return c; - } - - private AmbariCluster.ServiceConfiguration createTestServiceConfig(String name, String version) { - AmbariCluster.ServiceConfiguration sc = EasyMock.createNiceMock(AmbariCluster.ServiceConfiguration.class); - EasyMock.expect(sc.getType()).andReturn(name).anyTimes(); - EasyMock.expect(sc.getVersion()).andReturn(version).anyTimes(); - EasyMock.replay(sc); - return sc; - } - - private ServiceDiscoveryConfig createTestDiscoveryConfig(String address) { - return createTestDiscoveryConfig(address, null, null); - } - - private ServiceDiscoveryConfig createTestDiscoveryConfig(String address, String username, String pwdAlias) { - ServiceDiscoveryConfig sdc = EasyMock.createNiceMock(ServiceDiscoveryConfig.class); - EasyMock.expect(sdc.getAddress()).andReturn(address).anyTimes(); - EasyMock.expect(sdc.getUser()).andReturn(username).anyTimes(); - EasyMock.expect(sdc.getPasswordAlias()).andReturn(pwdAlias).anyTimes(); - EasyMock.replay(sdc); - return sdc; - } - - /** - * AmbariConfigurationMonitor extension that replaces the collection of updated configuration data with a static - * mechanism rather than the REST invocation mechanism. - */ - private static final class TestableAmbariConfigurationMonitor extends AmbariConfigurationMonitor { - - Map<String, Map<String, Map<String, String>>> configVersionData = new HashMap<>(); - - TestableAmbariConfigurationMonitor(GatewayConfig config) { - super(config, null); - } - - void addTestConfigVersion(String address, String clusterName, String configType, String configVersion) { - configVersionData.computeIfAbsent(address, a -> new HashMap<>()) - .computeIfAbsent(clusterName, cl -> new HashMap<>()) - .put(configType, configVersion); - } - - void addTestConfigVersions(String address, String clusterName, Map<String, String> configVersions) { - configVersionData.computeIfAbsent(address, a -> new HashMap<>()) - .computeIfAbsent(clusterName, cl -> new HashMap<>()) - .putAll(configVersions); - } - - void updateTestConfigVersion(String address, String clusterName, String configType, String updatedVersions) { - configVersionData.computeIfAbsent(address, a -> new HashMap<>()) - .computeIfAbsent(clusterName, cl -> new HashMap<>()) - .replace(configType, updatedVersions); - } - - void updateTestConfigVersions(String address, String clusterName, Map<String, String> updatedVersions) { - configVersionData.computeIfAbsent(address, a -> new HashMap<>()) - .computeIfAbsent(clusterName, cl -> new HashMap<>()) - .replaceAll((k,v) -> updatedVersions.get(k)); - } - - void updateConfigState(String address, String clusterName, String configType, String configVersion) { - configVersionsLock.writeLock().lock(); - try { - if (ambariClusterConfigVersions.containsKey(address)) { - ambariClusterConfigVersions.get(address).get(clusterName).replace(configType, configVersion); - } - } finally { - configVersionsLock.writeLock().unlock(); - } - } - - @Override - Map<String, String> getUpdatedConfigVersions(String address, String clusterName) { - Map<String, Map<String, String>> clusterConfigVersions = configVersionData.get(address); - if (clusterConfigVersions != null) { - return clusterConfigVersions.get(clusterName); - } - return null; - } - } - -}
http://git-wip-us.apache.org/repos/asf/knox/blob/e766b3b7/gateway-discovery-ambari/src/test/java/org/apache/knox/gateway/topology/discovery/ambari/AmbariConfigurationMonitorTest.java ---------------------------------------------------------------------- diff --git a/gateway-discovery-ambari/src/test/java/org/apache/knox/gateway/topology/discovery/ambari/AmbariConfigurationMonitorTest.java b/gateway-discovery-ambari/src/test/java/org/apache/knox/gateway/topology/discovery/ambari/AmbariConfigurationMonitorTest.java new file mode 100644 index 0000000..7411545 --- /dev/null +++ b/gateway-discovery-ambari/src/test/java/org/apache/knox/gateway/topology/discovery/ambari/AmbariConfigurationMonitorTest.java @@ -0,0 +1,319 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with this + * work for additional information regarding copyright ownership. The ASF + * licenses this file to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * <p> + * http://www.apache.org/licenses/LICENSE-2.0 + * <p> + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, WITHOUT + * WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the + * License for the specific language governing permissions and limitations under + * the License. + */ +package org.apache.knox.gateway.topology.discovery.ambari; + +import org.apache.commons.io.FileUtils; +import org.apache.knox.gateway.config.GatewayConfig; +import org.apache.knox.gateway.topology.discovery.ServiceDiscoveryConfig; +import org.easymock.EasyMock; +import org.junit.After; +import org.junit.Before; +import org.junit.Test; + +import java.io.File; +import java.util.Arrays; +import java.util.Collections; +import java.util.HashMap; +import java.util.List; +import java.util.Map; +import java.util.UUID; + +import static org.junit.Assert.assertEquals; +import static org.junit.Assert.assertNotNull; +import static org.junit.Assert.assertNull; + +public class AmbariConfigurationMonitorTest { + + private File dataDir = null; + + @Before + public void setup() throws Exception { + File targetDir = new File( System.getProperty("user.dir"), "target"); + File tempDir = new File(targetDir, this.getClass().getName() + "__data__" + UUID.randomUUID()); + FileUtils.forceMkdir(tempDir); + dataDir = tempDir; + } + + @After + public void tearDown() throws Exception { + dataDir.delete(); + } + + @Test + public void testPollingMonitor() throws Exception { + final String addr1 = "http://host1:8080"; + final String addr2 = "http://host2:8080"; + final String cluster1Name = "Cluster_One"; + final String cluster2Name = "Cluster_Two"; + + + GatewayConfig config = EasyMock.createNiceMock(GatewayConfig.class); + EasyMock.expect(config.getGatewayDataDir()).andReturn(dataDir.getAbsolutePath()).anyTimes(); + EasyMock.expect(config.getClusterMonitorPollingInterval(AmbariConfigurationMonitor.getType())) + .andReturn(10) + .anyTimes(); + EasyMock.replay(config); + + // Create the monitor + TestableAmbariConfigurationMonitor monitor = new TestableAmbariConfigurationMonitor(config); + + // Clear the system property now that the monitor has been initialized + System.clearProperty(AmbariConfigurationMonitor.INTERVAL_PROPERTY_NAME); + + + // Sequence of config changes for testing monitoring for updates + Map<String, Map<String, List<List<AmbariCluster.ServiceConfiguration>>>> updateConfigurations = new HashMap<>(); + + updateConfigurations.put(addr1, new HashMap<>()); + updateConfigurations.get(addr1).put(cluster1Name, Arrays.asList(Arrays.asList(createTestServiceConfig("zoo.cfg", "3"), + createTestServiceConfig("hive-site", "2")), + Arrays.asList(createTestServiceConfig("zoo.cfg", "3"), + createTestServiceConfig("hive-site", "3")), + Arrays.asList(createTestServiceConfig("zoo.cfg", "2"), + createTestServiceConfig("hive-site", "1")))); + + updateConfigurations.put(addr2, new HashMap<>()); + updateConfigurations.get(addr2).put(cluster2Name, Arrays.asList(Arrays.asList(createTestServiceConfig("zoo.cfg", "1"), + createTestServiceConfig("hive-site", "1")), + Collections.singletonList(createTestServiceConfig("zoo.cfg", "1")), + Arrays.asList(createTestServiceConfig("zoo.cfg", "1"), + createTestServiceConfig("hive-site", "2")))); + + updateConfigurations.get(addr2).put(cluster1Name, Arrays.asList(Arrays.asList(createTestServiceConfig("zoo.cfg", "2"), + createTestServiceConfig("hive-site", "4")), + Arrays.asList(createTestServiceConfig("zoo.cfg", "3"), + createTestServiceConfig("hive-site", "4"), + createTestServiceConfig("yarn-site", "1")), + Arrays.asList(createTestServiceConfig("zoo.cfg", "1"), + createTestServiceConfig("hive-site", "2")))); + + Map<String, Map<String, Integer>> configChangeIndex = new HashMap<>(); + configChangeIndex.put(addr1, new HashMap<>()); + configChangeIndex.get(addr1).put(cluster1Name, 0); + configChangeIndex.get(addr1).put(cluster2Name, 0); + configChangeIndex.put(addr2, new HashMap<>()); + configChangeIndex.get(addr2).put(cluster2Name, 0); + + // Setup the initial test update data + // Cluster 1 data change + monitor.addTestConfigVersion(addr1, cluster1Name, "zoo.cfg", "2"); + monitor.addTestConfigVersion(addr1, cluster1Name, "hive-site", "1"); + + // Cluster 2 NO data change + monitor.addTestConfigVersion(addr2, cluster1Name, "zoo.cfg", "1"); + monitor.addTestConfigVersion(addr2, cluster1Name, "hive-site", "1"); + + // Cluster 3 data change + monitor.addTestConfigVersion(addr2, cluster2Name, "zoo.cfg", "1"); + monitor.addTestConfigVersion(addr2, cluster2Name, "hive-site", "2"); + + Map<String, Map<String, AmbariCluster.ServiceConfiguration>> initialAmbariClusterConfigs = new HashMap<>(); + + Map<String, AmbariCluster.ServiceConfiguration> cluster1Configs = new HashMap<>(); + AmbariCluster.ServiceConfiguration zooCfg = createTestServiceConfig("zoo.cfg", "1"); + cluster1Configs.put("ZOOKEEPER", zooCfg); + + AmbariCluster.ServiceConfiguration hiveSite = createTestServiceConfig("hive-site", "1"); + cluster1Configs.put("Hive", hiveSite); + + initialAmbariClusterConfigs.put(cluster1Name, cluster1Configs); + AmbariCluster cluster1 = createTestCluster(cluster1Name, initialAmbariClusterConfigs); + + // Tell the monitor about the cluster configurations + monitor.addClusterConfigVersions(cluster1, createTestDiscoveryConfig(addr1)); + + monitor.addClusterConfigVersions(createTestCluster(cluster2Name, initialAmbariClusterConfigs), + createTestDiscoveryConfig(addr2)); + + monitor.addClusterConfigVersions(createTestCluster(cluster1Name, initialAmbariClusterConfigs), + createTestDiscoveryConfig(addr2)); + + final Map<String, Map<String, Integer>> changeNotifications = new HashMap<>(); + monitor.addListener((src, cname) -> { +// System.out.println("Cluster config changed: " + cname + " @ " + src); + // Record the notification + Integer notificationCount = changeNotifications.computeIfAbsent(src, s -> new HashMap<>()) + .computeIfAbsent(cname, c -> Integer.valueOf(0)); + changeNotifications.get(src).put(cname, (notificationCount+=1)); + + // Update the config version + int changeIndex = configChangeIndex.get(src).get(cname); + if (changeIndex < updateConfigurations.get(src).get(cname).size()) { + List<AmbariCluster.ServiceConfiguration> changes = updateConfigurations.get(src).get(cname).get(changeIndex); + +// System.out.println("Applying config update " + changeIndex + " to " + cname + " @ " + src + " ..."); + for (AmbariCluster.ServiceConfiguration change : changes) { + monitor.updateConfigState(src, cname, change.getType(), change.getVersion()); +// System.out.println(" Updated " + change.getType() + " to version " + change.getVersion()); + } + + // Increment the change index + configChangeIndex.get(src).replace(cname, changeIndex + 1); + +// System.out.println("Monitor config updated for " + cname + " @ " + src + " : " + changeIndex ); + } + }); + + try { + monitor.start(); + + long expiration = System.currentTimeMillis() + (1000 * 30); + while (!areChangeUpdatesExhausted(updateConfigurations, configChangeIndex) + && (System.currentTimeMillis() < expiration)) { + try { + Thread.sleep(5); + } catch (InterruptedException e) { + // + } + } + + } finally { + monitor.stop(); + } + + assertNotNull("Expected changes to have been reported for source 1.", + changeNotifications.get(addr1)); + + assertEquals("Expected changes to have been reported.", + 3, changeNotifications.get(addr1).get(cluster1Name).intValue()); + + assertNotNull("Expected changes to have been reported for source 2.", + changeNotifications.get(addr2)); + + assertEquals("Expected changes to have been reported.", + 3, changeNotifications.get(addr2).get(cluster2Name).intValue()); + + assertNull("Expected changes to have been reported.", + changeNotifications.get(addr2).get(cluster1Name)); + } + + + private static boolean areChangeUpdatesExhausted(Map<String, Map<String, List<List<AmbariCluster.ServiceConfiguration>>>> updates, + Map<String, Map<String, Integer>> configChangeIndeces) { + boolean isExhausted = true; + + for (String address : updates.keySet()) { + Map<String, List<List<AmbariCluster.ServiceConfiguration>>> clusterConfigs = updates.get(address); + for (String clusterName : clusterConfigs.keySet()) { + Integer configChangeCount = clusterConfigs.get(clusterName).size(); + if (configChangeIndeces.get(address).containsKey(clusterName)) { + if (configChangeIndeces.get(address).get(clusterName) < configChangeCount) { + isExhausted = false; + break; + } + } + } + } + + return isExhausted; + } + + /** + * + * @param name The cluster name + * @param serviceConfigs A map of service configurations (keyed by service name) + * + * @return + */ + private AmbariCluster createTestCluster(String name, + Map<String, Map<String, AmbariCluster.ServiceConfiguration>> serviceConfigs) { + AmbariCluster c = EasyMock.createNiceMock(AmbariCluster.class); + EasyMock.expect(c.getName()).andReturn(name).anyTimes(); + EasyMock.expect(c.getServiceConfigurations()).andReturn(serviceConfigs).anyTimes(); + EasyMock.replay(c); + return c; + } + + private AmbariCluster.ServiceConfiguration createTestServiceConfig(String name, String version) { + AmbariCluster.ServiceConfiguration sc = EasyMock.createNiceMock(AmbariCluster.ServiceConfiguration.class); + EasyMock.expect(sc.getType()).andReturn(name).anyTimes(); + EasyMock.expect(sc.getVersion()).andReturn(version).anyTimes(); + EasyMock.replay(sc); + return sc; + } + + private ServiceDiscoveryConfig createTestDiscoveryConfig(String address) { + return createTestDiscoveryConfig(address, null, null); + } + + private ServiceDiscoveryConfig createTestDiscoveryConfig(String address, String username, String pwdAlias) { + ServiceDiscoveryConfig sdc = EasyMock.createNiceMock(ServiceDiscoveryConfig.class); + EasyMock.expect(sdc.getAddress()).andReturn(address).anyTimes(); + EasyMock.expect(sdc.getUser()).andReturn(username).anyTimes(); + EasyMock.expect(sdc.getPasswordAlias()).andReturn(pwdAlias).anyTimes(); + EasyMock.replay(sdc); + return sdc; + } + + /** + * AmbariConfigurationMonitor extension that replaces the collection of updated configuration data with a static + * mechanism rather than the REST invocation mechanism. + */ + private static final class TestableAmbariConfigurationMonitor extends AmbariConfigurationMonitor { + + Map<String, Map<String, Map<String, String>>> configVersionData = new HashMap<>(); + + TestableAmbariConfigurationMonitor(GatewayConfig config) { + super(config, null); + } + + void addTestConfigVersion(String address, String clusterName, String configType, String configVersion) { + configVersionData.computeIfAbsent(address, a -> new HashMap<>()) + .computeIfAbsent(clusterName, cl -> new HashMap<>()) + .put(configType, configVersion); + } + + void addTestConfigVersions(String address, String clusterName, Map<String, String> configVersions) { + configVersionData.computeIfAbsent(address, a -> new HashMap<>()) + .computeIfAbsent(clusterName, cl -> new HashMap<>()) + .putAll(configVersions); + } + + void updateTestConfigVersion(String address, String clusterName, String configType, String updatedVersions) { + configVersionData.computeIfAbsent(address, a -> new HashMap<>()) + .computeIfAbsent(clusterName, cl -> new HashMap<>()) + .replace(configType, updatedVersions); + } + + void updateTestConfigVersions(String address, String clusterName, Map<String, String> updatedVersions) { + configVersionData.computeIfAbsent(address, a -> new HashMap<>()) + .computeIfAbsent(clusterName, cl -> new HashMap<>()) + .replaceAll((k,v) -> updatedVersions.get(k)); + } + + void updateConfigState(String address, String clusterName, String configType, String configVersion) { + configVersionsLock.writeLock().lock(); + try { + if (ambariClusterConfigVersions.containsKey(address)) { + ambariClusterConfigVersions.get(address).get(clusterName).replace(configType, configVersion); + } + } finally { + configVersionsLock.writeLock().unlock(); + } + } + + @Override + Map<String, String> getUpdatedConfigVersions(String address, String clusterName) { + Map<String, Map<String, String>> clusterConfigVersions = configVersionData.get(address); + if (clusterConfigVersions != null) { + return clusterConfigVersions.get(clusterName); + } + return null; + } + } + +} http://git-wip-us.apache.org/repos/asf/knox/blob/e766b3b7/gateway-server/src/main/java/org/apache/hadoop/gateway/services/topology/impl/DefaultClusterConfigurationMonitorService.java ---------------------------------------------------------------------- diff --git a/gateway-server/src/main/java/org/apache/hadoop/gateway/services/topology/impl/DefaultClusterConfigurationMonitorService.java b/gateway-server/src/main/java/org/apache/hadoop/gateway/services/topology/impl/DefaultClusterConfigurationMonitorService.java deleted file mode 100644 index 342ce11..0000000 --- a/gateway-server/src/main/java/org/apache/hadoop/gateway/services/topology/impl/DefaultClusterConfigurationMonitorService.java +++ /dev/null @@ -1,81 +0,0 @@ -/** - * Licensed to the Apache Software Foundation (ASF) under one or more - * contributor license agreements. See the NOTICE file distributed with this - * work for additional information regarding copyright ownership. The ASF - * licenses this file to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance with the License. - * You may obtain a copy of the License at - * <p> - * http://www.apache.org/licenses/LICENSE-2.0 - * <p> - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, WITHOUT - * WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the - * License for the specific language governing permissions and limitations under - * the License. - */ -package org.apache.hadoop.gateway.services.topology.impl; - -import org.apache.hadoop.gateway.config.GatewayConfig; -import org.apache.hadoop.gateway.services.ServiceLifecycleException; -import org.apache.hadoop.gateway.services.security.AliasService; -import org.apache.hadoop.gateway.topology.ClusterConfigurationMonitorService; -import org.apache.hadoop.gateway.topology.discovery.ClusterConfigurationMonitor; -import org.apache.hadoop.gateway.topology.discovery.ClusterConfigurationMonitorProvider; - -import java.util.HashMap; -import java.util.Map; -import java.util.ServiceLoader; - - -public class DefaultClusterConfigurationMonitorService implements ClusterConfigurationMonitorService { - - private AliasService aliasService = null; - - private Map<String, ClusterConfigurationMonitor> monitors = new HashMap<>(); - - @Override - public void init(GatewayConfig config, Map<String, String> options) throws ServiceLifecycleException { - ServiceLoader<ClusterConfigurationMonitorProvider> providers = - ServiceLoader.load(ClusterConfigurationMonitorProvider.class); - for (ClusterConfigurationMonitorProvider provider : providers) { - // Check the gateway configuration to determine if this type of monitor is enabled - if (config.isClusterMonitorEnabled(provider.getType())) { - ClusterConfigurationMonitor monitor = provider.newInstance(config, aliasService); - if (monitor != null) { - monitors.put(provider.getType(), monitor); - } - } - } - } - - @Override - public void start() { - for (ClusterConfigurationMonitor monitor : monitors.values()) { - monitor.start(); - } - } - - @Override - public void stop() { - for (ClusterConfigurationMonitor monitor : monitors.values()) { - monitor.stop(); - } - } - - @Override - public ClusterConfigurationMonitor getMonitor(String type) { - return monitors.get(type); - } - - @Override - public void addListener(ClusterConfigurationMonitor.ConfigurationChangeListener listener) { - for (ClusterConfigurationMonitor monitor : monitors.values()) { - monitor.addListener(listener); - } - } - - public void setAliasService(AliasService aliasService) { - this.aliasService = aliasService; - } -} http://git-wip-us.apache.org/repos/asf/knox/blob/e766b3b7/gateway-server/src/main/java/org/apache/hadoop/gateway/topology/monitor/DefaultConfigurationMonitorProvider.java ---------------------------------------------------------------------- diff --git a/gateway-server/src/main/java/org/apache/hadoop/gateway/topology/monitor/DefaultConfigurationMonitorProvider.java b/gateway-server/src/main/java/org/apache/hadoop/gateway/topology/monitor/DefaultConfigurationMonitorProvider.java deleted file mode 100644 index 7b34e3d..0000000 --- a/gateway-server/src/main/java/org/apache/hadoop/gateway/topology/monitor/DefaultConfigurationMonitorProvider.java +++ /dev/null @@ -1,31 +0,0 @@ -/** - * Licensed to the Apache Software Foundation (ASF) under one or more - * contributor license agreements. See the NOTICE file distributed with this - * work for additional information regarding copyright ownership. The ASF - * licenses this file to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance with the License. - * You may obtain a copy of the License at - * <p> - * http://www.apache.org/licenses/LICENSE-2.0 - * <p> - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, WITHOUT - * WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the - * License for the specific language governing permissions and limitations under - * the License. - */ -package org.apache.hadoop.gateway.topology.monitor; - -import org.apache.hadoop.gateway.config.GatewayConfig; -import org.apache.hadoop.gateway.services.config.client.RemoteConfigurationRegistryClientService; - - -public class DefaultConfigurationMonitorProvider implements RemoteConfigurationMonitorProvider { - - @Override - public RemoteConfigurationMonitor newInstance(final GatewayConfig config, - final RemoteConfigurationRegistryClientService clientService) { - return new DefaultRemoteConfigurationMonitor(config, clientService); - } - -} http://git-wip-us.apache.org/repos/asf/knox/blob/e766b3b7/gateway-server/src/main/java/org/apache/hadoop/gateway/topology/monitor/DefaultRemoteConfigurationMonitor.java ---------------------------------------------------------------------- diff --git a/gateway-server/src/main/java/org/apache/hadoop/gateway/topology/monitor/DefaultRemoteConfigurationMonitor.java b/gateway-server/src/main/java/org/apache/hadoop/gateway/topology/monitor/DefaultRemoteConfigurationMonitor.java deleted file mode 100644 index af60058..0000000 --- a/gateway-server/src/main/java/org/apache/hadoop/gateway/topology/monitor/DefaultRemoteConfigurationMonitor.java +++ /dev/null @@ -1,228 +0,0 @@ -/** - * Licensed to the Apache Software Foundation (ASF) under one or more - * contributor license agreements. See the NOTICE file distributed with this - * work for additional information regarding copyright ownership. The ASF - * licenses this file to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance with the License. - * You may obtain a copy of the License at - * <p> - * http://www.apache.org/licenses/LICENSE-2.0 - * <p> - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, WITHOUT - * WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the - * License for the specific language governing permissions and limitations under - * the License. - */ -package org.apache.hadoop.gateway.topology.monitor; - -import org.apache.commons.io.FileUtils; -import org.apache.hadoop.gateway.GatewayMessages; -import org.apache.hadoop.gateway.config.GatewayConfig; -import org.apache.hadoop.gateway.i18n.messages.MessagesFactory; -import org.apache.hadoop.gateway.services.config.client.RemoteConfigurationRegistryClient.ChildEntryListener; -import org.apache.hadoop.gateway.services.config.client.RemoteConfigurationRegistryClient.EntryListener; -import org.apache.hadoop.gateway.services.config.client.RemoteConfigurationRegistryClient; -import org.apache.hadoop.gateway.services.config.client.RemoteConfigurationRegistryClientService; -import org.apache.zookeeper.ZooDefs; - -import java.io.File; -import java.io.IOException; -import java.util.ArrayList; -import java.util.Collections; -import java.util.List; - - -class DefaultRemoteConfigurationMonitor implements RemoteConfigurationMonitor { - - private static final String NODE_KNOX = "/knox"; - private static final String NODE_KNOX_CONFIG = NODE_KNOX + "/config"; - private static final String NODE_KNOX_PROVIDERS = NODE_KNOX_CONFIG + "/shared-providers"; - private static final String NODE_KNOX_DESCRIPTORS = NODE_KNOX_CONFIG + "/descriptors"; - - private static GatewayMessages log = MessagesFactory.get(GatewayMessages.class); - - // N.B. This is ZooKeeper-specific, and should be abstracted when another registry is supported - private static final RemoteConfigurationRegistryClient.EntryACL AUTHENTICATED_USERS_ALL; - static { - AUTHENTICATED_USERS_ALL = new RemoteConfigurationRegistryClient.EntryACL() { - public String getId() { - return ""; - } - - public String getType() { - return "auth"; - } - - public Object getPermissions() { - return ZooDefs.Perms.ALL; - } - - public boolean canRead() { - return true; - } - - public boolean canWrite() { - return true; - } - }; - } - - private RemoteConfigurationRegistryClient client = null; - - private File providersDir; - private File descriptorsDir; - - /** - * @param config The gateway configuration - * @param registryClientService The service from which the remote registry client should be acquired. - */ - DefaultRemoteConfigurationMonitor(GatewayConfig config, - RemoteConfigurationRegistryClientService registryClientService) { - this.providersDir = new File(config.getGatewayProvidersConfigDir()); - this.descriptorsDir = new File(config.getGatewayDescriptorsDir()); - - if (registryClientService != null) { - String clientName = config.getRemoteConfigurationMonitorClientName(); - if (clientName != null) { - this.client = registryClientService.get(clientName); - if (this.client == null) { - log.unresolvedClientConfigurationForRemoteMonitoring(clientName); - } - } else { - log.missingClientConfigurationForRemoteMonitoring(); - } - } - } - - @Override - public void start() throws Exception { - if (client == null) { - throw new IllegalStateException("Failed to acquire a remote configuration registry client."); - } - - final String monitorSource = client.getAddress(); - log.startingRemoteConfigurationMonitor(monitorSource); - - // Ensure the existence of the expected entries and their associated ACLs - ensureEntries(); - - // Confirm access to the remote provider configs directory znode - List<String> providerConfigs = client.listChildEntries(NODE_KNOX_PROVIDERS); - if (providerConfigs == null) { - // Either the ZNode does not exist, or there is an authentication problem - throw new IllegalStateException("Unable to access remote path: " + NODE_KNOX_PROVIDERS); - } - - // Confirm access to the remote descriptors directory znode - List<String> descriptors = client.listChildEntries(NODE_KNOX_DESCRIPTORS); - if (descriptors == null) { - // Either the ZNode does not exist, or there is an authentication problem - throw new IllegalStateException("Unable to access remote path: " + NODE_KNOX_DESCRIPTORS); - } - - // Register a listener for provider config znode additions/removals - client.addChildEntryListener(NODE_KNOX_PROVIDERS, new ConfigDirChildEntryListener(providersDir)); - - // Register a listener for descriptor znode additions/removals - client.addChildEntryListener(NODE_KNOX_DESCRIPTORS, new ConfigDirChildEntryListener(descriptorsDir)); - - log.monitoringRemoteConfigurationSource(monitorSource); - } - - - @Override - public void stop() throws Exception { - client.removeEntryListener(NODE_KNOX_PROVIDERS); - client.removeEntryListener(NODE_KNOX_DESCRIPTORS); - } - - private void ensureEntries() { - ensureEntry(NODE_KNOX); - ensureEntry(NODE_KNOX_CONFIG); - ensureEntry(NODE_KNOX_PROVIDERS); - ensureEntry(NODE_KNOX_DESCRIPTORS); - } - - private void ensureEntry(String name) { - if (!client.entryExists(name)) { - client.createEntry(name); - } else { - // Validate the ACL - List<RemoteConfigurationRegistryClient.EntryACL> entryACLs = client.getACL(name); - for (RemoteConfigurationRegistryClient.EntryACL entryACL : entryACLs) { - // N.B. This is ZooKeeper-specific, and should be abstracted when another registry is supported - // For now, check for ZooKeeper world:anyone with ANY permissions (even read-only) - if (entryACL.getType().equals("world") && entryACL.getId().equals("anyone")) { - log.suspectWritableRemoteConfigurationEntry(name); - - // If the client is authenticated, but "anyone" can write the content, then the content may not - // be trustworthy. - if (client.isAuthenticationConfigured()) { - log.correctingSuspectWritableRemoteConfigurationEntry(name); - - // Replace the existing ACL with one that permits only authenticated users - client.setACL(name, Collections.singletonList(AUTHENTICATED_USERS_ALL)); - } - } - } - } - } - - private static class ConfigDirChildEntryListener implements ChildEntryListener { - File localDir; - - ConfigDirChildEntryListener(File localDir) { - this.localDir = localDir; - } - - @Override - public void childEvent(RemoteConfigurationRegistryClient client, Type type, String path) { - File localFile = new File(localDir, path.substring(path.lastIndexOf("/") + 1)); - - switch (type) { - case REMOVED: - FileUtils.deleteQuietly(localFile); - log.deletedRemoteConfigFile(localDir.getName(), localFile.getName()); - try { - client.removeEntryListener(path); - } catch (Exception e) { - log.errorRemovingRemoteConfigurationListenerForPath(path, e); - } - break; - case ADDED: - try { - client.addEntryListener(path, new ConfigEntryListener(localDir)); - } catch (Exception e) { - log.errorAddingRemoteConfigurationListenerForPath(path, e); - } - break; - } - } - } - - private static class ConfigEntryListener implements EntryListener { - private File localDir; - - ConfigEntryListener(File localDir) { - this.localDir = localDir; - } - - @Override - public void entryChanged(RemoteConfigurationRegistryClient client, String path, byte[] data) { - File localFile = new File(localDir, path.substring(path.lastIndexOf("/"))); - if (data != null) { - try { - FileUtils.writeByteArrayToFile(localFile, data); - log.downloadedRemoteConfigFile(localDir.getName(), localFile.getName()); - } catch (IOException e) { - log.errorDownloadingRemoteConfiguration(path, e); - } - } else { - FileUtils.deleteQuietly(localFile); - log.deletedRemoteConfigFile(localDir.getName(), localFile.getName()); - } - } - } - -} http://git-wip-us.apache.org/repos/asf/knox/blob/e766b3b7/gateway-server/src/main/java/org/apache/hadoop/gateway/topology/monitor/RemoteConfigurationMonitorFactory.java ---------------------------------------------------------------------- diff --git a/gateway-server/src/main/java/org/apache/hadoop/gateway/topology/monitor/RemoteConfigurationMonitorFactory.java b/gateway-server/src/main/java/org/apache/hadoop/gateway/topology/monitor/RemoteConfigurationMonitorFactory.java deleted file mode 100644 index 4d2df45..0000000 --- a/gateway-server/src/main/java/org/apache/hadoop/gateway/topology/monitor/RemoteConfigurationMonitorFactory.java +++ /dev/null @@ -1,74 +0,0 @@ -/** - * Licensed to the Apache Software Foundation (ASF) under one or more - * contributor license agreements. See the NOTICE file distributed with this - * work for additional information regarding copyright ownership. The ASF - * licenses this file to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance with the License. - * You may obtain a copy of the License at - * <p> - * http://www.apache.org/licenses/LICENSE-2.0 - * <p> - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, WITHOUT - * WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the - * License for the specific language governing permissions and limitations under - * the License. - */ -package org.apache.hadoop.gateway.topology.monitor; - -import org.apache.hadoop.gateway.GatewayMessages; -import org.apache.hadoop.gateway.GatewayServer; -import org.apache.hadoop.gateway.config.GatewayConfig; -import org.apache.hadoop.gateway.i18n.messages.MessagesFactory; -import org.apache.hadoop.gateway.services.GatewayServices; -import org.apache.hadoop.gateway.services.config.client.RemoteConfigurationRegistryClientService; - -import java.util.ServiceLoader; - -public class RemoteConfigurationMonitorFactory { - - private static final GatewayMessages log = MessagesFactory.get(GatewayMessages.class); - - private static RemoteConfigurationRegistryClientService remoteConfigRegistryClientService = null; - - public static void setClientService(RemoteConfigurationRegistryClientService clientService) { - remoteConfigRegistryClientService = clientService; - } - - private static RemoteConfigurationRegistryClientService getClientService() { - if (remoteConfigRegistryClientService == null) { - GatewayServices services = GatewayServer.getGatewayServices(); - if (services != null) { - remoteConfigRegistryClientService = services.getService(GatewayServices.REMOTE_REGISTRY_CLIENT_SERVICE); - } - } - - return remoteConfigRegistryClientService; - } - - /** - * - * @param config The GatewayConfig - * - * @return The first RemoteConfigurationMonitor extension that is found. - */ - public static RemoteConfigurationMonitor get(GatewayConfig config) { - RemoteConfigurationMonitor rcm = null; - - ServiceLoader<RemoteConfigurationMonitorProvider> providers = - ServiceLoader.load(RemoteConfigurationMonitorProvider.class); - for (RemoteConfigurationMonitorProvider provider : providers) { - try { - rcm = provider.newInstance(config, getClientService()); - if (rcm != null) { - break; - } - } catch (Exception e) { - log.remoteConfigurationMonitorInitFailure(e.getLocalizedMessage(), e); - } - } - - return rcm; - } - -} http://git-wip-us.apache.org/repos/asf/knox/blob/e766b3b7/gateway-server/src/main/java/org/apache/knox/gateway/services/CLIGatewayServices.java ---------------------------------------------------------------------- diff --git a/gateway-server/src/main/java/org/apache/knox/gateway/services/CLIGatewayServices.java b/gateway-server/src/main/java/org/apache/knox/gateway/services/CLIGatewayServices.java index a1ed549..f168d44 100644 --- a/gateway-server/src/main/java/org/apache/knox/gateway/services/CLIGatewayServices.java +++ b/gateway-server/src/main/java/org/apache/knox/gateway/services/CLIGatewayServices.java @@ -24,7 +24,7 @@ import org.apache.knox.gateway.descriptor.FilterParamDescriptor; import org.apache.knox.gateway.descriptor.ResourceDescriptor; import org.apache.knox.gateway.i18n.messages.MessagesFactory; import org.apache.knox.gateway.service.config.remote.RemoteConfigurationRegistryClientServiceFactory; -import org.apache.hadoop.gateway.services.config.client.RemoteConfigurationRegistryClientService; +import org.apache.knox.gateway.services.config.client.RemoteConfigurationRegistryClientService; import org.apache.knox.gateway.services.topology.impl.DefaultTopologyService; import org.apache.knox.gateway.services.security.impl.DefaultAliasService; import org.apache.knox.gateway.services.security.impl.DefaultCryptoService; http://git-wip-us.apache.org/repos/asf/knox/blob/e766b3b7/gateway-server/src/main/java/org/apache/knox/gateway/services/topology/impl/DefaultClusterConfigurationMonitorService.java ---------------------------------------------------------------------- diff --git a/gateway-server/src/main/java/org/apache/knox/gateway/services/topology/impl/DefaultClusterConfigurationMonitorService.java b/gateway-server/src/main/java/org/apache/knox/gateway/services/topology/impl/DefaultClusterConfigurationMonitorService.java new file mode 100644 index 0000000..e7ef01d --- /dev/null +++ b/gateway-server/src/main/java/org/apache/knox/gateway/services/topology/impl/DefaultClusterConfigurationMonitorService.java @@ -0,0 +1,81 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with this + * work for additional information regarding copyright ownership. The ASF + * licenses this file to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * <p> + * http://www.apache.org/licenses/LICENSE-2.0 + * <p> + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, WITHOUT + * WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the + * License for the specific language governing permissions and limitations under + * the License. + */ +package org.apache.knox.gateway.services.topology.impl; + +import org.apache.knox.gateway.config.GatewayConfig; +import org.apache.knox.gateway.services.ServiceLifecycleException; +import org.apache.knox.gateway.services.security.AliasService; +import org.apache.knox.gateway.topology.ClusterConfigurationMonitorService; +import org.apache.knox.gateway.topology.discovery.ClusterConfigurationMonitor; +import org.apache.knox.gateway.topology.discovery.ClusterConfigurationMonitorProvider; + +import java.util.HashMap; +import java.util.Map; +import java.util.ServiceLoader; + + +public class DefaultClusterConfigurationMonitorService implements ClusterConfigurationMonitorService { + + private AliasService aliasService = null; + + private Map<String, ClusterConfigurationMonitor> monitors = new HashMap<>(); + + @Override + public void init(GatewayConfig config, Map<String, String> options) throws ServiceLifecycleException { + ServiceLoader<ClusterConfigurationMonitorProvider> providers = + ServiceLoader.load(ClusterConfigurationMonitorProvider.class); + for (ClusterConfigurationMonitorProvider provider : providers) { + // Check the gateway configuration to determine if this type of monitor is enabled + if (config.isClusterMonitorEnabled(provider.getType())) { + ClusterConfigurationMonitor monitor = provider.newInstance(config, aliasService); + if (monitor != null) { + monitors.put(provider.getType(), monitor); + } + } + } + } + + @Override + public void start() { + for (ClusterConfigurationMonitor monitor : monitors.values()) { + monitor.start(); + } + } + + @Override + public void stop() { + for (ClusterConfigurationMonitor monitor : monitors.values()) { + monitor.stop(); + } + } + + @Override + public ClusterConfigurationMonitor getMonitor(String type) { + return monitors.get(type); + } + + @Override + public void addListener(ClusterConfigurationMonitor.ConfigurationChangeListener listener) { + for (ClusterConfigurationMonitor monitor : monitors.values()) { + monitor.addListener(listener); + } + } + + public void setAliasService(AliasService aliasService) { + this.aliasService = aliasService; + } +} http://git-wip-us.apache.org/repos/asf/knox/blob/e766b3b7/gateway-server/src/main/java/org/apache/knox/gateway/topology/monitor/DefaultConfigurationMonitorProvider.java ---------------------------------------------------------------------- diff --git a/gateway-server/src/main/java/org/apache/knox/gateway/topology/monitor/DefaultConfigurationMonitorProvider.java b/gateway-server/src/main/java/org/apache/knox/gateway/topology/monitor/DefaultConfigurationMonitorProvider.java new file mode 100644 index 0000000..25bea08 --- /dev/null +++ b/gateway-server/src/main/java/org/apache/knox/gateway/topology/monitor/DefaultConfigurationMonitorProvider.java @@ -0,0 +1,31 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with this + * work for additional information regarding copyright ownership. The ASF + * licenses this file to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * <p> + * http://www.apache.org/licenses/LICENSE-2.0 + * <p> + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, WITHOUT + * WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the + * License for the specific language governing permissions and limitations under + * the License. + */ +package org.apache.knox.gateway.topology.monitor; + +import org.apache.knox.gateway.config.GatewayConfig; +import org.apache.knox.gateway.services.config.client.RemoteConfigurationRegistryClientService; + + +public class DefaultConfigurationMonitorProvider implements RemoteConfigurationMonitorProvider { + + @Override + public RemoteConfigurationMonitor newInstance(final GatewayConfig config, + final RemoteConfigurationRegistryClientService clientService) { + return new DefaultRemoteConfigurationMonitor(config, clientService); + } + +} http://git-wip-us.apache.org/repos/asf/knox/blob/e766b3b7/gateway-server/src/main/java/org/apache/knox/gateway/topology/monitor/DefaultRemoteConfigurationMonitor.java ---------------------------------------------------------------------- diff --git a/gateway-server/src/main/java/org/apache/knox/gateway/topology/monitor/DefaultRemoteConfigurationMonitor.java b/gateway-server/src/main/java/org/apache/knox/gateway/topology/monitor/DefaultRemoteConfigurationMonitor.java new file mode 100644 index 0000000..efafee0 --- /dev/null +++ b/gateway-server/src/main/java/org/apache/knox/gateway/topology/monitor/DefaultRemoteConfigurationMonitor.java @@ -0,0 +1,228 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with this + * work for additional information regarding copyright ownership. The ASF + * licenses this file to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * <p> + * http://www.apache.org/licenses/LICENSE-2.0 + * <p> + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, WITHOUT + * WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the + * License for the specific language governing permissions and limitations under + * the License. + */ +package org.apache.knox.gateway.topology.monitor; + +import org.apache.commons.io.FileUtils; +import org.apache.knox.gateway.GatewayMessages; +import org.apache.knox.gateway.config.GatewayConfig; +import org.apache.knox.gateway.i18n.messages.MessagesFactory; +import org.apache.knox.gateway.services.config.client.RemoteConfigurationRegistryClient.ChildEntryListener; +import org.apache.knox.gateway.services.config.client.RemoteConfigurationRegistryClient.EntryListener; +import org.apache.knox.gateway.services.config.client.RemoteConfigurationRegistryClient; +import org.apache.knox.gateway.services.config.client.RemoteConfigurationRegistryClientService; +import org.apache.zookeeper.ZooDefs; + +import java.io.File; +import java.io.IOException; +import java.util.ArrayList; +import java.util.Collections; +import java.util.List; + + +class DefaultRemoteConfigurationMonitor implements RemoteConfigurationMonitor { + + private static final String NODE_KNOX = "/knox"; + private static final String NODE_KNOX_CONFIG = NODE_KNOX + "/config"; + private static final String NODE_KNOX_PROVIDERS = NODE_KNOX_CONFIG + "/shared-providers"; + private static final String NODE_KNOX_DESCRIPTORS = NODE_KNOX_CONFIG + "/descriptors"; + + private static GatewayMessages log = MessagesFactory.get(GatewayMessages.class); + + // N.B. This is ZooKeeper-specific, and should be abstracted when another registry is supported + private static final RemoteConfigurationRegistryClient.EntryACL AUTHENTICATED_USERS_ALL; + static { + AUTHENTICATED_USERS_ALL = new RemoteConfigurationRegistryClient.EntryACL() { + public String getId() { + return ""; + } + + public String getType() { + return "auth"; + } + + public Object getPermissions() { + return ZooDefs.Perms.ALL; + } + + public boolean canRead() { + return true; + } + + public boolean canWrite() { + return true; + } + }; + } + + private RemoteConfigurationRegistryClient client = null; + + private File providersDir; + private File descriptorsDir; + + /** + * @param config The gateway configuration + * @param registryClientService The service from which the remote registry client should be acquired. + */ + DefaultRemoteConfigurationMonitor(GatewayConfig config, + RemoteConfigurationRegistryClientService registryClientService) { + this.providersDir = new File(config.getGatewayProvidersConfigDir()); + this.descriptorsDir = new File(config.getGatewayDescriptorsDir()); + + if (registryClientService != null) { + String clientName = config.getRemoteConfigurationMonitorClientName(); + if (clientName != null) { + this.client = registryClientService.get(clientName); + if (this.client == null) { + log.unresolvedClientConfigurationForRemoteMonitoring(clientName); + } + } else { + log.missingClientConfigurationForRemoteMonitoring(); + } + } + } + + @Override + public void start() throws Exception { + if (client == null) { + throw new IllegalStateException("Failed to acquire a remote configuration registry client."); + } + + final String monitorSource = client.getAddress(); + log.startingRemoteConfigurationMonitor(monitorSource); + + // Ensure the existence of the expected entries and their associated ACLs + ensureEntries(); + + // Confirm access to the remote provider configs directory znode + List<String> providerConfigs = client.listChildEntries(NODE_KNOX_PROVIDERS); + if (providerConfigs == null) { + // Either the ZNode does not exist, or there is an authentication problem + throw new IllegalStateException("Unable to access remote path: " + NODE_KNOX_PROVIDERS); + } + + // Confirm access to the remote descriptors directory znode + List<String> descriptors = client.listChildEntries(NODE_KNOX_DESCRIPTORS); + if (descriptors == null) { + // Either the ZNode does not exist, or there is an authentication problem + throw new IllegalStateException("Unable to access remote path: " + NODE_KNOX_DESCRIPTORS); + } + + // Register a listener for provider config znode additions/removals + client.addChildEntryListener(NODE_KNOX_PROVIDERS, new ConfigDirChildEntryListener(providersDir)); + + // Register a listener for descriptor znode additions/removals + client.addChildEntryListener(NODE_KNOX_DESCRIPTORS, new ConfigDirChildEntryListener(descriptorsDir)); + + log.monitoringRemoteConfigurationSource(monitorSource); + } + + + @Override + public void stop() throws Exception { + client.removeEntryListener(NODE_KNOX_PROVIDERS); + client.removeEntryListener(NODE_KNOX_DESCRIPTORS); + } + + private void ensureEntries() { + ensureEntry(NODE_KNOX); + ensureEntry(NODE_KNOX_CONFIG); + ensureEntry(NODE_KNOX_PROVIDERS); + ensureEntry(NODE_KNOX_DESCRIPTORS); + } + + private void ensureEntry(String name) { + if (!client.entryExists(name)) { + client.createEntry(name); + } else { + // Validate the ACL + List<RemoteConfigurationRegistryClient.EntryACL> entryACLs = client.getACL(name); + for (RemoteConfigurationRegistryClient.EntryACL entryACL : entryACLs) { + // N.B. This is ZooKeeper-specific, and should be abstracted when another registry is supported + // For now, check for ZooKeeper world:anyone with ANY permissions (even read-only) + if (entryACL.getType().equals("world") && entryACL.getId().equals("anyone")) { + log.suspectWritableRemoteConfigurationEntry(name); + + // If the client is authenticated, but "anyone" can write the content, then the content may not + // be trustworthy. + if (client.isAuthenticationConfigured()) { + log.correctingSuspectWritableRemoteConfigurationEntry(name); + + // Replace the existing ACL with one that permits only authenticated users + client.setACL(name, Collections.singletonList(AUTHENTICATED_USERS_ALL)); + } + } + } + } + } + + private static class ConfigDirChildEntryListener implements ChildEntryListener { + File localDir; + + ConfigDirChildEntryListener(File localDir) { + this.localDir = localDir; + } + + @Override + public void childEvent(RemoteConfigurationRegistryClient client, Type type, String path) { + File localFile = new File(localDir, path.substring(path.lastIndexOf("/") + 1)); + + switch (type) { + case REMOVED: + FileUtils.deleteQuietly(localFile); + log.deletedRemoteConfigFile(localDir.getName(), localFile.getName()); + try { + client.removeEntryListener(path); + } catch (Exception e) { + log.errorRemovingRemoteConfigurationListenerForPath(path, e); + } + break; + case ADDED: + try { + client.addEntryListener(path, new ConfigEntryListener(localDir)); + } catch (Exception e) { + log.errorAddingRemoteConfigurationListenerForPath(path, e); + } + break; + } + } + } + + private static class ConfigEntryListener implements EntryListener { + private File localDir; + + ConfigEntryListener(File localDir) { + this.localDir = localDir; + } + + @Override + public void entryChanged(RemoteConfigurationRegistryClient client, String path, byte[] data) { + File localFile = new File(localDir, path.substring(path.lastIndexOf("/"))); + if (data != null) { + try { + FileUtils.writeByteArrayToFile(localFile, data); + log.downloadedRemoteConfigFile(localDir.getName(), localFile.getName()); + } catch (IOException e) { + log.errorDownloadingRemoteConfiguration(path, e); + } + } else { + FileUtils.deleteQuietly(localFile); + log.deletedRemoteConfigFile(localDir.getName(), localFile.getName()); + } + } + } + +} http://git-wip-us.apache.org/repos/asf/knox/blob/e766b3b7/gateway-server/src/main/java/org/apache/knox/gateway/topology/monitor/RemoteConfigurationMonitorFactory.java ---------------------------------------------------------------------- diff --git a/gateway-server/src/main/java/org/apache/knox/gateway/topology/monitor/RemoteConfigurationMonitorFactory.java b/gateway-server/src/main/java/org/apache/knox/gateway/topology/monitor/RemoteConfigurationMonitorFactory.java new file mode 100644 index 0000000..d020532 --- /dev/null +++ b/gateway-server/src/main/java/org/apache/knox/gateway/topology/monitor/RemoteConfigurationMonitorFactory.java @@ -0,0 +1,74 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with this + * work for additional information regarding copyright ownership. The ASF + * licenses this file to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * <p> + * http://www.apache.org/licenses/LICENSE-2.0 + * <p> + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, WITHOUT + * WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the + * License for the specific language governing permissions and limitations under + * the License. + */ +package org.apache.knox.gateway.topology.monitor; + +import org.apache.knox.gateway.GatewayMessages; +import org.apache.knox.gateway.GatewayServer; +import org.apache.knox.gateway.config.GatewayConfig; +import org.apache.knox.gateway.i18n.messages.MessagesFactory; +import org.apache.knox.gateway.services.GatewayServices; +import org.apache.knox.gateway.services.config.client.RemoteConfigurationRegistryClientService; + +import java.util.ServiceLoader; + +public class RemoteConfigurationMonitorFactory { + + private static final GatewayMessages log = MessagesFactory.get(GatewayMessages.class); + + private static RemoteConfigurationRegistryClientService remoteConfigRegistryClientService = null; + + public static void setClientService(RemoteConfigurationRegistryClientService clientService) { + remoteConfigRegistryClientService = clientService; + } + + private static RemoteConfigurationRegistryClientService getClientService() { + if (remoteConfigRegistryClientService == null) { + GatewayServices services = GatewayServer.getGatewayServices(); + if (services != null) { + remoteConfigRegistryClientService = services.getService(GatewayServices.REMOTE_REGISTRY_CLIENT_SERVICE); + } + } + + return remoteConfigRegistryClientService; + } + + /** + * + * @param config The GatewayConfig + * + * @return The first RemoteConfigurationMonitor extension that is found. + */ + public static RemoteConfigurationMonitor get(GatewayConfig config) { + RemoteConfigurationMonitor rcm = null; + + ServiceLoader<RemoteConfigurationMonitorProvider> providers = + ServiceLoader.load(RemoteConfigurationMonitorProvider.class); + for (RemoteConfigurationMonitorProvider provider : providers) { + try { + rcm = provider.newInstance(config, getClientService()); + if (rcm != null) { + break; + } + } catch (Exception e) { + log.remoteConfigurationMonitorInitFailure(e.getLocalizedMessage(), e); + } + } + + return rcm; + } + +} http://git-wip-us.apache.org/repos/asf/knox/blob/e766b3b7/gateway-server/src/main/java/org/apache/knox/gateway/util/KnoxCLI.java ---------------------------------------------------------------------- diff --git a/gateway-server/src/main/java/org/apache/knox/gateway/util/KnoxCLI.java b/gateway-server/src/main/java/org/apache/knox/gateway/util/KnoxCLI.java index 9a87dd0..928c37e 100644 --- a/gateway-server/src/main/java/org/apache/knox/gateway/util/KnoxCLI.java +++ b/gateway-server/src/main/java/org/apache/knox/gateway/util/KnoxCLI.java @@ -1855,7 +1855,7 @@ public class KnoxCLI extends Configured implements Tool { static final String DESC = "Lists all of the remote configuration registry clients defined in gateway-site.xml.\n"; /* (non-Javadoc) - * @see org.apache.hadoop.gateway.util.KnoxCLI.Command#execute() + * @see org.apache.knox.gateway.util.KnoxCLI.Command#execute() */ @Override public void execute() throws Exception { @@ -1870,7 +1870,7 @@ public class KnoxCLI extends Configured implements Tool { } /* (non-Javadoc) - * @see org.apache.hadoop.gateway.util.KnoxCLI.Command#getUsage() + * @see org.apache.knox.gateway.util.KnoxCLI.Command#getUsage() */ @Override public String getUsage() { @@ -1958,7 +1958,7 @@ public class KnoxCLI extends Configured implements Tool { } /* (non-Javadoc) - * @see org.apache.hadoop.gateway.util.KnoxCLI.Command#execute() + * @see org.apache.knox.gateway.util.KnoxCLI.Command#execute() */ @Override public void execute() throws Exception { @@ -1966,7 +1966,7 @@ public class KnoxCLI extends Configured implements Tool { } /* (non-Javadoc) - * @see org.apache.hadoop.gateway.util.KnoxCLI.Command#getUsage() + * @see org.apache.knox.gateway.util.KnoxCLI.Command#getUsage() */ @Override public String getUsage() { @@ -1987,7 +1987,7 @@ public class KnoxCLI extends Configured implements Tool { } /* (non-Javadoc) - * @see org.apache.hadoop.gateway.util.KnoxCLI.Command#execute() + * @see org.apache.knox.gateway.util.KnoxCLI.Command#execute() */ @Override public void execute() throws Exception { @@ -1995,7 +1995,7 @@ public class KnoxCLI extends Configured implements Tool { } /* (non-Javadoc) - * @see org.apache.hadoop.gateway.util.KnoxCLI.Command#getUsage() + * @see org.apache.knox.gateway.util.KnoxCLI.Command#getUsage() */ @Override public String getUsage() { @@ -2016,7 +2016,7 @@ public class KnoxCLI extends Configured implements Tool { } /* (non-Javadoc) - * @see org.apache.hadoop.gateway.util.KnoxCLI.Command#execute() + * @see org.apache.knox.gateway.util.KnoxCLI.Command#execute() */ @Override public void execute() throws Exception { @@ -2039,7 +2039,7 @@ public class KnoxCLI extends Configured implements Tool { } /* (non-Javadoc) - * @see org.apache.hadoop.gateway.util.KnoxCLI.Command#getUsage() + * @see org.apache.knox.gateway.util.KnoxCLI.Command#getUsage() */ @Override public String getUsage() { http://git-wip-us.apache.org/repos/asf/knox/blob/e766b3b7/gateway-server/src/main/resources/META-INF/services/org.apache.hadoop.gateway.topology.monitor.RemoteConfigurationMonitorProvider ---------------------------------------------------------------------- diff --git a/gateway-server/src/main/resources/META-INF/services/org.apache.hadoop.gateway.topology.monitor.RemoteConfigurationMonitorProvider b/gateway-server/src/main/resources/META-INF/services/org.apache.hadoop.gateway.topology.monitor.RemoteConfigurationMonitorProvider deleted file mode 100644 index bd4023e..0000000 --- a/gateway-server/src/main/resources/META-INF/services/org.apache.hadoop.gateway.topology.monitor.RemoteConfigurationMonitorProvider +++ /dev/null @@ -1,19 +0,0 @@ -########################################################################## -# Licensed to the Apache Software Foundation (ASF) under one -# or more contributor license agreements. See the NOTICE file -# distributed with this work for additional information -# regarding copyright ownership. The ASF licenses this file -# to you under the Apache License, Version 2.0 (the -# "License"); you may not use this file except in compliance -# with the License. You may obtain a copy of the License at -# -# http://www.apache.org/licenses/LICENSE-2.0 -# -# Unless required by applicable law or agreed to in writing, software -# distributed under the License is distributed on an "AS IS" BASIS, -# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. -# See the License for the specific language governing permissions and -# limitations under the License. -########################################################################## - -org.apache.hadoop.gateway.topology.monitor.DefaultConfigurationMonitorProvider http://git-wip-us.apache.org/repos/asf/knox/blob/e766b3b7/gateway-server/src/main/resources/META-INF/services/org.apache.knox.gateway.topology.monitor.RemoteConfigurationMonitorProvider ---------------------------------------------------------------------- diff --git a/gateway-server/src/main/resources/META-INF/services/org.apache.knox.gateway.topology.monitor.RemoteConfigurationMonitorProvider b/gateway-server/src/main/resources/META-INF/services/org.apache.knox.gateway.topology.monitor.RemoteConfigurationMonitorProvider new file mode 100644 index 0000000..63f438a --- /dev/null +++ b/gateway-server/src/main/resources/META-INF/services/org.apache.knox.gateway.topology.monitor.RemoteConfigurationMonitorProvider @@ -0,0 +1,19 @@ +########################################################################## +# Licensed to the Apache Software Foundation (ASF) under one +# or more contributor license agreements. See the NOTICE file +# distributed with this work for additional information +# regarding copyright ownership. The ASF licenses this file +# to you under the Apache License, Version 2.0 (the +# "License"); you may not use this file except in compliance +# with the License. You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. +########################################################################## + +org.apache.knox.gateway.topology.monitor.DefaultConfigurationMonitorProvider
