Repository: ambari Updated Branches: refs/heads/trunk 6168a47ad -> ef9a417fc
AMBARI-17719. HAWQ install should recommend required values for HDFS properties. (adenissov) Project: http://git-wip-us.apache.org/repos/asf/ambari/repo Commit: http://git-wip-us.apache.org/repos/asf/ambari/commit/ef9a417f Tree: http://git-wip-us.apache.org/repos/asf/ambari/tree/ef9a417f Diff: http://git-wip-us.apache.org/repos/asf/ambari/diff/ef9a417f Branch: refs/heads/trunk Commit: ef9a417fc7755dde7f3cd1986854df84b71a60ae Parents: 6168a47 Author: Alexander Denissov <[email protected]> Authored: Mon Jul 18 15:05:37 2016 -0700 Committer: Alexander Denissov <[email protected]> Committed: Mon Jul 18 15:05:37 2016 -0700 ---------------------------------------------------------------------- .../HAWQ/2.0.0/service_advisor.py | 213 +++++++++++-------- .../src/main/resources/stacks/stack_advisor.py | 14 +- .../HAWQ/test_service_advisor.py | 39 ++++ 3 files changed, 177 insertions(+), 89 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/ambari/blob/ef9a417f/ambari-server/src/main/resources/common-services/HAWQ/2.0.0/service_advisor.py ---------------------------------------------------------------------- diff --git a/ambari-server/src/main/resources/common-services/HAWQ/2.0.0/service_advisor.py b/ambari-server/src/main/resources/common-services/HAWQ/2.0.0/service_advisor.py index feb3d52..dffe57d 100644 --- a/ambari-server/src/main/resources/common-services/HAWQ/2.0.0/service_advisor.py +++ b/ambari-server/src/main/resources/common-services/HAWQ/2.0.0/service_advisor.py @@ -129,11 +129,49 @@ class HAWQ200ServiceAdvisor(service_advisor.ServiceAdvisor): return any([self.isLocalHost(host) for host in hawqMasterComponentHosts]) def getServiceConfigurationRecommendations(self, configurations, clusterData, services, hosts): - putHdfsSiteProperty = self.putProperty(configurations, "hdfs-site", services) - - # Set dfs.allow.truncate to true - putHdfsSiteProperty('dfs.allow.truncate', 'true') - + # Determine if the cluster is secured + if "cluster-env" in services["configurations"] and "security_enabled" in services["configurations"]["cluster-env"]["properties"]: + is_secured = services["configurations"]["cluster-env"]["properties"]["security_enabled"] + else: + is_secured = "false" + + # Update HDFS properties in hdfs-site + if "hdfs-site" in services["configurations"]: + hdfs_site = services["configurations"]["hdfs-site"]["properties"] + putHdfsSiteProperty = self.putProperty(configurations, "hdfs-site", services) + putHdfsSitePropertyAttribute = self.putPropertyAttribute(configurations, "hdfs-site") + + hdfs_site_desired_values = { + "dfs.allow.truncate" : "true", + "dfs.block.access.token.enable" : is_secured, + "dfs.block.local-path-access.user" : "gpadmin", + "dfs.client.read.shortcircuit" : "true", + "dfs.client.use.legacy.blockreader.local" : "false", + "dfs.datanode.data.dir.perm" : "750", + "dfs.datanode.handler.count" : "60", + "dfs.datanode.max.transfer.threads" : "40960", + "dfs.namenode.accesstime.precision" : "0", + "dfs.namenode.handler.count" : "200", + "dfs.support.append" : "true" + } + for property, desired_value in hdfs_site_desired_values.iteritems(): + if property not in hdfs_site or hdfs_site[property] != desired_value: + putHdfsSiteProperty(property, desired_value) + + # Update HDFS properties in core-site + if "core-site" in services["configurations"]: + core_site = services["configurations"]["core-site"]["properties"] + putCoreSiteProperty = self.putProperty(configurations, "core-site", services) + + core_site_desired_values = { + "ipc.client.connection.maxidletime" : "3600000", + "ipc.server.listen.queue.size" : "3300" + } + for property, desired_value in core_site_desired_values.iteritems(): + if property not in core_site or core_site[property] != desired_value: + putCoreSiteProperty(property, desired_value) + + # Process HAWQ specific properties if any(x in services["configurations"] for x in ["hawq-site", "hdfs-client", "hawq-sysctl-env"]): componentsListList = [service["components"] for service in services["services"]] componentsList = [item["StackServiceComponents"] for sublist in componentsListList for item in sublist] @@ -145,88 +183,89 @@ class HAWQ200ServiceAdvisor(service_advisor.ServiceAdvisor): minHawqHostsMemory = min([host['Hosts']['total_mem'] for host in hosts['items'] if host['Hosts']['host_name'] in hawqHosts]) minHawqHostsCoreCount = min([host['Hosts']['cpu_count'] for host in hosts['items'] if host['Hosts']['host_name'] in hawqHosts]) - hawq_site = services["configurations"]["hawq-site"]["properties"] - putHawqSiteProperty = self.putProperty(configurations, "hawq-site", services) - putHawqSitePropertyAttribute = self.putPropertyAttribute(configurations, "hawq-site") - hawq_sysctl_env = services["configurations"]["hawq-sysctl-env"]["properties"] - putHawqSysctlEnvProperty = self.putProperty(configurations, "hawq-sysctl-env", services) - putHawqSysctlEnvPropertyAttribute = self.putPropertyAttribute(configurations, "hawq-sysctl-env") - - # remove master port when master is colocated with Ambari server - if self.isHawqMasterComponentOnAmbariServer(services) and "hawq_master_address_port" in hawq_site: - putHawqSiteProperty('hawq_master_address_port', '') - - # update query limits if segments are deployed - if numSegments and "default_hash_table_bucket_number" in hawq_site and "hawq_rm_nvseg_perquery_limit" in hawq_site: - factor_min = 1 - factor_max = 6 - limit = int(hawq_site["hawq_rm_nvseg_perquery_limit"]) - factor = limit / numSegments - # if too many segments or default limit is too low --> stick with the limit - if factor < factor_min: - buckets = limit - # if the limit is large and results in factor > max --> limit factor to max - elif factor > factor_max: - buckets = factor_max * numSegments - else: - buckets = factor * numSegments - putHawqSiteProperty('default_hash_table_bucket_number', buckets) - putHawqSitePropertyAttribute('default_hash_table_bucket_number', "maximum", numSegments * 16 if 10000 > numSegments * 16 else 10000) - - # update YARN RM urls with the values from yarn-site if YARN is installed - if "YARN" in servicesList and "yarn-site" in services["configurations"]: - yarn_site = services["configurations"]["yarn-site"]["properties"] - for hs_prop, ys_prop in self.getHAWQYARNPropertyMapping().items(): - if hs_prop in hawq_site and ys_prop in yarn_site: - putHawqSiteProperty(hs_prop, yarn_site[ys_prop]) - - putHawqSiteProperty('hawq_rm_nvcore_limit_perseg', minHawqHostsCoreCount) - - if "vm.overcommit_memory" in hawq_sysctl_env: - MEM_THRESHOLD = 33554432 # 32GB, minHawqHostsMemory is represented in kB - # Set the value for hawq_rm_memory_limit_perseg based on vm.overcommit value and RAM available on HAWQ Hosts - # If value of hawq_rm_memory_limit_perseg is 67108864KB, it indicates hawq is being added and recommendation - # has not be made yet, since after recommendation it will be in GB in case its 67108864KB. - vm_overcommit_ratio = int(hawq_sysctl_env["vm.overcommit_ratio"]) if "vm.overcommit_ratio" in hawq_sysctl_env else 50 - if "hawq_rm_memory_limit_perseg" in hawq_site and hawq_site["hawq_rm_memory_limit_perseg"] == "65535MB": - vm_overcommit_mem_value = 2 if minHawqHostsMemory >= MEM_THRESHOLD else 1 - else: - # set vm.overcommit_memory to 2 if the minimum memory among all hawqHosts is greater than 32GB - vm_overcommit_mem_value = int(hawq_sysctl_env["vm.overcommit_memory"]) - putHawqSysctlEnvProperty("vm.overcommit_ratio", vm_overcommit_ratio) - # Show vm.overcommit_ratio on theme only if vm.overcommit_memory is set to 2 - overcommit_ratio_visibility = "true" if vm_overcommit_mem_value == 2 else "false" - putHawqSysctlEnvPropertyAttribute("vm.overcommit_ratio", "visible", overcommit_ratio_visibility) - putHawqSysctlEnvProperty("vm.overcommit_memory", vm_overcommit_mem_value) - host_ram_kb = minHawqHostsMemory * vm_overcommit_ratio / 100 if vm_overcommit_mem_value == 2 else minHawqHostsMemory - host_ram_gb = float(host_ram_kb) / (1024 * 1024) - recommended_mem_percentage = { - host_ram_gb <= 64: .75, - 64 < host_ram_gb <= 512: .85, - host_ram_gb > 512: .95 - }[True] - recommended_mem = math.ceil(host_ram_gb * recommended_mem_percentage) - unit = "GB" - # If RAM on a host is very low ~ 2 GB, ceil function may result in making it equal to total mem, - # in that case we recommend the value in MB not GB - if recommended_mem >= host_ram_gb: - recommended_mem = math.ceil(float(host_ram_kb)/1024 * recommended_mem_percentage) - unit = "MB" - # hawq_rm_memory_limit_perseg does not support decimal value so trim decimal using int - putHawqSiteProperty("hawq_rm_memory_limit_perseg", "{0}{1}".format(int(recommended_mem), unit)) - - # Show / Hide properties based on the value of hawq_global_rm_type - YARN_MODE = True if hawq_site["hawq_global_rm_type"].lower() == "yarn" else False - yarn_mode_properties_visibility = { - "hawq_rm_memory_limit_perseg": False, - "hawq_rm_nvcore_limit_perseg": False, - "hawq_rm_yarn_app_name": True, - "hawq_rm_yarn_queue_name": True, - "hawq_rm_yarn_scheduler_address": True, - "hawq_rm_yarn_address": True - } - for property, visible_status in yarn_mode_properties_visibility.iteritems(): - putHawqSitePropertyAttribute(property, "visible", str(visible_status if YARN_MODE else not visible_status).lower()) + if "hawq-site" in services["configurations"]: + hawq_site = services["configurations"]["hawq-site"]["properties"] + putHawqSiteProperty = self.putProperty(configurations, "hawq-site", services) + putHawqSitePropertyAttribute = self.putPropertyAttribute(configurations, "hawq-site") + hawq_sysctl_env = services["configurations"]["hawq-sysctl-env"]["properties"] + putHawqSysctlEnvProperty = self.putProperty(configurations, "hawq-sysctl-env", services) + putHawqSysctlEnvPropertyAttribute = self.putPropertyAttribute(configurations, "hawq-sysctl-env") + + # remove master port when master is colocated with Ambari server + if self.isHawqMasterComponentOnAmbariServer(services) and "hawq_master_address_port" in hawq_site: + putHawqSiteProperty('hawq_master_address_port', '') + + # update query limits if segments are deployed + if numSegments and "default_hash_table_bucket_number" in hawq_site and "hawq_rm_nvseg_perquery_limit" in hawq_site: + factor_min = 1 + factor_max = 6 + limit = int(hawq_site["hawq_rm_nvseg_perquery_limit"]) + factor = limit / numSegments + # if too many segments or default limit is too low --> stick with the limit + if factor < factor_min: + buckets = limit + # if the limit is large and results in factor > max --> limit factor to max + elif factor > factor_max: + buckets = factor_max * numSegments + else: + buckets = factor * numSegments + putHawqSiteProperty('default_hash_table_bucket_number', buckets) + putHawqSitePropertyAttribute('default_hash_table_bucket_number', "maximum", numSegments * 16 if 10000 > numSegments * 16 else 10000) + + # update YARN RM urls with the values from yarn-site if YARN is installed + if "YARN" in servicesList and "yarn-site" in services["configurations"]: + yarn_site = services["configurations"]["yarn-site"]["properties"] + for hs_prop, ys_prop in self.getHAWQYARNPropertyMapping().items(): + if hs_prop in hawq_site and ys_prop in yarn_site: + putHawqSiteProperty(hs_prop, yarn_site[ys_prop]) + + putHawqSiteProperty('hawq_rm_nvcore_limit_perseg', minHawqHostsCoreCount) + + if "vm.overcommit_memory" in hawq_sysctl_env: + MEM_THRESHOLD = 33554432 # 32GB, minHawqHostsMemory is represented in kB + # Set the value for hawq_rm_memory_limit_perseg based on vm.overcommit value and RAM available on HAWQ Hosts + # If value of hawq_rm_memory_limit_perseg is 67108864KB, it indicates hawq is being added and recommendation + # has not be made yet, since after recommendation it will be in GB in case its 67108864KB. + vm_overcommit_ratio = int(hawq_sysctl_env["vm.overcommit_ratio"]) if "vm.overcommit_ratio" in hawq_sysctl_env else 50 + if "hawq_rm_memory_limit_perseg" in hawq_site and hawq_site["hawq_rm_memory_limit_perseg"] == "65535MB": + vm_overcommit_mem_value = 2 if minHawqHostsMemory >= MEM_THRESHOLD else 1 + else: + # set vm.overcommit_memory to 2 if the minimum memory among all hawqHosts is greater than 32GB + vm_overcommit_mem_value = int(hawq_sysctl_env["vm.overcommit_memory"]) + putHawqSysctlEnvProperty("vm.overcommit_ratio", vm_overcommit_ratio) + # Show vm.overcommit_ratio on theme only if vm.overcommit_memory is set to 2 + overcommit_ratio_visibility = "true" if vm_overcommit_mem_value == 2 else "false" + putHawqSysctlEnvPropertyAttribute("vm.overcommit_ratio", "visible", overcommit_ratio_visibility) + putHawqSysctlEnvProperty("vm.overcommit_memory", vm_overcommit_mem_value) + host_ram_kb = minHawqHostsMemory * vm_overcommit_ratio / 100 if vm_overcommit_mem_value == 2 else minHawqHostsMemory + host_ram_gb = float(host_ram_kb) / (1024 * 1024) + recommended_mem_percentage = { + host_ram_gb <= 64: .75, + 64 < host_ram_gb <= 512: .85, + host_ram_gb > 512: .95 + }[True] + recommended_mem = math.ceil(host_ram_gb * recommended_mem_percentage) + unit = "GB" + # If RAM on a host is very low ~ 2 GB, ceil function may result in making it equal to total mem, + # in that case we recommend the value in MB not GB + if recommended_mem >= host_ram_gb: + recommended_mem = math.ceil(float(host_ram_kb)/1024 * recommended_mem_percentage) + unit = "MB" + # hawq_rm_memory_limit_perseg does not support decimal value so trim decimal using int + putHawqSiteProperty("hawq_rm_memory_limit_perseg", "{0}{1}".format(int(recommended_mem), unit)) + + # Show / Hide properties based on the value of hawq_global_rm_type + YARN_MODE = True if hawq_site["hawq_global_rm_type"].lower() == "yarn" else False + yarn_mode_properties_visibility = { + "hawq_rm_memory_limit_perseg": False, + "hawq_rm_nvcore_limit_perseg": False, + "hawq_rm_yarn_app_name": True, + "hawq_rm_yarn_queue_name": True, + "hawq_rm_yarn_scheduler_address": True, + "hawq_rm_yarn_address": True + } + for property, visible_status in yarn_mode_properties_visibility.iteritems(): + putHawqSitePropertyAttribute(property, "visible", str(visible_status if YARN_MODE else not visible_status).lower()) # set output.replace-datanode-on-failure in HAWQ hdfs-client depending on the cluster size if "hdfs-client" in services["configurations"]: http://git-wip-us.apache.org/repos/asf/ambari/blob/ef9a417f/ambari-server/src/main/resources/stacks/stack_advisor.py ---------------------------------------------------------------------- diff --git a/ambari-server/src/main/resources/stacks/stack_advisor.py b/ambari-server/src/main/resources/stacks/stack_advisor.py index fb30f4d..437fe4f 100644 --- a/ambari-server/src/main/resources/stacks/stack_advisor.py +++ b/ambari-server/src/main/resources/stacks/stack_advisor.py @@ -674,6 +674,9 @@ class DefaultStackAdvisor(StackAdvisor): configurations = {} + # there can be dependencies between service recommendations which require special ordering + # for now, make sure custom services (that have service advisors) run after standard ones + serviceAdvisors = [] for service in services["services"]: serviceName = service["StackServices"]["service_name"] calculation = self.getServiceConfigurationRecommender(serviceName) @@ -682,7 +685,9 @@ class DefaultStackAdvisor(StackAdvisor): else: serviceAdvisor = self.getServiceAdvisor(serviceName) if serviceAdvisor is not None: - serviceAdvisor.getServiceConfigurationRecommendations(configurations, cgClusterSummary, cgServices, cgHosts) + serviceAdvisors.append(serviceAdvisor) + for serviceAdvisor in serviceAdvisors: + serviceAdvisor.getServiceConfigurationRecommendations(configurations, cgClusterSummary, cgServices, cgHosts) cgRecommendation = { "configurations": {}, @@ -745,6 +750,9 @@ class DefaultStackAdvisor(StackAdvisor): else: configurations = recommendations["recommendations"]["blueprint"]["configurations"] + # there can be dependencies between service recommendations which require special ordering + # for now, make sure custom services (that have service advisors) run after standard ones + serviceAdvisors = [] for service in services["services"]: serviceName = service["StackServices"]["service_name"] calculation = self.getServiceConfigurationRecommender(serviceName) @@ -753,7 +761,9 @@ class DefaultStackAdvisor(StackAdvisor): else: serviceAdvisor = self.getServiceAdvisor(serviceName) if serviceAdvisor is not None: - serviceAdvisor.getServiceConfigurationRecommendations(configurations, clusterSummary, services, hosts) + serviceAdvisors.append(serviceAdvisor) + for serviceAdvisor in serviceAdvisors: + serviceAdvisor.getServiceConfigurationRecommendations(configurations, clusterSummary, services, hosts) return recommendations http://git-wip-us.apache.org/repos/asf/ambari/blob/ef9a417f/ambari-server/src/test/python/common-services/HAWQ/test_service_advisor.py ---------------------------------------------------------------------- diff --git a/ambari-server/src/test/python/common-services/HAWQ/test_service_advisor.py b/ambari-server/src/test/python/common-services/HAWQ/test_service_advisor.py index 780370d..f8a9468 100644 --- a/ambari-server/src/test/python/common-services/HAWQ/test_service_advisor.py +++ b/ambari-server/src/test/python/common-services/HAWQ/test_service_advisor.py @@ -182,6 +182,18 @@ class TestHAWQ200ServiceAdvisor(TestCase): "hawq_rm_nvcore_limit_perseg": "16", "hawq_global_rm_type": "yarn" } + }, + "hdfs-site": { + "properties": { + } + }, + "core-site": { + "properties": { + } + }, + "cluster-env": { + "properties": { + } } } @@ -251,6 +263,33 @@ class TestHAWQ200ServiceAdvisor(TestCase): ] } + ## Test that HDFS parameters required by HAWQ are recommended + self.serviceAdvisor.getServiceConfigurationRecommendations(configurations, None, services, hosts) + self.assertEquals(configurations["hdfs-site"]["properties"]["dfs.allow.truncate"], "true") + self.assertEquals(configurations["hdfs-site"]["properties"]["dfs.block.access.token.enable"], "false") + self.assertEquals(configurations["hdfs-site"]["properties"]["dfs.block.local-path-access.user"], "gpadmin") + self.assertEquals(configurations["hdfs-site"]["properties"]["dfs.client.read.shortcircuit"], "true") + self.assertEquals(configurations["hdfs-site"]["properties"]["dfs.client.use.legacy.blockreader.local"], "false") + self.assertEquals(configurations["hdfs-site"]["properties"]["dfs.datanode.data.dir.perm"], "750") + self.assertEquals(configurations["hdfs-site"]["properties"]["dfs.datanode.handler.count"], "60") + self.assertEquals(configurations["hdfs-site"]["properties"]["dfs.datanode.max.transfer.threads"], "40960") + self.assertEquals(configurations["hdfs-site"]["properties"]["dfs.namenode.accesstime.precision"], "0") + self.assertEquals(configurations["hdfs-site"]["properties"]["dfs.namenode.handler.count"], "200") + self.assertEquals(configurations["hdfs-site"]["properties"]["dfs.support.append"], "true") + + self.assertEquals(configurations["core-site"]["properties"]["ipc.client.connection.maxidletime"], "3600000") + self.assertEquals(configurations["core-site"]["properties"]["ipc.server.listen.queue.size"], "3300") + + + configurations["cluster-env"]["properties"]["security_enabled"]="false" + self.serviceAdvisor.getServiceConfigurationRecommendations(configurations, None, services, hosts) + self.assertEquals(configurations["hdfs-site"]["properties"]["dfs.block.access.token.enable"], "false") + + configurations["cluster-env"]["properties"]["security_enabled"]="true" + self.serviceAdvisor.getServiceConfigurationRecommendations(configurations, None, services, hosts) + self.assertEquals(configurations["hdfs-site"]["properties"]["dfs.block.access.token.enable"], "true") + + ## Test if hawq_rm_nvcore_limit_perseg is set correctly # Case 1:
