Repository: ambari Updated Branches: refs/heads/trunk 5bdd6cf7e -> 326cc1b2a
http://git-wip-us.apache.org/repos/asf/ambari/blob/326cc1b2/ambari-server/src/main/resources/stacks/HDPWIN/2.1/services/stack_advisor.py ---------------------------------------------------------------------- diff --git a/ambari-server/src/main/resources/stacks/HDPWIN/2.1/services/stack_advisor.py b/ambari-server/src/main/resources/stacks/HDPWIN/2.1/services/stack_advisor.py index 4e4ef51..c7d9327 100644 --- a/ambari-server/src/main/resources/stacks/HDPWIN/2.1/services/stack_advisor.py +++ b/ambari-server/src/main/resources/stacks/HDPWIN/2.1/services/stack_advisor.py @@ -37,6 +37,7 @@ class HDPWIN21StackAdvisor(DefaultStackAdvisor): self.modifyHeapSizeProperties() self.modifyNotValuableComponents() self.modifyComponentsNotPreferableOnServer() + self.modifyComponentLayoutSchemes() def modifyMastersWithMultipleInstances(self): """ @@ -79,6 +80,32 @@ class HDPWIN21StackAdvisor(DefaultStackAdvisor): """ self.notPreferableOnServerComponents |= set(['STORM_UI_SERVER', 'DRPC_SERVER', 'STORM_REST_API', 'NIMBUS']) + def modifyComponentLayoutSchemes(self): + """ + Modify layout scheme dictionaries for components. + + The scheme dictionary basically maps the number of hosts to + host index where component should exist. + + Must be overriden in child class. + """ + self.componentLayoutSchemes = { + 'NAMENODE': {"else": 0}, + 'SECONDARY_NAMENODE': {"else": 1}, + 'HBASE_MASTER': {6: 0, 31: 2, "else": 3}, + + 'HISTORYSERVER': {31: 1, "else": 2}, + 'RESOURCEMANAGER': {31: 1, "else": 2}, + + 'OOZIE_SERVER': {6: 1, 31: 2, "else": 3}, + + 'HIVE_SERVER': {6: 1, 31: 2, "else": 4}, + 'HIVE_METASTORE': {6: 1, 31: 2, "else": 4}, + 'WEBHCAT_SERVER': {6: 1, 31: 2, "else": 4}, + 'APP_TIMELINE_SERVER': {31: 1, "else": 2}, + 'FALCON_SERVER': {6: 1, 31: 2, "else": 3} + } + def getComponentLayoutValidations(self, services, hosts): """Returns array of Validation objects about issues with hostnames components assigned to""" items = [] @@ -561,26 +588,6 @@ class HDPWIN21StackAdvisor(DefaultStackAdvisor): {"config-name": 'hbase_master_heapsize', "item": self.validatorLessThenDefaultValue(properties, recommendedDefaults, 'hbase_master_heapsize')}] return self.toConfigurationValidationProblems(validationItems, "hbase-env") - - # TODO, move to Service Advisors. - def getComponentLayoutSchemes(self): - return { - 'NAMENODE': {"else": 0}, - 'SECONDARY_NAMENODE': {"else": 1}, - 'HBASE_MASTER': {6: 0, 31: 2, "else": 3}, - - 'HISTORYSERVER': {31: 1, "else": 2}, - 'RESOURCEMANAGER': {31: 1, "else": 2}, - - 'OOZIE_SERVER': {6: 1, 31: 2, "else": 3}, - - 'HIVE_SERVER': {6: 1, 31: 2, "else": 4}, - 'HIVE_METASTORE': {6: 1, 31: 2, "else": 4}, - 'WEBHCAT_SERVER': {6: 1, 31: 2, "else": 4}, - 'APP_TIMELINE_SERVER': {31: 1, "else": 2}, - 'FALCON_SERVER': {6: 1, 31: 2, "else": 3} - } - def getHostsWithComponent(self, serviceName, componentName, services, hosts): if services is not None and hosts is not None and serviceName in [service["StackServices"]["service_name"] for service in services["services"]]: service = [serviceEntry for serviceEntry in services["services"] if serviceEntry["StackServices"]["service_name"] == serviceName][0] http://git-wip-us.apache.org/repos/asf/ambari/blob/326cc1b2/ambari-server/src/main/resources/stacks/HDPWIN/2.2/services/stack_advisor.py ---------------------------------------------------------------------- diff --git a/ambari-server/src/main/resources/stacks/HDPWIN/2.2/services/stack_advisor.py b/ambari-server/src/main/resources/stacks/HDPWIN/2.2/services/stack_advisor.py index 26292d9..b72f046 100644 --- a/ambari-server/src/main/resources/stacks/HDPWIN/2.2/services/stack_advisor.py +++ b/ambari-server/src/main/resources/stacks/HDPWIN/2.2/services/stack_advisor.py @@ -33,27 +33,6 @@ def getSiteProperties(configurations, siteName): return None return siteConfig.get("properties") -def getPort(address): - """ - Extracts port from the address like 0.0.0.0:1019 - """ - if address is None: - return None - m = re.search(r'(?:http(?:s)?://)?([\w\d.]*):(\d{1,5})', address) - if m is not None: - return int(m.group(2)) - else: - return None - -def isSecurePort(port): - """ - Returns True if port is root-owned at *nix systems - """ - if port is not None: - return port < 1024 - else: - return False - class HDPWIN22StackAdvisor(HDPWIN21StackAdvisor): def __init__(self): @@ -822,15 +801,15 @@ class HDPWIN22StackAdvisor(HDPWIN21StackAdvisor): data_transfer_protection = 'dfs.data.transfer.protection' try: # Params may be absent - privileged_dfs_dn_port = isSecurePort(getPort(hdfs_site[dfs_datanode_address])) + privileged_dfs_dn_port = self.isSecurePort(self.getPort(hdfs_site[dfs_datanode_address])) except KeyError: privileged_dfs_dn_port = False try: - privileged_dfs_http_port = isSecurePort(getPort(hdfs_site[datanode_http_address])) + privileged_dfs_http_port = self.isSecurePort(self.getPort(hdfs_site[datanode_http_address])) except KeyError: privileged_dfs_http_port = False try: - privileged_dfs_https_port = isSecurePort(getPort(hdfs_site[datanode_https_address])) + privileged_dfs_https_port = self.isSecurePort(self.getPort(hdfs_site[datanode_https_address])) except KeyError: privileged_dfs_https_port = False try: http://git-wip-us.apache.org/repos/asf/ambari/blob/326cc1b2/ambari-server/src/main/resources/stacks/stack_advisor.py ---------------------------------------------------------------------- diff --git a/ambari-server/src/main/resources/stacks/stack_advisor.py b/ambari-server/src/main/resources/stacks/stack_advisor.py index cc556c6..6a92934 100644 --- a/ambari-server/src/main/resources/stacks/stack_advisor.py +++ b/ambari-server/src/main/resources/stacks/stack_advisor.py @@ -23,8 +23,11 @@ import os import re import socket import traceback +from math import ceil, floor +from urlparse import urlparse # Local imports +from resource_management.libraries.functions.data_structure_utils import get_from_dict from resource_management.core.exceptions import Fail from resource_management.core.logger import Logger @@ -332,6 +335,7 @@ class DefaultStackAdvisor(StackAdvisor): self.notValuableComponents = set() self.notPreferableOnServerComponents = set() self.cardinalitiesDict = {} + self.componentLayoutSchemes = {} self.loaded_service_advisors = False @@ -878,7 +882,99 @@ class DefaultStackAdvisor(StackAdvisor): return items def getConfigurationClusterSummary(self, servicesList, hosts, components, services): - return {} + """ + Copied from HDP 2.0.6 so that it could be used by Service Advisors. + :return: Dictionary of memory and CPU attributes in the cluster + """ + hBaseInstalled = False + if 'HBASE' in servicesList: + hBaseInstalled = True + + cluster = { + "cpu": 0, + "disk": 0, + "ram": 0, + "hBaseInstalled": hBaseInstalled, + "components": components + } + + if len(hosts["items"]) > 0: + nodeManagerHosts = self.getHostsWithComponent("YARN", "NODEMANAGER", services, hosts) + # NodeManager host with least memory is generally used in calculations as it will work in larger hosts. + if nodeManagerHosts is not None and len(nodeManagerHosts) > 0: + nodeManagerHost = nodeManagerHosts[0]; + for nmHost in nodeManagerHosts: + if nmHost["Hosts"]["total_mem"] < nodeManagerHost["Hosts"]["total_mem"]: + nodeManagerHost = nmHost + host = nodeManagerHost["Hosts"] + cluster["referenceNodeManagerHost"] = host + else: + host = hosts["items"][0]["Hosts"] + cluster["referenceHost"] = host + cluster["cpu"] = host["cpu_count"] + cluster["disk"] = len(host["disk_info"]) + cluster["ram"] = int(host["total_mem"] / (1024 * 1024)) + + ramRecommendations = [ + {"os":1, "hbase":1}, + {"os":2, "hbase":1}, + {"os":2, "hbase":2}, + {"os":4, "hbase":4}, + {"os":6, "hbase":8}, + {"os":8, "hbase":8}, + {"os":8, "hbase":8}, + {"os":12, "hbase":16}, + {"os":24, "hbase":24}, + {"os":32, "hbase":32}, + {"os":64, "hbase":32} + ] + index = { + cluster["ram"] <= 4: 0, + 4 < cluster["ram"] <= 8: 1, + 8 < cluster["ram"] <= 16: 2, + 16 < cluster["ram"] <= 24: 3, + 24 < cluster["ram"] <= 48: 4, + 48 < cluster["ram"] <= 64: 5, + 64 < cluster["ram"] <= 72: 6, + 72 < cluster["ram"] <= 96: 7, + 96 < cluster["ram"] <= 128: 8, + 128 < cluster["ram"] <= 256: 9, + 256 < cluster["ram"]: 10 + }[1] + + + cluster["reservedRam"] = ramRecommendations[index]["os"] + cluster["hbaseRam"] = ramRecommendations[index]["hbase"] + + + cluster["minContainerSize"] = { + cluster["ram"] <= 4: 256, + 4 < cluster["ram"] <= 8: 512, + 8 < cluster["ram"] <= 24: 1024, + 24 < cluster["ram"]: 2048 + }[1] + + totalAvailableRam = cluster["ram"] - cluster["reservedRam"] + if cluster["hBaseInstalled"]: + totalAvailableRam -= cluster["hbaseRam"] + cluster["totalAvailableRam"] = max(512, totalAvailableRam * 1024) + '''containers = max(3, min (2*cores,min (1.8*DISKS,(Total available RAM) / MIN_CONTAINER_SIZE))))''' + cluster["containers"] = round(max(3, + min(2 * cluster["cpu"], + min(ceil(1.8 * cluster["disk"]), + cluster["totalAvailableRam"] / cluster["minContainerSize"])))) + + '''ramPerContainers = max(2GB, RAM - reservedRam - hBaseRam) / containers''' + cluster["ramPerContainer"] = abs(cluster["totalAvailableRam"] / cluster["containers"]) + '''If greater than 1GB, value will be in multiples of 512.''' + if cluster["ramPerContainer"] > 1024: + cluster["ramPerContainer"] = int(cluster["ramPerContainer"] / 512) * 512 + + cluster["mapMemory"] = int(cluster["ramPerContainer"]) + cluster["reduceMemory"] = cluster["ramPerContainer"] + cluster["amMemory"] = max(cluster["mapMemory"], cluster["reduceMemory"]) + + return cluster def validateClusterConfigurations(self, configurations, services, hosts): validationItems = [] @@ -886,6 +982,12 @@ class DefaultStackAdvisor(StackAdvisor): return self.toConfigurationValidationProblems(validationItems, "") def toConfigurationValidationProblems(self, validationProblems, siteName): + """ + Encapsulate the validation item's fields of "level" and "message" for the given validation's config-name. + :param validationProblems: List of validation problems + :param siteName: Config type + :return: List of configuration validation problems that include additional fields like the log level. + """ result = [] for validationProblem in validationProblems: validationItem = validationProblem.get("item", None) @@ -947,7 +1049,31 @@ class DefaultStackAdvisor(StackAdvisor): self.validateMinMax(items, recommendedDefaults, configurations) return items + def validateListOfConfigUsingMethod(self, configurations, recommendedDefaults, services, hosts, validators): + """ + Service Advisors can use this method to pass in a list of validators, each of which is a tuple of a + a config type (string) and a function (pointer). Each validator is then executed. + :param validators: List of tuples like [("hadoop-env", someFunctionPointer), ("hdfs-site", someFunctionPointer)] + :return: List of validation errors + """ + items = [] + for (configType, method) in validators: + if configType in recommendedDefaults: + siteProperties = self.getSiteProperties(configurations, configType) + if siteProperties is not None: + siteRecommendations = recommendedDefaults[configType]["properties"] + print("SiteName: %s, method: %s" % (configType, method.__name__)) + print("Site properties: %s" % str(siteProperties)) + print("Recommendations: %s" % str(siteRecommendations)) + validationItems = method(siteProperties, siteRecommendations, configurations, services, hosts) + items.extend(validationItems) + return items + def validateConfigurationsForSite(self, configurations, recommendedDefaults, services, hosts, siteName, method): + """ + Deprecated, please use validateListOfConfigUsingMethod + :return: List of validation errors by calling the corresponding method. + """ if siteName in recommendedDefaults: siteProperties = self.getSiteProperties(configurations, siteName) if siteProperties is not None: @@ -1189,7 +1315,7 @@ class DefaultStackAdvisor(StackAdvisor): The scheme dictionary basically maps the number of hosts to host index where component should exist. """ - return {} + return self.componentLayoutSchemes def getWarnItem(self, message): """ @@ -1672,3 +1798,524 @@ class DefaultStackAdvisor(StackAdvisor): if recommendation: put_f(name, ",".join(recommendation)) + + def getHostNamesWithComponent(self, serviceName, componentName, services): + """ + Returns the list of hostnames on which service component is installed + """ + if services is not None and serviceName in [service["StackServices"]["service_name"] for service in services["services"]]: + service = [serviceEntry for serviceEntry in services["services"] if serviceEntry["StackServices"]["service_name"] == serviceName][0] + components = [componentEntry for componentEntry in service["components"] if componentEntry["StackServiceComponents"]["component_name"] == componentName] + if (len(components) > 0 and len(components[0]["StackServiceComponents"]["hostnames"]) > 0): + componentHostnames = components[0]["StackServiceComponents"]["hostnames"] + return componentHostnames + return [] + + def getHostsWithComponent(self, serviceName, componentName, services, hosts): + if services is not None and hosts is not None and serviceName in [service["StackServices"]["service_name"] for service in services["services"]]: + service = [serviceEntry for serviceEntry in services["services"] if serviceEntry["StackServices"]["service_name"] == serviceName][0] + components = [componentEntry for componentEntry in service["components"] if componentEntry["StackServiceComponents"]["component_name"] == componentName] + if (len(components) > 0 and len(components[0]["StackServiceComponents"]["hostnames"]) > 0): + componentHostnames = components[0]["StackServiceComponents"]["hostnames"] + componentHosts = [host for host in hosts["items"] if host["Hosts"]["host_name"] in componentHostnames] + return componentHosts + return [] + + def getHostWithComponent(self, serviceName, componentName, services, hosts): + componentHosts = self.getHostsWithComponent(serviceName, componentName, services, hosts) + if (len(componentHosts) > 0): + return componentHosts[0] + return None + + def getHostComponentsByCategories(self, hostname, categories, services, hosts): + components = [] + if services is not None and hosts is not None: + for service in services["services"]: + components.extend([componentEntry for componentEntry in service["components"] + if componentEntry["StackServiceComponents"]["component_category"] in categories + and hostname in componentEntry["StackServiceComponents"]["hostnames"]]) + return components + + def get_services_list(self, services): + """ + Returns available services as list + + :type services dict + :rtype list + """ + if not services: + return [] + + return [service["StackServices"]["service_name"] for service in services["services"]] + + def getHadoopProxyUsersValidationItems(self, properties, services, hosts, configurations): + validationItems = [] + users = self.getHadoopProxyUsers(services, hosts, configurations) + for user_name, user_properties in users.iteritems(): + props = ["hadoop.proxyuser.{0}.hosts".format(user_name)] + if "propertyGroups" in user_properties: + props.append("hadoop.proxyuser.{0}.groups".format(user_name)) + + for prop in props: + validationItems.append({"config-name": prop, "item": self.validatorNotEmpty(properties, prop)}) + + return validationItems + + def getHadoopProxyUsers(self, services, hosts, configurations): + """ + Gets Hadoop Proxy User recommendations based on the configuration that is provided by + getServiceHadoopProxyUsersConfigurationDict. + + See getServiceHadoopProxyUsersConfigurationDict + """ + servicesList = self.get_services_list(services) + users = {} + + for serviceName, serviceUserComponents in self.getServiceHadoopProxyUsersConfigurationDict().iteritems(): + users.update(self._getHadoopProxyUsersForService(serviceName, serviceUserComponents, services, hosts, configurations)) + + return users + + def getServiceHadoopProxyUsersConfigurationDict(self): + """ + Returns a map that is used by 'getHadoopProxyUsers' to determine service + user properties and related components and get proxyuser recommendations. + This method can be overridden in stackadvisors for the further stacks to + add additional services or change the previous logic. + + Example of the map format: + { + "serviceName": [ + ("configTypeName1", "userPropertyName1", {"propertyHosts": "*", "propertyGroups": "exact string value"}) + ("configTypeName2", "userPropertyName2", {"propertyHosts": ["COMPONENT1", "COMPONENT2", "COMPONENT3"], "propertyGroups": "*"}), + ("configTypeName3", "userPropertyName3", {"propertyHosts": ["COMPONENT1", "COMPONENT2", "COMPONENT3"]}, filterFunction) + ], + "serviceName2": [ + ... + } + + If the third element of a tuple is map that maps proxy property to it's value. + The key could be either 'propertyHosts' or 'propertyGroups'. (Both are optional) + If the map value is a string, then this string will be used for the proxyuser + value (e.g. 'hadoop.proxyuser.{user}.hosts' = '*'). + Otherwise map value should be alist or a tuple with component names. + All hosts with the provided components will be added + to the property (e.g. 'hadoop.proxyuser.{user}.hosts' = 'host1,host2,host3') + + The forth element of the tuple is optional and if it's provided, + it should be a function that takes two arguments: services and hosts. + If it returns False, proxyusers for the tuple will not be added. + """ + ALL_WILDCARD = "*" + HOSTS_PROPERTY = "propertyHosts" + GROUPS_PROPERTY = "propertyGroups" + + return { + "HDFS": [("hadoop-env", "hdfs_user", {HOSTS_PROPERTY: ALL_WILDCARD, GROUPS_PROPERTY: ALL_WILDCARD})], + "OOZIE": [("oozie-env", "oozie_user", {HOSTS_PROPERTY: ["OOZIE_SERVER"], GROUPS_PROPERTY: ALL_WILDCARD})], + "HIVE": [("hive-env", "hive_user", {HOSTS_PROPERTY: ["HIVE_SERVER", "HIVE_SERVER_INTERACTIVE"], GROUPS_PROPERTY: ALL_WILDCARD}), + ("hive-env", "webhcat_user", {HOSTS_PROPERTY: ["WEBHCAT_SERVER"], GROUPS_PROPERTY: ALL_WILDCARD})], + "YARN": [("yarn-env", "yarn_user", {HOSTS_PROPERTY: ["RESOURCEMANAGER"]}, lambda services, hosts: len(self.getHostsWithComponent("YARN", "RESOURCEMANAGER", services, hosts)) > 1)], + "FALCON": [("falcon-env", "falcon_user", {HOSTS_PROPERTY: ALL_WILDCARD, GROUPS_PROPERTY: ALL_WILDCARD})], + "SPARK": [("livy-env", "livy_user", {HOSTS_PROPERTY: ALL_WILDCARD, GROUPS_PROPERTY: ALL_WILDCARD})] + } + + def _getHadoopProxyUsersForService(self, serviceName, serviceUserComponents, services, hosts, configurations): + Logger.info("Calculating Hadoop Proxy User recommendations for {0} service.".format(serviceName)) + servicesList = self.get_services_list(services) + resultUsers = {} + + if serviceName in servicesList: + usersComponents = {} + for values in serviceUserComponents: + + # Filter, if 4th argument is present in the tuple + filterFunction = values[3:] + if filterFunction and not filterFunction[0](services, hosts): + continue + + userNameConfig, userNameProperty, hostSelectorMap = values[:3] + user = get_from_dict(services, ("configurations", userNameConfig, "properties", userNameProperty), None) + if user: + usersComponents[user] = (userNameConfig, userNameProperty, hostSelectorMap) + + for user, (userNameConfig, userNameProperty, hostSelectorMap) in usersComponents.iteritems(): + proxyUsers = {"config": userNameConfig, "propertyName": userNameProperty} + for proxyPropertyName, hostSelector in hostSelectorMap.iteritems(): + componentHostNamesString = hostSelector if isinstance(hostSelector, basestring) else '*' + if isinstance(hostSelector, (list, tuple)): + _, componentHostNames = self.get_data_for_proxyuser(user, services, configurations) # preserve old values + for component in hostSelector: + componentHosts = self.getHostsWithComponent(serviceName, component, services, hosts) + if componentHosts is not None: + for componentHost in componentHosts: + componentHostName = componentHost["Hosts"]["host_name"] + componentHostNames.add(componentHostName) + + componentHostNamesString = ",".join(sorted(componentHostNames)) + Logger.info("Host List for [service='{0}'; user='{1}'; components='{2}']: {3}".format(serviceName, user, ','.join(hostSelector), componentHostNamesString)) + + if not proxyPropertyName in proxyUsers: + proxyUsers[proxyPropertyName] = componentHostNamesString + + if not user in resultUsers: + resultUsers[user] = proxyUsers + + return resultUsers + + def recommendHadoopProxyUsers(self, configurations, services, hosts): + servicesList = self.get_services_list(services) + + if 'forced-configurations' not in services: + services["forced-configurations"] = [] + + putCoreSiteProperty = self.putProperty(configurations, "core-site", services) + putCoreSitePropertyAttribute = self.putPropertyAttribute(configurations, "core-site") + + users = self.getHadoopProxyUsers(services, hosts, configurations) + + # Force dependencies for HIVE + if "HIVE" in servicesList: + hive_user = get_from_dict(services, ("configurations", "hive-env", "properties", "hive_user"), default_value=None) + if hive_user and get_from_dict(users, (hive_user, "propertyHosts"), default_value=None): + services["forced-configurations"].append({"type" : "core-site", "name" : "hadoop.proxyuser.{0}.hosts".format(hive_user)}) + + for user_name, user_properties in users.iteritems(): + + # Add properties "hadoop.proxyuser.*.hosts", "hadoop.proxyuser.*.groups" to core-site for all users + self.put_proxyuser_value(user_name, user_properties["propertyHosts"], services=services, configurations=configurations, put_function=putCoreSiteProperty) + Logger.info("Updated hadoop.proxyuser.{0}.hosts as : {1}".format(user_name, user_properties["propertyHosts"])) + if "propertyGroups" in user_properties: + self.put_proxyuser_value(user_name, user_properties["propertyGroups"], is_groups=True, services=services, configurations=configurations, put_function=putCoreSiteProperty) + + # Remove old properties if user was renamed + userOldValue = self.getOldValue(services, user_properties["config"], user_properties["propertyName"]) + if userOldValue is not None and userOldValue != user_name: + putCoreSitePropertyAttribute("hadoop.proxyuser.{0}.hosts".format(userOldValue), 'delete', 'true') + services["forced-configurations"].append({"type" : "core-site", "name" : "hadoop.proxyuser.{0}.hosts".format(userOldValue)}) + services["forced-configurations"].append({"type" : "core-site", "name" : "hadoop.proxyuser.{0}.hosts".format(user_name)}) + + if "propertyGroups" in user_properties: + putCoreSitePropertyAttribute("hadoop.proxyuser.{0}.groups".format(userOldValue), 'delete', 'true') + services["forced-configurations"].append({"type" : "core-site", "name" : "hadoop.proxyuser.{0}.groups".format(userOldValue)}) + services["forced-configurations"].append({"type" : "core-site", "name" : "hadoop.proxyuser.{0}.groups".format(user_name)}) + + self.recommendAmbariProxyUsersForHDFS(services, configurations, servicesList, putCoreSiteProperty, putCoreSitePropertyAttribute) + + def recommendAmbariProxyUsersForHDFS(self, services, configurations, servicesList, putCoreSiteProperty, putCoreSitePropertyAttribute): + if "HDFS" in servicesList: + ambari_user = self.getAmbariUser(services) + ambariHostName = socket.getfqdn() + self.put_proxyuser_value(ambari_user, ambariHostName, services=services, configurations=configurations, put_function=putCoreSiteProperty) + self.put_proxyuser_value(ambari_user, "*", is_groups=True, services=services, configurations=configurations, put_function=putCoreSiteProperty) + old_ambari_user = self.getOldAmbariUser(services) + if old_ambari_user is not None: + putCoreSitePropertyAttribute("hadoop.proxyuser.{0}.hosts".format(old_ambari_user), 'delete', 'true') + putCoreSitePropertyAttribute("hadoop.proxyuser.{0}.groups".format(old_ambari_user), 'delete', 'true') + + def getAmbariUser(self, services): + ambari_user = services['ambari-server-properties']['ambari-server.user'] + if "cluster-env" in services["configurations"] \ + and "ambari_principal_name" in services["configurations"]["cluster-env"]["properties"] \ + and "security_enabled" in services["configurations"]["cluster-env"]["properties"] \ + and services["configurations"]["cluster-env"]["properties"]["security_enabled"].lower() == "true": + ambari_user = services["configurations"]["cluster-env"]["properties"]["ambari_principal_name"] + ambari_user = ambari_user.split('@')[0] + return ambari_user + + def getOldAmbariUser(self, services): + ambari_user = None + if "cluster-env" in services["configurations"]: + if "security_enabled" in services["configurations"]["cluster-env"]["properties"] \ + and services["configurations"]["cluster-env"]["properties"]["security_enabled"].lower() == "true": + ambari_user = services['ambari-server-properties']['ambari-server.user'] + elif "ambari_principal_name" in services["configurations"]["cluster-env"]["properties"]: + ambari_user = services["configurations"]["cluster-env"]["properties"]["ambari_principal_name"] + ambari_user = ambari_user.split('@')[0] + return ambari_user + + def put_proxyuser_value(self, user_name, value, is_groups=False, services=None, configurations=None, put_function=None): + is_wildcard_value, current_value = self.get_data_for_proxyuser(user_name, services, configurations, is_groups) + result_value = "*" + result_values_set = self.merge_proxyusers_values(current_value, value) + if len(result_values_set) > 0: + result_value = ",".join(sorted([val for val in result_values_set if val])) + + if is_groups: + property_name = "hadoop.proxyuser.{0}.groups".format(user_name) + else: + property_name = "hadoop.proxyuser.{0}.hosts".format(user_name) + + put_function(property_name, result_value) + + def get_data_for_proxyuser(self, user_name, services, configurations, groups=False): + """ + Returns values of proxyuser properties for given user. Properties can be + hadoop.proxyuser.username.groups or hadoop.proxyuser.username.hosts + :param user_name: + :param services: + :param groups: if true, will return values for group property, not hosts + :return: tuple (wildcard_value, set[values]), where wildcard_value indicates if property value was * + """ + if "core-site" in services["configurations"]: + coreSite = services["configurations"]["core-site"]['properties'] + else: + coreSite = {} + if groups: + property_name = "hadoop.proxyuser.{0}.groups".format(user_name) + else: + property_name = "hadoop.proxyuser.{0}.hosts".format(user_name) + if property_name in coreSite: + property_value = coreSite[property_name] + if property_value == "*": + return True, set() + else: + result_values = set() + if "core-site" in configurations: + if property_name in configurations["core-site"]['properties']: + result_values = result_values.union(configurations["core-site"]['properties'][property_name].split(",")) + result_values = result_values.union(property_value.split(",")) + return False, result_values + return False, set() + + def merge_proxyusers_values(self, first, second): + result = set() + def append(data): + if isinstance(data, str) or isinstance(data, unicode): + if data != "*": + result.update(data.split(",")) + else: + result.update(data) + append(first) + append(second) + return result + + def getOldValue(self, services, configType, propertyName): + if services: + if 'changed-configurations' in services.keys(): + changedConfigs = services["changed-configurations"] + for changedConfig in changedConfigs: + if changedConfig["type"] == configType and changedConfig["name"]== propertyName and "old_value" in changedConfig: + return changedConfig["old_value"] + return None + + @classmethod + def isSecurePort(cls, port): + """ + Returns True if port is root-owned at *nix systems + """ + if port is not None: + return port < 1024 + else: + return False + + @classmethod + def getPort(cls, address): + """ + Extracts port from the address like 0.0.0.0:1019 + """ + if address is None: + return None + m = re.search(r'(?:http(?:s)?://)?([\w\d.]*):(\d{1,5})', address) + if m is not None: + return int(m.group(2)) + else: + return None + + def validatorLessThenDefaultValue(self, properties, recommendedDefaults, propertyName): + if propertyName not in recommendedDefaults: + # If a property name exists in say hbase-env and hbase-site (which is allowed), then it will exist in the + # "properties" dictionary, but not necessarily in the "recommendedDefaults" dictionary". In this case, ignore it. + return None + + if not propertyName in properties: + return self.getErrorItem("Value should be set") + value = self.to_number(properties[propertyName]) + if value is None: + return self.getErrorItem("Value should be integer") + defaultValue = self.to_number(recommendedDefaults[propertyName]) + if defaultValue is None: + return None + if value < defaultValue: + return self.getWarnItem("Value is less than the recommended default of {0}".format(defaultValue)) + return None + + def validatorEqualsPropertyItem(self, properties1, propertyName1, + properties2, propertyName2, + emptyAllowed=False): + if not propertyName1 in properties1: + return self.getErrorItem("Value should be set for %s" % propertyName1) + if not propertyName2 in properties2: + return self.getErrorItem("Value should be set for %s" % propertyName2) + value1 = properties1.get(propertyName1) + if value1 is None and not emptyAllowed: + return self.getErrorItem("Empty value for %s" % propertyName1) + value2 = properties2.get(propertyName2) + if value2 is None and not emptyAllowed: + return self.getErrorItem("Empty value for %s" % propertyName2) + if value1 != value2: + return self.getWarnItem("It is recommended to set equal values " + "for properties {0} and {1}".format(propertyName1, propertyName2)) + + return None + + def validatorEqualsToRecommendedItem(self, properties, recommendedDefaults, + propertyName): + if not propertyName in properties: + return self.getErrorItem("Value should be set for %s" % propertyName) + value = properties.get(propertyName) + if not propertyName in recommendedDefaults: + return self.getErrorItem("Value should be recommended for %s" % propertyName) + recommendedValue = recommendedDefaults.get(propertyName) + if value != recommendedValue: + return self.getWarnItem("It is recommended to set value {0} " + "for property {1}".format(recommendedValue, propertyName)) + return None + + def validatorNotEmpty(self, properties, propertyName): + if not propertyName in properties: + return self.getErrorItem("Value should be set for {0}".format(propertyName)) + value = properties.get(propertyName) + if not value: + return self.getWarnItem("Empty value for {0}".format(propertyName)) + return None + + def validatorNotRootFs(self, properties, recommendedDefaults, propertyName, hostInfo): + if not propertyName in properties: + return self.getErrorItem("Value should be set") + dir = properties[propertyName] + if not dir.startswith("file://") or dir == recommendedDefaults.get(propertyName): + return None + + dir = re.sub("^file://", "", dir, count=1) + mountPoints = [] + for mountPoint in hostInfo["disk_info"]: + mountPoints.append(mountPoint["mountpoint"]) + mountPoint = DefaultStackAdvisor.getMountPointForDir(dir, mountPoints) + + if "/" == mountPoint and self.getPreferredMountPoints(hostInfo)[0] != mountPoint: + return self.getWarnItem("It is not recommended to use root partition for {0}".format(propertyName)) + + return None + + def validatorEnoughDiskSpace(self, properties, propertyName, hostInfo, reqiuredDiskSpace): + if not propertyName in properties: + return self.getErrorItem("Value should be set") + dir = properties[propertyName] + if not dir.startswith("file://"): + return None + + dir = re.sub("^file://", "", dir, count=1) + mountPoints = {} + for mountPoint in hostInfo["disk_info"]: + mountPoints[mountPoint["mountpoint"]] = self.to_number(mountPoint["available"]) + mountPoint = DefaultStackAdvisor.getMountPointForDir(dir, mountPoints.keys()) + + if not mountPoints: + return self.getErrorItem("No disk info found on host %s" % hostInfo["host_name"]) + + if mountPoints[mountPoint] < reqiuredDiskSpace: + msg = "Ambari Metrics disk space requirements not met. \n" \ + "Recommended disk space for partition {0} is {1}G" + return self.getWarnItem(msg.format(mountPoint, reqiuredDiskSpace/1048576)) # in Gb + return None + + @classmethod + def is_valid_host_port_authority(cls, target): + has_scheme = "://" in target + if not has_scheme: + target = "dummyscheme://"+target + try: + result = urlparse(target) + if result.hostname is not None and result.port is not None: + return True + except ValueError: + pass + return False + + @classmethod + def getMountPointForDir(cls, dir, mountPoints): + """ + :param dir: Directory to check, even if it doesn't exist. + :return: Returns the closest mount point as a string for the directory. + if the "dir" variable is None, will return None. + If the directory does not exist, will return "/". + """ + bestMountFound = None + if dir: + dir = re.sub("^file://", "", dir, count=1).strip().lower() + + # If the path is "/hadoop/hdfs/data", then possible matches for mounts could be + # "/", "/hadoop/hdfs", and "/hadoop/hdfs/data". + # So take the one with the greatest number of segments. + for mountPoint in mountPoints: + # Ensure that the mount path and the dir path ends with "/" + # The mount point "/hadoop" should not match with the path "/hadoop1" + if os.path.join(dir, "").startswith(os.path.join(mountPoint, "")): + if bestMountFound is None: + bestMountFound = mountPoint + elif os.path.join(bestMountFound, "").count(os.path.sep) < os.path.join(mountPoint, "").count(os.path.sep): + bestMountFound = mountPoint + + return bestMountFound + + def validateMinMemorySetting(self, properties, defaultValue, propertyName): + if not propertyName in properties: + return self.getErrorItem("Value should be set") + if defaultValue is None: + return self.getErrorItem("Config's default value can't be null or undefined") + + value = properties[propertyName] + if value is None: + return self.getErrorItem("Value can't be null or undefined") + try: + valueInt = self.to_number(value) + # TODO: generify for other use cases + defaultValueInt = int(str(defaultValue).strip()) + if valueInt < defaultValueInt: + return self.getWarnItem("Value is less than the minimum recommended default of -Xmx" + str(defaultValue)) + except: + return None + + return None + + @classmethod + def checkXmxValueFormat(cls, value): + p = re.compile('-Xmx(\d+)(b|k|m|g|p|t|B|K|M|G|P|T)?') + matches = p.findall(value) + return len(matches) == 1 + + @classmethod + def getXmxSize(cls, value): + p = re.compile("-Xmx(\d+)(.?)") + result = p.findall(value)[0] + if len(result) > 1: + # result[1] - is a space or size formatter (b|k|m|g etc) + return result[0] + result[1].lower() + return result[0] + + @classmethod + def formatXmxSizeToBytes(cls, value): + value = value.lower() + if len(value) == 0: + return 0 + modifier = value[-1] + + if modifier == ' ' or modifier in "0123456789": + modifier = 'b' + m = { + modifier == 'b': 1, + modifier == 'k': 1024, + modifier == 'm': 1024 * 1024, + modifier == 'g': 1024 * 1024 * 1024, + modifier == 't': 1024 * 1024 * 1024 * 1024, + modifier == 'p': 1024 * 1024 * 1024 * 1024 * 1024 + }[1] + return cls.to_number(value) * m + + @classmethod + def to_number(cls, s): + try: + return int(re.sub("\D", "", s)) + except ValueError: + return None http://git-wip-us.apache.org/repos/asf/ambari/blob/326cc1b2/ambari-server/src/test/python/TestStackAdvisor.py ---------------------------------------------------------------------- diff --git a/ambari-server/src/test/python/TestStackAdvisor.py b/ambari-server/src/test/python/TestStackAdvisor.py index 87d2d15..d48def3 100644 --- a/ambari-server/src/test/python/TestStackAdvisor.py +++ b/ambari-server/src/test/python/TestStackAdvisor.py @@ -270,8 +270,24 @@ class TestStackAdvisorInitialization(TestCase): } hosts= { "items": [ - {"Hosts": {"host_name": "host1"}}, - {"Hosts": {"host_name": "host2"}} + {"Hosts": {"host_name": "host1", + "cpu_count": 1, + "total_mem": 2097152, + "disk_info": [{ + "size": '80000000', + "mountpoint": "/" + }] + } + }, + {"Hosts": {"host_name": "host2", + "cpu_count": 1, + "total_mem": 2097152, + "disk_info": [{ + "size": '80000000', + "mountpoint": "/" + }] + } + } ] } actualValidateConfigResponse = default_stack_advisor.validateConfigurations(services, hosts) @@ -343,8 +359,14 @@ class TestStackAdvisorInitialization(TestCase): # Test with maintenance_state. One host is in maintenance mode. hosts= { "items": [ - {"Hosts": {"host_name": "host1", "maintenance_state":"OFF"}}, - {"Hosts": {"host_name": "host2", "maintenance_state":"ON"}} + {"Hosts": {"host_name": "host1", + "maintenance_state":"OFF", + "cpu_count": 1} + }, + {"Hosts": {"host_name": "host2", + "maintenance_state":"ON", + "cpu_count": 1} + } ] } @@ -397,8 +419,26 @@ class TestStackAdvisorInitialization(TestCase): # Test with maintenance_state. Both hosts are in maintenance mode. hosts= { "items": [ - {"Hosts": {"host_name": "host1", "maintenance_state":"ON"}}, - {"Hosts": {"host_name": "host2", "maintenance_state":"ON"}} + {"Hosts": {"host_name": "host1", + "maintenance_state":"ON", + "cpu_count": 1, + "total_mem": 2097152, + "disk_info": [{ + "size": '80000000', + "mountpoint": "/" + }] + } + }, + {"Hosts": {"host_name": "host2", + "maintenance_state":"ON", + "cpu_count": 1, + "total_mem": 2097152, + "disk_info": [{ + "size": '80000000', + "mountpoint": "/" + }] + } + } ] } http://git-wip-us.apache.org/repos/asf/ambari/blob/326cc1b2/ambari-server/src/test/python/stacks/2.0.6/common/test_stack_advisor.py ---------------------------------------------------------------------- diff --git a/ambari-server/src/test/python/stacks/2.0.6/common/test_stack_advisor.py b/ambari-server/src/test/python/stacks/2.0.6/common/test_stack_advisor.py index 1145154..a486fb3 100644 --- a/ambari-server/src/test/python/stacks/2.0.6/common/test_stack_advisor.py +++ b/ambari-server/src/test/python/stacks/2.0.6/common/test_stack_advisor.py @@ -3253,11 +3253,11 @@ class TestHDP206StackAdvisor(TestCase): self.assertEquals(self.stack_advisor_impl.round_to_n(4097), 4096) def test_getMountPointForDir(self): - self.assertEquals(self.stack_advisor_impl.getMountPointForDir("/var/log", ["/"]), "/") - self.assertEquals(self.stack_advisor_impl.getMountPointForDir("/var/log", ["/var", "/"]), "/var") - self.assertEquals(self.stack_advisor_impl.getMountPointForDir("file:///var/log", ["/var", "/"]), "/var") - self.assertEquals(self.stack_advisor_impl.getMountPointForDir("hdfs:///hdfs_path", ["/var", "/"]), None) - self.assertEquals(self.stack_advisor_impl.getMountPointForDir("relative/path", ["/var", "/"]), None) + self.assertEquals(self.stackAdvisor.getMountPointForDir("/var/log", ["/"]), "/") + self.assertEquals(self.stackAdvisor.getMountPointForDir("/var/log", ["/var", "/"]), "/var") + self.assertEquals(self.stackAdvisor.getMountPointForDir("file:///var/log", ["/var", "/"]), "/var") + self.assertEquals(self.stackAdvisor.getMountPointForDir("hdfs:///hdfs_path", ["/var", "/"]), None) + self.assertEquals(self.stackAdvisor.getMountPointForDir("relative/path", ["/var", "/"]), None) def test_parseCardinality(self): self.assertEquals(self.stackAdvisor.parseCardinality("ALL", 5), (5, 5)) http://git-wip-us.apache.org/repos/asf/ambari/blob/326cc1b2/ambari-server/src/test/python/stacks/2.1/common/test_stack_advisor.py ---------------------------------------------------------------------- diff --git a/ambari-server/src/test/python/stacks/2.1/common/test_stack_advisor.py b/ambari-server/src/test/python/stacks/2.1/common/test_stack_advisor.py index f9fb1f5..e8bd5d0 100644 --- a/ambari-server/src/test/python/stacks/2.1/common/test_stack_advisor.py +++ b/ambari-server/src/test/python/stacks/2.1/common/test_stack_advisor.py @@ -168,11 +168,8 @@ class TestHDP21StackAdvisor(TestCase): if len(components) > 0: groups.append(components) - def sort_nested_lists(list): - result_list = [] - for sublist in list: - result_list.append(sorted(sublist)) - return sorted(result_list) + def sort_nested_lists(l): + return sorted(reduce(lambda x,y: x+y, l)) self.assertEquals(sort_nested_lists(expected_layout), sort_nested_lists(groups))
