Repository: ambari Updated Branches: refs/heads/trunk 132cf31bc -> 43ae930b0
AMBARI-10811. Fix issues with config topology update in certain topologies which include HA and implicit MYSQL_SERVER component inclusion Project: http://git-wip-us.apache.org/repos/asf/ambari/repo Commit: http://git-wip-us.apache.org/repos/asf/ambari/commit/43ae930b Tree: http://git-wip-us.apache.org/repos/asf/ambari/tree/43ae930b Diff: http://git-wip-us.apache.org/repos/asf/ambari/diff/43ae930b Branch: refs/heads/trunk Commit: 43ae930b0188f22913dcc67ec5c8ea168ff4deae Parents: 132cf31 Author: jspeidel <[email protected]> Authored: Tue Apr 28 20:43:59 2015 -0400 Committer: jspeidel <[email protected]> Committed: Tue Apr 28 20:45:13 2015 -0400 ---------------------------------------------------------------------- .../BlueprintConfigurationProcessor.java | 96 ++++++++++++++++---- .../topology/ClusterConfigurationRequest.java | 7 +- .../ambari/server/topology/TopologyManager.java | 25 +++-- .../BlueprintConfigurationProcessorTest.java | 48 +++++++++- 4 files changed, 142 insertions(+), 34 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/ambari/blob/43ae930b/ambari-server/src/main/java/org/apache/ambari/server/controller/internal/BlueprintConfigurationProcessor.java ---------------------------------------------------------------------- diff --git a/ambari-server/src/main/java/org/apache/ambari/server/controller/internal/BlueprintConfigurationProcessor.java b/ambari-server/src/main/java/org/apache/ambari/server/controller/internal/BlueprintConfigurationProcessor.java index 95e9807..7938cc1 100644 --- a/ambari-server/src/main/java/org/apache/ambari/server/controller/internal/BlueprintConfigurationProcessor.java +++ b/ambari-server/src/main/java/org/apache/ambari/server/controller/internal/BlueprintConfigurationProcessor.java @@ -636,12 +636,12 @@ public class BlueprintConfigurationProcessor { * * @return new property value */ - public String updateForClusterCreate(String propertyName, + String updateForClusterCreate(String propertyName, String origValue, Map<String, Map<String, String>> properties, ClusterTopology topology); - public Collection<String> getRequiredHostGroups(String origValue, + Collection<String> getRequiredHostGroups(String origValue, Map<String, Map<String, String>> properties, ClusterTopology topology); } @@ -792,32 +792,79 @@ public class BlueprintConfigurationProcessor { return Collections.singleton(hostGroupName); } else { Collection<String> matchingGroups = topology.getHostGroupsForComponent(component); - if (matchingGroups.size() == 1) { + int matchingGroupCount = matchingGroups.size(); + if (matchingGroupCount == 1) { return Collections.singleton(matchingGroups.iterator().next()); } else { - if (topology.isNameNodeHAEnabled() && isComponentNameNode() && (matchingGroups.size() == 2)) { - // if this is the defaultFS property, it should reflect the nameservice name, - // rather than a hostname (used in non-HA scenarios) - if (properties.containsKey("core-site") && properties.get("core-site").get("fs.defaultFS").equals(origValue)) { - return Collections.emptySet(); + Cardinality cardinality = topology.getBlueprint().getStack().getCardinality(component); + // if no matching host groups are found for a component whose configuration + // is handled by this updater, return an empty set + if (matchingGroupCount == 0 && cardinality.isValidCount(0)) { + return Collections.emptySet(); + } else { + //todo: shouldn't have all of these hard coded HA rules here + if (topology.isNameNodeHAEnabled() && isComponentNameNode() && (matchingGroupCount == 2)) { + // if this is the defaultFS property, it should reflect the nameservice name, + // rather than a hostname (used in non-HA scenarios) + if (properties.containsKey("core-site") && properties.get("core-site").get("fs.defaultFS").equals(origValue)) { + return Collections.emptySet(); + } + + if (properties.containsKey("hbase-site") && properties.get("hbase-site").get("hbase.rootdir").equals(origValue)) { + // hbase-site's reference to the namenode is handled differently in HA mode, since the + // reference must point to the logical nameservice, rather than an individual namenode + return Collections.emptySet(); + } + + if (properties.containsKey("accumulo-site") && properties.get("accumulo-site").get("instance.volumes").equals(origValue)) { + // accumulo-site's reference to the namenode is handled differently in HA mode, since the + // reference must point to the logical nameservice, rather than an individual namenode + return Collections.emptySet(); + } + + if (!origValue.contains("localhost")) { + // if this NameNode HA property is a FDQN, then simply return it + return Collections.emptySet(); + } } - if (properties.containsKey("hbase-site") && properties.get("hbase-site").get("hbase.rootdir").equals(origValue)) { - // hbase-site's reference to the namenode is handled differently in HA mode, since the - // reference must point to the logical nameservice, rather than an individual namenode + if (topology.isNameNodeHAEnabled() && isComponentSecondaryNameNode() && (matchingGroupCount == 0)) { + // if HDFS HA is enabled, then no replacement is necessary for properties that refer to the SECONDARY_NAMENODE + // eventually this type of information should be encoded in the stacks return Collections.emptySet(); } - } - if (topology.isNameNodeHAEnabled() && isComponentSecondaryNameNode() && (matchingGroups.isEmpty())) { - // if HDFS HA is enabled, then no replacement is necessary for properties that refer to the SECONDARY_NAMENODE - // eventually this type of information should be encoded in the stacks - return Collections.emptySet(); - } + if (isYarnResourceManagerHAEnabled(properties) && isComponentResourceManager() && (matchingGroupCount == 2)) { + if (!origValue.contains("localhost")) { + // if this Yarn property is a FQDN, then simply return it + return Collections.emptySet(); + } + } - //todo: - throw new IllegalArgumentException("Unable to determine required host groups for component. " + - "Component '" + component + "' is not mapped to any host group or is mapped to multiple groups."); + if ((isOozieServerHAEnabled(properties)) && isComponentOozieServer() && (matchingGroupCount > 1)) { + if (!origValue.contains("localhost")) { + // if this Oozie property is a FQDN, then simply return it + return Collections.emptySet(); + } + } + + if ((isHiveServerHAEnabled(properties)) && isComponentHiveServer() && (matchingGroupCount > 1)) { + if (!origValue.contains("localhost")) { + // if this Hive property is a FQDN, then simply return it + return Collections.emptySet(); + } + } + + if ((isComponentHiveMetaStoreServer()) && matchingGroupCount > 1) { + if (!origValue.contains("localhost")) { + // if this Hive MetaStore property is a FQDN, then simply return it + return Collections.emptySet(); + } + } + //todo: property name + throw new IllegalArgumentException("Unable to update configuration property with topology information. " + + "Component '" + component + "' is not mapped to any host group or is mapped to multiple groups."); + } } } } @@ -999,6 +1046,15 @@ public class BlueprintConfigurationProcessor { } } + @Override + public Collection<String> getRequiredHostGroups(String origValue, Map<String, Map<String, String>> properties, ClusterTopology topology) { + if (isDatabaseManaged(properties)) { + return super.getRequiredHostGroups(origValue, properties, topology); + } else { + return Collections.emptySet(); + } + } + /** * Determine if database is managed, meaning that it is a component in the cluster topology. * http://git-wip-us.apache.org/repos/asf/ambari/blob/43ae930b/ambari-server/src/main/java/org/apache/ambari/server/topology/ClusterConfigurationRequest.java ---------------------------------------------------------------------- diff --git a/ambari-server/src/main/java/org/apache/ambari/server/topology/ClusterConfigurationRequest.java b/ambari-server/src/main/java/org/apache/ambari/server/topology/ClusterConfigurationRequest.java index 1bffbf2..07ea50b 100644 --- a/ambari-server/src/main/java/org/apache/ambari/server/topology/ClusterConfigurationRequest.java +++ b/ambari-server/src/main/java/org/apache/ambari/server/topology/ClusterConfigurationRequest.java @@ -68,7 +68,12 @@ public class ClusterConfigurationRequest { public void process() throws AmbariException, ConfigurationTopologyException { // this will update the topo cluster config and all host group configs in the cluster topology - configurationProcessor.doUpdateForClusterCreate(); + try { + configurationProcessor.doUpdateForClusterCreate(); + } catch (ConfigurationTopologyException e) { + //log and continue to set configs on cluster to make progress + LOG.error("An exception occurred while doing configuration topology update: " + e, e); + } setConfigurationsOnCluster(clusterTopology, "TOPOLOGY_RESOLVED"); } http://git-wip-us.apache.org/repos/asf/ambari/blob/43ae930b/ambari-server/src/main/java/org/apache/ambari/server/topology/TopologyManager.java ---------------------------------------------------------------------- diff --git a/ambari-server/src/main/java/org/apache/ambari/server/topology/TopologyManager.java b/ambari-server/src/main/java/org/apache/ambari/server/topology/TopologyManager.java index 3e1b565..fb4baec 100644 --- a/ambari-server/src/main/java/org/apache/ambari/server/topology/TopologyManager.java +++ b/ambari-server/src/main/java/org/apache/ambari/server/topology/TopologyManager.java @@ -39,6 +39,8 @@ import org.apache.ambari.server.orm.entities.StageEntity; import org.apache.ambari.server.state.Cluster; import org.apache.ambari.server.state.SecurityType; import org.apache.ambari.server.state.host.HostImpl; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; import java.util.ArrayList; import java.util.Collection; @@ -81,6 +83,8 @@ public class TopologyManager { private final static AtomicLong nextTaskId = new AtomicLong(10000); private final Object serviceResourceLock = new Object(); + protected final static Logger LOG = LoggerFactory.getLogger(TopologyManager.class); + public TopologyManager() { pendingTasks.put(TopologyTask.Type.CONFIGURE, new HashSet<TopologyTask>()); @@ -174,7 +178,7 @@ public class TopologyManager { if (! matchedToRequest) { synchronized (availableHosts) { - System.out.printf("TopologyManager: Queueing available host %s\n", hostName); + LOG.info("TopologyManager: Queueing available host {}", hostName); availableHosts.add(host); } } @@ -500,7 +504,7 @@ public class TopologyManager { @Override public void run() { - System.out.println("TopologyManager.ConfigureClusterTask: Entering"); + LOG.info("TopologyManager.ConfigureClusterTask: Entering"); boolean completed = false; boolean interrupted = false; @@ -520,25 +524,25 @@ public class TopologyManager { if (! interrupted) { try { - System.out.println("TopologyManager.ConfigureClusterTask: Setting Configuration on cluster"); + LOG.info("TopologyManager.ConfigureClusterTask: Setting Configuration on cluster"); // sets updated configuration on topology and cluster configRequest.process(); } catch (Exception e) { //todo: how to handle this? If this fails, we shouldn't start any hosts. - System.out.println("TopologyManager.ConfigureClusterTask: " + - "An exception occurred while attempting to process cluster configs and set on cluster"); + LOG.error("TopologyManager.ConfigureClusterTask: " + + "An exception occurred while attempting to process cluster configs and set on cluster: " + e); e.printStackTrace(); } synchronized (configurationFlagLock) { - System.out.println("TopologyManager.ConfigureClusterTask: Setting configure complete flag to true"); + LOG.info("TopologyManager.ConfigureClusterTask: Setting configure complete flag to true"); configureComplete = true; } // execute all queued install/start tasks executor.submit(new ExecuteQueuedHostTasks()); } - System.out.println("TopologyManager.ConfigureClusterTask: Exiting"); + LOG.info("TopologyManager.ConfigureClusterTask: Exiting"); } // get set of required host groups from config processor and confirm that all requests @@ -549,9 +553,10 @@ public class TopologyManager { try { requiredHostGroups = configRequest.getRequiredHostGroups(); } catch (RuntimeException e) { - //todo - System.out.println("Caught an error from Config Processor: " + e); - throw e; + //todo: for now if an exception occurs, log error and return true which will result in topology update + LOG.error("An exception occurred while attempting to determine required host groups for config update " + e); + e.printStackTrace(); + requiredHostGroups = Collections.emptyList(); } synchronized (outstandingRequests) { http://git-wip-us.apache.org/repos/asf/ambari/blob/43ae930b/ambari-server/src/test/java/org/apache/ambari/server/controller/internal/BlueprintConfigurationProcessorTest.java ---------------------------------------------------------------------- diff --git a/ambari-server/src/test/java/org/apache/ambari/server/controller/internal/BlueprintConfigurationProcessorTest.java b/ambari-server/src/test/java/org/apache/ambari/server/controller/internal/BlueprintConfigurationProcessorTest.java index 34b239b..7898473 100644 --- a/ambari-server/src/test/java/org/apache/ambari/server/controller/internal/BlueprintConfigurationProcessorTest.java +++ b/ambari-server/src/test/java/org/apache/ambari/server/controller/internal/BlueprintConfigurationProcessorTest.java @@ -48,6 +48,7 @@ import org.apache.ambari.server.topology.HostGroup; import org.apache.ambari.server.topology.HostGroupImpl; import org.apache.ambari.server.topology.HostGroupInfo; import org.apache.ambari.server.topology.InvalidTopologyException; +import org.apache.commons.collections.map.HashedMap; import org.junit.After; import org.junit.Before; import org.junit.Test; @@ -139,6 +140,8 @@ public class BlueprintConfigurationProcessorTest { expect(stack.getServiceForComponent(component)).andReturn(service).anyTimes(); } } + + expect(stack.getCardinality("MYSQL_SERVER")).andReturn(new Cardinality("0-1")).anyTimes(); } @After @@ -2182,7 +2185,7 @@ public class BlueprintConfigurationProcessorTest { // verify that the properties with hostname information was correctly preserved assertEquals("Yarn Log Server URL was incorrectly updated", - "http://" + expectedHostName +":19888/jobhistory/logs", yarnSiteProperties.get("yarn.log.server.url")); + "http://" + expectedHostName + ":19888/jobhistory/logs", yarnSiteProperties.get("yarn.log.server.url")); assertEquals("Yarn ResourceManager hostname was incorrectly exported", expectedHostName, yarnSiteProperties.get("yarn.resourcemanager.hostname")); assertEquals("Yarn ResourceManager tracker address was incorrectly updated", @@ -2910,7 +2913,7 @@ public class BlueprintConfigurationProcessorTest { expectedHostName + ":" + expectedPortNum, falconStartupProperties.get("*.broker.url")); assertEquals("Falcon Kerberos Principal property not properly exported", - "falcon/" + expectedHostName + "@EXAMPLE.COM", falconStartupProperties.get("*.falcon.service.authentication.kerberos.principal")); + "falcon/" + expectedHostName + "@EXAMPLE.COM", falconStartupProperties.get("*.falcon.service.authentication.kerberos.principal")); assertEquals("Falcon Kerberos HTTP Principal property not properly exported", "HTTP/" + expectedHostName + "@EXAMPLE.COM", falconStartupProperties.get("*.falcon.http.authentication.kerberos.principal")); @@ -3123,7 +3126,7 @@ public class BlueprintConfigurationProcessorTest { "localhost", stormSiteProperties.get("supervisor.childopts")); assertEquals("nimbus startup settings not properly handled by cluster create", - "localhost", stormSiteProperties.get("nimbus.childopts")); + "localhost", stormSiteProperties.get("nimbus.childopts")); assertEquals("Kafka ganglia host property not properly handled by cluster create", "localhost", kafkaBrokerProperties.get("kafka.ganglia.metrics.host")); @@ -3526,6 +3529,45 @@ public class BlueprintConfigurationProcessorTest { hdfsSiteProperties.get("dfs.namenode.shared.edits.dir")); } + @Test + public void testGetRequiredHostGroups___validComponentCountofZero() throws Exception { + Map<String, Map<String, String>> properties = new HashMap<String, Map<String, String>>(); + Map<String, String> hiveSite = new HashMap<String, String>(); + properties.put("hive-site", hiveSite); + Map<String, String> hiveEnv = new HashMap<String, String>(); + properties.put("hive-env", hiveEnv); + + hiveSite.put("javax.jdo.option.ConnectionURL", "localhost:1111"); + // not the exact string but we are only looking for "New" + hiveEnv.put("hive_database", "New Database"); + + + Configuration clusterConfig = new Configuration(properties, + Collections.<String, Map<String, Map<String, String>>>emptyMap()); + + Collection<String> hgComponents1 = new HashSet<String>(); + hgComponents1.add("HIVE_SERVER"); + hgComponents1.add("NAMENODE"); + TestHostGroup group1 = new TestHostGroup("group1", hgComponents1, Collections.singleton("host1")); + + Collection<String> hgComponents2 = new HashSet<String>(); + hgComponents2.add("DATANODE"); + TestHostGroup group2 = new TestHostGroup("group2", hgComponents2, Collections.singleton("host2")); + + Collection<TestHostGroup> hostGroups = new ArrayList<TestHostGroup>(); + hostGroups.add(group1); + hostGroups.add(group2); + + ClusterTopology topology = createClusterTopology("c1", bp, clusterConfig, hostGroups); + BlueprintConfigurationProcessor updater = new BlueprintConfigurationProcessor(topology); + + // call top-level export method + Collection<String> requiredGroups = updater.getRequiredHostGroups(); + System.out.println("Required Groups: " + requiredGroups); + + + } + private static String createExportedAddress(String expectedPortNum, String expectedHostGroupName) { return createExportedHostName(expectedHostGroupName, expectedPortNum); }
