AMBARI-22159. Replace hostgroup vars for Druid
Project: http://git-wip-us.apache.org/repos/asf/ambari/repo Commit: http://git-wip-us.apache.org/repos/asf/ambari/commit/75465a83 Tree: http://git-wip-us.apache.org/repos/asf/ambari/tree/75465a83 Diff: http://git-wip-us.apache.org/repos/asf/ambari/diff/75465a83 Branch: refs/heads/branch-feature-AMBARI-20859 Commit: 75465a83bd743bb3a2fa74acf30cfca4d0a2287c Parents: f1c4626 Author: Attila Doroszlai <adorosz...@hortonworks.com> Authored: Mon Oct 9 14:40:02 2017 +0200 Committer: Attila Doroszlai <adorosz...@hortonworks.com> Committed: Mon Oct 9 18:39:22 2017 +0200 ---------------------------------------------------------------------- .../BlueprintConfigurationProcessor.java | 81 +++++++++++++++----- .../BlueprintConfigurationProcessorTest.java | 32 ++++++++ 2 files changed, 92 insertions(+), 21 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/ambari/blob/75465a83/ambari-server/src/main/java/org/apache/ambari/server/controller/internal/BlueprintConfigurationProcessor.java ---------------------------------------------------------------------- diff --git a/ambari-server/src/main/java/org/apache/ambari/server/controller/internal/BlueprintConfigurationProcessor.java b/ambari-server/src/main/java/org/apache/ambari/server/controller/internal/BlueprintConfigurationProcessor.java index 5a6e2cc..03f84a5 100644 --- a/ambari-server/src/main/java/org/apache/ambari/server/controller/internal/BlueprintConfigurationProcessor.java +++ b/ambari-server/src/main/java/org/apache/ambari/server/controller/internal/BlueprintConfigurationProcessor.java @@ -32,6 +32,7 @@ import java.util.LinkedHashSet; import java.util.LinkedList; import java.util.List; import java.util.Map; +import java.util.Objects; import java.util.Set; import java.util.regex.Matcher; import java.util.regex.Pattern; @@ -1367,11 +1368,56 @@ public class BlueprintConfigurationProcessor { ClusterTopology topology); } + private static class HostGroupUpdater implements PropertyUpdater { + + public static final PropertyUpdater INSTANCE = new HostGroupUpdater(); + + @Override + public String updateForClusterCreate(String propertyName, + String origValue, + Map<String, Map<String, String>> properties, + ClusterTopology topology) { + + //todo: getHostStrings + Matcher m = HostGroup.HOSTGROUP_REGEX.matcher(origValue); + if (m.find()) { + String hostGroupName = m.group(1); + + HostGroupInfo groupInfo = topology.getHostGroupInfo().get(hostGroupName); + if (groupInfo == null) { + //todo: this should be validated in configuration validation + throw new RuntimeException( + "Encountered a host group token in configuration which couldn't be matched to a host group: " + + hostGroupName); + } + + //todo: warn if > hosts + return origValue.replace(m.group(0), groupInfo.getHostNames().iterator().next()); + } + + return origValue; + } + + @Override + public Collection<String> getRequiredHostGroups(String propertyName, + String origValue, + Map<String, Map<String, String>> properties, + ClusterTopology topology) { + //todo: getHostStrings + Matcher m = HostGroup.HOSTGROUP_REGEX.matcher(origValue); + if (m.find()) { + String hostGroupName = m.group(1); + return Collections.singleton(hostGroupName); + } + return Collections.emptySet(); + } + } + /** * Topology based updater which replaces the original host name of a property with the host name * which runs the associated (master) component in the new cluster. */ - private static class SingleHostTopologyUpdater implements PropertyUpdater { + private static class SingleHostTopologyUpdater extends HostGroupUpdater { /** * Component name */ @@ -1402,21 +1448,9 @@ public class BlueprintConfigurationProcessor { Map<String, Map<String, String>> properties, ClusterTopology topology) { - //todo: getHostStrings - Matcher m = HostGroup.HOSTGROUP_REGEX.matcher(origValue); - if (m.find()) { - String hostGroupName = m.group(1); - - HostGroupInfo groupInfo = topology.getHostGroupInfo().get(hostGroupName); - if (groupInfo == null) { - //todo: this should be validated in configuration validation - throw new RuntimeException( - "Encountered a host group token in configuration which couldn't be matched to a host group: " - + hostGroupName); - } - - //todo: warn if > hosts - return origValue.replace(m.group(0), groupInfo.getHostNames().iterator().next()); + String replacedValue = super.updateForClusterCreate(propertyName, origValue, properties, topology); + if (!Objects.equals(origValue, replacedValue)) { + return replacedValue; } else { int matchingGroupCount = topology.getHostGroupsForComponent(component).size(); if (matchingGroupCount == 1) { @@ -1525,11 +1559,9 @@ public class BlueprintConfigurationProcessor { String origValue, Map<String, Map<String, String>> properties, ClusterTopology topology) { - //todo: getHostStrings - Matcher m = HostGroup.HOSTGROUP_REGEX.matcher(origValue); - if (m.find()) { - String hostGroupName = m.group(1); - return Collections.singleton(hostGroupName); + Collection<String> result = super.getRequiredHostGroups(propertyName, origValue, properties, topology); + if (!result.isEmpty()) { + return result; } else { Collection<String> matchingGroups = topology.getHostGroupsForComponent(component); int matchingGroupCount = matchingGroups.size(); @@ -2351,6 +2383,7 @@ public class BlueprintConfigurationProcessor { allUpdaters.add(nonTopologyUpdaters); Map<String, PropertyUpdater> amsSiteMap = new HashMap<>(); + Map<String, PropertyUpdater> druidCommon = new HashMap<>(); Map<String, PropertyUpdater> hdfsSiteMap = new HashMap<>(); Map<String, PropertyUpdater> mapredSiteMap = new HashMap<>(); Map<String, PropertyUpdater> coreSiteMap = new HashMap<>(); @@ -2404,6 +2437,7 @@ public class BlueprintConfigurationProcessor { Map<String, PropertyUpdater> zookeeperEnvMap = new HashMap<>(); singleHostTopologyUpdaters.put("ams-site", amsSiteMap); + singleHostTopologyUpdaters.put("druid-common", druidCommon); singleHostTopologyUpdaters.put("hdfs-site", hdfsSiteMap); singleHostTopologyUpdaters.put("mapred-site", mapredSiteMap); singleHostTopologyUpdaters.put("core-site", coreSiteMap); @@ -2775,6 +2809,11 @@ public class BlueprintConfigurationProcessor { } } }); + + // DRUID + druidCommon.put("metastore_hostname", HostGroupUpdater.INSTANCE); + druidCommon.put("druid.metadata.storage.connector.connectURI", HostGroupUpdater.INSTANCE); + druidCommon.put("druid.zk.service.host", new MultipleHostTopologyUpdater("ZOOKEEPER_SERVER")); } private static void addUnitPropertyUpdaters() { http://git-wip-us.apache.org/repos/asf/ambari/blob/75465a83/ambari-server/src/test/java/org/apache/ambari/server/controller/internal/BlueprintConfigurationProcessorTest.java ---------------------------------------------------------------------- diff --git a/ambari-server/src/test/java/org/apache/ambari/server/controller/internal/BlueprintConfigurationProcessorTest.java b/ambari-server/src/test/java/org/apache/ambari/server/controller/internal/BlueprintConfigurationProcessorTest.java index 68d6349..d137f2c 100644 --- a/ambari-server/src/test/java/org/apache/ambari/server/controller/internal/BlueprintConfigurationProcessorTest.java +++ b/ambari-server/src/test/java/org/apache/ambari/server/controller/internal/BlueprintConfigurationProcessorTest.java @@ -88,6 +88,7 @@ import com.google.common.collect.ImmutableMap; import com.google.common.collect.ImmutableSet; import com.google.common.collect.Lists; import com.google.common.collect.Maps; +import com.google.common.collect.Sets; /** * BlueprintConfigurationProcessor unit tests. @@ -7933,6 +7934,37 @@ public class BlueprintConfigurationProcessorTest extends EasyMockSupport { assertEquals(someString, metricsReporterRegister); } + @Test + public void druidProperties() throws Exception { + Map<String, Map<String, String>> properties = new HashMap<>(); + Map<String, String> druidCommon = new HashMap<>(); + String connectUriKey = "druid.metadata.storage.connector.connectURI"; + String metastoreHostnameKey = "metastore_hostname"; + String connectUriTemplate = "jdbc:mysql://%s:3306/druid?createDatabaseIfNotExist=true"; + druidCommon.put(connectUriKey, String.format(connectUriTemplate, "%HOSTGROUP::group1%")); + druidCommon.put(metastoreHostnameKey, "%HOSTGROUP::group1%"); + properties.put("druid-common", druidCommon); + + Map<String, Map<String, String>> parentProperties = new HashMap<>(); + Configuration parentClusterConfig = new Configuration(parentProperties, Collections.<String, Map<String, Map<String, String>>>emptyMap()); + Configuration clusterConfig = new Configuration(properties, Collections.<String, Map<String, Map<String, String>>>emptyMap(), parentClusterConfig); + + Collection<String> hgComponents1 = Sets.newHashSet("DRUID_COORDINATOR"); + TestHostGroup group1 = new TestHostGroup("group1", hgComponents1, Collections.singleton("host1")); + + Collection<String> hgComponents2 = Sets.newHashSet("DRUID_BROKER", "DRUID_OVERLORD", "DRUID_ROUTER"); + TestHostGroup group2 = new TestHostGroup("group2", hgComponents2, Collections.singleton("host2")); + + Collection<TestHostGroup> hostGroups = Arrays.asList(group1, group2); + + ClusterTopology topology = createClusterTopology(bp, clusterConfig, hostGroups); + BlueprintConfigurationProcessor configProcessor = new BlueprintConfigurationProcessor(topology); + + configProcessor.doUpdateForClusterCreate(); + + assertEquals(String.format(connectUriTemplate, "host1"), clusterConfig.getPropertyValue("druid-common", connectUriKey)); + assertEquals("host1", clusterConfig.getPropertyValue("druid-common", metastoreHostnameKey)); + } @Test public void testAmsPropertiesDefault() throws Exception {