Repository: ambari Updated Branches: refs/heads/trunk 8e5eeb4de -> 4ce716f8e
http://git-wip-us.apache.org/repos/asf/ambari/blob/4ce716f8/ambari-server/src/main/resources/stacks/stack_advisor.py ---------------------------------------------------------------------- diff --git a/ambari-server/src/main/resources/stacks/stack_advisor.py b/ambari-server/src/main/resources/stacks/stack_advisor.py index 9979e7e..00b9d79 100644 --- a/ambari-server/src/main/resources/stacks/stack_advisor.py +++ b/ambari-server/src/main/resources/stacks/stack_advisor.py @@ -309,6 +309,8 @@ class DefaultStackAdvisor(StackAdvisor): implement """ + services = None + """ Filters the list of specified hosts object and returns a list of hosts which are not in maintenance mode. @@ -342,6 +344,7 @@ class DefaultStackAdvisor(StackAdvisor): return recommendations def createComponentLayoutRecommendations(self, services, hosts): + self.services = services recommendations = { "blueprint": { @@ -365,27 +368,14 @@ class DefaultStackAdvisor(StackAdvisor): #extend hostsComponentsMap' with MASTER components for service in services["services"]: masterComponents = [component for component in service["components"] if self.isMasterComponent(component)] + serviceAdvisor = self.instantiateServiceAdvisor(service) for component in masterComponents: componentName = component["StackServiceComponents"]["component_name"] - - if self.isComponentHostsPopulated(component): - hostsForComponent = component["StackServiceComponents"]["hostnames"] + hostsForComponent = [] + if serviceAdvisor is None: + hostsForComponent = self.getHostsForMasterComponent(services, hosts, component, hostsList, hostsComponentsMap) else: - - if len(hostsList) > 1 and self.isMasterComponentWithMultipleInstances(component): - hostsCount = self.getMinComponentCount(component) - if hostsCount > 1: # get first 'hostsCount' available hosts - hostsForComponent = [] - hostIndex = 0 - while hostsCount > len(hostsForComponent) and hostIndex < len(hostsList): - currentHost = hostsList[hostIndex] - if self.isHostSuitableForComponent(currentHost, component): - hostsForComponent.append(currentHost) - hostIndex += 1 - else: - hostsForComponent = [self.getHostForComponent(component, hostsList)] - else: - hostsForComponent = [self.getHostForComponent(component, hostsList)] + hostsForComponent = serviceAdvisor.getHostsForMasterComponent(self, services, hosts, component, hostsList, hostsComponentsMap) #extend 'hostsComponentsMap' with 'hostsForComponent' for hostName in hostsForComponent: @@ -402,41 +392,14 @@ class DefaultStackAdvisor(StackAdvisor): for service in services["services"]: slaveClientComponents = [component for component in service["components"] if self.isSlaveComponent(component) or self.isClientComponent(component)] + serviceAdvisor = self.instantiateServiceAdvisor(service) for component in slaveClientComponents: componentName = component["StackServiceComponents"]["component_name"] - - if component["StackServiceComponents"]["cardinality"] == "ALL": - hostsForComponent = hostsList + hostsForComponent = [] + if serviceAdvisor is None: + hostsForComponent = self.getHostsForSlaveComponent(services, hosts, component, hostsList, hostsComponentsMap, freeHosts) else: - componentIsPopulated = self.isComponentHostsPopulated(component) - if componentIsPopulated: - hostsForComponent = component["StackServiceComponents"]["hostnames"] - else: - hostsForComponent = [] - - if self.isSlaveComponent(component): - cardinality = str(component["StackServiceComponents"]["cardinality"]) - if self.isComponentUsingCardinalityForLayout(componentName) and cardinality: - # cardinality types: 1+, 1-2, 1, ALL - if "+" in cardinality: - hostsMin = int(cardinality[:-1]) - elif "-" in cardinality: - nums = cardinality.split("-") - hostsMin = int(nums[0]) - else: - hostsMin = int(cardinality) - if hostsMin > len(hostsForComponent): - hostsForComponent.extend(freeHosts[0:hostsMin-len(hostsForComponent)]) - # Components which are already installed, keep the recommendation as the existing layout - elif not componentIsPopulated: - hostsForComponent.extend(freeHosts) - if not hostsForComponent: # hostsForComponent is empty - hostsForComponent = hostsList[-1:] - hostsForComponent = list(set(hostsForComponent)) # removing duplicates - elif self.isClientComponent(component) and not componentIsPopulated: - hostsForComponent = freeHosts[0:1] - if not hostsForComponent: # hostsForComponent is empty - hostsForComponent = hostsList[-1:] + hostsForComponent = serviceAdvisor.getHostsForSlaveComponent(self, services, hosts, component, hostsList, hostsComponentsMap, freeHosts) #extend 'hostsComponentsMap' with 'hostsForComponent' for hostName in hostsForComponent: @@ -445,6 +408,13 @@ class DefaultStackAdvisor(StackAdvisor): if hostName in hostsSet: hostsComponentsMap[hostName].append( { "name": componentName } ) + #colocate custom services + for service in services["services"]: + serviceAdvisor = self.instantiateServiceAdvisor(service) + if serviceAdvisor is not None: + serviceComponents = [component for component in service["components"]] + serviceAdvisor.colocateService(self, hostsComponentsMap, serviceComponents) + #prepare 'host-group's from 'hostsComponentsMap' host_groups = recommendations["blueprint"]["host_groups"] bindings = recommendations["blueprint_cluster_binding"]["host_groups"] @@ -458,7 +428,110 @@ class DefaultStackAdvisor(StackAdvisor): return recommendations pass + def getHostsForMasterComponent(self, services, hosts, component, hostsList, hostsComponentsMap): + componentName = component["StackServiceComponents"]["component_name"] + + if self.isComponentHostsPopulated(component): + return component["StackServiceComponents"]["hostnames"] + + if len(hostsList) > 1 and self.isMasterComponentWithMultipleInstances(component): + hostsCount = self.getMinComponentCount(component) + if hostsCount > 1: # get first 'hostsCount' available hosts + hostsForComponent = [] + hostIndex = 0 + while hostsCount > len(hostsForComponent) and hostIndex < len(hostsList): + currentHost = hostsList[hostIndex] + if self.isHostSuitableForComponent(currentHost, component): + hostsForComponent.append(currentHost) + hostIndex += 1 + return hostsForComponent + + return [self.getHostForComponent(component, hostsList)] + + def getHostsForSlaveComponent(self, services, hosts, component, hostsList, hostsComponentsMap, freeHosts): + componentName = component["StackServiceComponents"]["component_name"] + + if component["StackServiceComponents"]["cardinality"] == "ALL": + return hostsList + + componentIsPopulated = self.isComponentHostsPopulated(component) + if componentIsPopulated: + return component["StackServiceComponents"]["hostnames"] + + hostsForComponent = [] + + if self.isSlaveComponent(component): + cardinality = str(component["StackServiceComponents"]["cardinality"]) + if self.isComponentUsingCardinalityForLayout(component) and cardinality: + # cardinality types: 1+, 1-2, 1 + if "+" in cardinality: + hostsMin = int(cardinality[:-1]) + elif "-" in cardinality: + nums = cardinality.split("-") + hostsMin = int(nums[0]) + else: + hostsMin = int(cardinality) + if hostsMin > len(hostsForComponent): + hostsForComponent.extend(freeHosts[0:hostsMin-len(hostsForComponent)]) + + else: + hostsForComponent.extend(freeHosts) + if not hostsForComponent: # hostsForComponent is empty + hostsForComponent = hostsList[-1:] + hostsForComponent = list(set(hostsForComponent)) # removing duplicates + elif self.isClientComponent(component): + hostsForComponent = freeHosts[0:1] + if not hostsForComponent: # hostsForComponent is empty + hostsForComponent = hostsList[-1:] + + return hostsForComponent + + def instantiateServiceAdvisorForComponent(self, componentName): + if self.services is None: + return None + + for service in self.services["services"]: + for component in service["components"]: + if componentName == self.getComponentName(component): + return self.instantiateServiceAdvisor(service) + + return None + + def instantiateServiceAdvisor(self, service): + import imp + import os + import traceback + + serviceName = service["StackServices"]["service_name"] + className = service["StackServices"]["advisor_name"] if "advisor_name" in service["StackServices"] else None + path = service["StackServices"]["advisor_path"] if "advisor_path" in service["StackServices"] else None + + if path is None or className is None: + return None + + if not os.path.exists(path): + return None + + try: + serviceAdvisor = None + with open(path, 'rb') as fp: + serviceAdvisor = imp.load_module('service_advisor_impl', fp, path, ('.py', 'rb', imp.PY_SOURCE)) + if hasattr(serviceAdvisor, className): + print "ServiceAdvisor implementation for service {0} was loaded".format(serviceName) + clazz = getattr(serviceAdvisor, className) + return clazz() + + except Exception as e: + traceback.print_exc() + print "Failed to load or create ServiceAdvisor implementation for service {0}".format(serviceName) + + return None + def isComponentUsingCardinalityForLayout(self, componentName): + serviceAdvisor = self.instantiateServiceAdvisorForComponent(componentName) + if serviceAdvisor is not None: + return serviceAdvisor.isComponentUsingCardinalityForLayout(componentName) + return False def createValidationResponse(self, services, validationItems): @@ -480,17 +553,116 @@ class DefaultStackAdvisor(StackAdvisor): def validateConfigurations(self, services, hosts): """Returns array of Validation objects about issues with hostnames components assigned to""" + self.services = services + validationItems = self.getConfigurationsValidationItems(services, hosts) return self.createValidationResponse(services, validationItems) def getComponentLayoutValidations(self, services, hosts): - return [] + self.services = services + + items = [] + if services is None: + return items + + for service in services["services"]: + advisor = self.instantiateServiceAdvisor(service) + if advisor is not None: + items.extend(advisor.getComponentLayoutValidations(self, services, hosts)) + + return items def getConfigurationClusterSummary(self, servicesList, hosts, components, services): pass + def validateClusterConfigurations(self, configurations, services, hosts): + validationItems = [] + + return self.toConfigurationValidationProblems(validationItems, "") + + def toConfigurationValidationProblems(self, validationProblems, siteName): + result = [] + for validationProblem in validationProblems: + validationItem = validationProblem.get("item", None) + if validationItem is not None: + problem = {"type": 'configuration', "level": validationItem["level"], "message": validationItem["message"], + "config-type": siteName, "config-name": validationProblem["config-name"] } + result.append(problem) + return result + + def validateServiceConfigurations(self, serviceName): + return self.getServiceConfigurationValidators().get(serviceName, None) + + def getServiceConfigurationValidators(self): + return {} + + def validateMinMax(self, items, recommendedDefaults, configurations): + + # required for casting to the proper numeric type before comparison + def convertToNumber(number): + try: + return int(number) + except ValueError: + return float(number) + + for configName in configurations: + validationItems = [] + if configName in recommendedDefaults and "property_attributes" in recommendedDefaults[configName]: + for propertyName in recommendedDefaults[configName]["property_attributes"]: + if propertyName in configurations[configName]["properties"]: + if "maximum" in recommendedDefaults[configName]["property_attributes"][propertyName] and \ + propertyName in recommendedDefaults[configName]["properties"]: + userValue = convertToNumber(configurations[configName]["properties"][propertyName]) + maxValue = convertToNumber(recommendedDefaults[configName]["property_attributes"][propertyName]["maximum"]) + if userValue > maxValue: + validationItems.extend([{"config-name": propertyName, "item": self.getWarnItem("Value is greater than the recommended maximum of {0} ".format(maxValue))}]) + if "minimum" in recommendedDefaults[configName]["property_attributes"][propertyName] and \ + propertyName in recommendedDefaults[configName]["properties"]: + userValue = convertToNumber(configurations[configName]["properties"][propertyName]) + minValue = convertToNumber(recommendedDefaults[configName]["property_attributes"][propertyName]["minimum"]) + if userValue < minValue: + validationItems.extend([{"config-name": propertyName, "item": self.getWarnItem("Value is less than the recommended minimum of {0} ".format(minValue))}]) + items.extend(self.toConfigurationValidationProblems(validationItems, configName)) + pass + + def getConfigurationsValidationItems(self, services, hosts): - return [] + """Returns array of Validation objects about issues with configuration values provided in services""" + items = [] + + recommendations = self.recommendConfigurations(services, hosts) + recommendedDefaults = recommendations["recommendations"]["blueprint"]["configurations"] + configurations = services["configurations"] + + for service in services["services"]: + items.extend(self.getConfigurationsValidationItemsForService(configurations, recommendedDefaults, service, services, hosts)) + + clusterWideItems = self.validateClusterConfigurations(configurations, services, hosts) + items.extend(clusterWideItems) + self.validateMinMax(items, recommendedDefaults, configurations) + return items + + def getConfigurationsValidationItemsForService(self, configurations, recommendedDefaults, service, services, hosts): + items = [] + serviceName = service["StackServices"]["service_name"] + validator = self.validateServiceConfigurations(serviceName) + if validator is not None: + for siteName, method in validator.items(): + if siteName in recommendedDefaults: + siteProperties = getSiteProperties(configurations, siteName) + if siteProperties is not None: + siteRecommendations = recommendedDefaults[siteName]["properties"] + print("SiteName: %s, method: %s\n" % (siteName, method.__name__)) + print("Site properties: %s\n" % str(siteProperties)) + print("Recommendations: %s\n********\n" % str(siteRecommendations)) + resultItems = method(siteProperties, siteRecommendations, configurations, services, hosts) + items.extend(resultItems) + + advisor = self.instantiateServiceAdvisor(service) + if advisor is not None: + items.extend(advisor.getConfigurationsValidationItems(self, configurations, recommendedDefaults, services, hosts)) + + return items def recommendConfigGroupsConfigurations(self, recommendations, services, components, hosts, servicesList): @@ -519,10 +691,15 @@ class DefaultStackAdvisor(StackAdvisor): configurations = {} - for service in servicesList: - calculation = self.getServiceConfigurationRecommender(service) + for service in services["services"]: + serviceName = service["StackServices"]["service_name"] + calculation = self.getServiceConfigurationRecommender(serviceName) if calculation is not None: calculation(configurations, cgClusterSummary, cgServices, cgHosts) + else: + advisor = self.instantiateServiceAdvisor(service) + if advisor is not None: + advisor.getServiceConfigurationRecommendations(self, configuration, cgClusterSummary, cgServices, cgHosts) cgRecommendation = { "configurations": {}, @@ -551,6 +728,8 @@ class DefaultStackAdvisor(StackAdvisor): configElement][property] = value def recommendConfigurations(self, services, hosts): + self.services = services + stackName = services["Versions"]["stack_name"] stackVersion = services["Versions"]["stack_version"] hostsList = [host["Hosts"]["host_name"] for host in hosts["items"]] @@ -583,10 +762,15 @@ class DefaultStackAdvisor(StackAdvisor): else: configurations = recommendations["recommendations"]["blueprint"]["configurations"] - for service in servicesList: - calculation = self.getServiceConfigurationRecommender(service) + for service in services["services"]: + serviceName = service["StackServices"]["service_name"] + calculation = self.getServiceConfigurationRecommender(serviceName) if calculation is not None: calculation(configurations, clusterSummary, services, hosts) + else: + advisor = self.instantiateServiceAdvisor(service) + if advisor is not None: + advisor.getServiceConfigurationRecommendations(self, configurations, clusterSummary, services, hosts) return recommendations @@ -623,11 +807,19 @@ class DefaultStackAdvisor(StackAdvisor): def isMasterComponentWithMultipleInstances(self, component): componentName = self.getComponentName(component) + serviceAdvisor = self.instantiateServiceAdvisorForComponent(componentName) + if serviceAdvisor is not None: + return serviceAdvisor.isMasterComponentWithMultipleInstances(componentName) + masters = self.getMastersWithMultipleInstances() return componentName in masters def isComponentNotValuable(self, component): componentName = self.getComponentName(component) + serviceAdvisor = self.instantiateServiceAdvisorForComponent(componentName) + if serviceAdvisor is not None: + return serviceAdvisor.isComponentNotValuable(componentName) + service = self.getNotValuableComponents() return componentName in service @@ -637,6 +829,10 @@ class DefaultStackAdvisor(StackAdvisor): # Helper dictionaries def getComponentCardinality(self, componentName): + serviceAdvisor = self.instantiateServiceAdvisorForComponent(componentName) + if serviceAdvisor is not None: + return serviceAdvisor.getComponentCardinality(componentName) + return self.getCardinalitiesDict().get(componentName, {"min": 1, "max": 1}) def getHostForComponent(self, component, hostsList): @@ -660,6 +856,10 @@ class DefaultStackAdvisor(StackAdvisor): """ Provides a scheme for laying out given component on different number of hosts. """ + serviceAdvisor = self.instantiateServiceAdvisorForComponent(componentName) + if serviceAdvisor is not None: + return serviceAdvisor.getComponentLayoutScheme(componentName) + return self.getComponentLayoutSchemes().get(componentName, None) def getComponentName(self, component): @@ -667,6 +867,10 @@ class DefaultStackAdvisor(StackAdvisor): def isComponentNotPreferableOnAmbariServerHost(self, component): componentName = self.getComponentName(component) + serviceAdvisor = self.instantiateServiceAdvisorForComponent(componentName) + if serviceAdvisor is not None: + return serviceAdvisor.isComponentNotPreferableOnAmbariServerHost(componentName) + service = self.getNotPreferableOnServerComponents() return componentName in service @@ -787,3 +991,9 @@ class DefaultStackAdvisor(StackAdvisor): return [int(x) for x in re.sub(r'(\.0+)*$','', v).split(".")] return cmp(normalize(version1), normalize(version2)) pass + +def getSiteProperties(configurations, siteName): + siteConfig = configurations.get(siteName) + if siteConfig is None: + return None + return siteConfig.get("properties") http://git-wip-us.apache.org/repos/asf/ambari/blob/4ce716f8/ambari-server/src/test/python/stacks/2.3/common/test_stack_advisor.py ---------------------------------------------------------------------- diff --git a/ambari-server/src/test/python/stacks/2.3/common/test_stack_advisor.py b/ambari-server/src/test/python/stacks/2.3/common/test_stack_advisor.py index fcb5407..95be86b 100644 --- a/ambari-server/src/test/python/stacks/2.3/common/test_stack_advisor.py +++ b/ambari-server/src/test/python/stacks/2.3/common/test_stack_advisor.py @@ -87,7 +87,6 @@ class TestHDP23StackAdvisor(TestCase): def test_createComponentLayoutRecommendations_hawq_1_Host(self): - """ Test that HAWQSTANDBY is not recommended on a single node cluster """ services = self.load_json("services-hawq-1-host.json") componentsListList = [service["components"] for service in services["services"]] @@ -99,6 +98,7 @@ class TestHDP23StackAdvisor(TestCase): hostsList = [host["Hosts"]["host_name"] for host in hosts["items"]] self.assertEquals(len(hostsList), 1) + self.insertHAWQServiceAdvisorInfo(services) recommendations = self.stackAdvisor.createComponentLayoutRecommendations(services, hosts) recommendedComponentsListList = [hostgroup["components"] for hostgroup in recommendations["blueprint"]["host_groups"]] @@ -107,6 +107,19 @@ class TestHDP23StackAdvisor(TestCase): self.assertFalse('HAWQSTANDBY' in recommendedComponents) self.assertTrue('HAWQSEGMENT' in recommendedComponents) + def insertHAWQServiceAdvisorInfo(self, services): + for service in services["services"]: + if service["StackServices"]["service_name"] == 'HAWQ': + service["StackServices"]["advisor_name"] = "HAWQ200ServiceAdvisor" + path = os.path.join(self.testDirectory, '../../../../../main/resources/common-services/HAWQ/2.0.0/service_advisor.py') + service["StackServices"]["advisor_path"] = path + + def insertPXFServiceAdvisorInfo(self, services): + for service in services["services"]: + if service["StackServices"]["service_name"] == 'PXF': + service["StackServices"]["advisor_name"] = "PXF300ServiceAdvisor" + path = os.path.join(self.testDirectory, '../../../../../main/resources/common-services/PXF/3.0.0/service_advisor.py') + service["StackServices"]["advisor_path"] = path def test_createComponentLayoutRecommendations_hawq_3_Hosts(self): """ Test that HAWQSTANDBY is recommended on a 3-node cluster """ @@ -121,9 +134,11 @@ class TestHDP23StackAdvisor(TestCase): hostsList = [host["Hosts"]["host_name"] for host in hosts["items"]] self.assertEquals(len(hostsList), 3) - recommendations = self.stackAdvisor.createComponentLayoutRecommendations(services, hosts) + self.insertHAWQServiceAdvisorInfo(services) + recommendations = self.stackAdvisor.recommendComponentLayout(services, hosts) + layoutRecommendations = recommendations["recommendations"] - recommendedComponentsListList = [hostgroup["components"] for hostgroup in recommendations["blueprint"]["host_groups"]] + recommendedComponentsListList = [hostgroup["components"] for hostgroup in layoutRecommendations["blueprint"]["host_groups"]] recommendedComponents = [item["name"] for sublist in recommendedComponentsListList for item in sublist] self.assertTrue('HAWQMASTER' in recommendedComponents) self.assertTrue('HAWQSTANDBY' in recommendedComponents) @@ -150,9 +165,11 @@ class TestHDP23StackAdvisor(TestCase): hostsList = [host["Hosts"]["host_name"] for host in hosts["items"]] self.assertEquals(len(hostsList), 3) - recommendations = self.stackAdvisor.createComponentLayoutRecommendations(services, hosts) + self.insertHAWQServiceAdvisorInfo(services) + recommendations = self.stackAdvisor.recommendComponentLayout(services, hosts) + layoutRecommendations = recommendations["recommendations"] - recommendedComponentsListList = [hostgroup["components"] for hostgroup in recommendations["blueprint"]["host_groups"]] + recommendedComponentsListList = [hostgroup["components"] for hostgroup in layoutRecommendations["blueprint"]["host_groups"]] recommendedComponents = [item["name"] for sublist in recommendedComponentsListList for item in sublist] self.assertFalse('HAWQMASTER' in recommendedComponents) self.assertFalse('HAWQSTANDBY' in recommendedComponents) @@ -198,6 +215,7 @@ class TestHDP23StackAdvisor(TestCase): } hawqSegmentHosts = set(["c6401.ambari.apache.org", "c6402.ambari.apache.org", "c6403.ambari.apache.org"]) + self.insertHAWQServiceAdvisorInfo(services) recommendations = self.stackAdvisor.createComponentLayoutRecommendations(services, hosts) hostGroups = [ hostgroup["name"] for hostgroup in recommendations["blueprint"]["host_groups"] if {"name": "HAWQSEGMENT"} in hostgroup["components"] ] hostNames = [ host["fqdn"] for hostgroup in recommendations["blueprint_cluster_binding"]["host_groups"] if hostgroup["name"] in hostGroups for host in hostgroup["hosts"] ] @@ -243,6 +261,7 @@ class TestHDP23StackAdvisor(TestCase): } hawqSegmentHosts = set(["c6401.ambari.apache.org", "c6403.ambari.apache.org"]) + self.insertHAWQServiceAdvisorInfo(services) recommendations = self.stackAdvisor.createComponentLayoutRecommendations(services, hosts) hostGroups = [ hostgroup["name"] for hostgroup in recommendations["blueprint"]["host_groups"] if {"name": "HAWQSEGMENT"} in hostgroup["components"] ] hostNames = [ host["fqdn"] for hostgroup in recommendations["blueprint_cluster_binding"]["host_groups"] if hostgroup["name"] in hostGroups for host in hostgroup["hosts"] ] @@ -303,6 +322,7 @@ class TestHDP23StackAdvisor(TestCase): } hawqSegmentHosts = set(["c6402.ambari.apache.org"]) + self.insertHAWQServiceAdvisorInfo(services) recommendations = self.stackAdvisor.createComponentLayoutRecommendations(services, hosts) hostGroups = [ hostgroup["name"] for hostgroup in recommendations["blueprint"]["host_groups"] if {"name": "HAWQSEGMENT"} in hostgroup["components"] ] hostNames = [ host["fqdn"] for hostgroup in recommendations["blueprint_cluster_binding"]["host_groups"] if hostgroup["name"] in hostGroups for host in hostgroup["hosts"] ] @@ -357,6 +377,8 @@ class TestHDP23StackAdvisor(TestCase): } pxfHosts = set(["c6401.ambari.apache.org", "c6402.ambari.apache.org", "c6403.ambari.apache.org"]) + + self.insertPXFServiceAdvisorInfo(services) recommendations = self.stackAdvisor.createComponentLayoutRecommendations(services, hosts) hostGroups = [ hostgroup["name"] for hostgroup in recommendations["blueprint"]["host_groups"] if {"name": "PXF"} in hostgroup["components"] ] hostNames = [ host["fqdn"] for hostgroup in recommendations["blueprint_cluster_binding"]["host_groups"] if hostgroup["name"] in hostGroups for host in hostgroup["hosts"] ] @@ -411,6 +433,8 @@ class TestHDP23StackAdvisor(TestCase): } pxfHosts = set(["c6401.ambari.apache.org", "c6402.ambari.apache.org", "c6403.ambari.apache.org"]) + + self.insertPXFServiceAdvisorInfo(services) recommendations = self.stackAdvisor.createComponentLayoutRecommendations(services, hosts) hostGroups = [ hostgroup["name"] for hostgroup in recommendations["blueprint"]["host_groups"] if {"name": "PXF"} in hostgroup["components"] ] hostNames = [ host["fqdn"] for hostgroup in recommendations["blueprint_cluster_binding"]["host_groups"] if hostgroup["name"] in hostGroups for host in hostgroup["hosts"] ] @@ -481,6 +505,8 @@ class TestHDP23StackAdvisor(TestCase): } pxfHosts = set(["c6402.ambari.apache.org"]) + + self.insertPXFServiceAdvisorInfo(services) recommendations = self.stackAdvisor.createComponentLayoutRecommendations(services, hosts) hostGroups = [ hostgroup["name"] for hostgroup in recommendations["blueprint"]["host_groups"] if {"name": "PXF"} in hostgroup["components"] ] hostNames = [ host["fqdn"] for hostgroup in recommendations["blueprint_cluster_binding"]["host_groups"] if hostgroup["name"] in hostGroups for host in hostgroup["hosts"] ] @@ -507,6 +533,7 @@ class TestHDP23StackAdvisor(TestCase): datanodeComponent = self.__getHosts(componentsList, "DATANODE") datanodeComponent["hostnames"] = ['c6402.ambari.apache.org'] + self.insertHAWQServiceAdvisorInfo(services) validations = self.stackAdvisor.getComponentLayoutValidations(services, hosts) expected = { 'type': 'host-component', @@ -538,6 +565,7 @@ class TestHDP23StackAdvisor(TestCase): hostsList = [host["Hosts"]["host_name"] for host in hosts["items"]] self.assertEquals(len(hostsList), 3) + self.insertPXFServiceAdvisorInfo(services) validations = [validation for validation in self.stackAdvisor.getComponentLayoutValidations(services, hosts) if validation["component-name"] == "PXF"] self.assertEquals(len(validations), 1) expected = { @@ -566,6 +594,7 @@ class TestHDP23StackAdvisor(TestCase): hostsList = [host["Hosts"]["host_name"] for host in hosts["items"]] self.assertEquals(len(hostsList), 3) + self.insertPXFServiceAdvisorInfo(services) validations = [validation for validation in self.stackAdvisor.getComponentLayoutValidations(services, hosts) if validation["component-name"] == "PXF"] self.assertEquals(len(validations), 1) expected = { @@ -594,6 +623,7 @@ class TestHDP23StackAdvisor(TestCase): hostsList = [host["Hosts"]["host_name"] for host in hosts["items"]] self.assertEquals(len(hostsList), 3) + self.insertPXFServiceAdvisorInfo(services) validations = [validation for validation in self.stackAdvisor.getComponentLayoutValidations(services, hosts) if validation["component-name"] == "PXF"] self.assertEquals(len(validations), 1) expected = { @@ -622,6 +652,7 @@ class TestHDP23StackAdvisor(TestCase): hostsList = [host["Hosts"]["host_name"] for host in hosts["items"]] self.assertEquals(len(hostsList), 3) + self.insertPXFServiceAdvisorInfo(services) validations = [validation for validation in self.stackAdvisor.getComponentLayoutValidations(services, hosts) if validation["component-name"] == "PXF"] self.assertEquals(len(validations), 0) @@ -644,6 +675,7 @@ class TestHDP23StackAdvisor(TestCase): hostsList = [host["Hosts"]["host_name"] for host in hosts["items"]] self.assertEquals(len(hostsList), 3) + self.insertHAWQServiceAdvisorInfo(services) validations = self.stackAdvisor.getComponentLayoutValidations(services, hosts) self.assertEquals(len(validations), 0) @@ -657,6 +689,7 @@ class TestHDP23StackAdvisor(TestCase): self.assertEquals(len(hawqStandbyHosts[0]), 1) self.assertEquals(hawqMasterHosts[0][0], hawqStandbyHosts[0][0]) + self.insertHAWQServiceAdvisorInfo(services) validations = self.stackAdvisor.getComponentLayoutValidations(services, hosts) self.assertEquals(len(validations), 1) expected = { @@ -679,6 +712,7 @@ class TestHDP23StackAdvisor(TestCase): self.assertNotEquals(hawqMasterHosts[0][0], hawqStandbyHosts[0][0]) self.assertEquals(hawqMasterHosts[0][0], "c6401.ambari.apache.org") + self.insertHAWQServiceAdvisorInfo(services) validations = self.stackAdvisor.getComponentLayoutValidations(services, hosts) self.assertEquals(len(validations), 1) expected = { @@ -703,6 +737,7 @@ class TestHDP23StackAdvisor(TestCase): self.assertNotEquals(hawqMasterHosts[0][0], hawqStandbyHosts[0][0]) self.assertEquals(hawqStandbyHosts[0][0], "c6401.ambari.apache.org") + self.insertHAWQServiceAdvisorInfo(services) validations = self.stackAdvisor.getComponentLayoutValidations(services, hosts) self.assertEquals(len(validations), 1) expected = { @@ -734,6 +769,7 @@ class TestHDP23StackAdvisor(TestCase): hostsList = [host["Hosts"]["host_name"] for host in hosts["items"]] self.assertEquals(len(hostsList), 3) + self.insertHAWQServiceAdvisorInfo(services) validations = self.stackAdvisor.getComponentLayoutValidations(services, hosts) self.assertEquals(len(validations), 0) @@ -1939,6 +1975,18 @@ class TestHDP23StackAdvisor(TestCase): self.stackAdvisor.recommendTezConfigurations(configurations, clusterData, services, hosts) self.assertEquals(configurations, expected) + def createHAWQServiceAdvisor(self): + path = os.path.join(self.testDirectory, '../../../../../main/resources/common-services/HAWQ/2.0.0/service_advisor.py') + service = {"service_name" : "HAWQ", "service_version" : "2.0", "advisor_path" : path, "advisor_name" : "HAWQ200ServiceAdvisor"} + service = {"StackServices" : service} + return self.stackAdvisor.instantiateServiceAdvisor(service) + + def createPXFServiceAdvisor(self): + path = os.path.join(self.testDirectory, '../../../../../main/resources/common-services/PXF/3.0.0/service_advisor.py') + service = {"service_name" : "PXF", "service_version" : "3.0", "advisor_path" : path, "advisor_name" : "PXF300ServiceAdvisor"} + service = {"StackServices" : service} + return self.stackAdvisor.instantiateServiceAdvisor(service) + def test_recommendHAWQConfigurations(self): # original cluster data with 3 segments @@ -1963,7 +2011,8 @@ class TestHDP23StackAdvisor(TestCase): # Test 1 - with 3 segments self.assertEquals(len(hawqSegmentComponent["hostnames"]), 3) - self.stackAdvisor.recommendHAWQConfigurations(configurations, clusterData, services, None) + serviceAdvisor = self.createHAWQServiceAdvisor() + serviceAdvisor.getServiceConfigurationRecommendations(self.stackAdvisor, configurations, clusterData, services, None) self.assertEquals(configurations["hawq-site"]["properties"]["default_hash_table_bucket_number"], str(3 * 6)) self.assertEquals(configurations["hdfs-client"]["properties"]["output.replace-datanode-on-failure"], "false") @@ -1973,19 +2022,19 @@ class TestHDP23StackAdvisor(TestCase): # Test 2 - with 100 segments hawqSegmentComponent["hostnames"] = ["host" + str(i) for i in range(100)] - self.stackAdvisor.recommendHAWQConfigurations(configurations, clusterData, services, None) + serviceAdvisor.getServiceConfigurationRecommendations(self.stackAdvisor, configurations, clusterData, services, None) self.assertEquals(configurations["hawq-site"]["properties"]["default_hash_table_bucket_number"], str(100 * 5)) self.assertEquals(configurations["hdfs-client"]["properties"]["output.replace-datanode-on-failure"], "true") # Test 3 - with 512 segments hawqSegmentComponent["hostnames"] = ["host" + str(i) for i in range(512)] - self.stackAdvisor.recommendHAWQConfigurations(configurations, clusterData, services, None) + serviceAdvisor.getServiceConfigurationRecommendations(self.stackAdvisor, configurations, clusterData, services, None) self.assertEquals(configurations["hawq-site"]["properties"]["default_hash_table_bucket_number"], "512") self.assertEquals(configurations["hdfs-client"]["properties"]["output.replace-datanode-on-failure"], "true") # Test 4 - with 513 segments hawqSegmentComponent["hostnames"] = ["host" + str(i) for i in range(513)] - self.stackAdvisor.recommendHAWQConfigurations(configurations, clusterData, services, None) + serviceAdvisor.getServiceConfigurationRecommendations(self.stackAdvisor, configurations, clusterData, services, None) self.assertEquals(configurations["hawq-site"]["properties"]["default_hash_table_bucket_number"], "512") self.assertEquals(configurations["hdfs-client"]["properties"]["output.replace-datanode-on-failure"], "true") @@ -1993,9 +2042,9 @@ class TestHDP23StackAdvisor(TestCase): configurations = {} services["configurations"]["hawq-site"] = {"properties":{'hawq-site': {'properties': {}}}} hawqSegmentComponent["hostnames"] = [] - self.stackAdvisor.recommendHAWQConfigurations(configurations, clusterData, services, None) + serviceAdvisor.getServiceConfigurationRecommendations(self.stackAdvisor, configurations, clusterData, services, None) self.assertEquals(configurations, {'hdfs-client': {'properties': {'output.replace-datanode-on-failure': 'false'}}, - 'hawq-site': {'properties': {}}}) + 'hawq-site': {'properties': {}}, 'hdfs-site': {'properties': {'dfs.allow.truncate': 'true'}}}) def test_validateHiveConfigurations(self): properties = {"hive_security_authorization": "None", @@ -2295,7 +2344,8 @@ class TestHDP23StackAdvisor(TestCase): } } - problems = self.stackAdvisor.validateHAWQSiteConfigurations(properties, defaults, configurations, services, hosts) + serviceAdvisor = self.createHAWQServiceAdvisor() + problems = serviceAdvisor.validateHAWQSiteConfigurations(self.stackAdvisor, properties, defaults, configurations, services, hosts) problems_dict = {} for problem in problems: problems_dict[problem['config-name']] = problem @@ -2306,7 +2356,7 @@ class TestHDP23StackAdvisor(TestCase): configurations["hawq-site"] = {"properties": {"hawq_master_directory": "/data/hawq/master", "hawq_segment_directory": "/data/hawq/segment"}} properties = configurations["hawq-site"]["properties"] - problems = self.stackAdvisor.validateHAWQSiteConfigurations(properties, defaults, configurations, services, hosts) + problems = serviceAdvisor.validateHAWQSiteConfigurations(self.stackAdvisor, properties, defaults, configurations, services, hosts) problems_dict = {} self.assertEqual(len(problems), 0) expected_warnings = {} @@ -2315,7 +2365,7 @@ class TestHDP23StackAdvisor(TestCase): configurations["hawq-site"] = {"properties": {"hawq_master_directory": "/data/hawq/master1,/data/hawq/master2", "hawq_segment_directory": "/data/hawq/segment1 /data/hawq/segment2"}} properties = configurations["hawq-site"]["properties"] - problems = self.stackAdvisor.validateHAWQSiteConfigurations(properties, defaults, configurations, services, hosts) + problems = serviceAdvisor.validateHAWQSiteConfigurations(self.stackAdvisor, properties, defaults, configurations, services, hosts) problems_dict = {} for problem in problems: problems_dict[problem['config-name']] = problem @@ -2369,7 +2419,7 @@ class TestHDP23StackAdvisor(TestCase): "level": "ERROR" } ] """ - problems = self.stackAdvisor.validateHAWQSiteConfigurations(properties, defaults, services["configurations"], services, hosts) + problems = serviceAdvisor.validateHAWQSiteConfigurations(self.stackAdvisor, properties, defaults, services["configurations"], services, hosts) self.assertEqual(len(problems), 1) expected = { "config-type": "hawq-site", @@ -2383,20 +2433,20 @@ class TestHDP23StackAdvisor(TestCase): # case 2: hawq_global_rm_type is set as yarn, and YARN service is installed. No validation errors expected. services["services"].append({"StackServices" : {"service_name" : "YARN"}, "components":[]}) - problems = self.stackAdvisor.validateHAWQSiteConfigurations(properties, defaults, services["configurations"], services, hosts) + problems = serviceAdvisor.validateHAWQSiteConfigurations(self.stackAdvisor, properties, defaults, services["configurations"], services, hosts) self.assertEqual(len(problems), 0) # Test HAWQ Master port conflict with Ambari Server Postgres port # case 1: HAWQ Master is placed on Ambari Server and HAWQ Master port is same as Ambari Server Postgres Port - self.stackAdvisor.isHawqMasterComponentOnAmbariServer = MagicMock(return_value=True) + serviceAdvisor.isHawqMasterComponentOnAmbariServer = MagicMock(return_value=True) configurations = { "hawq-site": { "properties": {"hawq_master_address_port": "5432"} } } - problems = self.stackAdvisor.validateHAWQSiteConfigurations(properties, defaults, configurations, services, hosts) + problems = serviceAdvisor.validateHAWQSiteConfigurations(self.stackAdvisor, properties, defaults, configurations, services, hosts) self.assertEqual(len(problems), 1) expected = { "config-name": "hawq_master_address_port", @@ -2409,21 +2459,21 @@ class TestHDP23StackAdvisor(TestCase): self.assertEqual(problems[0], expected) # case 2: HAWQ Master is placed on Ambari Server and HAWQ Master port is different from Ambari Server Postgres Port - self.stackAdvisor.isHawqMasterComponentOnAmbariServer = MagicMock(return_value=True) + serviceAdvisor.isHawqMasterComponentOnAmbariServer = MagicMock(return_value=True) configurations["hawq-site"]["properties"]["hawq_master_address_port"] = "10432" - problems = self.stackAdvisor.validateHAWQSiteConfigurations(properties, defaults, configurations, services, hosts) + problems = serviceAdvisor.validateHAWQSiteConfigurations(self.stackAdvisor, properties, defaults, configurations, services, hosts) self.assertEqual(len(problems), 0) # case 3: HAWQ Master is not placed on Ambari Server and HAWQ Master port is same as Ambari Server Postgres Port - self.stackAdvisor.isHawqMasterComponentOnAmbariServer = MagicMock(return_value=False) + serviceAdvisor.isHawqMasterComponentOnAmbariServer = MagicMock(return_value=False) configurations["hawq-site"]["properties"]["hawq_master_address_port"] = "5432" - problems = self.stackAdvisor.validateHAWQSiteConfigurations(properties, defaults, configurations, services, hosts) + problems = serviceAdvisor.validateHAWQSiteConfigurations(self.stackAdvisor, properties, defaults, configurations, services, hosts) self.assertEqual(len(problems), 0) # case 4: HAWQ Master is not placed on Ambari Server and HAWQ Master port is different from Ambari Server Postgres Port - self.stackAdvisor.isHawqMasterComponentOnAmbariServer = MagicMock(return_value=False) + serviceAdvisor.isHawqMasterComponentOnAmbariServer = MagicMock(return_value=False) configurations["hawq-site"]["properties"]["hawq_master_address_port"] = "10432" - problems = self.stackAdvisor.validateHAWQSiteConfigurations(properties, defaults, configurations, services, hosts) + problems = serviceAdvisor.validateHAWQSiteConfigurations(self.stackAdvisor, properties, defaults, configurations, services, hosts) self.assertEqual(len(problems), 0) # -------- test query limits warning ---------- @@ -2453,14 +2503,14 @@ class TestHDP23StackAdvisor(TestCase): 'config-name': 'default_hash_table_bucket_number', 'level': 'ERROR' } - problems = self.stackAdvisor.validateHAWQSiteConfigurations(properties, defaults, configurations, services, hosts) + problems = serviceAdvisor.validateHAWQSiteConfigurations(self.stackAdvisor, properties, defaults, configurations, services, hosts) self.assertEqual(len(problems), 1) self.assertEqual(problems[0], expected) configurations["hawq-site"] = {"properties": {"default_hash_table_bucket_number": "500", "hawq_rm_nvseg_perquery_limit": "500"}} properties = configurations["hawq-site"]["properties"] - problems = self.stackAdvisor.validateHAWQSiteConfigurations(properties, defaults, configurations, services, hosts) + problems = serviceAdvisor.validateHAWQSiteConfigurations(self.stackAdvisor, properties, defaults, configurations, services, hosts) self.assertEqual(len(problems), 0) @@ -2492,19 +2542,20 @@ class TestHDP23StackAdvisor(TestCase): 'level': 'WARN' } - problems = self.stackAdvisor.validateHAWQHdfsClientConfigurations(properties, defaults, configurations, services, hosts) + serviceAdvisor = self.createHAWQServiceAdvisor() + problems = serviceAdvisor.validateHAWQHdfsClientConfigurations(self.stackAdvisor, properties, defaults, configurations, services, hosts) self.assertEqual(len(problems), 1) self.assertEqual(problems[0], expected) # 2. Try with 3 hosts services["services"][0]["components"][0]["StackServiceComponents"]["hostnames"] = ["host1", "host2", "host3"] - problems = self.stackAdvisor.validateHAWQHdfsClientConfigurations(properties, defaults, configurations, services, hosts) + problems = serviceAdvisor.validateHAWQHdfsClientConfigurations(self.stackAdvisor, properties, defaults, configurations, services, hosts) self.assertEqual(len(problems), 1) self.assertEqual(problems[0], expected) # 3. Try with 4 hosts - default value services["services"][0]["components"][0]["StackServiceComponents"]["hostnames"] = ["host1", "host2", "host3", "host4"] - problems = self.stackAdvisor.validateHAWQHdfsClientConfigurations(properties, defaults, configurations, services, hosts) + problems = serviceAdvisor.validateHAWQHdfsClientConfigurations(self.stackAdvisor, properties, defaults, configurations, services, hosts) self.assertEqual(len(problems), 0) # 4. Try with 4 hosts @@ -2516,7 +2567,7 @@ class TestHDP23StackAdvisor(TestCase): 'config-name': 'output.replace-datanode-on-failure', 'level': 'WARN' } - problems = self.stackAdvisor.validateHAWQHdfsClientConfigurations(properties, defaults, configurations, services, hosts) + problems = serviceAdvisor.validateHAWQHdfsClientConfigurations(self.stackAdvisor, properties, defaults, configurations, services, hosts) self.assertEqual(len(problems), 1) self.assertEqual(problems[0], expected)
