Repository: ambari Updated Branches: refs/heads/trunk f25834109 -> df2d8e3f8
AMBARI-11753. Invalid property value set in core-site.xml when KNOX HA is enabled (rlevas) Project: http://git-wip-us.apache.org/repos/asf/ambari/repo Commit: http://git-wip-us.apache.org/repos/asf/ambari/commit/df2d8e3f Tree: http://git-wip-us.apache.org/repos/asf/ambari/tree/df2d8e3f Diff: http://git-wip-us.apache.org/repos/asf/ambari/diff/df2d8e3f Branch: refs/heads/trunk Commit: df2d8e3f853b5ade5852ff5e18bbe8f9069f4e94 Parents: f258341 Author: Robert Levas <[email protected]> Authored: Mon Jun 8 06:14:18 2015 -0400 Committer: Robert Levas <[email protected]> Committed: Mon Jun 8 06:14:26 2015 -0400 ---------------------------------------------------------------------- .../server/controller/KerberosHelperImpl.java | 19 +- .../internal/ClientConfigResourceProvider.java | 63 +- .../apache/ambari/server/utils/StageUtils.java | 81 ++- .../KNOX/0.5.0.2.2/kerberos.json | 12 +- .../server/controller/KerberosHelperTest.java | 19 +- .../ambari/server/utils/StageUtilsTest.java | 727 +++++++++++++++++++ .../ambari/server/utils/TestStageUtils.java | 443 ----------- 7 files changed, 843 insertions(+), 521 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/ambari/blob/df2d8e3f/ambari-server/src/main/java/org/apache/ambari/server/controller/KerberosHelperImpl.java ---------------------------------------------------------------------- diff --git a/ambari-server/src/main/java/org/apache/ambari/server/controller/KerberosHelperImpl.java b/ambari-server/src/main/java/org/apache/ambari/server/controller/KerberosHelperImpl.java index 76054b7..8a5d4fd 100644 --- a/ambari-server/src/main/java/org/apache/ambari/server/controller/KerberosHelperImpl.java +++ b/ambari-server/src/main/java/org/apache/ambari/server/controller/KerberosHelperImpl.java @@ -767,16 +767,19 @@ public class KerberosHelperImpl implements KerberosHelper { generalProperties.put("cluster_name", cluster.getClusterName()); // add clusterHostInfo config - Map<String, String> componentHosts = new HashMap<String, String>(); - for (Map.Entry<String, Service> service : cluster.getServices().entrySet()) { - for (Map.Entry<String, ServiceComponent> serviceComponent : service.getValue().getServiceComponents().entrySet()) { - if (StageUtils.getComponentToClusterInfoKeyMap().keySet().contains(serviceComponent.getValue().getName())) { - componentHosts.put(StageUtils.getComponentToClusterInfoKeyMap().get(serviceComponent.getValue().getName()), - StringUtils.join(serviceComponent.getValue().getServiceComponentHosts().keySet(), ",")); - } + Map<String, Set<String>> clusterHostInfo = StageUtils.getClusterHostInfo(cluster); + + if(clusterHostInfo != null) { + Map<String, String> componentHosts = new HashMap<String, String>(); + + clusterHostInfo = StageUtils.substituteHostIndexes(clusterHostInfo); + + for (Map.Entry<String, Set<String>> entry : clusterHostInfo.entrySet()) { + componentHosts.put(entry.getKey(), StringUtils.join(entry.getValue(), ",")); } + + configurations.put("clusterHostInfo", componentHosts); } - configurations.put("clusterHostInfo", componentHosts); return configurations; } http://git-wip-us.apache.org/repos/asf/ambari/blob/df2d8e3f/ambari-server/src/main/java/org/apache/ambari/server/controller/internal/ClientConfigResourceProvider.java ---------------------------------------------------------------------- diff --git a/ambari-server/src/main/java/org/apache/ambari/server/controller/internal/ClientConfigResourceProvider.java b/ambari-server/src/main/java/org/apache/ambari/server/controller/internal/ClientConfigResourceProvider.java index 2db2d28..c42814f 100644 --- a/ambari-server/src/main/java/org/apache/ambari/server/controller/internal/ClientConfigResourceProvider.java +++ b/ambari-server/src/main/java/org/apache/ambari/server/controller/internal/ClientConfigResourceProvider.java @@ -273,7 +273,16 @@ public class ClientConfigResourceProvider extends AbstractControllerResourceProv clusterHostInfo = StageUtils.getClusterHostInfo(cluster); serviceInfo = managementController.getAmbariMetaInfo().getService(stackId.getStackName(), stackId.getStackVersion(), serviceName); - clusterHostInfo = substituteHostIndexes(clusterHostInfo); + try { + clusterHostInfo = StageUtils.substituteHostIndexes(clusterHostInfo); + } catch (AmbariException e) { + // Before moving substituteHostIndexes to StageUtils, a SystemException was thrown in the + // event an index could not be mapped to a host. After the move, this was changed to an + // AmbariException for consistency in the StageUtils class. To keep this method consistent + // with how it behaved in the past, if an AmbariException is thrown, it is caught and + // translated to a SystemException. + throw new SystemException(e.getMessage(), e); + } osFamily = clusters.getHost(hostName).getOsFamily(); TreeMap<String, String> hostLevelParams = new TreeMap<String, String>(); @@ -402,58 +411,6 @@ public class ClientConfigResourceProvider extends AbstractControllerResourceProv return resources; } - private static Map<String, Set<String>> substituteHostIndexes(Map<String, Set<String>> clusterHostInfo) throws SystemException { - Set<String> keysToSkip = new HashSet<String>(Arrays.asList("all_hosts", "all_ping_ports", - "ambari_server_host", "all_racks", "all_ipv4_ips")); - String[] allHosts = {}; - if (clusterHostInfo.get("all_hosts") != null) { - allHosts = clusterHostInfo.get("all_hosts").toArray(new String[clusterHostInfo.get("all_hosts").size()]); - } - Set<String> keys = clusterHostInfo.keySet(); - for (String key : keys) { - if (keysToSkip.contains(key)) { - continue; - } - Set<String> hosts = new HashSet<String>(); - Set<String> currentHostsIndexes = clusterHostInfo.get(key); - if (currentHostsIndexes == null) { - continue; - } - for (String hostIndexRange : currentHostsIndexes) { - for (Integer hostIndex : rangeToSet(hostIndexRange)) { - try { - hosts.add(allHosts[hostIndex]); - } catch (ArrayIndexOutOfBoundsException ex) { - throw new SystemException("Failed to fill cluster host info ", ex); - } - } - } - clusterHostInfo.put(key, hosts); - } - return clusterHostInfo; - } - - private static Set<Integer> rangeToSet(String range) { - Set<Integer> indexSet = new HashSet<Integer>(); - int startIndex; - int endIndex; - if (range.contains("-")) { - startIndex = Integer.parseInt(range.split("-")[0]); - endIndex = Integer.parseInt(range.split("-")[1]); - } - else if (range.contains(",")) { - startIndex = Integer.parseInt(range.split(",")[0]); - endIndex = Integer.parseInt(range.split(",")[1]); - } - else { - startIndex = endIndex = Integer.parseInt(range); - } - for (int i=startIndex; i<=endIndex; i++) { - indexSet.add(i); - } - return indexSet; - } - @Override public RequestStatus updateResources(final Request request, Predicate predicate) throws SystemException, UnsupportedPropertyException, NoSuchResourceException, NoSuchParentResourceException { http://git-wip-us.apache.org/repos/asf/ambari/blob/df2d8e3f/ambari-server/src/main/java/org/apache/ambari/server/utils/StageUtils.java ---------------------------------------------------------------------- diff --git a/ambari-server/src/main/java/org/apache/ambari/server/utils/StageUtils.java b/ambari-server/src/main/java/org/apache/ambari/server/utils/StageUtils.java index aeca69b..3da0fe2 100644 --- a/ambari-server/src/main/java/org/apache/ambari/server/utils/StageUtils.java +++ b/ambari-server/src/main/java/org/apache/ambari/server/utils/StageUtils.java @@ -24,6 +24,7 @@ import java.net.InetAddress; import java.net.UnknownHostException; import java.nio.charset.Charset; import java.util.ArrayList; +import java.util.Arrays; import java.util.Collection; import java.util.HashMap; import java.util.HashSet; @@ -72,11 +73,11 @@ public class StageUtils { public static final String DEFAULT_IPV4_ADDRESS = "127.0.0.1"; private static final Log LOG = LogFactory.getLog(StageUtils.class); - static final String AMBARI_SERVER_HOST = "ambari_server_host"; - private static final String HOSTS_LIST = "all_hosts"; - private static final String PORTS = "all_ping_ports"; - private static final String RACKS = "all_racks"; - private static final String IPV4_ADDRESSES = "all_ipv4_ips"; + protected static final String AMBARI_SERVER_HOST = "ambari_server_host"; + protected static final String HOSTS_LIST = "all_hosts"; + protected static final String PORTS = "all_ping_ports"; + protected static final String RACKS = "all_racks"; + protected static final String IPV4_ADDRESSES = "all_ipv4_ips"; private static Map<String, String> componentToClusterInfoKeyMap = new HashMap<String, String>(); private static Map<String, String> decommissionedToClusterInfoKeyMap = @@ -425,6 +426,49 @@ public class StageUtils { } /** + * Given a clusterHostInfo map, replaces host indexes with the mapped host names. + * <p/> + * If all_hosts was <code>["host1", "host2", "host3", "host4", "host5"]</code>, then a value of + * <code>["1-3", "5"]</code> for a given component would be converted to + * <code>["host1", "host2", "host3", "host5"]</code>. + * <p/> + * Operations are performed inplace, meaning a new clusterHostInfo map is not created and updated. + * + * @param clusterHostInfo the cluster host info map to perform the substitutions within + * @return the updated cluster host info map. + * @throws AmbariException if an index fails to map to a host name + */ + public static Map<String, Set<String>> substituteHostIndexes(Map<String, Set<String>> clusterHostInfo) throws AmbariException { + Set<String> keysToSkip = new HashSet<String>(Arrays.asList(HOSTS_LIST, PORTS, AMBARI_SERVER_HOST, RACKS, IPV4_ADDRESSES)); + String[] allHosts = {}; + if (clusterHostInfo.get(HOSTS_LIST) != null) { + allHosts = clusterHostInfo.get(HOSTS_LIST).toArray(new String[clusterHostInfo.get(HOSTS_LIST).size()]); + } + Set<String> keys = clusterHostInfo.keySet(); + for (String key : keys) { + if (keysToSkip.contains(key)) { + continue; + } + Set<String> hosts = new HashSet<String>(); + Set<String> currentHostsIndexes = clusterHostInfo.get(key); + if (currentHostsIndexes == null) { + continue; + } + for (String hostIndexRange : currentHostsIndexes) { + for (Integer hostIndex : rangeToSet(hostIndexRange)) { + try { + hosts.add(allHosts[hostIndex]); + } catch (ArrayIndexOutOfBoundsException ex) { + throw new AmbariException("Failed to fill cluster host info ", ex); + } + } + } + clusterHostInfo.put(key, hosts); + } + return clusterHostInfo; + } + + /** * Finds ranges in sorted set and replaces ranges by compact notation * <p/> * <p>For example, suppose <tt>set</tt> comprises<tt> [1, 2, 3, 4, 7]</tt>. @@ -497,6 +541,33 @@ public class StageUtils { return result; } + /** + * Splits a range to its explicit set of values. + * <p/> + * For example if the range is "1-5", the result will be [1, 2, 3, 4, 5] + * + * @param range the range to split + * @return a set of integers representing the original range + */ + private static Set<Integer> rangeToSet(String range) { + Set<Integer> indexSet = new HashSet<Integer>(); + int startIndex; + int endIndex; + if (range.contains("-")) { + startIndex = Integer.parseInt(range.split("-")[0]); + endIndex = Integer.parseInt(range.split("-")[1]); + } else if (range.contains(",")) { + startIndex = Integer.parseInt(range.split(",")[0]); + endIndex = Integer.parseInt(range.split(",")[1]); + } else { + startIndex = endIndex = Integer.parseInt(range); + } + for (int i = startIndex; i <= endIndex; i++) { + indexSet.add(i); + } + return indexSet; + } + private static String getRangedItem(Integer startOfRange, Integer endOfRange) { String separator = (endOfRange - startOfRange) > 1 ? "-" : ","; http://git-wip-us.apache.org/repos/asf/ambari/blob/df2d8e3f/ambari-server/src/main/resources/common-services/KNOX/0.5.0.2.2/kerberos.json ---------------------------------------------------------------------- diff --git a/ambari-server/src/main/resources/common-services/KNOX/0.5.0.2.2/kerberos.json b/ambari-server/src/main/resources/common-services/KNOX/0.5.0.2.2/kerberos.json index 584a932..6a89af6 100644 --- a/ambari-server/src/main/resources/common-services/KNOX/0.5.0.2.2/kerberos.json +++ b/ambari-server/src/main/resources/common-services/KNOX/0.5.0.2.2/kerberos.json @@ -38,20 +38,20 @@ }, { "core-site": { - "hadoop.proxyuser.knox.groups": "${hadoop-env/proxyuser_group}", - "hadoop.proxyuser.knox.hosts": "${host}" + "hadoop.proxyuser.${knox-env/knox_user}.groups": "${hadoop-env/proxyuser_group}", + "hadoop.proxyuser.${knox-env/knox_user}.hosts": "${clusterHostInfo/knox_gateway_hosts}" } }, { "webhcat-site": { - "webhcat.proxyuser.knox.groups": "${hadoop-env/proxyuser_group}", - "webhcat.proxyuser.knox.hosts": "${host}" + "webhcat.proxyuser.${knox-env/knox_user}.groups": "${hadoop-env/proxyuser_group}", + "webhcat.proxyuser.${knox-env/knox_user}.hosts": "${clusterHostInfo/knox_gateway_hosts}" } }, { "oozie-site": { - "oozie.service.ProxyUserService.proxyuser.knox.groups": "${hadoop-env/proxyuser_group}", - "oozie.service.ProxyUserService.proxyuser.knox.hosts": "${host}" + "oozie.service.ProxyUserService.proxyuser.${knox-env/knox_user}.groups": "${hadoop-env/proxyuser_group}", + "oozie.service.ProxyUserService.proxyuser.${knox-env/knox_user}.hosts": "${clusterHostInfo/knox_gateway_hosts}" } } ] http://git-wip-us.apache.org/repos/asf/ambari/blob/df2d8e3f/ambari-server/src/test/java/org/apache/ambari/server/controller/KerberosHelperTest.java ---------------------------------------------------------------------- diff --git a/ambari-server/src/test/java/org/apache/ambari/server/controller/KerberosHelperTest.java b/ambari-server/src/test/java/org/apache/ambari/server/controller/KerberosHelperTest.java index 5744b53..f8ba840 100644 --- a/ambari-server/src/test/java/org/apache/ambari/server/controller/KerberosHelperTest.java +++ b/ambari-server/src/test/java/org/apache/ambari/server/controller/KerberosHelperTest.java @@ -2824,6 +2824,14 @@ public class KerberosHelperTest extends EasyMockSupport { .andReturn(Collections.<String, ServiceComponent>emptyMap()) .anyTimes(); + final Map<String, Host> hostMap = new HashMap<String, Host>() { + { + put("host1", host1); + put("host2", host2); + } + }; + final Collection<Host> hosts = hostMap.values(); + final Cluster cluster = createMock(Cluster.class); expect(cluster.getSecurityType()).andReturn(clusterSecurityType).anyTimes(); expect(cluster.getClusterName()).andReturn(clusterName).anyTimes(); @@ -2857,18 +2865,17 @@ public class KerberosHelperTest extends EasyMockSupport { } }) .anyTimes(); + expect(cluster.getHosts()) + .andReturn(hosts) + .anyTimes(); + final Clusters clusters = injector.getInstance(Clusters.class); expect(clusters.getCluster(clusterName)).andReturn(cluster).times(1); if(hostName == null) { expect(clusters.getHostsForCluster(clusterName)) - .andReturn(new HashMap<String, Host>() { - { - put("host1", host1); - put("host2", host2); - } - }) + .andReturn(hostMap) .once(); } http://git-wip-us.apache.org/repos/asf/ambari/blob/df2d8e3f/ambari-server/src/test/java/org/apache/ambari/server/utils/StageUtilsTest.java ---------------------------------------------------------------------- diff --git a/ambari-server/src/test/java/org/apache/ambari/server/utils/StageUtilsTest.java b/ambari-server/src/test/java/org/apache/ambari/server/utils/StageUtilsTest.java new file mode 100644 index 0000000..e85d9a1 --- /dev/null +++ b/ambari-server/src/test/java/org/apache/ambari/server/utils/StageUtilsTest.java @@ -0,0 +1,727 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * <p/> + * http://www.apache.org/licenses/LICENSE-2.0 + * <p/> + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.ambari.server.utils; + +import static org.easymock.EasyMock.anyObject; +import static org.easymock.EasyMock.expect; +import static org.easymock.EasyMock.expectLastCall; +import static org.easymock.EasyMock.getCurrentArguments; +import static org.junit.Assert.assertEquals; +import static org.junit.Assert.assertTrue; + +import java.io.ByteArrayInputStream; +import java.io.IOException; +import java.io.InputStream; +import java.io.InputStreamReader; +import java.nio.charset.Charset; +import java.util.ArrayList; +import java.util.Arrays; +import java.util.Collection; +import java.util.HashMap; +import java.util.HashSet; +import java.util.List; +import java.util.Map; +import java.util.Map.Entry; +import java.util.Set; +import java.util.SortedMap; +import java.util.TreeMap; + +import javax.persistence.EntityManager; +import javax.xml.bind.JAXBException; + +import com.google.inject.AbstractModule; +import org.apache.ambari.server.AmbariException; +import org.apache.ambari.server.actionmanager.ExecutionCommandWrapper; +import org.apache.ambari.server.actionmanager.Stage; +import org.apache.ambari.server.agent.ExecutionCommand; +import org.apache.ambari.server.api.services.AmbariMetaInfo; +import org.apache.ambari.server.orm.DBAccessor; +import org.apache.ambari.server.security.SecurityHelper; +import org.apache.ambari.server.stack.StackManagerFactory; +import org.apache.ambari.server.state.Cluster; +import org.apache.ambari.server.state.Clusters; +import org.apache.ambari.server.state.Host; +import org.apache.ambari.server.state.HostComponentAdminState; +import org.apache.ambari.server.state.Service; +import org.apache.ambari.server.state.ServiceComponent; +import org.apache.ambari.server.state.ServiceComponentHost; +import org.apache.ambari.server.state.ServiceComponentHostFactory; +import org.apache.ambari.server.state.StackId; +import org.apache.ambari.server.state.cluster.ClusterFactory; +import org.apache.ambari.server.state.cluster.ClustersImpl; +import org.apache.ambari.server.state.host.HostFactory; +import org.apache.ambari.server.state.stack.OsFamily; +import org.apache.ambari.server.topology.TopologyManager; +import org.codehaus.jackson.JsonGenerationException; +import org.codehaus.jackson.map.JsonMappingException; +import org.easymock.EasyMockSupport; +import org.easymock.IAnswer; +import org.junit.Before; +import org.junit.Ignore; +import org.junit.Test; + +import com.google.common.collect.ContiguousSet; +import com.google.common.collect.DiscreteDomain; +import com.google.common.collect.Range; +import com.google.gson.Gson; +import com.google.inject.Guice; +import com.google.inject.Injector; + +public class StageUtilsTest extends EasyMockSupport { + private static final String STACK_ID = "HDP-1.3.1"; + + private Injector injector; + + @Before + public void setup() throws Exception { + + injector = Guice.createInjector(new AbstractModule() { + + @Override + protected void configure() { + bind(EntityManager.class).toInstance(createNiceMock(EntityManager.class)); + bind(DBAccessor.class).toInstance(createNiceMock(DBAccessor.class)); + bind(ClusterFactory.class).toInstance(createNiceMock(ClusterFactory.class)); + bind(HostFactory.class).toInstance(createNiceMock(HostFactory.class)); + bind(SecurityHelper.class).toInstance(createNiceMock(SecurityHelper.class)); + bind(OsFamily.class).toInstance(createNiceMock(OsFamily.class)); + bind(TopologyManager.class).toInstance(createNiceMock(TopologyManager.class)); + bind(AmbariMetaInfo.class).toInstance(createMock(AmbariMetaInfo.class)); + bind(Clusters.class).toInstance(createNiceMock(ClustersImpl.class)); + bind(StackManagerFactory.class).toInstance(createNiceMock(StackManagerFactory.class)); + bind(ServiceComponentHostFactory.class).toInstance(createNiceMock(ServiceComponentHostFactory.class)); + } + }); + + + StageUtils.setTopologyManager(injector.getInstance(TopologyManager.class)); + } + + + public static void addService(Cluster cl, List<String> hostList, + Map<String, List<Integer>> topology, String serviceName, + Injector injector) throws AmbariException { + ServiceComponentHostFactory serviceComponentHostFactory = injector.getInstance(ServiceComponentHostFactory.class); + + cl.setDesiredStackVersion(new StackId(STACK_ID)); + cl.addService(serviceName); + + for (Entry<String, List<Integer>> component : topology.entrySet()) { + String componentName = component.getKey(); + cl.getService(serviceName).addServiceComponent(componentName); + + for (Integer hostIndex : component.getValue()) { + cl.getService(serviceName) + .getServiceComponent(componentName) + .addServiceComponentHost( + serviceComponentHostFactory.createNew(cl.getService(serviceName) + .getServiceComponent(componentName), hostList.get(hostIndex))); + } + } + } + + @Test + @Ignore + public void testGetATestStage() { + Stage s = StageUtils.getATestStage(1, 2, "host2", "", "hostParamsStage"); + String hostname = s.getHosts().get(0); + List<ExecutionCommandWrapper> wrappers = s.getExecutionCommands(hostname); + for (ExecutionCommandWrapper wrapper : wrappers) { + assertEquals("cluster1", wrapper.getExecutionCommand().getClusterName()); + assertEquals(StageUtils.getActionId(1, 2), wrapper.getExecutionCommand().getCommandId()); + assertEquals(hostname, wrapper.getExecutionCommand().getHostname()); + } + } + + @Test + @Ignore + public void testJaxbToString() throws Exception { + Stage s = StageUtils.getATestStage(1, 2, "host1", "", "hostParamsStage"); + String hostname = s.getHosts().get(0); + List<ExecutionCommandWrapper> wrappers = s.getExecutionCommands(hostname); + for (ExecutionCommandWrapper wrapper : wrappers) { + // Why are we logging in test case? + // LOG.info("Command is " + StageUtils.jaxbToString(wrapper.getExecutionCommand())); + } + assertEquals(StageUtils.getActionId(1, 2), s.getActionId()); + } + + @Test + @Ignore + public void testJasonToExecutionCommand() throws JsonGenerationException, + JsonMappingException, JAXBException, IOException { + Stage s = StageUtils.getATestStage(1, 2, "host1", "clusterHostInfo", "hostParamsStage"); + ExecutionCommand cmd = s.getExecutionCommands("host1").get(0).getExecutionCommand(); + HashMap<String, Map<String, String>> configTags = new HashMap<String, Map<String, String>>(); + Map<String, String> globalTag = new HashMap<String, String>(); + globalTag.put("tag", "version1"); + configTags.put("global", globalTag); + cmd.setConfigurationTags(configTags); + String json = StageUtils.jaxbToString(cmd); + + InputStream is = new ByteArrayInputStream( + json.getBytes(Charset.forName("UTF8"))); + + ExecutionCommand cmdDes = new Gson().fromJson(new InputStreamReader(is), + ExecutionCommand.class); + + assertEquals(cmd.toString(), cmdDes.toString()); + assertEquals(cmd, cmdDes); + } + + @Test + public void testGetClusterHostInfo() throws Exception { + final HashMap<String, String> hostAttributes = new HashMap<String, String>() {{ + put("os_family", "redhat"); + put("os_release_version", "5.9"); + }}; + + final Clusters clusters = createNiceMock(Clusters.class); + + List<Host> hosts = new ArrayList<Host>(); + List<String> hostNames = new ArrayList<String>(); + + List<Integer> pingPorts = Arrays.asList(StageUtils.DEFAULT_PING_PORT, + StageUtils.DEFAULT_PING_PORT, + StageUtils.DEFAULT_PING_PORT, + 8671, + 8671, + null, + 8672, + 8672, + null, + 8673); + + + for (int i = 0; i < 10; i++) { + String hostname = String.format("h%d", i); + Host host = createNiceMock(Host.class); + expect(host.getHostName()).andReturn(hostname).anyTimes(); + expect(host.getHostAttributes()).andReturn(hostAttributes).anyTimes(); + expect(host.getCurrentPingPort()).andReturn(pingPorts.get(i)).anyTimes(); + + hosts.add(host); + hostNames.add(hostname); + + expect(clusters.getHost(hostname)).andReturn(host).anyTimes(); + } + + final ServiceComponentHost nnh0ServiceComponentHost = createMock(ServiceComponentHost.class); + expect(nnh0ServiceComponentHost.getComponentAdminState()).andReturn(HostComponentAdminState.INSERVICE).anyTimes(); + + final ServiceComponentHost snnh1ServiceComponentHost = createMock(ServiceComponentHost.class); + expect(snnh1ServiceComponentHost.getComponentAdminState()).andReturn(HostComponentAdminState.INSERVICE).anyTimes(); + + final ServiceComponentHost dnh0ServiceComponentHost = createMock(ServiceComponentHost.class); + expect(dnh0ServiceComponentHost.getComponentAdminState()).andReturn(HostComponentAdminState.INSERVICE).anyTimes(); + + final ServiceComponentHost dnh1ServiceComponentHost = createMock(ServiceComponentHost.class); + expect(dnh1ServiceComponentHost.getComponentAdminState()).andReturn(HostComponentAdminState.INSERVICE).anyTimes(); + + final ServiceComponentHost dnh2ServiceComponentHost = createMock(ServiceComponentHost.class); + expect(dnh2ServiceComponentHost.getComponentAdminState()).andReturn(HostComponentAdminState.INSERVICE).anyTimes(); + + final ServiceComponentHost dnh3ServiceComponentHost = createMock(ServiceComponentHost.class); + expect(dnh3ServiceComponentHost.getComponentAdminState()).andReturn(HostComponentAdminState.INSERVICE).anyTimes(); + + final ServiceComponentHost dnh5ServiceComponentHost = createMock(ServiceComponentHost.class); + expect(dnh5ServiceComponentHost.getComponentAdminState()).andReturn(HostComponentAdminState.INSERVICE).anyTimes(); + + final ServiceComponentHost dnh7ServiceComponentHost = createMock(ServiceComponentHost.class); + expect(dnh7ServiceComponentHost.getComponentAdminState()).andReturn(HostComponentAdminState.INSERVICE).anyTimes(); + + final ServiceComponentHost dnh8ServiceComponentHost = createMock(ServiceComponentHost.class); + expect(dnh8ServiceComponentHost.getComponentAdminState()).andReturn(HostComponentAdminState.INSERVICE).anyTimes(); + + final ServiceComponentHost dnh9ServiceComponentHost = createMock(ServiceComponentHost.class); + expect(dnh9ServiceComponentHost.getComponentAdminState()).andReturn(HostComponentAdminState.INSERVICE).anyTimes(); + + final ServiceComponentHost hbm5ServiceComponentHost = createMock(ServiceComponentHost.class); + expect(hbm5ServiceComponentHost.getComponentAdminState()).andReturn(HostComponentAdminState.INSERVICE).anyTimes(); + + final ServiceComponentHost hbrs1ServiceComponentHost = createMock(ServiceComponentHost.class); + expect(hbrs1ServiceComponentHost.getComponentAdminState()).andReturn(HostComponentAdminState.INSERVICE).anyTimes(); + + final ServiceComponentHost hbrs3ServiceComponentHost = createMock(ServiceComponentHost.class); + expect(hbrs3ServiceComponentHost.getComponentAdminState()).andReturn(HostComponentAdminState.INSERVICE).anyTimes(); + + final ServiceComponentHost hbrs5ServiceComponentHost = createMock(ServiceComponentHost.class); + expect(hbrs5ServiceComponentHost.getComponentAdminState()).andReturn(HostComponentAdminState.INSERVICE).anyTimes(); + + final ServiceComponentHost hbrs8ServiceComponentHost = createMock(ServiceComponentHost.class); + expect(hbrs8ServiceComponentHost.getComponentAdminState()).andReturn(HostComponentAdminState.INSERVICE).anyTimes(); + + final ServiceComponentHost hbrs9ServiceComponentHost = createMock(ServiceComponentHost.class); + expect(hbrs9ServiceComponentHost.getComponentAdminState()).andReturn(HostComponentAdminState.INSERVICE).anyTimes(); + + final ServiceComponentHost mrjt5ServiceComponentHost = createMock(ServiceComponentHost.class); + expect(mrjt5ServiceComponentHost.getComponentAdminState()).andReturn(HostComponentAdminState.INSERVICE).anyTimes(); + + final ServiceComponentHost mrtt1ServiceComponentHost = createMock(ServiceComponentHost.class); + expect(mrtt1ServiceComponentHost.getComponentAdminState()).andReturn(HostComponentAdminState.INSERVICE).anyTimes(); + + final ServiceComponentHost mrtt2ServiceComponentHost = createMock(ServiceComponentHost.class); + expect(mrtt2ServiceComponentHost.getComponentAdminState()).andReturn(HostComponentAdminState.DECOMMISSIONED).anyTimes(); + + final ServiceComponentHost mrtt3ServiceComponentHost = createMock(ServiceComponentHost.class); + expect(mrtt3ServiceComponentHost.getComponentAdminState()).andReturn(HostComponentAdminState.DECOMMISSIONED).anyTimes(); + + final ServiceComponentHost mrtt4ServiceComponentHost = createMock(ServiceComponentHost.class); + expect(mrtt4ServiceComponentHost.getComponentAdminState()).andReturn(HostComponentAdminState.INSERVICE).anyTimes(); + + final ServiceComponentHost mrtt5ServiceComponentHost = createMock(ServiceComponentHost.class); + expect(mrtt5ServiceComponentHost.getComponentAdminState()).andReturn(HostComponentAdminState.INSERVICE).anyTimes(); + + final ServiceComponentHost mrtt7ServiceComponentHost = createMock(ServiceComponentHost.class); + expect(mrtt7ServiceComponentHost.getComponentAdminState()).andReturn(HostComponentAdminState.INSERVICE).anyTimes(); + + final ServiceComponentHost mrtt9ServiceComponentHost = createMock(ServiceComponentHost.class); + expect(mrtt9ServiceComponentHost.getComponentAdminState()).andReturn(HostComponentAdminState.INSERVICE).anyTimes(); + + final ServiceComponentHost nns7ServiceComponentHost = createMock(ServiceComponentHost.class); + expect(nns7ServiceComponentHost.getComponentAdminState()).andReturn(HostComponentAdminState.INSERVICE).anyTimes(); + + Map<String, Collection<String>> projectedTopology = new HashMap<String, Collection<String>>(); + + + final HashMap<String, ServiceComponentHost> nnServiceComponentHosts = new HashMap<String, ServiceComponentHost>() { + { + put("h0", nnh0ServiceComponentHost); + } + }; + insertTopology(projectedTopology, "NAMENODE", nnServiceComponentHosts.keySet()); + + final HashMap<String, ServiceComponentHost> snnServiceComponentHosts = new HashMap<String, ServiceComponentHost>() { + { + put("h1", snnh1ServiceComponentHost); + } + }; + insertTopology(projectedTopology, "SECONDARY_NAMENODE", snnServiceComponentHosts.keySet()); + + final Map<String, ServiceComponentHost> dnServiceComponentHosts = new HashMap<String, ServiceComponentHost>() { + { + put("h0", dnh0ServiceComponentHost); + put("h1", dnh1ServiceComponentHost); + put("h2", dnh2ServiceComponentHost); + put("h3", dnh3ServiceComponentHost); + put("h5", dnh5ServiceComponentHost); + put("h7", dnh7ServiceComponentHost); + put("h8", dnh8ServiceComponentHost); + put("h9", dnh9ServiceComponentHost); + } + }; + insertTopology(projectedTopology, "DATANODE", dnServiceComponentHosts.keySet()); + + final Map<String, ServiceComponentHost> hbmServiceComponentHosts = new HashMap<String, ServiceComponentHost>() { + { + put("h5", hbm5ServiceComponentHost); + } + }; + insertTopology(projectedTopology, "HBASE_MASTER", hbmServiceComponentHosts.keySet()); + + final Map<String, ServiceComponentHost> hbrsServiceComponentHosts = new HashMap<String, ServiceComponentHost>() { + { + put("h1", hbrs1ServiceComponentHost); + put("h3", hbrs3ServiceComponentHost); + put("h5", hbrs5ServiceComponentHost); + put("h8", hbrs8ServiceComponentHost); + put("h9", hbrs9ServiceComponentHost); + } + }; + insertTopology(projectedTopology, "HBASE_REGIONSERVER", hbrsServiceComponentHosts.keySet()); + + final Map<String, ServiceComponentHost> mrjtServiceComponentHosts = new HashMap<String, ServiceComponentHost>() { + { + put("h5", mrjt5ServiceComponentHost); + } + }; + insertTopology(projectedTopology, "JOBTRACKER", mrjtServiceComponentHosts.keySet()); + + final Map<String, ServiceComponentHost> mrttServiceComponentHosts = new HashMap<String, ServiceComponentHost>() { + { + put("h1", mrtt1ServiceComponentHost); + put("h2", mrtt2ServiceComponentHost); + put("h3", mrtt3ServiceComponentHost); + put("h4", mrtt4ServiceComponentHost); + put("h5", mrtt5ServiceComponentHost); + put("h7", mrtt7ServiceComponentHost); + put("h9", mrtt9ServiceComponentHost); + } + }; + insertTopology(projectedTopology, "TASKTRACKER", mrttServiceComponentHosts.keySet()); + + + final Map<String, ServiceComponentHost> nnsServiceComponentHosts = new HashMap<String, ServiceComponentHost>() { + { + put("h7", nns7ServiceComponentHost); + } + }; + insertTopology(projectedTopology, "NONAME_SERVER", nnsServiceComponentHosts.keySet()); + + final ServiceComponent nnComponent = createMock(ServiceComponent.class); + expect(nnComponent.getName()).andReturn("NAMENODE").anyTimes(); + expect(nnComponent.getServiceComponentHost(anyObject(String.class))) + .andAnswer(new IAnswer<ServiceComponentHost>() { + @Override + public ServiceComponentHost answer() throws Throwable { + Object[] args = getCurrentArguments(); + return nnServiceComponentHosts.get((String) args[0]); + } + }).anyTimes(); + expect(nnComponent.getServiceComponentHosts()).andReturn(nnServiceComponentHosts).anyTimes(); + expect(nnComponent.isClientComponent()).andReturn(false).anyTimes(); + + final ServiceComponent snnComponent = createMock(ServiceComponent.class); + expect(snnComponent.getName()).andReturn("SECONDARY_NAMENODE").anyTimes(); + expect(snnComponent.getServiceComponentHost(anyObject(String.class))) + .andAnswer(new IAnswer<ServiceComponentHost>() { + @Override + public ServiceComponentHost answer() throws Throwable { + Object[] args = getCurrentArguments(); + return snnServiceComponentHosts.get((String) args[0]); + } + }).anyTimes(); + expect(snnComponent.getServiceComponentHosts()).andReturn(snnServiceComponentHosts).anyTimes(); + expect(snnComponent.isClientComponent()).andReturn(false).anyTimes(); + + final ServiceComponent dnComponent = createMock(ServiceComponent.class); + expect(dnComponent.getName()).andReturn("DATANODE").anyTimes(); + expect(dnComponent.getServiceComponentHost(anyObject(String.class))) + .andAnswer(new IAnswer<ServiceComponentHost>() { + @Override + public ServiceComponentHost answer() throws Throwable { + Object[] args = getCurrentArguments(); + return dnServiceComponentHosts.get((String) args[0]); + } + }).anyTimes(); + expect(dnComponent.getServiceComponentHosts()).andReturn(dnServiceComponentHosts).anyTimes(); + expect(dnComponent.isClientComponent()).andReturn(false).anyTimes(); + + final ServiceComponent hbmComponent = createMock(ServiceComponent.class); + expect(hbmComponent.getName()).andReturn("HBASE_MASTER").anyTimes(); + expect(hbmComponent.getServiceComponentHost(anyObject(String.class))) + .andAnswer(new IAnswer<ServiceComponentHost>() { + @Override + public ServiceComponentHost answer() throws Throwable { + Object[] args = getCurrentArguments(); + return hbmServiceComponentHosts.get((String) args[0]); + } + }).anyTimes(); + expect(hbmComponent.getServiceComponentHosts()).andReturn(hbmServiceComponentHosts).anyTimes(); + expect(hbmComponent.isClientComponent()).andReturn(false).anyTimes(); + + final ServiceComponent hbrsComponent = createMock(ServiceComponent.class); + expect(hbrsComponent.getName()).andReturn("HBASE_REGIONSERVER").anyTimes(); + expect(hbrsComponent.getServiceComponentHost(anyObject(String.class))) + .andAnswer(new IAnswer<ServiceComponentHost>() { + @Override + public ServiceComponentHost answer() throws Throwable { + Object[] args = getCurrentArguments(); + return hbrsServiceComponentHosts.get((String) args[0]); + } + }).anyTimes(); + expect(hbrsComponent.getServiceComponentHosts()).andReturn(hbrsServiceComponentHosts).anyTimes(); + expect(hbrsComponent.isClientComponent()).andReturn(false).anyTimes(); + + final ServiceComponent mrjtComponent = createMock(ServiceComponent.class); + expect(mrjtComponent.getName()).andReturn("JOBTRACKER").anyTimes(); + expect(mrjtComponent.getServiceComponentHost(anyObject(String.class))) + .andAnswer(new IAnswer<ServiceComponentHost>() { + @Override + public ServiceComponentHost answer() throws Throwable { + Object[] args = getCurrentArguments(); + return mrjtServiceComponentHosts.get((String) args[0]); + } + }).anyTimes(); + expect(mrjtComponent.getServiceComponentHosts()).andReturn(mrjtServiceComponentHosts).anyTimes(); + expect(mrjtComponent.isClientComponent()).andReturn(false).anyTimes(); + + final ServiceComponent mrttCompomnent = createMock(ServiceComponent.class); + expect(mrttCompomnent.getName()).andReturn("TASKTRACKER").anyTimes(); + expect(mrttCompomnent.getServiceComponentHost(anyObject(String.class))) + .andAnswer(new IAnswer<ServiceComponentHost>() { + @Override + public ServiceComponentHost answer() throws Throwable { + Object[] args = getCurrentArguments(); + return mrttServiceComponentHosts.get((String) args[0]); + } + }).anyTimes(); + expect(mrttCompomnent.getServiceComponentHosts()).andReturn(mrttServiceComponentHosts).anyTimes(); + expect(mrttCompomnent.isClientComponent()).andReturn(false).anyTimes(); + + final ServiceComponent nnsComponent = createMock(ServiceComponent.class); + expect(nnsComponent.getName()).andReturn("NONAME_SERVER").anyTimes(); + expect(nnsComponent.getServiceComponentHost(anyObject(String.class))) + .andAnswer(new IAnswer<ServiceComponentHost>() { + @Override + public ServiceComponentHost answer() throws Throwable { + Object[] args = getCurrentArguments(); + return nnsServiceComponentHosts.get((String) args[0]); + } + }).anyTimes(); + expect(nnsComponent.getServiceComponentHosts()).andReturn(nnsServiceComponentHosts).anyTimes(); + expect(nnsComponent.isClientComponent()).andReturn(false).anyTimes(); + + final Service hdfsService = createMock(Service.class); + expect(hdfsService.getServiceComponents()).andReturn(new HashMap<String, ServiceComponent>() {{ + put("NAMENODE", nnComponent); + put("SECONDARY_NAMENODE", snnComponent); + put("DATANODE", dnComponent); + }}).anyTimes(); + + final Service hbaseService = createMock(Service.class); + expect(hbaseService.getServiceComponents()).andReturn(new HashMap<String, ServiceComponent>() {{ + put("HBASE_MASTER", hbmComponent); + put("HBASE_REGIONSERVER", hbrsComponent); + }}).anyTimes(); + + final Service mrService = createMock(Service.class); + expect(mrService.getServiceComponents()).andReturn(new HashMap<String, ServiceComponent>() { + { + put("JOBTRACKER", mrjtComponent); + put("TASKTRACKER", mrttCompomnent); + } + }).anyTimes(); + + final Service nnService = createMock(Service.class); + expect(nnService.getServiceComponents()).andReturn(new HashMap<String, ServiceComponent>() { + { + put("NONAME_SERVER", nnsComponent); + } + }).anyTimes(); + + + Cluster cluster = createMock(Cluster.class); + expect(cluster.getHosts()).andReturn(hosts).anyTimes(); + expect(cluster.getServices()).andReturn(new HashMap<String, Service>() {{ + put("HDFS", hdfsService); + put("HBASE", hbaseService); + put("MAPREDUCE", mrService); + put("NONAME", nnService); + }}).anyTimes(); + + + final TopologyManager topologyManager = injector.getInstance(TopologyManager.class); + topologyManager.getProjectedTopology(); + expectLastCall().andReturn(projectedTopology).once(); + + replayAll(); + + // This is required by the infrastructure + injector.getInstance(AmbariMetaInfo.class).init(); + + //Get cluster host info + Map<String, Set<String>> info = StageUtils.getClusterHostInfo(cluster); + + verifyAll(); + + //All hosts present in cluster host info + Set<String> allHosts = info.get(StageUtils.HOSTS_LIST); + assertEquals(hosts.size(), allHosts.size()); + for (Host host : hosts) { + assertTrue(allHosts.contains(host.getHostName())); + } + + checkServiceHostIndexes(info, "DATANODE", "slave_hosts", projectedTopology, hostNames); + checkServiceHostIndexes(info, "NAMENODE", "namenode_host", projectedTopology, hostNames); + checkServiceHostIndexes(info, "SECONDARY_NAMENODE", "snamenode_host", projectedTopology, hostNames); + checkServiceHostIndexes(info, "HBASE_MASTER", "hbase_master_hosts", projectedTopology, hostNames); + checkServiceHostIndexes(info, "HBASE_REGIONSERVER", "hbase_rs_hosts", projectedTopology, hostNames); + checkServiceHostIndexes(info, "JOBTRACKER", "jtnode_host", projectedTopology, hostNames); + checkServiceHostIndexes(info, "TASKTRACKER", "mapred_tt_hosts", projectedTopology, hostNames); + checkServiceHostIndexes(info, "NONAME_SERVER", "noname_server_hosts", projectedTopology, hostNames); + + Set<String> actualPingPorts = info.get(StageUtils.PORTS); + if (pingPorts.contains(null)) { + assertEquals(new HashSet<Integer>(pingPorts).size(), actualPingPorts.size() + 1); + } else { + assertEquals(new HashSet<Integer>(pingPorts).size(), actualPingPorts.size()); + } + + List<Integer> pingPortsActual = getRangeMappedDecompressedSet(actualPingPorts); + List<Integer> reindexedPorts = getReindexedList(pingPortsActual, new ArrayList<String>(allHosts), hostNames); + + //Treat null values + List<Integer> expectedPingPorts = new ArrayList<Integer>(pingPorts); + for (int i = 0; i < expectedPingPorts.size(); i++) { + if (expectedPingPorts.get(i) == null) { + expectedPingPorts.set(i, StageUtils.DEFAULT_PING_PORT); + } + } + assertEquals(expectedPingPorts, reindexedPorts); + + assertTrue(info.containsKey("decom_tt_hosts")); + Set<String> decommissionedHosts = info.get("decom_tt_hosts"); + assertEquals(2, decommissionedHosts.toString().split(",").length); + + // check server hostname field + assertTrue(info.containsKey(StageUtils.AMBARI_SERVER_HOST)); + Set<String> serverHost = info.get(StageUtils.AMBARI_SERVER_HOST); + assertEquals(1, serverHost.size()); + assertEquals(StageUtils.getHostName(), serverHost.iterator().next()); + + // Validate substitutions... + info = StageUtils.substituteHostIndexes(info); + + checkServiceHostNames(info, "DATANODE", "slave_hosts", projectedTopology); + checkServiceHostNames(info, "NAMENODE", "namenode_host", projectedTopology); + checkServiceHostNames(info, "SECONDARY_NAMENODE", "snamenode_host", projectedTopology); + checkServiceHostNames(info, "HBASE_MASTER", "hbase_master_hosts", projectedTopology); + checkServiceHostNames(info, "HBASE_REGIONSERVER", "hbase_rs_hosts", projectedTopology); + checkServiceHostNames(info, "JOBTRACKER", "jtnode_host", projectedTopology); + checkServiceHostNames(info, "TASKTRACKER", "mapred_tt_hosts", projectedTopology); + checkServiceHostNames(info, "NONAME_SERVER", "noname_server_hosts", projectedTopology); + } + + private void insertTopology(Map<String, Collection<String>> projectedTopology, String componentName, Set<String> hostNames) { + if (hostNames != null) { + for (String hostname : hostNames) { + Collection<String> components = projectedTopology.get(hostname); + + if (components == null) { + components = new HashSet<String>(); + projectedTopology.put(hostname, components); + } + + components.add(componentName); + } + } + } + + private void checkServiceHostIndexes(Map<String, Set<String>> info, String componentName, String mappedComponentName, + Map<String, Collection<String>> serviceTopology, List<String> hostList) { + Set<Integer> expectedHostsList = new HashSet<Integer>(); + Set<Integer> actualHostsList = new HashSet<Integer>(); + + // Determine the expected hosts for a given component... + for (Entry<String, Collection<String>> entry : serviceTopology.entrySet()) { + if (entry.getValue().contains(componentName)) { + expectedHostsList.add(hostList.indexOf(entry.getKey())); + } + } + + // Determine the actual hosts for a given component... + Set<String> hosts = info.get(mappedComponentName); + if (hosts != null) { + actualHostsList.addAll(getDecompressedSet(hosts)); + } + + assertEquals(expectedHostsList, actualHostsList); + } + + private void checkServiceHostNames(Map<String, Set<String>> info, String componentName, String mappedComponentName, + Map<String, Collection<String>> serviceTopology) { + Set<String> expectedHostsList = new HashSet<String>(); + Set<String> actualHostsList = new HashSet<String>(); + + // Determine the expected hosts for a given component... + for (Entry<String, Collection<String>> entry : serviceTopology.entrySet()) { + if (entry.getValue().contains(componentName)) { + expectedHostsList.add(entry.getKey()); + } + } + + // Determine the actual hosts for a given component... + Set<String> hosts = info.get(mappedComponentName); + if (hosts != null) { + actualHostsList.addAll(hosts); + } + + assertEquals(expectedHostsList, actualHostsList); + } + + private Set<Integer> getDecompressedSet(Set<String> set) { + + Set<Integer> resultSet = new HashSet<Integer>(); + + for (String index : set) { + String[] ranges = index.split(","); + + for (String r : ranges) { + String[] split = r.split("-"); + + if (split.length == 2) { + Integer start = Integer.valueOf(split[0]); + Integer end = Integer.valueOf(split[1]); + ContiguousSet<Integer> rangeSet = ContiguousSet.create(Range.closed(start, end), DiscreteDomain.integers()); + + for (Integer i : rangeSet) { + resultSet.add(i); + } + + } else { + resultSet.add(Integer.valueOf(split[0])); + } + } + } + + return resultSet; + } + + private List<Integer> getRangeMappedDecompressedSet(Set<String> compressedSet) { + + SortedMap<Integer, Integer> resultMap = new TreeMap<Integer, Integer>(); + + for (String token : compressedSet) { + + String[] split = token.split(":"); + + if (split.length != 2) { + throw new RuntimeException("Broken data, expected format - m:r, got - " + + token); + } + + Integer index = Integer.valueOf(split[0]); + + String rangeTokens = split[1]; + + Set<String> rangeTokensSet = + new HashSet<String>(Arrays.asList(rangeTokens.split(","))); + + Set<Integer> decompressedSet = getDecompressedSet(rangeTokensSet); + + for (Integer i : decompressedSet) { + resultMap.put(i, index); + } + + } + + List<Integer> resultList = new ArrayList<Integer>(resultMap.values()); + + return resultList; + + } + + private List<Integer> getReindexedList(List<Integer> list, + List<String> currentIndexes, List<String> desiredIndexes) { + + SortedMap<Integer, Integer> sortedMap = new TreeMap<Integer, Integer>(); + + int index = 0; + + for (Integer value : list) { + String currentIndexValue = currentIndexes.get(index); + Integer desiredIndexValue = desiredIndexes.indexOf(currentIndexValue); + sortedMap.put(desiredIndexValue, value); + index++; + } + + return new ArrayList<Integer>(sortedMap.values()); + } + +} http://git-wip-us.apache.org/repos/asf/ambari/blob/df2d8e3f/ambari-server/src/test/java/org/apache/ambari/server/utils/TestStageUtils.java ---------------------------------------------------------------------- diff --git a/ambari-server/src/test/java/org/apache/ambari/server/utils/TestStageUtils.java b/ambari-server/src/test/java/org/apache/ambari/server/utils/TestStageUtils.java deleted file mode 100644 index 8b6d2ae..0000000 --- a/ambari-server/src/test/java/org/apache/ambari/server/utils/TestStageUtils.java +++ /dev/null @@ -1,443 +0,0 @@ -/** - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ -package org.apache.ambari.server.utils; - -import static org.easymock.EasyMock.expect; -import static org.junit.Assert.assertEquals; -import static org.junit.Assert.assertTrue; -import static org.powermock.api.easymock.PowerMock.mockStaticPartial; -import static org.powermock.api.easymock.PowerMock.replayAll; - -import java.io.ByteArrayInputStream; -import java.io.IOException; -import java.io.InputStream; -import java.io.InputStreamReader; -import java.net.UnknownHostException; -import java.nio.charset.Charset; -import java.util.ArrayList; -import java.util.Arrays; -import java.util.Collections; -import java.util.HashMap; -import java.util.HashSet; -import java.util.List; -import java.util.Map; -import java.util.Map.Entry; -import java.util.Set; -import java.util.SortedMap; -import java.util.TreeMap; - -import javax.xml.bind.JAXBException; - -import org.apache.ambari.server.AmbariException; -import org.apache.ambari.server.actionmanager.ExecutionCommandWrapper; -import org.apache.ambari.server.actionmanager.Stage; -import org.apache.ambari.server.agent.ExecutionCommand; -import org.apache.ambari.server.api.services.AmbariMetaInfo; -import org.apache.ambari.server.orm.GuiceJpaInitializer; -import org.apache.ambari.server.orm.InMemoryDefaultTestModule; -import org.apache.ambari.server.state.Cluster; -import org.apache.ambari.server.state.Clusters; -import org.apache.ambari.server.state.Host; -import org.apache.ambari.server.state.HostComponentAdminState; -import org.apache.ambari.server.state.ServiceComponentHostFactory; -import org.apache.ambari.server.state.StackId; -import org.apache.commons.logging.Log; -import org.apache.commons.logging.LogFactory; -import org.codehaus.jackson.JsonGenerationException; -import org.codehaus.jackson.map.JsonMappingException; -import org.junit.Before; -import org.junit.Ignore; -import org.junit.Test; -import org.junit.runner.RunWith; -import org.powermock.core.classloader.annotations.PowerMockIgnore; -import org.powermock.core.classloader.annotations.PrepareForTest; -import org.powermock.modules.junit4.PowerMockRunner; - -import com.google.common.collect.ContiguousSet; -import com.google.common.collect.DiscreteDomain; -import com.google.common.collect.Range; -import com.google.gson.Gson; -import com.google.inject.Guice; -import com.google.inject.Injector; - -@RunWith(PowerMockRunner.class) -@PrepareForTest(StageUtils.class) -@PowerMockIgnore("javax.management.*") -public class TestStageUtils { - private static final String HOSTS_LIST = "all_hosts"; - - private static final String STACK_ID = "HDP-1.3.1"; - - private static Log LOG = LogFactory.getLog(TestStageUtils.class); - - private AmbariMetaInfo ambariMetaInfo; - - private Injector injector; - - static ServiceComponentHostFactory serviceComponentHostFactory; - - @Before - public void setup() throws Exception { - injector = Guice.createInjector(new InMemoryDefaultTestModule()); - injector.getInstance(GuiceJpaInitializer.class); - serviceComponentHostFactory = injector.getInstance(ServiceComponentHostFactory.class); - ambariMetaInfo = injector.getInstance(AmbariMetaInfo.class); - } - - - public static void addService(Cluster cl, List<String> hostList, - Map<String, List<Integer>> topology, String serviceName, - Injector injector) throws AmbariException { - cl.setDesiredStackVersion(new StackId(STACK_ID)); - cl.addService(serviceName); - - for (Entry<String, List<Integer>> component : topology.entrySet()) { - - String componentName = component.getKey(); - cl.getService(serviceName).addServiceComponent(componentName); - - for (Integer hostIndex : component.getValue()) { - cl.getService(serviceName) - .getServiceComponent(componentName) - .addServiceComponentHost( - serviceComponentHostFactory.createNew(cl.getService(serviceName) - .getServiceComponent(componentName), hostList.get(hostIndex))); - } - } - } - - @Test - @Ignore - public void testGetATestStage() { - Stage s = StageUtils.getATestStage(1, 2, "host2", "", "hostParamsStage"); - String hostname = s.getHosts().get(0); - List<ExecutionCommandWrapper> wrappers = s.getExecutionCommands(hostname); - for (ExecutionCommandWrapper wrapper : wrappers) { - assertEquals("cluster1", wrapper.getExecutionCommand().getClusterName()); - assertEquals(StageUtils.getActionId(1, 2), wrapper.getExecutionCommand().getCommandId()); - assertEquals(hostname, wrapper.getExecutionCommand().getHostname()); - } - } - - @Test - @Ignore - public void testJaxbToString() throws Exception { - Stage s = StageUtils.getATestStage(1, 2, "host1", "", "hostParamsStage"); - String hostname = s.getHosts().get(0); - List<ExecutionCommandWrapper> wrappers = s.getExecutionCommands(hostname); - for (ExecutionCommandWrapper wrapper : wrappers) { - LOG.info("Command is " + StageUtils.jaxbToString(wrapper.getExecutionCommand())); - } - assertEquals(StageUtils.getActionId(1, 2), s.getActionId()); - } - - @Test - @Ignore - public void testJasonToExecutionCommand() throws JsonGenerationException, - JsonMappingException, JAXBException, IOException { - Stage s = StageUtils.getATestStage(1, 2, "host1", "clusterHostInfo", "hostParamsStage"); - ExecutionCommand cmd = s.getExecutionCommands("host1").get(0).getExecutionCommand(); - HashMap<String, Map<String,String>> configTags = new HashMap<String, Map<String,String>>(); - Map<String, String> globalTag = new HashMap<String, String>(); - globalTag.put("tag", "version1"); - configTags.put("global", globalTag ); - cmd.setConfigurationTags(configTags); - String json = StageUtils.jaxbToString(cmd); - - InputStream is = new ByteArrayInputStream( - json.getBytes(Charset.forName("UTF8"))); - - ExecutionCommand cmdDes = new Gson().fromJson(new InputStreamReader(is), - ExecutionCommand.class); - - assertEquals(cmd.toString(), cmdDes.toString()); - assertEquals(cmd, cmdDes); - } - - @Test - @Ignore - public void testGetClusterHostInfo() throws AmbariException, UnknownHostException { - Clusters fsm = injector.getInstance(Clusters.class); - String h0 = "h0"; - - List<String> hostList = new ArrayList<String>(); - hostList.add("h1"); - hostList.add("h2"); - hostList.add("h3"); - hostList.add("h4"); - hostList.add("h5"); - hostList.add("h6"); - hostList.add("h7"); - hostList.add("h8"); - hostList.add("h9"); - hostList.add("h10"); - - mockStaticPartial(StageUtils.class, "getHostName"); - expect(StageUtils.getHostName()).andReturn(h0).anyTimes(); - replayAll(); - - List<Integer> pingPorts = Arrays.asList(StageUtils.DEFAULT_PING_PORT, - StageUtils.DEFAULT_PING_PORT, - StageUtils.DEFAULT_PING_PORT, - 8671, - 8671, - null, - 8672, - 8672, - null, - 8673); - - StackId stackId = new StackId(STACK_ID); - fsm.addCluster("c1", stackId); - - int index = 0; - - for (String host: hostList) { - fsm.addHost(host); - - Map<String, String> hostAttributes = new HashMap<String, String>(); - hostAttributes.put("os_family", "redhat"); - hostAttributes.put("os_release_version", "5.9"); - fsm.getHost(host).setHostAttributes(hostAttributes); - - fsm.getHost(host).setCurrentPingPort(pingPorts.get(index)); - fsm.getHost(host).persist(); - fsm.mapHostToCluster(host, "c1"); - index++; - } - - //Add HDFS service - Map<String, List<Integer>> hdfsTopology = new HashMap<String, List<Integer>>(); - hdfsTopology.put("NAMENODE", Collections.singletonList(0)); - hdfsTopology.put("SECONDARY_NAMENODE", Collections.singletonList(1)); - List<Integer> datanodeIndexes = Arrays.asList(0,1,2,3,5,7,8,9); - hdfsTopology.put("DATANODE", new ArrayList<Integer>(datanodeIndexes)); - addService(fsm.getCluster("c1"), hostList, hdfsTopology , "HDFS", injector); - - //Add HBASE service - Map<String, List<Integer>> hbaseTopology = new HashMap<String, List<Integer>>(); - hbaseTopology.put("HBASE_MASTER", Collections.singletonList(5)); - List<Integer> regionServiceIndexes = Arrays.asList(1,3,5,8,9); - hbaseTopology.put("HBASE_REGIONSERVER", regionServiceIndexes); - addService(fsm.getCluster("c1"), hostList, hbaseTopology , "HBASE", injector); - - //Add MAPREDUCE service - Map<String, List<Integer>> mrTopology = new HashMap<String, List<Integer>>(); - mrTopology.put("JOBTRACKER", Collections.singletonList(5)); - List<Integer> taskTrackerIndexes = Arrays.asList(1,2,3,4,5,7,9); - mrTopology.put("TASKTRACKER", taskTrackerIndexes); - addService(fsm.getCluster("c1"), hostList, mrTopology , "MAPREDUCE", injector); - - - //Add NONAME service - Map<String, List<Integer>> nonameTopology = new HashMap<String, List<Integer>>(); - nonameTopology.put("NONAME_SERVER", Collections.singletonList(7)); - addService(fsm.getCluster("c1"), hostList, nonameTopology , "NONAME", injector); - - fsm.getCluster("c1").getService("MAPREDUCE").getServiceComponent("TASKTRACKER").getServiceComponentHost("h2") - .setComponentAdminState(HostComponentAdminState.DECOMMISSIONED); - fsm.getCluster("c1").getService("MAPREDUCE").getServiceComponent("TASKTRACKER").getServiceComponentHost("h3") - .setComponentAdminState(HostComponentAdminState.DECOMMISSIONED); - - //Get cluster host info - Map<String, Set<String>> info = - StageUtils.getClusterHostInfo(fsm.getCluster("c1")); - - //All hosts present in cluster host info - Set<String> allHosts = info.get(HOSTS_LIST); - ArrayList<String> allHostsList = new ArrayList<String>(allHosts); - assertEquals(fsm.getHosts().size(), allHosts.size()); - for (Host host: fsm.getHosts()) { - assertTrue(allHosts.contains(host.getHostName())); - } - - - //Check HDFS topology compression - Map<String, String> hdfsMapping = new HashMap<String, String>(); - hdfsMapping.put("DATANODE", "slave_hosts"); - hdfsMapping.put("NAMENODE", "namenode_host"); - hdfsMapping.put("SECONDARY_NAMENODE", "snamenode_host"); - checkServiceCompression(info, hdfsMapping, hdfsTopology, hostList); - - - //Check HBASE topology compression - Map<String, String> hbaseMapping = new HashMap<String, String>(); - hbaseMapping.put("HBASE_MASTER", "hbase_master_hosts"); - hbaseMapping.put("HBASE_REGIONSERVER", "hbase_rs_hosts"); - checkServiceCompression(info, hbaseMapping, hbaseTopology, hostList); - - //Check MAPREDUCE topology compression - Map<String, String> mrMapping = new HashMap<String, String>(); - mrMapping.put("JOBTRACKER", "jtnode_host"); - mrMapping.put("TASKTRACKER", "mapred_tt_hosts"); - checkServiceCompression(info, mrMapping, mrTopology, hostList); - - Set<String> actualPingPorts = info.get("all_ping_ports"); - - if (pingPorts.contains(null)) { - assertEquals(new HashSet<Integer>(pingPorts).size(), actualPingPorts.size() + 1); - } else { - assertEquals(new HashSet<Integer>(pingPorts).size(), actualPingPorts.size()); - } - - List<Integer> pingPortsActual = getRangeMappedDecompressedSet(actualPingPorts); - - List<Integer> reindexedPorts = getReindexedList(pingPortsActual, new ArrayList<String>(allHosts), hostList); - - //Treat null values - while (pingPorts.contains(null)) { - pingPorts.remove(null); - pingPorts.add(StageUtils.DEFAULT_PING_PORT); - } - - assertEquals(pingPorts, reindexedPorts); - - // check for no-name in the list - assertTrue(info.containsKey("noname_server_hosts")); - assertTrue(info.containsKey("decom_tt_hosts")); - Set<String> decommissionedHosts = info.get("decom_tt_hosts"); - assertEquals(2, decommissionedHosts.toString().split(",").length); - - // check server hostname field - assertTrue(info.containsKey(StageUtils.AMBARI_SERVER_HOST)); - Set<String> serverHost = info.get(StageUtils.AMBARI_SERVER_HOST); - assertEquals(1, serverHost.size()); - assertEquals(h0, serverHost.iterator().next()); - } - - private void checkServiceCompression(Map<String, Set<String>> info, - Map<String, String> serviceMapping, Map<String, List<Integer>> serviceTopology, - List<String> hostList) { - - - for (Entry<String, List<Integer>> component: serviceTopology.entrySet()) { - - String componentName = component.getKey(); - - List<Integer> componentIndexesExpected = component.getValue(); - - String roleName = serviceMapping.get(componentName); - - assertTrue("No mapping for " + componentName , roleName != null); - - Set<Integer> componentIndexesActual = getDecompressedSet(info.get(roleName)); - - Set<String> expectedComponentHosts = new HashSet<String>(); - - for (Integer i: componentIndexesExpected) { - expectedComponentHosts.add(hostList.get(i)); - } - - Set<String> actualSlavesHosts = new HashSet<String>(); - - for (Integer i: componentIndexesActual) { - actualSlavesHosts.add(new ArrayList<String>(info.get(HOSTS_LIST)).get(i)); - } - - - - assertEquals(expectedComponentHosts, actualSlavesHosts); - - } - - } - - private Set<Integer> getDecompressedSet(Set<String> set) { - - Set<Integer> resultSet = new HashSet<Integer>(); - - for (String index : set) { - - String[] ranges = index.split(","); - - for (String r : ranges) { - - String[] split = r.split("-"); - - if (split.length == 2) { - Integer start = Integer.valueOf(split[0]); - Integer end = Integer.valueOf(split[1]); - ContiguousSet<Integer> rangeSet = - ContiguousSet.create(Range.closed(start, end), DiscreteDomain.integers()) ; - - for (Integer i : rangeSet) { - resultSet.add(i); - - } - - } else { - resultSet.add(Integer.valueOf(split[0])); - } - } - - } - return resultSet; - } - - private List<Integer> getRangeMappedDecompressedSet(Set<String> compressedSet) { - - SortedMap<Integer, Integer> resultMap = new TreeMap<Integer, Integer>(); - - for (String token : compressedSet) { - - String[] split = token.split(":"); - - if (split.length != 2) { - throw new RuntimeException("Broken data, expected format - m:r, got - " - + token); - } - - Integer index = Integer.valueOf(split[0]); - - String rangeTokens = split[1]; - - Set<String> rangeTokensSet = - new HashSet<String>(Arrays.asList(rangeTokens.split(","))); - - Set<Integer> decompressedSet = getDecompressedSet(rangeTokensSet); - - for (Integer i : decompressedSet) { - resultMap.put(i, index); - } - - } - - List<Integer> resultList = new ArrayList<Integer>(resultMap.values()); - - return resultList; - - } - - private List<Integer> getReindexedList(List<Integer> list, - List<String> currentIndexes, List<String> desiredIndexes) { - - SortedMap<Integer, Integer> sortedMap = new TreeMap<Integer, Integer>(); - - int index = 0; - - for (Integer value : list) { - String currentIndexValue = currentIndexes.get(index); - Integer desiredIndexValue = desiredIndexes.indexOf(currentIndexValue); - sortedMap.put(desiredIndexValue, value); - index++; - } - - return new ArrayList<Integer>(sortedMap.values()); - } - -}
