AMBARI-19097. HDP 3.0 TP - create Service Advisor for HDFS (alejandro)
Project: http://git-wip-us.apache.org/repos/asf/ambari/repo Commit: http://git-wip-us.apache.org/repos/asf/ambari/commit/326cc1b2 Tree: http://git-wip-us.apache.org/repos/asf/ambari/tree/326cc1b2 Diff: http://git-wip-us.apache.org/repos/asf/ambari/diff/326cc1b2 Branch: refs/heads/trunk Commit: 326cc1b2a1e05073050a38ea104d6d63ed76f373 Parents: 5bdd6cf Author: Alejandro Fernandez <afernan...@hortonworks.com> Authored: Tue Jan 10 12:55:50 2017 -0800 Committer: Alejandro Fernandez <afernan...@hortonworks.com> Committed: Mon Jan 16 17:47:16 2017 -0800 ---------------------------------------------------------------------- .../HDFS/3.0.0.3.0/service_advisor.py | 602 +++++++++++++++++ .../ZOOKEEPER/3.4.9/service_advisor.py | 49 +- .../stacks/BIGTOP/0.8/services/stack_advisor.py | 63 +- .../stacks/HDP/2.0.6/services/stack_advisor.py | 553 ++-------------- .../stacks/HDP/2.1/services/stack_advisor.py | 31 +- .../stacks/HDP/2.2/services/stack_advisor.py | 39 +- .../stacks/HDPWIN/2.1/services/stack_advisor.py | 47 +- .../stacks/HDPWIN/2.2/services/stack_advisor.py | 27 +- .../src/main/resources/stacks/stack_advisor.py | 651 ++++++++++++++++++- .../src/test/python/TestStackAdvisor.py | 52 +- .../stacks/2.0.6/common/test_stack_advisor.py | 10 +- .../stacks/2.1/common/test_stack_advisor.py | 7 +- 12 files changed, 1486 insertions(+), 645 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/ambari/blob/326cc1b2/ambari-server/src/main/resources/common-services/HDFS/3.0.0.3.0/service_advisor.py ---------------------------------------------------------------------- diff --git a/ambari-server/src/main/resources/common-services/HDFS/3.0.0.3.0/service_advisor.py b/ambari-server/src/main/resources/common-services/HDFS/3.0.0.3.0/service_advisor.py new file mode 100644 index 0000000..eb7f35c --- /dev/null +++ b/ambari-server/src/main/resources/common-services/HDFS/3.0.0.3.0/service_advisor.py @@ -0,0 +1,602 @@ +#!/usr/bin/env ambari-python-wrap +""" +Licensed to the Apache Software Foundation (ASF) under one +or more contributor license agreements. See the NOTICE file +distributed with this work for additional information +regarding copyright ownership. The ASF licenses this file +to you under the Apache License, Version 2.0 (the +"License"); you may not use this file except in compliance +with the License. You may obtain a copy of the License at + + http://www.apache.org/licenses/LICENSE-2.0 + +Unless required by applicable law or agreed to in writing, software +distributed under the License is distributed on an "AS IS" BASIS, +WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +See the License for the specific language governing permissions and +limitations under the License. +""" + +# Python imports +import imp +import os +import traceback +import inspect +import re +import socket +from urlparse import urlparse + +# Local imports +from resource_management.core.logger import Logger +from resource_management.libraries.functions.data_structure_utils import get_from_dict +from resource_management.libraries.functions.mounted_dirs_helper import get_mounts_with_multiple_data_dirs + + +SCRIPT_DIR = os.path.dirname(os.path.abspath(__file__)) +STACKS_DIR = os.path.join(SCRIPT_DIR, '../../../stacks/') +PARENT_FILE = os.path.join(STACKS_DIR, 'service_advisor.py') + +try: + with open(PARENT_FILE, 'rb') as fp: + service_advisor = imp.load_module('service_advisor', fp, PARENT_FILE, ('.py', 'rb', imp.PY_SOURCE)) +except Exception as e: + traceback.print_exc() + print "Failed to load parent" + + +class HDFSServiceAdvisor(service_advisor.ServiceAdvisor): + + def __init__(self, *args, **kwargs): + self.as_super = super(HDFSServiceAdvisor, self) + self.as_super.__init__(*args, **kwargs) + + # Always call these methods + self.modifyMastersWithMultipleInstances() + self.modifyCardinalitiesDict() + self.modifyHeapSizeProperties() + self.modifyNotValuableComponents() + self.modifyComponentsNotPreferableOnServer() + self.modifyComponentLayoutSchemes() + + def modifyMastersWithMultipleInstances(self): + """ + Modify the set of masters with multiple instances. + Must be overriden in child class. + """ + # Nothing to do + pass + + def modifyCardinalitiesDict(self): + """ + Modify the dictionary of cardinalities. + Must be overriden in child class. + """ + # Nothing to do + pass + + def modifyHeapSizeProperties(self): + """ + Modify the dictionary of heap size properties. + Must be overriden in child class. + """ + self.heap_size_properties = {"NAMENODE": + [{"config-name": "hadoop-env", + "property": "namenode_heapsize", + "default": "1024m"}], + "SECONDARY_NAMENODE": + [{"config-name": "hadoop-env", + "property": "namenode_heapsize", + "default": "1024m"}], + "DATANODE": + [{"config-name": "hadoop-env", + "property": "dtnode_heapsize", + "default": "1024m"}]} + + def modifyNotValuableComponents(self): + """ + Modify the set of components whose host assignment is based on other services. + Must be overriden in child class. + """ + self.notValuableComponents |= set(['JOURNALNODE', 'ZKFC']) + + def modifyComponentsNotPreferableOnServer(self): + """ + Modify the set of components that are not preferable on the server. + Must be overriden in child class. + """ + # Nothing to do + pass + + def modifyComponentLayoutSchemes(self): + """ + Modify layout scheme dictionaries for components. + The scheme dictionary basically maps the number of hosts to + host index where component should exist. + Must be overriden in child class. + """ + self.componentLayoutSchemes.update({ + 'NAMENODE': {"else": 0}, + 'SECONDARY_NAMENODE': {"else": 1} + }) + + def getServiceComponentLayoutValidations(self, services, hosts): + """ + Get a list of errors. + Must be overriden in child class. + """ + Logger.info("Class: %s, Method: %s. Validating Service Component Layout." % + (self.__class__.__name__, inspect.stack()[0][3])) + + # HDFS allows NameNode and Secondary NameNode to be on the same host. + return self.as_super.getServiceComponentLayoutValidations(services, hosts) + + def getServiceConfigurationRecommendations(self, configurations, clusterData, services, hosts): + """ + Entry point. + Must be overriden in child class. + """ + Logger.info("Class: %s, Method: %s. Recommending Service Configurations." % + (self.__class__.__name__, inspect.stack()[0][3])) + + # Due to the existing stack inheritance, make it clear where each calculation came from. + recommender = HDFSRecommender() + recommender.recommendConfigurationsFromHDP206(configurations, clusterData, services, hosts) + recommender.recommendConfigurationsFromHDP23(configurations, clusterData, services, hosts) + + def getServiceConfigurationsValidationItems(self, configurations, recommendedDefaults, services, hosts): + """ + Entry point. + Validate configurations for the service. Return a list of errors. + The code for this function should be the same for each Service Advisor. + """ + Logger.info("Class: %s, Method: %s. Validating Configurations." % + (self.__class__.__name__, inspect.stack()[0][3])) + + validator = HDFSValidator() + # Calls the methods of the validator using arguments, + # method(siteProperties, siteRecommendations, configurations, services, hosts) + return validator.validateListOfConfigUsingMethod(configurations, recommendedDefaults, services, hosts, validator.validators) + +class HDFSRecommender(service_advisor.ServiceAdvisor): + """ + HDFS Recommender suggests properties when adding the service for the first time or modifying configs via the UI. + """ + + def __init__(self, *args, **kwargs): + self.as_super = super(HDFSRecommender, self) + self.as_super.__init__(*args, **kwargs) + + def recommendConfigurationsFromHDP206(self, configurations, clusterData, services, hosts): + """ + Recommend configurations for this service based on HDP 2.0.6. + """ + Logger.info("Class: %s, Method: %s. Recommending Service Configurations." % + (self.__class__.__name__, inspect.stack()[0][3])) + + putHDFSProperty = self.putProperty(configurations, "hadoop-env", services) + putHDFSSiteProperty = self.putProperty(configurations, "hdfs-site", services) + putHDFSSitePropertyAttributes = self.putPropertyAttribute(configurations, "hdfs-site") + + totalAvailableRam = clusterData['totalAvailableRam'] + Logger.info("Class: %s, Method: %s. Total Available Ram: %s" % (self.__class__.__name__, inspect.stack()[0][3], str(totalAvailableRam))) + putHDFSProperty('namenode_heapsize', max(int(totalAvailableRam / 2), 1024)) + putHDFSProperty = self.putProperty(configurations, "hadoop-env", services) + putHDFSProperty('namenode_opt_newsize', max(int(totalAvailableRam / 8), 128)) + putHDFSProperty = self.putProperty(configurations, "hadoop-env", services) + putHDFSProperty('namenode_opt_maxnewsize', max(int(totalAvailableRam / 8), 256)) + + # Check if NN HA is enabled and recommend removing dfs.namenode.rpc-address + hdfsSiteProperties = self.getServicesSiteProperties(services, "hdfs-site") + nameServices = None + if hdfsSiteProperties and 'dfs.internal.nameservices' in hdfsSiteProperties: + nameServices = hdfsSiteProperties['dfs.internal.nameservices'] + if nameServices is None and hdfsSiteProperties and 'dfs.nameservices' in hdfsSiteProperties: + nameServices = hdfsSiteProperties['dfs.nameservices'] + if nameServices and "dfs.ha.namenodes.%s" % nameServices in hdfsSiteProperties: + namenodes = hdfsSiteProperties["dfs.ha.namenodes.%s" % nameServices] + if len(namenodes.split(',')) > 1: + putHDFSSitePropertyAttributes("dfs.namenode.rpc-address", "delete", "true") + + Logger.info("Class: %s, Method: %s. HDFS nameservices: %s" % + (self.__class__.__name__, inspect.stack()[0][3], str(nameServices))) + + hdfs_mount_properties = [ + ("dfs.datanode.data.dir", "DATANODE", "/hadoop/hdfs/data", "multi"), + ("dfs.namenode.name.dir", "DATANODE", "/hadoop/hdfs/namenode", "multi"), + ("dfs.namenode.checkpoint.dir", "SECONDARY_NAMENODE", "/hadoop/hdfs/namesecondary", "single") + ] + + Logger.info("Class: %s, Method: %s. Updating HDFS mount properties." % + (self.__class__.__name__, inspect.stack()[0][3])) + self.updateMountProperties("hdfs-site", hdfs_mount_properties, configurations, services, hosts) + + dataDirs = [] + if configurations and "hdfs-site" in configurations and \ + "dfs.datanode.data.dir" in configurations["hdfs-site"]["properties"] and \ + configurations["hdfs-site"]["properties"]["dfs.datanode.data.dir"] is not None: + dataDirs = configurations["hdfs-site"]["properties"]["dfs.datanode.data.dir"].split(",") + + elif hdfsSiteProperties and "dfs.datanode.data.dir" in hdfsSiteProperties and \ + hdfsSiteProperties["dfs.datanode.data.dir"] is not None: + dataDirs = hdfsSiteProperties["dfs.datanode.data.dir"].split(",") + + Logger.info("Class: %s, Method: %s. HDFS Data Dirs: %s" % + (self.__class__.__name__, inspect.stack()[0][3], str(dataDirs))) + + # dfs.datanode.du.reserved should be set to 10-15% of volume size + # For each host selects maximum size of the volume. Then gets minimum for all hosts. + # This ensures that each host will have at least one data dir with available space. + reservedSizeRecommendation = 0l #kBytes + for host in hosts["items"]: + mountPoints = [] + mountPointDiskAvailableSpace = [] #kBytes + for diskInfo in host["Hosts"]["disk_info"]: + mountPoints.append(diskInfo["mountpoint"]) + mountPointDiskAvailableSpace.append(long(diskInfo["size"])) + + maxFreeVolumeSizeForHost = 0l #kBytes + for dataDir in dataDirs: + mp = HDFSRecommender.getMountPointForDir(dataDir, mountPoints) + for i in range(len(mountPoints)): + if mp == mountPoints[i]: + if mountPointDiskAvailableSpace[i] > maxFreeVolumeSizeForHost: + maxFreeVolumeSizeForHost = mountPointDiskAvailableSpace[i] + + if (not reservedSizeRecommendation) or (maxFreeVolumeSizeForHost and maxFreeVolumeSizeForHost < reservedSizeRecommendation): + reservedSizeRecommendation = maxFreeVolumeSizeForHost + + Logger.info("Class: %s, Method: %s. HDFS Datanode recommended reserved size: %d" % + (self.__class__.__name__, inspect.stack()[0][3], reservedSizeRecommendation)) + + if reservedSizeRecommendation: + reservedSizeRecommendation = max(reservedSizeRecommendation * 1024 / 8, 1073741824) # At least 1Gb is reserved + putHDFSSiteProperty('dfs.datanode.du.reserved', reservedSizeRecommendation) #Bytes + + # recommendations for "hadoop.proxyuser.*.hosts", "hadoop.proxyuser.*.groups" properties in core-site + self.recommendHadoopProxyUsers(configurations, services, hosts) + + def recommendConfigurationsFromHDP23(self, configurations, clusterData, services, hosts): + """ + Recommend configurations for this service based on HDP 2.3. + """ + putHdfsSiteProperty = self.putProperty(configurations, "hdfs-site", services) + putHdfsSitePropertyAttribute = self.putPropertyAttribute(configurations, "hdfs-site") + + if ('ranger-hdfs-plugin-properties' in services['configurations']) and ('ranger-hdfs-plugin-enabled' in services['configurations']['ranger-hdfs-plugin-properties']['properties']): + rangerPluginEnabled = '' + if 'ranger-hdfs-plugin-properties' in configurations and 'ranger-hdfs-plugin-enabled' in configurations['ranger-hdfs-plugin-properties']['properties']: + rangerPluginEnabled = configurations['ranger-hdfs-plugin-properties']['properties']['ranger-hdfs-plugin-enabled'] + elif 'ranger-hdfs-plugin-properties' in services['configurations'] and 'ranger-hdfs-plugin-enabled' in services['configurations']['ranger-hdfs-plugin-properties']['properties']: + rangerPluginEnabled = services['configurations']['ranger-hdfs-plugin-properties']['properties']['ranger-hdfs-plugin-enabled'] + + if rangerPluginEnabled and (rangerPluginEnabled.lower() == 'Yes'.lower()): + putHdfsSiteProperty("dfs.namenode.inode.attributes.provider.class",'org.apache.ranger.authorization.hadoop.RangerHdfsAuthorizer') + else: + putHdfsSitePropertyAttribute('dfs.namenode.inode.attributes.provider.class', 'delete', 'true') + else: + putHdfsSitePropertyAttribute('dfs.namenode.inode.attributes.provider.class', 'delete', 'true') + + +class HDFSValidator(service_advisor.ServiceAdvisor): + """ + HDFS Validator checks the correctness of properties whenever the service is first added or the user attempts to + change configs via the UI. + """ + + def __init__(self, *args, **kwargs): + self.as_super = super(HDFSValidator, self) + self.as_super.__init__(*args, **kwargs) + + self.validators = [("hdfs-site", self.validateHDFSConfigurationsFromHDP206), + ("hadoop-env", self.validateHadoopEnvConfigurationsFromHDP206), + ("core-site", self.validateHDFSCoreSiteFromHDP206), + ("hdfs-site", self.validateHDFSConfigurationsFromHDP22), + ("hadoop-env", self.validateHadoopEnvConfigurationsFromHDP22), + ("ranger-hdfs-plugin-properties", self.validateHDFSRangerPluginConfigurationsFromHDP22), + ("hdfs-site", self.validateRangerAuthorizerFromHDP23)] + + # ********************************************************** + # Example of how to add a function that validates a certain config type. + # If the same config type has multiple functions, can keep adding tuples to self.validators + #self.validators.append(("hadoop-env", self.sampleValidator)) + + def sampleValidator(self, properties, recommendedDefaults, configurations, services, hosts): + """ + Example of a validator function other other Service Advisors to emulate. + :return: A list of configuration validation problems. + """ + validationItems = [] + + ''' + Item is a simple dictionary. + Two functions can be used to construct it depending on the log level: WARN|ERROR + E.g., + self.getErrorItem(message) or self.getWarnItem(message) + + item = {"level": "ERROR|WARN", "message": "value"} + ''' + validationItems.append({"config-name": "my_config_property_name", + "item": self.getErrorItem("My custom message in method %s" % inspect.stack()[0][3])}) + return self.toConfigurationValidationProblems(validationItems, "hadoop-env") + + def validateHDFSConfigurationsFromHDP206(self, properties, recommendedDefaults, configurations, services, hosts): + """ + This was copied from HDP 2.0.6; validate hdfs-site + :return: A list of configuration validation problems. + """ + clusterEnv = self.getSiteProperties(configurations, "cluster-env") + validationItems = [{"config-name": 'dfs.datanode.du.reserved', "item": self.validatorLessThenDefaultValue(properties, recommendedDefaults, 'dfs.datanode.du.reserved')}, + {"config-name": 'dfs.datanode.data.dir', "item": self.validatorOneDataDirPerPartition(properties, 'dfs.datanode.data.dir', services, hosts, clusterEnv)}] + return self.toConfigurationValidationProblems(validationItems, "hdfs-site") + + def validatorOneDataDirPerPartition(self, properties, propertyName, services, hosts, clusterEnv): + if not propertyName in properties: + return self.getErrorItem("Value should be set") + dirs = properties[propertyName] + + if not (clusterEnv and "one_dir_per_partition" in clusterEnv and clusterEnv["one_dir_per_partition"].lower() == "true"): + return None + + dataNodeHosts = self.getDataNodeHosts(services, hosts) + + warnings = set() + for host in dataNodeHosts: + hostName = host["Hosts"]["host_name"] + + mountPoints = [] + for diskInfo in host["Hosts"]["disk_info"]: + mountPoints.append(diskInfo["mountpoint"]) + + if get_mounts_with_multiple_data_dirs(mountPoints, dirs): + # A detailed message can be too long on large clusters: + # warnings.append("Host: " + hostName + "; Mount: " + mountPoint + "; Data directories: " + ", ".join(dirList)) + warnings.add(hostName) + break + + if len(warnings) > 0: + return self.getWarnItem("cluster-env/one_dir_per_partition is enabled but there are multiple data directories on the same mount. Affected hosts: {0}".format(", ".join(sorted(warnings)))) + + return None + + def validateHadoopEnvConfigurationsFromHDP206(self, properties, recommendedDefaults, configurations, services, hosts): + """ + This was copied from HDP 2.0.6; validate hadoop-env + :return: A list of configuration validation problems. + """ + validationItems = [ {"config-name": 'namenode_heapsize', "item": self.validatorLessThenDefaultValue(properties, recommendedDefaults, 'namenode_heapsize')}, + {"config-name": 'namenode_opt_newsize', "item": self.validatorLessThenDefaultValue(properties, recommendedDefaults, 'namenode_opt_newsize')}, + {"config-name": 'namenode_opt_maxnewsize', "item": self.validatorLessThenDefaultValue(properties, recommendedDefaults, 'namenode_opt_maxnewsize')}] + return self.toConfigurationValidationProblems(validationItems, "hadoop-env") + + def validateHDFSCoreSiteFromHDP206(self, properties, recommendedDefaults, configurations, services, hosts): + """ + This was copied from HDP 2.0.6; validate core-site + :return: A list of configuration validation problems. + """ + validationItems = [] + validationItems.extend(self.getHadoopProxyUsersValidationItems(properties, services, hosts, configurations)) + validationItems.extend(self.getAmbariProxyUsersForHDFSValidationItems(properties, services)) + return self.toConfigurationValidationProblems(validationItems, "core-site") + + def getAmbariProxyUsersForHDFSValidationItems(self, properties, services): + validationItems = [] + servicesList = self.get_services_list(services) + + if "HDFS" in servicesList: + ambari_user = self.getAmbariUser(services) + props = ( + "hadoop.proxyuser.{0}.hosts".format(ambari_user), + "hadoop.proxyuser.{0}.groups".format(ambari_user) + ) + for prop in props: + validationItems.append({"config-name": prop, "item": self.validatorNotEmpty(properties, prop)}) + + return validationItems + + def validateHDFSConfigurationsFromHDP22(self, properties, recommendedDefaults, configurations, services, hosts): + """ + This was copied from HDP 2.2; validate hdfs-site + :return: A list of configuration validation problems. + """ + # We can not access property hadoop.security.authentication from the + # other config (core-site). That's why we are using another heuristic here + hdfs_site = properties + core_site = self.getSiteProperties(configurations, "core-site") + + dfs_encrypt_data_transfer = 'dfs.encrypt.data.transfer' # Hadoop Wire encryption + wire_encryption_enabled = False + try: + wire_encryption_enabled = hdfs_site[dfs_encrypt_data_transfer] == "true" + except KeyError: + pass + + HTTP_ONLY = 'HTTP_ONLY' + HTTPS_ONLY = 'HTTPS_ONLY' + HTTP_AND_HTTPS = 'HTTP_AND_HTTPS' + + VALID_HTTP_POLICY_VALUES = [HTTP_ONLY, HTTPS_ONLY, HTTP_AND_HTTPS] + VALID_TRANSFER_PROTECTION_VALUES = ['authentication', 'integrity', 'privacy'] + + validationItems = [] + address_properties = [ + # "dfs.datanode.address", + # "dfs.datanode.http.address", + # "dfs.datanode.https.address", + # "dfs.datanode.ipc.address", + # "dfs.journalnode.http-address", + # "dfs.journalnode.https-address", + # "dfs.namenode.rpc-address", + # "dfs.namenode.secondary.http-address", + "dfs.namenode.http-address", + "dfs.namenode.https-address", + ] + #Validating *address properties for correct values + + for address_property in address_properties: + if address_property in hdfs_site: + value = hdfs_site[address_property] + if not HDFSValidator.is_valid_host_port_authority(value): + validationItems.append({"config-name" : address_property, "item" : + self.getErrorItem(address_property + " does not contain a valid host:port authority: " + value)}) + + #Adding Ranger Plugin logic here + ranger_plugin_properties = self.getSiteProperties(configurations, "ranger-hdfs-plugin-properties") + ranger_plugin_enabled = ranger_plugin_properties['ranger-hdfs-plugin-enabled'] if ranger_plugin_properties else 'No' + servicesList = [service["StackServices"]["service_name"] for service in services["services"]] + if ("RANGER" in servicesList) and (ranger_plugin_enabled.lower() == 'Yes'.lower()): + if 'dfs.permissions.enabled' in hdfs_site and \ + hdfs_site['dfs.permissions.enabled'] != 'true': + validationItems.append({"config-name": 'dfs.permissions.enabled', + "item": self.getWarnItem("dfs.permissions.enabled needs to be set to true if Ranger HDFS Plugin is enabled.")}) + + if (not wire_encryption_enabled and # If wire encryption is enabled at Hadoop, it disables all our checks + 'hadoop.security.authentication' in core_site and + core_site['hadoop.security.authentication'] == 'kerberos' and + 'hadoop.security.authorization' in core_site and + core_site['hadoop.security.authorization'] == 'true'): + # security is enabled + + dfs_http_policy = 'dfs.http.policy' + dfs_datanode_address = 'dfs.datanode.address' + datanode_http_address = 'dfs.datanode.http.address' + datanode_https_address = 'dfs.datanode.https.address' + data_transfer_protection = 'dfs.data.transfer.protection' + + try: # Params may be absent + privileged_dfs_dn_port = HDFSValidator.isSecurePort(HDFSValidator.getPort(hdfs_site[dfs_datanode_address])) + except KeyError: + privileged_dfs_dn_port = False + try: + privileged_dfs_http_port = HDFSValidator.isSecurePort(HDFSValidator.getPort(hdfs_site[datanode_http_address])) + except KeyError: + privileged_dfs_http_port = False + try: + privileged_dfs_https_port = HDFSValidator.isSecurePort(HDFSValidator.getPort(hdfs_site[datanode_https_address])) + except KeyError: + privileged_dfs_https_port = False + try: + dfs_http_policy_value = hdfs_site[dfs_http_policy] + except KeyError: + dfs_http_policy_value = HTTP_ONLY # Default + try: + data_transfer_protection_value = hdfs_site[data_transfer_protection] + except KeyError: + data_transfer_protection_value = None + + if dfs_http_policy_value not in VALID_HTTP_POLICY_VALUES: + validationItems.append({"config-name": dfs_http_policy, + "item": self.getWarnItem( + "Invalid property value: {0}. Valid values are {1}".format( + dfs_http_policy_value, VALID_HTTP_POLICY_VALUES))}) + + # determine whether we use secure ports + address_properties_with_warnings = [] + if dfs_http_policy_value == HTTPS_ONLY: + if not privileged_dfs_dn_port and (privileged_dfs_https_port or datanode_https_address not in hdfs_site): + important_properties = [dfs_datanode_address, datanode_https_address] + message = "You set up datanode to use some non-secure ports. " \ + "If you want to run Datanode under non-root user in a secure cluster, " \ + "you should set all these properties {2} " \ + "to use non-secure ports (if property {3} does not exist, " \ + "just add it). You may also set up property {4} ('{5}' is a good default value). " \ + "Also, set up WebHDFS with SSL as " \ + "described in manual in order to be able to " \ + "use HTTPS.".format(dfs_http_policy, dfs_http_policy_value, important_properties, + datanode_https_address, data_transfer_protection, + VALID_TRANSFER_PROTECTION_VALUES[0]) + address_properties_with_warnings.extend(important_properties) + else: # dfs_http_policy_value == HTTP_AND_HTTPS or HTTP_ONLY + # We don't enforce datanode_https_address to use privileged ports here + any_nonprivileged_ports_are_in_use = not privileged_dfs_dn_port or not privileged_dfs_http_port + if any_nonprivileged_ports_are_in_use: + important_properties = [dfs_datanode_address, datanode_http_address] + message = "You have set up datanode to use some non-secure ports, but {0} is set to {1}. " \ + "In a secure cluster, Datanode forbids using non-secure ports " \ + "if {0} is not set to {3}. " \ + "Please make sure that properties {2} use secure ports.".format( + dfs_http_policy, dfs_http_policy_value, important_properties, HTTPS_ONLY) + address_properties_with_warnings.extend(important_properties) + + # Generate port-related warnings if any + for prop in address_properties_with_warnings: + validationItems.append({"config-name": prop, + "item": self.getWarnItem(message)}) + + # Check if it is appropriate to use dfs.data.transfer.protection + if data_transfer_protection_value is not None: + if dfs_http_policy_value in [HTTP_ONLY, HTTP_AND_HTTPS]: + validationItems.append({"config-name": data_transfer_protection, + "item": self.getWarnItem( + "{0} property can not be used when {1} is set to any " + "value other then {2}. Tip: When {1} property is not defined, it defaults to {3}".format( + data_transfer_protection, dfs_http_policy, HTTPS_ONLY, HTTP_ONLY))}) + elif not data_transfer_protection_value in VALID_TRANSFER_PROTECTION_VALUES: + validationItems.append({"config-name": data_transfer_protection, + "item": self.getWarnItem( + "Invalid property value: {0}. Valid values are {1}.".format( + data_transfer_protection_value, VALID_TRANSFER_PROTECTION_VALUES))}) + return self.toConfigurationValidationProblems(validationItems, "hdfs-site") + + def validateHadoopEnvConfigurationsFromHDP22(self, properties, recommendedDefaults, configurations, services, hosts): + """ + This was copied from HDP 2.2; validate hadoop-env + :return: A list of configuration validation problems. + """ + validationItems = [ {"config-name": 'namenode_heapsize', "item": self.validatorLessThenDefaultValue(properties, recommendedDefaults, 'namenode_heapsize')}, + {"config-name": 'namenode_opt_newsize', "item": self.validatorLessThenDefaultValue(properties, recommendedDefaults, 'namenode_opt_newsize')}, + {"config-name": 'namenode_opt_maxnewsize', "item": self.validatorLessThenDefaultValue(properties, recommendedDefaults, 'namenode_opt_maxnewsize')}] + return self.toConfigurationValidationProblems(validationItems, "hadoop-env") + + def validateHDFSRangerPluginConfigurationsFromHDP22(self, properties, recommendedDefaults, configurations, services, hosts): + """ + This was copied from HDP 2.2; validate ranger-hdfs-plugin-properties + :return: A list of configuration validation problems. + """ + validationItems = [] + ranger_plugin_properties = self.getSiteProperties(configurations, "ranger-hdfs-plugin-properties") + ranger_plugin_enabled = ranger_plugin_properties['ranger-hdfs-plugin-enabled'] if ranger_plugin_properties else 'No' + if ranger_plugin_enabled.lower() == 'yes': + # ranger-hdfs-plugin must be enabled in ranger-env + ranger_env = self.getServicesSiteProperties(services, 'ranger-env') + if not ranger_env or not 'ranger-hdfs-plugin-enabled' in ranger_env or ranger_env['ranger-hdfs-plugin-enabled'].lower() != 'yes': + validationItems.append({"config-name": 'ranger-hdfs-plugin-enabled', + "item": self.getWarnItem( + "ranger-hdfs-plugin-properties/ranger-hdfs-plugin-enabled must correspond ranger-env/ranger-hdfs-plugin-enabled")}) + return self.toConfigurationValidationProblems(validationItems, "ranger-hdfs-plugin-properties") + + def validateRangerAuthorizerFromHDP23(self, properties, recommendedDefaults, configurations, services, hosts): + """ + This was copied from HDP 2.3 + If Ranger service is present and the ranger plugin is enabled, check that the provider class is correctly set. + :return: A list of configuration validation problems. + """ + Logger.info("Class: %s, Method: %s. Checking if Ranger service is present and if the provider class is using the Ranger Authorizer." % + (self.__class__.__name__, inspect.stack()[0][3])) + # We can not access property hadoop.security.authentication from the + # other config (core-site). That's why we are using another heuristics here + hdfs_site = properties + validationItems = [] #Adding Ranger Plugin logic here + ranger_plugin_properties = self.getSiteProperties(configurations, "ranger-hdfs-plugin-properties") + ranger_plugin_enabled = ranger_plugin_properties['ranger-hdfs-plugin-enabled'] if ranger_plugin_properties else 'No' + servicesList = self.getServiceNames(services) + if ("RANGER" in servicesList) and (ranger_plugin_enabled.lower() == 'yes'): + + try: + if hdfs_site['dfs.namenode.inode.attributes.provider.class'].lower() != 'org.apache.ranger.authorization.hadoop.RangerHdfsAuthorizer'.lower(): + raise ValueError() + except (KeyError, ValueError), e: + message = "dfs.namenode.inode.attributes.provider.class needs to be set to 'org.apache.ranger.authorization.hadoop.RangerHdfsAuthorizer' if Ranger HDFS Plugin is enabled." + validationItems.append({"config-name": 'dfs.namenode.inode.attributes.provider.class', + "item": self.getWarnItem(message)}) + + return self.toConfigurationValidationProblems(validationItems, "hdfs-site") + + def getDataNodeHosts(self, services, hosts): + """ + Returns the list of Data Node hosts. If none, return an empty list. + """ + if len(hosts["items"]) > 0: + dataNodeHosts = self.getHostsWithComponent("HDFS", "DATANODE", services, hosts) + if dataNodeHosts is not None: + return dataNodeHosts + return [] \ No newline at end of file http://git-wip-us.apache.org/repos/asf/ambari/blob/326cc1b2/ambari-server/src/main/resources/common-services/ZOOKEEPER/3.4.9/service_advisor.py ---------------------------------------------------------------------- diff --git a/ambari-server/src/main/resources/common-services/ZOOKEEPER/3.4.9/service_advisor.py b/ambari-server/src/main/resources/common-services/ZOOKEEPER/3.4.9/service_advisor.py index 82316f4..4174b9c 100644 --- a/ambari-server/src/main/resources/common-services/ZOOKEEPER/3.4.9/service_advisor.py +++ b/ambari-server/src/main/resources/common-services/ZOOKEEPER/3.4.9/service_advisor.py @@ -48,6 +48,9 @@ class ZookeeperServiceAdvisor(service_advisor.ServiceAdvisor): self.modifyMastersWithMultipleInstances() self.modifyCardinalitiesDict() self.modifyHeapSizeProperties() + self.modifyNotValuableComponents() + self.modifyComponentsNotPreferableOnServer() + self.modifyComponentLayoutSchemes() def modifyMastersWithMultipleInstances(self): """ @@ -72,22 +75,46 @@ class ZookeeperServiceAdvisor(service_advisor.ServiceAdvisor): "property": "zk_server_heapsize", "default": "1024m"}]} + def modifyNotValuableComponents(self): + """ + Modify the set of components whose host assignment is based on other services. + Must be overriden in child class. + """ + # Nothing to do + pass + + def modifyComponentsNotPreferableOnServer(self): + """ + Modify the set of components that are not preferable on the server. + Must be overriden in child class. + """ + # Nothing to do + pass + + def modifyComponentLayoutSchemes(self): + """ + Modify layout scheme dictionaries for components. + The scheme dictionary basically maps the number of hosts to + host index where component should exist. + Must be overriden in child class. + """ + # Nothing to do + pass + def getServiceComponentLayoutValidations(self, services, hosts): """ Get a list of errors. Zookeeper does not have any validations in this version. """ - service_name = services["services"][0]["StackServices"]["service_name"] - Logger.info("Class: %s, Method: %s. Validating Service Component Layout for Service: %s." % - (self.__class__.__name__, inspect.stack()[0][3], service_name)) + Logger.info("Class: %s, Method: %s. Validating Service Component Layout." % + (self.__class__.__name__, inspect.stack()[0][3])) return self.as_super.getServiceComponentLayoutValidations(services, hosts) def getServiceConfigurationRecommendations(self, configurations, clusterData, services, hosts): """ Recommend configurations to set. Zookeeper does not have any recommendations in this version. """ - service_name = services["services"][0]["StackServices"]["service_name"] - Logger.info("Class: %s, Method: %s. Recommending Service Configurations for Service: %s." % - (self.__class__.__name__, inspect.stack()[0][3], service_name)) + Logger.info("Class: %s, Method: %s. Recommending Service Configurations." % + (self.__class__.__name__, inspect.stack()[0][3])) self.recommendConfigurations(configurations, clusterData, services, hosts) @@ -95,9 +122,8 @@ class ZookeeperServiceAdvisor(service_advisor.ServiceAdvisor): """ Recommend configurations for this service. """ - service_name = services["services"][0]["StackServices"]["service_name"] - Logger.info("Class: %s, Method: %s. Recommending Service Configurations for Service: %s." % - (self.__class__.__name__, inspect.stack()[0][3], service_name)) + Logger.info("Class: %s, Method: %s. Recommending Service Configurations." % + (self.__class__.__name__, inspect.stack()[0][3])) Logger.info("Setting zoo.cfg to default dataDir to /hadoop/zookeeper on the best matching mount") @@ -110,9 +136,8 @@ class ZookeeperServiceAdvisor(service_advisor.ServiceAdvisor): """ Validate configurations for the service. Return a list of errors. """ - service_name = services["services"][0]["StackServices"]["service_name"] - Logger.info("Class: %s, Method: %s. Validating Configurations for Service: %s." % - (self.__class__.__name__, inspect.stack()[0][3], service_name)) + Logger.info("Class: %s, Method: %s. Validating Configurations." % + (self.__class__.__name__, inspect.stack()[0][3])) items = [] http://git-wip-us.apache.org/repos/asf/ambari/blob/326cc1b2/ambari-server/src/main/resources/stacks/BIGTOP/0.8/services/stack_advisor.py ---------------------------------------------------------------------- diff --git a/ambari-server/src/main/resources/stacks/BIGTOP/0.8/services/stack_advisor.py b/ambari-server/src/main/resources/stacks/BIGTOP/0.8/services/stack_advisor.py index 5172042..6ef74d2 100644 --- a/ambari-server/src/main/resources/stacks/BIGTOP/0.8/services/stack_advisor.py +++ b/ambari-server/src/main/resources/stacks/BIGTOP/0.8/services/stack_advisor.py @@ -37,6 +37,7 @@ class BaseBIGTOP08StackAdvisor(DefaultStackAdvisor): self.modifyHeapSizeProperties() self.modifyNotValuableComponents() self.modifyComponentsNotPreferableOnServer() + self.modifyComponentLayoutSchemes() def modifyMastersWithMultipleInstances(self): """ @@ -79,6 +80,28 @@ class BaseBIGTOP08StackAdvisor(DefaultStackAdvisor): """ self.notPreferableOnServerComponents |= set(['GANGLIA_SERVER']) + def modifyComponentLayoutSchemes(self): + """ + Modify layout scheme dictionaries for components. + The scheme dictionary basically maps the number of hosts to + host index where component should exist. + Must be overriden in child class. + """ + self.componentLayoutSchemes = { + 'NAMENODE': {"else": 0}, + 'SECONDARY_NAMENODE': {"else": 1}, + 'HBASE_MASTER': {6: 0, 31: 2, "else": 3}, + + 'HISTORYSERVER': {31: 1, "else": 2}, + 'RESOURCEMANAGER': {31: 1, "else": 2}, + + 'OOZIE_SERVER': {6: 1, 31: 2, "else": 3}, + + 'HIVE_SERVER': {6: 1, 31: 2, "else": 4}, + 'HIVE_METASTORE': {6: 1, 31: 2, "else": 4}, + 'WEBHCAT_SERVER': {6: 1, 31: 2, "else": 4}, + } + def getComponentLayoutValidations(self, services, hosts): """Returns array of Validation objects about issues with hostnames components assigned to""" items = [] @@ -330,23 +353,6 @@ class BaseBIGTOP08StackAdvisor(DefaultStackAdvisor): {"config-name": 'yarn.scheduler.maximum-allocation-mb', "item": self.validatorLessThenDefaultValue(properties, recommendedDefaults, 'yarn.scheduler.maximum-allocation-mb')} ] return self.toConfigurationValidationProblems(validationItems, "yarn-site") - # TODO, move to Service Advisors. - def getComponentLayoutSchemes(self): - return { - 'NAMENODE': {"else": 0}, - 'SECONDARY_NAMENODE': {"else": 1}, - 'HBASE_MASTER': {6: 0, 31: 2, "else": 3}, - - 'HISTORYSERVER': {31: 1, "else": 2}, - 'RESOURCEMANAGER': {31: 1, "else": 2}, - - 'OOZIE_SERVER': {6: 1, 31: 2, "else": 3}, - - 'HIVE_SERVER': {6: 1, 31: 2, "else": 4}, - 'HIVE_METASTORE': {6: 1, 31: 2, "else": 4}, - 'WEBHCAT_SERVER': {6: 1, 31: 2, "else": 4}, - } - class BIGTOP08StackAdvisor(BaseBIGTOP08StackAdvisor): def __init__(self): @@ -357,6 +363,7 @@ class BIGTOP08StackAdvisor(BaseBIGTOP08StackAdvisor): self.modifyHeapSizeProperties() self.modifyNotValuableComponents() self.modifyComponentsNotPreferableOnServer() + self.modifyComponentLayoutSchemes() def modifyMastersWithMultipleInstances(self): """ @@ -396,6 +403,18 @@ class BIGTOP08StackAdvisor(BaseBIGTOP08StackAdvisor): """ self.notPreferableOnServerComponents |= set(['STORM_UI_SERVER', 'DRPC_SERVER', 'STORM_REST_API', 'NIMBUS']) + def modifyComponentLayoutSchemes(self): + """ + Modify layout scheme dictionaries for components. + The scheme dictionary basically maps the number of hosts to + host index where component should exist. + Must be overriden in child class. + """ + self.componentLayoutSchemes.update({ + 'APP_TIMELINE_SERVER': {31: 1, "else": 2}, + 'FALCON_SERVER': {6: 1, 31: 2, "else": 3} + }) + def getServiceConfigurationRecommenderDict(self): parentRecommendConfDict = super(BIGTOP08StackAdvisor, self).getServiceConfigurationRecommenderDict() childRecommendConfDict = { @@ -430,16 +449,6 @@ class BIGTOP08StackAdvisor(BaseBIGTOP08StackAdvisor): "-server -Xmx" + str(int(0.8 * clusterData["amMemory"])) + "m -Djava.net.preferIPv4Stack=true -XX:+UseNUMA -XX:+UseParallelGC") - # TODO, move to Service Advisors. - def getComponentLayoutSchemes(self): - parentSchemes = super(BIGTOP08StackAdvisor, self).getComponentLayoutSchemes() - childSchemes = { - 'APP_TIMELINE_SERVER': {31: 1, "else": 2}, - 'FALCON_SERVER': {6: 1, 31: 2, "else": 3} - } - parentSchemes.update(childSchemes) - return parentSchemes - def getServiceConfigurationValidators(self): parentValidators = super(BIGTOP08StackAdvisor, self).getServiceConfigurationValidators() childValidators = { http://git-wip-us.apache.org/repos/asf/ambari/blob/326cc1b2/ambari-server/src/main/resources/stacks/HDP/2.0.6/services/stack_advisor.py ---------------------------------------------------------------------- diff --git a/ambari-server/src/main/resources/stacks/HDP/2.0.6/services/stack_advisor.py b/ambari-server/src/main/resources/stacks/HDP/2.0.6/services/stack_advisor.py index 9816702..3596798 100644 --- a/ambari-server/src/main/resources/stacks/HDP/2.0.6/services/stack_advisor.py +++ b/ambari-server/src/main/resources/stacks/HDP/2.0.6/services/stack_advisor.py @@ -42,6 +42,7 @@ class HDP206StackAdvisor(DefaultStackAdvisor): self.modifyHeapSizeProperties() self.modifyNotValuableComponents() self.modifyComponentsNotPreferableOnServer() + self.modifyComponentLayoutSchemes() def modifyMastersWithMultipleInstances(self): """ @@ -84,6 +85,29 @@ class HDP206StackAdvisor(DefaultStackAdvisor): """ self.notPreferableOnServerComponents |= set(['GANGLIA_SERVER', 'METRICS_COLLECTOR']) + def modifyComponentLayoutSchemes(self): + """ + Modify layout scheme dictionaries for components. + The scheme dictionary basically maps the number of hosts to + host index where component should exist. + Must be overriden in child class. + """ + self.componentLayoutSchemes.update({ + 'NAMENODE': {"else": 0}, + 'SECONDARY_NAMENODE': {"else": 1}, + 'HBASE_MASTER': {6: 0, 31: 2, "else": 3}, + + 'HISTORYSERVER': {31: 1, "else": 2}, + 'RESOURCEMANAGER': {31: 1, "else": 2}, + + 'OOZIE_SERVER': {6: 1, 31: 2, "else": 3}, + + 'HIVE_SERVER': {6: 1, 31: 2, "else": 4}, + 'HIVE_METASTORE': {6: 1, 31: 2, "else": 4}, + 'WEBHCAT_SERVER': {6: 1, 31: 2, "else": 4}, + 'METRICS_COLLECTOR': {3: 2, 6: 2, 31: 3, "else": 5}, + }) + def getComponentLayoutValidations(self, services, hosts): """Returns array of Validation objects about issues with hostnames components assigned to""" items = super(HDP206StackAdvisor, self).getComponentLayoutValidations(services, hosts) @@ -248,17 +272,6 @@ class HDP206StackAdvisor(DefaultStackAdvisor): ambari_user = ambari_user.split('@')[0] return ambari_user - def recommendAmbariProxyUsersForHDFS(self, services, configurations, servicesList, putCoreSiteProperty, putCoreSitePropertyAttribute): - if "HDFS" in servicesList: - ambari_user = self.getAmbariUser(services) - ambariHostName = socket.getfqdn() - self.put_proxyuser_value(ambari_user, ambariHostName, services=services, configurations=configurations, put_function=putCoreSiteProperty) - self.put_proxyuser_value(ambari_user, "*", is_groups=True, services=services, configurations=configurations, put_function=putCoreSiteProperty) - old_ambari_user = self.getOldAmbariUser(services) - if old_ambari_user is not None: - putCoreSitePropertyAttribute("hadoop.proxyuser.{0}.hosts".format(old_ambari_user), 'delete', 'true') - putCoreSitePropertyAttribute("hadoop.proxyuser.{0}.groups".format(old_ambari_user), 'delete', 'true') - def getAmbariProxyUsersForHDFSValidationItems(self, properties, services): validationItems = [] servicesList = self.get_services_list(services) @@ -274,216 +287,6 @@ class HDP206StackAdvisor(DefaultStackAdvisor): return validationItems - def _getHadoopProxyUsersForService(self, serviceName, serviceUserComponents, services, hosts, configurations): - Logger.info("Calculating Hadoop Proxy User recommendations for {0} service.".format(serviceName)) - servicesList = self.get_services_list(services) - resultUsers = {} - - if serviceName in servicesList: - usersComponents = {} - for values in serviceUserComponents: - - # Filter, if 4th argument is present in the tuple - filterFunction = values[3:] - if filterFunction and not filterFunction[0](services, hosts): - continue - - userNameConfig, userNameProperty, hostSelectorMap = values[:3] - user = get_from_dict(services, ("configurations", userNameConfig, "properties", userNameProperty), None) - if user: - usersComponents[user] = (userNameConfig, userNameProperty, hostSelectorMap) - - for user, (userNameConfig, userNameProperty, hostSelectorMap) in usersComponents.iteritems(): - proxyUsers = {"config": userNameConfig, "propertyName": userNameProperty} - for proxyPropertyName, hostSelector in hostSelectorMap.iteritems(): - componentHostNamesString = hostSelector if isinstance(hostSelector, basestring) else '*' - if isinstance(hostSelector, (list, tuple)): - _, componentHostNames = self.get_data_for_proxyuser(user, services, configurations) # preserve old values - for component in hostSelector: - componentHosts = self.getHostsWithComponent(serviceName, component, services, hosts) - if componentHosts is not None: - for componentHost in componentHosts: - componentHostName = componentHost["Hosts"]["host_name"] - componentHostNames.add(componentHostName) - - componentHostNamesString = ",".join(sorted(componentHostNames)) - Logger.info("Host List for [service='{0}'; user='{1}'; components='{2}']: {3}".format(serviceName, user, ','.join(hostSelector), componentHostNamesString)) - - if not proxyPropertyName in proxyUsers: - proxyUsers[proxyPropertyName] = componentHostNamesString - - if not user in resultUsers: - resultUsers[user] = proxyUsers - - return resultUsers - - def getHadoopProxyUsers(self, services, hosts, configurations): - """ - Gets Hadoop Proxy User recommendations based on the configuration that is provided by - getServiceHadoopProxyUsersConfigurationDict. - - See getServiceHadoopProxyUsersConfigurationDict - """ - servicesList = self.get_services_list(services) - users = {} - - for serviceName, serviceUserComponents in self.getServiceHadoopProxyUsersConfigurationDict().iteritems(): - users.update(self._getHadoopProxyUsersForService(serviceName, serviceUserComponents, services, hosts, configurations)) - - return users - - def get_data_for_proxyuser(self, user_name, services, configurations, groups=False): - """ - Returns values of proxyuser properties for given user. Properties can be - hadoop.proxyuser.username.groups or hadoop.proxyuser.username.hosts - :param user_name: - :param services: - :param groups: if true, will return values for group property, not hosts - :return: tuple (wildcard_value, set[values]), where wildcard_value indicates if property value was * - """ - if "core-site" in services["configurations"]: - coreSite = services["configurations"]["core-site"]['properties'] - else: - coreSite = {} - if groups: - property_name = "hadoop.proxyuser.{0}.groups".format(user_name) - else: - property_name = "hadoop.proxyuser.{0}.hosts".format(user_name) - if property_name in coreSite: - property_value = coreSite[property_name] - if property_value == "*": - return True, set() - else: - result_values = set() - if "core-site" in configurations: - if property_name in configurations["core-site"]['properties']: - result_values = result_values.union(configurations["core-site"]['properties'][property_name].split(",")) - result_values = result_values.union(property_value.split(",")) - return False, result_values - return False, set() - - def put_proxyuser_value(self, user_name, value, is_groups=False, services=None, configurations=None, put_function=None): - is_wildcard_value, current_value = self.get_data_for_proxyuser(user_name, services, configurations, is_groups) - result_value = "*" - result_values_set = self.merge_proxyusers_values(current_value, value) - if len(result_values_set) > 0: - result_value = ",".join(sorted([val for val in result_values_set if val])) - - if is_groups: - property_name = "hadoop.proxyuser.{0}.groups".format(user_name) - else: - property_name = "hadoop.proxyuser.{0}.hosts".format(user_name) - - put_function(property_name, result_value) - - def merge_proxyusers_values(self, first, second): - result = set() - def append(data): - if isinstance(data, str) or isinstance(data, unicode): - if data != "*": - result.update(data.split(",")) - else: - result.update(data) - append(first) - append(second) - return result - - def getServiceHadoopProxyUsersConfigurationDict(self): - """ - Returns a map that is used by 'getHadoopProxyUsers' to determine service - user properties and related components and get proxyuser recommendations. - This method can be overridden in stackadvisors for the further stacks to - add additional services or change the previous logic. - - Example of the map format: - { - "serviceName": [ - ("configTypeName1", "userPropertyName1", {"propertyHosts": "*", "propertyGroups": "exact string value"}) - ("configTypeName2", "userPropertyName2", {"propertyHosts": ["COMPONENT1", "COMPONENT2", "COMPONENT3"], "propertyGroups": "*"}), - ("configTypeName3", "userPropertyName3", {"propertyHosts": ["COMPONENT1", "COMPONENT2", "COMPONENT3"]}, filterFunction) - ], - "serviceName2": [ - ... - } - - If the third element of a tuple is map that maps proxy property to it's value. - The key could be either 'propertyHosts' or 'propertyGroups'. (Both are optional) - If the map value is a string, then this string will be used for the proxyuser - value (e.g. 'hadoop.proxyuser.{user}.hosts' = '*'). - Otherwise map value should be alist or a tuple with component names. - All hosts with the provided components will be added - to the property (e.g. 'hadoop.proxyuser.{user}.hosts' = 'host1,host2,host3') - - The forth element of the tuple is optional and if it's provided, - it should be a function that takes two arguments: services and hosts. - If it returns False, proxyusers for the tuple will not be added. - """ - ALL_WILDCARD = "*" - HOSTS_PROPERTY = "propertyHosts" - GROUPS_PROPERTY = "propertyGroups" - - return { - "HDFS": [("hadoop-env", "hdfs_user", {HOSTS_PROPERTY: ALL_WILDCARD, GROUPS_PROPERTY: ALL_WILDCARD})], - "OOZIE": [("oozie-env", "oozie_user", {HOSTS_PROPERTY: ["OOZIE_SERVER"], GROUPS_PROPERTY: ALL_WILDCARD})], - "HIVE": [("hive-env", "hive_user", {HOSTS_PROPERTY: ["HIVE_SERVER", "HIVE_SERVER_INTERACTIVE"], GROUPS_PROPERTY: ALL_WILDCARD}), - ("hive-env", "webhcat_user", {HOSTS_PROPERTY: ["WEBHCAT_SERVER"], GROUPS_PROPERTY: ALL_WILDCARD})], - "YARN": [("yarn-env", "yarn_user", {HOSTS_PROPERTY: ["RESOURCEMANAGER"]}, lambda services, hosts: len(self.getHostsWithComponent("YARN", "RESOURCEMANAGER", services, hosts)) > 1)], - "FALCON": [("falcon-env", "falcon_user", {HOSTS_PROPERTY: ALL_WILDCARD, GROUPS_PROPERTY: ALL_WILDCARD})], - "SPARK": [("livy-env", "livy_user", {HOSTS_PROPERTY: ALL_WILDCARD, GROUPS_PROPERTY: ALL_WILDCARD})] - } - - def recommendHadoopProxyUsers(self, configurations, services, hosts): - servicesList = self.get_services_list(services) - - if 'forced-configurations' not in services: - services["forced-configurations"] = [] - - putCoreSiteProperty = self.putProperty(configurations, "core-site", services) - putCoreSitePropertyAttribute = self.putPropertyAttribute(configurations, "core-site") - - users = self.getHadoopProxyUsers(services, hosts, configurations) - - # Force dependencies for HIVE - if "HIVE" in servicesList: - hive_user = get_from_dict(services, ("configurations", "hive-env", "properties", "hive_user"), default_value=None) - if hive_user and get_from_dict(users, (hive_user, "propertyHosts"), default_value=None): - services["forced-configurations"].append({"type" : "core-site", "name" : "hadoop.proxyuser.{0}.hosts".format(hive_user)}) - - for user_name, user_properties in users.iteritems(): - - # Add properties "hadoop.proxyuser.*.hosts", "hadoop.proxyuser.*.groups" to core-site for all users - self.put_proxyuser_value(user_name, user_properties["propertyHosts"], services=services, configurations=configurations, put_function=putCoreSiteProperty) - Logger.info("Updated hadoop.proxyuser.{0}.hosts as : {1}".format(user_name, user_properties["propertyHosts"])) - if "propertyGroups" in user_properties: - self.put_proxyuser_value(user_name, user_properties["propertyGroups"], is_groups=True, services=services, configurations=configurations, put_function=putCoreSiteProperty) - - # Remove old properties if user was renamed - userOldValue = getOldValue(self, services, user_properties["config"], user_properties["propertyName"]) - if userOldValue is not None and userOldValue != user_name: - putCoreSitePropertyAttribute("hadoop.proxyuser.{0}.hosts".format(userOldValue), 'delete', 'true') - services["forced-configurations"].append({"type" : "core-site", "name" : "hadoop.proxyuser.{0}.hosts".format(userOldValue)}) - services["forced-configurations"].append({"type" : "core-site", "name" : "hadoop.proxyuser.{0}.hosts".format(user_name)}) - - if "propertyGroups" in user_properties: - putCoreSitePropertyAttribute("hadoop.proxyuser.{0}.groups".format(userOldValue), 'delete', 'true') - services["forced-configurations"].append({"type" : "core-site", "name" : "hadoop.proxyuser.{0}.groups".format(userOldValue)}) - services["forced-configurations"].append({"type" : "core-site", "name" : "hadoop.proxyuser.{0}.groups".format(user_name)}) - - self.recommendAmbariProxyUsersForHDFS(services, configurations, servicesList, putCoreSiteProperty, putCoreSitePropertyAttribute) - - def getHadoopProxyUsersValidationItems(self, properties, services, hosts, configurations): - validationItems = [] - users = self.getHadoopProxyUsers(services, hosts, configurations) - for user_name, user_properties in users.iteritems(): - props = ["hadoop.proxyuser.{0}.hosts".format(user_name)] - if "propertyGroups" in user_properties: - props.append("hadoop.proxyuser.{0}.groups".format(user_name)) - - for prop in props: - validationItems.append({"config-name": prop, "item": self.validatorNotEmpty(properties, prop)}) - - return validationItems - def recommendHDFSConfigurations(self, configurations, clusterData, services, hosts): putHDFSProperty = self.putProperty(configurations, "hadoop-env", services) putHDFSSiteProperty = self.putProperty(configurations, "hdfs-site", services) @@ -536,7 +339,7 @@ class HDP206StackAdvisor(DefaultStackAdvisor): maxFreeVolumeSizeForHost = 0l #kBytes for dataDir in dataDirs: - mp = getMountPointForDir(dataDir, mountPoints) + mp = self.getMountPointForDir(dataDir, mountPoints) for i in range(len(mountPoints)): if mp == mountPoints[i]: if mountPointDiskAvailableSpace[i] > maxFreeVolumeSizeForHost: @@ -974,43 +777,6 @@ class HDP206StackAdvisor(DefaultStackAdvisor): pass - def getHostNamesWithComponent(self, serviceName, componentName, services): - """ - Returns the list of hostnames on which service component is installed - """ - if services is not None and serviceName in [service["StackServices"]["service_name"] for service in services["services"]]: - service = [serviceEntry for serviceEntry in services["services"] if serviceEntry["StackServices"]["service_name"] == serviceName][0] - components = [componentEntry for componentEntry in service["components"] if componentEntry["StackServiceComponents"]["component_name"] == componentName] - if (len(components) > 0 and len(components[0]["StackServiceComponents"]["hostnames"]) > 0): - componentHostnames = components[0]["StackServiceComponents"]["hostnames"] - return componentHostnames - return [] - - def getHostsWithComponent(self, serviceName, componentName, services, hosts): - if services is not None and hosts is not None and serviceName in [service["StackServices"]["service_name"] for service in services["services"]]: - service = [serviceEntry for serviceEntry in services["services"] if serviceEntry["StackServices"]["service_name"] == serviceName][0] - components = [componentEntry for componentEntry in service["components"] if componentEntry["StackServiceComponents"]["component_name"] == componentName] - if (len(components) > 0 and len(components[0]["StackServiceComponents"]["hostnames"]) > 0): - componentHostnames = components[0]["StackServiceComponents"]["hostnames"] - componentHosts = [host for host in hosts["items"] if host["Hosts"]["host_name"] in componentHostnames] - return componentHosts - return [] - - def getHostWithComponent(self, serviceName, componentName, services, hosts): - componentHosts = self.getHostsWithComponent(serviceName, componentName, services, hosts) - if (len(componentHosts) > 0): - return componentHosts[0] - return None - - def getHostComponentsByCategories(self, hostname, categories, services, hosts): - components = [] - if services is not None and hosts is not None: - for service in services["services"]: - components.extend([componentEntry for componentEntry in service["components"] - if componentEntry["StackServiceComponents"]["component_category"] in categories - and hostname in componentEntry["StackServiceComponents"]["hostnames"]]) - return components - # TODO, move this to a generic stack advisor. def getZKHostPortString(self, services, include_port=True): """ @@ -1312,8 +1078,8 @@ class HDP206StackAdvisor(DefaultStackAdvisor): mountPoints = [] for mountPoint in host["Hosts"]["disk_info"]: mountPoints.append(mountPoint["mountpoint"]) - hbase_rootdir_mountpoint = getMountPointForDir(hbase_rootdir, mountPoints) - hbase_tmpdir_mountpoint = getMountPointForDir(hbase_tmpdir, mountPoints) + hbase_rootdir_mountpoint = self.getMountPointForDir(hbase_rootdir, mountPoints) + hbase_tmpdir_mountpoint = self.getMountPointForDir(hbase_tmpdir, mountPoints) preferred_mountpoints = self.getPreferredMountPoints(host['Hosts']) # hbase.rootdir and hbase.tmp.dir shouldn't point to the same partition # if multiple preferred_mountpoints exist @@ -1331,7 +1097,7 @@ class HDP206StackAdvisor(DefaultStackAdvisor): if dn_hosts and collectorHostName in dn_hosts and ams_site and \ dfs_datadirs and len(preferred_mountpoints) > len(dfs_datadirs): for dfs_datadir in dfs_datadirs: - dfs_datadir_mountpoint = getMountPointForDir(dfs_datadir, mountPoints) + dfs_datadir_mountpoint = self.getMountPointForDir(dfs_datadir, mountPoints) if dfs_datadir_mountpoint == hbase_rootdir_mountpoint: item = self.getWarnItem("Consider not using {0} partition for storing metrics data. " "{0} is already used by datanode to store HDFS data".format(hbase_rootdir_mountpoint)) @@ -1383,10 +1149,10 @@ class HDP206StackAdvisor(DefaultStackAdvisor): if logDirItem: validationItems.extend([{"config-name": "hbase_log_dir", "item": logDirItem}]) - hbase_master_heapsize = to_number(properties["hbase_master_heapsize"]) - hbase_master_xmn_size = to_number(properties["hbase_master_xmn_size"]) - hbase_regionserver_heapsize = to_number(properties["hbase_regionserver_heapsize"]) - hbase_regionserver_xmn_size = to_number(properties["regionserver_xmn_size"]) + hbase_master_heapsize = self.to_number(properties["hbase_master_heapsize"]) + hbase_master_xmn_size = self.to_number(properties["hbase_master_xmn_size"]) + hbase_regionserver_heapsize = self.to_number(properties["hbase_regionserver_heapsize"]) + hbase_regionserver_xmn_size = self.to_number(properties["regionserver_xmn_size"]) # Validate Xmn settings. masterXmnItem = None @@ -1476,14 +1242,14 @@ class HDP206StackAdvisor(DefaultStackAdvisor): heapPropertyToIncrease = "hbase_regionserver_heapsize" if is_hbase_distributed else "hbase_master_heapsize" xmnPropertyToIncrease = "regionserver_xmn_size" if is_hbase_distributed else "hbase_master_xmn_size" - hbase_needs_increase = to_number(properties[heapPropertyToIncrease]) * mb < 32 * gb + hbase_needs_increase = self.to_number(properties[heapPropertyToIncrease]) * mb < 32 * gb if unusedMemory > 4*gb and hbase_needs_increase: # warn user, if more than 4GB RAM is unused - recommended_hbase_heapsize = int((unusedMemory - 4*gb)*4/5) + to_number(properties.get(heapPropertyToIncrease))*mb + recommended_hbase_heapsize = int((unusedMemory - 4*gb)*4/5) + self.to_number(properties.get(heapPropertyToIncrease))*mb recommended_hbase_heapsize = min(32*gb, recommended_hbase_heapsize) #Make sure heapsize <= 32GB recommended_hbase_heapsize = round_to_n(recommended_hbase_heapsize/mb,128) # Round to 128m multiple - if to_number(properties[heapPropertyToIncrease]) < recommended_hbase_heapsize: + if self.to_number(properties[heapPropertyToIncrease]) < recommended_hbase_heapsize: hbaseHeapsizeItem = self.getWarnItem("Consider allocating {0} MB to {1} in ams-hbase-env to use up some " "unused memory on host" .format(recommended_hbase_heapsize, @@ -1491,7 +1257,7 @@ class HDP206StackAdvisor(DefaultStackAdvisor): validationItems.extend([{"config-name": heapPropertyToIncrease, "item": hbaseHeapsizeItem}]) recommended_xmn_size = round_to_n(0.15*recommended_hbase_heapsize,128) - if to_number(properties[xmnPropertyToIncrease]) < recommended_xmn_size: + if self.to_number(properties[xmnPropertyToIncrease]) < recommended_xmn_size: xmnPropertyToIncreaseItem = self.getWarnItem("Consider allocating {0} MB to use up some unused memory " "on host".format(recommended_xmn_size)) validationItems.extend([{"config-name": xmnPropertyToIncrease, "item": xmnPropertyToIncreaseItem}]) @@ -1505,7 +1271,7 @@ class HDP206StackAdvisor(DefaultStackAdvisor): mb = 1024 * 1024 gb = 1024 * mb validationItems = [] - collector_heapsize = to_number(ams_env.get("metrics_collector_heapsize")) + collector_heapsize = self.to_number(ams_env.get("metrics_collector_heapsize")) amsCollectorHosts = self.getComponentHostNames(services, "AMBARI_METRICS", "METRICS_COLLECTOR") for collectorHostName in amsCollectorHosts: for host in hosts["items"]: @@ -1550,13 +1316,13 @@ class HDP206StackAdvisor(DefaultStackAdvisor): if len(heapsize) > 1 and heapsize[-1] in '0123456789': heapsize = str(heapsize) + "m" - totalMemoryRequired += formatXmxSizeToBytes(heapsize) + totalMemoryRequired += self.formatXmxSizeToBytes(heapsize) else: if component == "METRICS_MONITOR" or "CLIENT" in component: heapsize = '512m' else: heapsize = '1024m' - totalMemoryRequired += formatXmxSizeToBytes(heapsize) + totalMemoryRequired += self.formatXmxSizeToBytes(heapsize) return totalMemoryRequired def getPreferredMountPoints(self, hostInfo): @@ -1573,130 +1339,13 @@ class HDP206StackAdvisor(DefaultStackAdvisor): mountpoint["mountpoint"].startswith(("/boot", "/mnt")) or mountpoint["type"] in undesirableFsTypes or mountpoint["available"] == str(0)): - mountPointsDict[mountpoint["mountpoint"]] = to_number(mountpoint["available"]) + mountPointsDict[mountpoint["mountpoint"]] = self.to_number(mountpoint["available"]) if mountPointsDict: mountPoints = sorted(mountPointsDict, key=mountPointsDict.get, reverse=True) mountPoints.append("/") return mountPoints - def validatorNotRootFs(self, properties, recommendedDefaults, propertyName, hostInfo): - if not propertyName in properties: - return self.getErrorItem("Value should be set") - dir = properties[propertyName] - if not dir.startswith("file://") or dir == recommendedDefaults.get(propertyName): - return None - - dir = re.sub("^file://", "", dir, count=1) - mountPoints = [] - for mountPoint in hostInfo["disk_info"]: - mountPoints.append(mountPoint["mountpoint"]) - mountPoint = getMountPointForDir(dir, mountPoints) - - if "/" == mountPoint and self.getPreferredMountPoints(hostInfo)[0] != mountPoint: - return self.getWarnItem("It is not recommended to use root partition for {0}".format(propertyName)) - - return None - - def validatorEnoughDiskSpace(self, properties, propertyName, hostInfo, reqiuredDiskSpace): - if not propertyName in properties: - return self.getErrorItem("Value should be set") - dir = properties[propertyName] - if not dir.startswith("file://"): - return None - - dir = re.sub("^file://", "", dir, count=1) - mountPoints = {} - for mountPoint in hostInfo["disk_info"]: - mountPoints[mountPoint["mountpoint"]] = to_number(mountPoint["available"]) - mountPoint = getMountPointForDir(dir, mountPoints.keys()) - - if not mountPoints: - return self.getErrorItem("No disk info found on host %s" % hostInfo["host_name"]) - - if mountPoints[mountPoint] < reqiuredDiskSpace: - msg = "Ambari Metrics disk space requirements not met. \n" \ - "Recommended disk space for partition {0} is {1}G" - return self.getWarnItem(msg.format(mountPoint, reqiuredDiskSpace/1048576)) # in Gb - return None - - def validatorLessThenDefaultValue(self, properties, recommendedDefaults, propertyName): - if propertyName not in recommendedDefaults: - # If a property name exists in say hbase-env and hbase-site (which is allowed), then it will exist in the - # "properties" dictionary, but not necessarily in the "recommendedDefaults" dictionary". In this case, ignore it. - return None - - if not propertyName in properties: - return self.getErrorItem("Value should be set") - value = to_number(properties[propertyName]) - if value is None: - return self.getErrorItem("Value should be integer") - defaultValue = to_number(recommendedDefaults[propertyName]) - if defaultValue is None: - return None - if value < defaultValue: - return self.getWarnItem("Value is less than the recommended default of {0}".format(defaultValue)) - return None - - def validatorEqualsPropertyItem(self, properties1, propertyName1, - properties2, propertyName2, - emptyAllowed=False): - if not propertyName1 in properties1: - return self.getErrorItem("Value should be set for %s" % propertyName1) - if not propertyName2 in properties2: - return self.getErrorItem("Value should be set for %s" % propertyName2) - value1 = properties1.get(propertyName1) - if value1 is None and not emptyAllowed: - return self.getErrorItem("Empty value for %s" % propertyName1) - value2 = properties2.get(propertyName2) - if value2 is None and not emptyAllowed: - return self.getErrorItem("Empty value for %s" % propertyName2) - if value1 != value2: - return self.getWarnItem("It is recommended to set equal values " - "for properties {0} and {1}".format(propertyName1, propertyName2)) - - return None - - def validatorEqualsToRecommendedItem(self, properties, recommendedDefaults, - propertyName): - if not propertyName in properties: - return self.getErrorItem("Value should be set for %s" % propertyName) - value = properties.get(propertyName) - if not propertyName in recommendedDefaults: - return self.getErrorItem("Value should be recommended for %s" % propertyName) - recommendedValue = recommendedDefaults.get(propertyName) - if value != recommendedValue: - return self.getWarnItem("It is recommended to set value {0} " - "for property {1}".format(recommendedValue, propertyName)) - return None - - def validatorNotEmpty(self, properties, propertyName): - if not propertyName in properties: - return self.getErrorItem("Value should be set for {0}".format(propertyName)) - value = properties.get(propertyName) - if not value: - return self.getWarnItem("Empty value for {0}".format(propertyName)) - return None - - def validateMinMemorySetting(self, properties, defaultValue, propertyName): - if not propertyName in properties: - return self.getErrorItem("Value should be set") - if defaultValue is None: - return self.getErrorItem("Config's default value can't be null or undefined") - - value = properties[propertyName] - if value is None: - return self.getErrorItem("Value can't be null or undefined") - try: - valueInt = to_number(value) - # TODO: generify for other use cases - defaultValueInt = int(str(defaultValue).strip()) - if valueInt < defaultValueInt: - return self.getWarnItem("Value is less than the minimum recommended default of -Xmx" + str(defaultValue)) - except: - return None - - return None - + # TODO, move to YARN Service Advisor def validatorYarnQueue(self, properties, recommendedDefaults, propertyName, services): if propertyName not in properties: return self.getErrorItem("Value should be set") @@ -1712,6 +1361,7 @@ class HDP206StackAdvisor(DefaultStackAdvisor): return None + # TODO, move to YARN Service Advisor def recommendYarnQueue(self, services, catalog_name=None, queue_property=None): old_queue_name = None @@ -1737,15 +1387,15 @@ class HDP206StackAdvisor(DefaultStackAdvisor): defaultValue = recommendedDefaults[propertyName] if defaultValue is None: return self.getErrorItem("Config's default value can't be null or undefined") - if not checkXmxValueFormat(value) and checkXmxValueFormat(defaultValue): + if not self.checkXmxValueFormat(value) and self.checkXmxValueFormat(defaultValue): # Xmx is in the default-value but not the value, should be an error return self.getErrorItem('Invalid value format') - if not checkXmxValueFormat(defaultValue): + if not self.checkXmxValueFormat(defaultValue): # if default value does not contain Xmx, then there is no point in validating existing value return None - valueInt = formatXmxSizeToBytes(getXmxSize(value)) - defaultValueXmx = getXmxSize(defaultValue) - defaultValueInt = formatXmxSizeToBytes(defaultValueXmx) + valueInt = self.formatXmxSizeToBytes(self.getXmxSize(value)) + defaultValueXmx = self.getXmxSize(defaultValue) + defaultValueInt = self.formatXmxSizeToBytes(defaultValueXmx) if valueInt < defaultValueInt: return self.getWarnItem("Value is less than the recommended default of -Xmx" + defaultValueXmx) return None @@ -1837,24 +1487,6 @@ class HDP206StackAdvisor(DefaultStackAdvisor): return dataNodeHosts return [] - # TODO, move to Service Advisors. - def getComponentLayoutSchemes(self): - return { - 'NAMENODE': {"else": 0}, - 'SECONDARY_NAMENODE': {"else": 1}, - 'HBASE_MASTER': {6: 0, 31: 2, "else": 3}, - - 'HISTORYSERVER': {31: 1, "else": 2}, - 'RESOURCEMANAGER': {31: 1, "else": 2}, - - 'OOZIE_SERVER': {6: 1, 31: 2, "else": 3}, - - 'HIVE_SERVER': {6: 1, 31: 2, "else": 4}, - 'HIVE_METASTORE': {6: 1, 31: 2, "else": 4}, - 'WEBHCAT_SERVER': {6: 1, 31: 2, "else": 4}, - 'METRICS_COLLECTOR': {3: 2, 6: 2, 31: 3, "else": 5}, - } - def get_system_min_uid(self): login_defs = '/etc/login.defs' uid_min_tag = 'UID_MIN' @@ -2038,17 +1670,7 @@ class HDP206StackAdvisor(DefaultStackAdvisor): "security_enabled" in services["configurations"]["cluster-env"]["properties"] and\ services["configurations"]["cluster-env"]["properties"]["security_enabled"].lower() == "true" - def get_services_list(self, services): - """ - Returns available services as list - - :type services dict - :rtype list - """ - if not services: - return [] - return [service["StackServices"]["service_name"] for service in services["services"]] def get_components_list(self, service, services): """ @@ -2103,88 +1725,7 @@ def getServicesSiteProperties(services, siteName): return None return siteConfig.get("properties") -def to_number(s): - try: - return int(re.sub("\D", "", s)) - except ValueError: - return None -def checkXmxValueFormat(value): - p = re.compile('-Xmx(\d+)(b|k|m|g|p|t|B|K|M|G|P|T)?') - matches = p.findall(value) - return len(matches) == 1 - -def getXmxSize(value): - p = re.compile("-Xmx(\d+)(.?)") - result = p.findall(value)[0] - if len(result) > 1: - # result[1] - is a space or size formatter (b|k|m|g etc) - return result[0] + result[1].lower() - return result[0] - -def formatXmxSizeToBytes(value): - value = value.lower() - if len(value) == 0: - return 0 - modifier = value[-1] - - if modifier == ' ' or modifier in "0123456789": - modifier = 'b' - m = { - modifier == 'b': 1, - modifier == 'k': 1024, - modifier == 'm': 1024 * 1024, - modifier == 'g': 1024 * 1024 * 1024, - modifier == 't': 1024 * 1024 * 1024 * 1024, - modifier == 'p': 1024 * 1024 * 1024 * 1024 * 1024 - }[1] - return to_number(value) * m - -def getPort(address): - """ - Extracts port from the address like 0.0.0.0:1019 - """ - if address is None: - return None - m = re.search(r'(?:http(?:s)?://)?([\w\d.]*):(\d{1,5})', address) - if m is not None: - return int(m.group(2)) - else: - return None - -def isSecurePort(port): - """ - Returns True if port is root-owned at *nix systems - """ - if port is not None: - return port < 1024 - else: - return False - -def getMountPointForDir(dir, mountPoints): - """ - :param dir: Directory to check, even if it doesn't exist. - :return: Returns the closest mount point as a string for the directory. - if the "dir" variable is None, will return None. - If the directory does not exist, will return "/". - """ - bestMountFound = None - if dir: - dir = re.sub("^file://", "", dir, count=1).strip().lower() - - # If the path is "/hadoop/hdfs/data", then possible matches for mounts could be - # "/", "/hadoop/hdfs", and "/hadoop/hdfs/data". - # So take the one with the greatest number of segments. - for mountPoint in mountPoints: - # Ensure that the mount path and the dir path ends with "/" - # The mount point "/hadoop" should not match with the path "/hadoop1" - if os.path.join(dir, "").startswith(os.path.join(mountPoint, "")): - if bestMountFound is None: - bestMountFound = mountPoint - elif os.path.join(bestMountFound, "").count(os.path.sep) < os.path.join(mountPoint, "").count(os.path.sep): - bestMountFound = mountPoint - - return bestMountFound def round_to_n(mem_size, n=128): return int(round(mem_size / float(n))) * int(n) http://git-wip-us.apache.org/repos/asf/ambari/blob/326cc1b2/ambari-server/src/main/resources/stacks/HDP/2.1/services/stack_advisor.py ---------------------------------------------------------------------- diff --git a/ambari-server/src/main/resources/stacks/HDP/2.1/services/stack_advisor.py b/ambari-server/src/main/resources/stacks/HDP/2.1/services/stack_advisor.py index b275b00..4822732 100644 --- a/ambari-server/src/main/resources/stacks/HDP/2.1/services/stack_advisor.py +++ b/ambari-server/src/main/resources/stacks/HDP/2.1/services/stack_advisor.py @@ -32,6 +32,7 @@ class HDP21StackAdvisor(HDP206StackAdvisor): self.modifyHeapSizeProperties() self.modifyNotValuableComponents() self.modifyComponentsNotPreferableOnServer() + self.modifyComponentLayoutSchemes() def modifyMastersWithMultipleInstances(self): """ @@ -74,6 +75,20 @@ class HDP21StackAdvisor(HDP206StackAdvisor): """ self.notPreferableOnServerComponents |= set(['STORM_UI_SERVER', 'DRPC_SERVER', 'STORM_REST_API', 'NIMBUS', 'GANGLIA_SERVER', 'METRICS_COLLECTOR']) + def modifyComponentLayoutSchemes(self): + """ + Modify layout scheme dictionaries for components. + + The scheme dictionary basically maps the number of hosts to + host index where component should exist. + + Must be overriden in child class. + """ + self.componentLayoutSchemes.update({ + 'APP_TIMELINE_SERVER': {31: 1, "else": 2}, + 'FALCON_SERVER': {6: 1, 31: 2, "else": 3} + }) + def getServiceConfigurationRecommenderDict(self): parentRecommendConfDict = super(HDP21StackAdvisor, self).getServiceConfigurationRecommenderDict() childRecommendConfDict = { @@ -210,16 +225,6 @@ class HDP21StackAdvisor(HDP206StackAdvisor): if recommended_tez_queue is not None: putTezProperty("tez.queue.name", recommended_tez_queue) - # TODO, move to Service Advisors. - def getComponentLayoutSchemes(self): - parentSchemes = super(HDP21StackAdvisor, self).getComponentLayoutSchemes() - childSchemes = { - 'APP_TIMELINE_SERVER': {31: 1, "else": 2}, - 'FALCON_SERVER': {6: 1, 31: 2, "else": 3} - } - parentSchemes.update(childSchemes) - return parentSchemes - def getServiceConfigurationValidators(self): parentValidators = super(HDP21StackAdvisor, self).getServiceConfigurationValidators() childValidators = { @@ -233,10 +238,10 @@ class HDP21StackAdvisor(HDP206StackAdvisor): validationItems = [ {"config-name": 'hive.tez.container.size', "item": self.validatorLessThenDefaultValue(properties, recommendedDefaults, 'hive.tez.container.size')}, {"config-name": 'hive.tez.java.opts', "item": self.validateXmxValue(properties, recommendedDefaults, 'hive.tez.java.opts')}, {"config-name": 'hive.auto.convert.join.noconditionaltask.size', "item": self.validatorLessThenDefaultValue(properties, recommendedDefaults, 'hive.auto.convert.join.noconditionaltask.size')} ] - yarnSiteProperties = getSiteProperties(configurations, "yarn-site") + yarnSiteProperties = self.getSiteProperties(configurations, "yarn-site") if yarnSiteProperties: - yarnSchedulerMaximumAllocationMb = to_number(yarnSiteProperties["yarn.scheduler.maximum-allocation-mb"]) - hiveTezContainerSize = to_number(properties['hive.tez.container.size']) + yarnSchedulerMaximumAllocationMb = self.to_number(yarnSiteProperties["yarn.scheduler.maximum-allocation-mb"]) + hiveTezContainerSize = self.to_number(properties['hive.tez.container.size']) if hiveTezContainerSize is not None and yarnSchedulerMaximumAllocationMb is not None and hiveTezContainerSize > yarnSchedulerMaximumAllocationMb: validationItems.append({"config-name": 'hive.tez.container.size', "item": self.getWarnItem("hive.tez.container.size is greater than the maximum container size specified in yarn.scheduler.maximum-allocation-mb")}) return self.toConfigurationValidationProblems(validationItems, "hive-site") http://git-wip-us.apache.org/repos/asf/ambari/blob/326cc1b2/ambari-server/src/main/resources/stacks/HDP/2.2/services/stack_advisor.py ---------------------------------------------------------------------- diff --git a/ambari-server/src/main/resources/stacks/HDP/2.2/services/stack_advisor.py b/ambari-server/src/main/resources/stacks/HDP/2.2/services/stack_advisor.py index 8980398..f108622 100644 --- a/ambari-server/src/main/resources/stacks/HDP/2.2/services/stack_advisor.py +++ b/ambari-server/src/main/resources/stacks/HDP/2.2/services/stack_advisor.py @@ -30,6 +30,7 @@ import xml.etree.ElementTree as ET # Local Imports from resource_management.core.logger import Logger + class HDP22StackAdvisor(HDP21StackAdvisor): def __init__(self): @@ -1202,23 +1203,23 @@ class HDP22StackAdvisor(HDP21StackAdvisor): {"config-name": 'mapreduce.job.queuename', "item": self.validatorYarnQueue(properties, recommendedDefaults, 'mapreduce.job.queuename', services)} ] if 'mapreduce.map.java.opts' in properties and \ - checkXmxValueFormat(properties['mapreduce.map.java.opts']): - mapreduceMapJavaOpts = formatXmxSizeToBytes(getXmxSize(properties['mapreduce.map.java.opts'])) / (1024.0 * 1024) - mapreduceMapMemoryMb = to_number(properties['mapreduce.map.memory.mb']) + self.checkXmxValueFormat(properties['mapreduce.map.java.opts']): + mapreduceMapJavaOpts = self.formatXmxSizeToBytes(self.getXmxSize(properties['mapreduce.map.java.opts'])) / (1024.0 * 1024) + mapreduceMapMemoryMb = self.to_number(properties['mapreduce.map.memory.mb']) if mapreduceMapJavaOpts > mapreduceMapMemoryMb: validationItems.append({"config-name": 'mapreduce.map.java.opts', "item": self.getWarnItem("mapreduce.map.java.opts Xmx should be less than mapreduce.map.memory.mb ({0})".format(mapreduceMapMemoryMb))}) if 'mapreduce.reduce.java.opts' in properties and \ - checkXmxValueFormat(properties['mapreduce.reduce.java.opts']): - mapreduceReduceJavaOpts = formatXmxSizeToBytes(getXmxSize(properties['mapreduce.reduce.java.opts'])) / (1024.0 * 1024) - mapreduceReduceMemoryMb = to_number(properties['mapreduce.reduce.memory.mb']) + self.checkXmxValueFormat(properties['mapreduce.reduce.java.opts']): + mapreduceReduceJavaOpts = self.formatXmxSizeToBytes(self.getXmxSize(properties['mapreduce.reduce.java.opts'])) / (1024.0 * 1024) + mapreduceReduceMemoryMb = self.to_number(properties['mapreduce.reduce.memory.mb']) if mapreduceReduceJavaOpts > mapreduceReduceMemoryMb: validationItems.append({"config-name": 'mapreduce.reduce.java.opts', "item": self.getWarnItem("mapreduce.reduce.java.opts Xmx should be less than mapreduce.reduce.memory.mb ({0})".format(mapreduceReduceMemoryMb))}) if 'yarn.app.mapreduce.am.command-opts' in properties and \ - checkXmxValueFormat(properties['yarn.app.mapreduce.am.command-opts']): - yarnAppMapreduceAmCommandOpts = formatXmxSizeToBytes(getXmxSize(properties['yarn.app.mapreduce.am.command-opts'])) / (1024.0 * 1024) - yarnAppMapreduceAmResourceMb = to_number(properties['yarn.app.mapreduce.am.resource.mb']) + self.checkXmxValueFormat(properties['yarn.app.mapreduce.am.command-opts']): + yarnAppMapreduceAmCommandOpts = self.formatXmxSizeToBytes(self.getXmxSize(properties['yarn.app.mapreduce.am.command-opts'])) / (1024.0 * 1024) + yarnAppMapreduceAmResourceMb = self.to_number(properties['yarn.app.mapreduce.am.resource.mb']) if yarnAppMapreduceAmCommandOpts > yarnAppMapreduceAmResourceMb: validationItems.append({"config-name": 'yarn.app.mapreduce.am.command-opts', "item": self.getWarnItem("yarn.app.mapreduce.am.command-opts Xmx should be less than yarn.app.mapreduce.am.resource.mb ({0})".format(yarnAppMapreduceAmResourceMb))}) @@ -1283,7 +1284,7 @@ class HDP22StackAdvisor(HDP21StackAdvisor): for address_property in address_properties: if address_property in hdfs_site: value = hdfs_site[address_property] - if not is_valid_host_port_authority(value): + if not self.is_valid_host_port_authority(value): validationItems.append({"config-name" : address_property, "item" : self.getErrorItem(address_property + " does not contain a valid host:port authority: " + value)}) @@ -1312,15 +1313,15 @@ class HDP22StackAdvisor(HDP21StackAdvisor): data_transfer_protection = 'dfs.data.transfer.protection' try: # Params may be absent - privileged_dfs_dn_port = isSecurePort(getPort(hdfs_site[dfs_datanode_address])) + privileged_dfs_dn_port = self.isSecurePort(self.getPort(hdfs_site[dfs_datanode_address])) except KeyError: privileged_dfs_dn_port = False try: - privileged_dfs_http_port = isSecurePort(getPort(hdfs_site[datanode_http_address])) + privileged_dfs_http_port = self.isSecurePort(self.getPort(hdfs_site[datanode_http_address])) except KeyError: privileged_dfs_http_port = False try: - privileged_dfs_https_port = isSecurePort(getPort(hdfs_site[datanode_https_address])) + privileged_dfs_https_port = self.isSecurePort(self.getPort(hdfs_site[datanode_https_address])) except KeyError: privileged_dfs_https_port = False try: @@ -1770,15 +1771,3 @@ def is_number(s): pass return False - -def is_valid_host_port_authority(target): - has_scheme = "://" in target - if not has_scheme: - target = "dummyscheme://"+target - try: - result = urlparse(target) - if result.hostname is not None and result.port is not None: - return True - except ValueError: - pass - return False