AMBARI-20281. Porting changes made in 2.5/stack_advisor to 3.0/YARN/service_advisor for LLAP.
Project: http://git-wip-us.apache.org/repos/asf/ambari/repo Commit: http://git-wip-us.apache.org/repos/asf/ambari/commit/fc8128b0 Tree: http://git-wip-us.apache.org/repos/asf/ambari/tree/fc8128b0 Diff: http://git-wip-us.apache.org/repos/asf/ambari/diff/fc8128b0 Branch: refs/heads/branch-feature-AMBARI-12556 Commit: fc8128b0de0a97c3c8ef9e7064776a5f0030ae6a Parents: 8e6c321 Author: Swapan Shridhar <sshrid...@hortonworks.com> Authored: Thu Mar 2 12:56:54 2017 -0800 Committer: Swapan Shridhar <sshrid...@hortonworks.com> Committed: Thu Mar 2 13:26:38 2017 -0800 ---------------------------------------------------------------------- .../YARN/3.0.0.3.0/service_advisor.py | 90 ++++++++++++-------- 1 file changed, 55 insertions(+), 35 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/ambari/blob/fc8128b0/ambari-server/src/main/resources/common-services/YARN/3.0.0.3.0/service_advisor.py ---------------------------------------------------------------------- diff --git a/ambari-server/src/main/resources/common-services/YARN/3.0.0.3.0/service_advisor.py b/ambari-server/src/main/resources/common-services/YARN/3.0.0.3.0/service_advisor.py index 18938a3..3629f30 100644 --- a/ambari-server/src/main/resources/common-services/YARN/3.0.0.3.0/service_advisor.py +++ b/ambari-server/src/main/resources/common-services/YARN/3.0.0.3.0/service_advisor.py @@ -509,10 +509,12 @@ class YARNRecommender(service_advisor.ServiceAdvisor): DEFAULT_EXECUTOR_TO_AM_RATIO = 20 MIN_EXECUTOR_TO_AM_RATIO = 10 MAX_CONCURRENT_QUERIES = 32 + MAX_CONCURRENT_QUERIES_SMALL_CLUSTERS = 4 # Concurrency for clusters with <10 executors leafQueueNames = None MB_TO_BYTES = 1048576 hsi_site = self.getServicesSiteProperties(services, YARNRecommender.HIVE_INTERACTIVE_SITE) yarn_site = self.getServicesSiteProperties(services, "yarn-site") + min_memory_required = 0 # Update 'hive.llap.daemon.queue.name' prop combo entries self.setLlapDaemonQueuePropAttributes(services, configurations) @@ -537,8 +539,10 @@ class YARNRecommender(service_advisor.ServiceAdvisor): # Check if it's 1st invocation after enabling Hive Server Interactive (config: enable_hive_interactive). changed_configs_has_enable_hive_int = self.isConfigPropertiesChanged(services, "hive-interactive-env", ['enable_hive_interactive'], False) llap_named_queue_selected_in_curr_invocation = None - if changed_configs_has_enable_hive_int \ - and services['configurations']['hive-interactive-env']['properties']['enable_hive_interactive']: + # Check if its : 1. 1st invocation from UI ('enable_hive_interactive' in changed-configurations) + # OR 2. 1st invocation from BP (services['changed-configurations'] should be empty in this case) + if (changed_configs_has_enable_hive_int or 0 == len(services['changed-configurations'])) \ + and services['configurations']['hive-interactive-env']['properties']['enable_hive_interactive']: if len(leafQueueNames) == 1 or (len(leafQueueNames) == 2 and llap_queue_name in leafQueueNames): llap_named_queue_selected_in_curr_invocation = True putHiveInteractiveSiteProperty('hive.llap.daemon.queue.name', llap_queue_name) @@ -552,15 +556,15 @@ class YARNRecommender(service_advisor.ServiceAdvisor): if (len(leafQueueNames) == 2 and (llap_daemon_selected_queue_name and llap_daemon_selected_queue_name == llap_queue_name) or llap_named_queue_selected_in_curr_invocation) or \ - (len(leafQueueNames) == 1 and llap_daemon_selected_queue_name == 'default' and llap_named_queue_selected_in_curr_invocation): - Logger.info("Setting visibility of num_llap_nodes to true.") - putHiveInteractiveEnvPropertyAttribute("num_llap_nodes", "visible", "true") + (len(leafQueueNames) == 1 and llap_daemon_selected_queue_name == 'default' and llap_named_queue_selected_in_curr_invocation): + Logger.info("DBG: Setting 'num_llap_nodes' config's READ ONLY attribute as 'False'.") + putHiveInteractiveEnvPropertyAttribute("num_llap_nodes", "read_only", "false") selected_queue_is_ambari_managed_llap = True Logger.info("DBG: Selected YARN queue for LLAP is : '{0}'. Current YARN queues : {1}. Setting 'Number of LLAP nodes' " "slider visibility to 'True'".format(llap_queue_name, list(leafQueueNames))) else: - Logger.info("Setting visibility of num_llap_nodes to false.") - putHiveInteractiveEnvPropertyAttribute("num_llap_nodes", "visible", "false") + Logger.info("DBG: Setting 'num_llap_nodes' config's READ ONLY attribute as 'True'.") + putHiveInteractiveEnvPropertyAttribute("num_llap_nodes", "read_only", "true") Logger.info("Selected YARN queue for LLAP is : '{0}'. Current YARN queues : {1}. Setting 'Number of LLAP nodes' " "visibility to 'False'.".format(llap_daemon_selected_queue_name, list(leafQueueNames))) selected_queue_is_ambari_managed_llap = False @@ -640,6 +644,25 @@ class YARNRecommender(service_advisor.ServiceAdvisor): self.recommendDefaultLlapConfiguration(configurations, services, hosts) return + # Get calculated value for Slider AM container Size + slider_am_container_size = self._normalizeUp(self.calculate_slider_am_size(yarn_min_container_size), + yarn_min_container_size) + Logger.info("DBG: Calculated 'slider_am_container_size' : {0}, using following: yarn_min_container_size : " + "{1}".format(slider_am_container_size, yarn_min_container_size)) + + min_memory_required = normalized_tez_am_container_size + slider_am_container_size + self._normalizeUp(mem_per_thread_for_llap, yarn_min_container_size) + Logger.info("DBG: Calculated 'min_memory_required': {0} using following : slider_am_container_size: {1}, " + "normalized_tez_am_container_size : {2}, mem_per_thread_for_llap : {3}, yarn_min_container_size : " + "{4}".format(min_memory_required, slider_am_container_size, normalized_tez_am_container_size, mem_per_thread_for_llap, yarn_min_container_size)) + + min_nodes_required = int(ceil( min_memory_required / yarn_nm_mem_in_mb_normalized)) + Logger.info("DBG: Calculated 'min_node_required': {0}, using following : min_memory_required : {1}, yarn_nm_mem_in_mb_normalized " + ": {2}".format(min_nodes_required, min_memory_required, yarn_nm_mem_in_mb_normalized)) + if min_nodes_required > node_manager_cnt: + Logger.warning("ERROR: Not enough memory/nodes to run LLAP"); + self.recommendDefaultLlapConfiguration(configurations, services, hosts) + return + mem_per_thread_for_llap = float(mem_per_thread_for_llap) Logger.info("DBG: selected_queue_is_ambari_managed_llap = {0}".format(selected_queue_is_ambari_managed_llap)) @@ -656,16 +679,27 @@ class YARNRecommender(service_advisor.ServiceAdvisor): Logger.info("DBG: Calculated '{0}' queue available capacity : {1}, using following: llap_daemon_selected_queue_cap : {2}, " "yarn_min_container_size : {3}".format(llap_daemon_selected_queue_name, total_llap_mem_normalized, llap_daemon_selected_queue_cap, yarn_min_container_size)) - '''Rounding up numNodes so that we run more daemons, and utilitze more CPUs. The rest of the calcaulkations will take care of cutting this down if required''' + '''Rounding up numNodes so that we run more daemons, and utilitze more CPUs. The rest of the calcaulations will take care of cutting this down if required''' num_llap_nodes_requested = ceil(total_llap_mem_normalized / yarn_nm_mem_in_mb_normalized) Logger.info("DBG: Calculated 'num_llap_nodes_requested' : {0}, using following: total_llap_mem_normalized : {1}, " "yarn_nm_mem_in_mb_normalized : {2}".format(num_llap_nodes_requested, total_llap_mem_normalized, yarn_nm_mem_in_mb_normalized)) + # Pouplate the 'num_llap_nodes_requested' in config 'num_llap_nodes', a read only config for non-Ambari managed queue case. + putHiveInteractiveEnvProperty('num_llap_nodes', num_llap_nodes_requested) + Logger.info("Setting config 'num_llap_nodes' as : {0}".format(num_llap_nodes_requested)) queue_am_fraction_perc = float(self.__getQueueAmFractionFromCapacityScheduler(capacity_scheduler_properties, llap_daemon_selected_queue_name)) hive_tez_am_cap_available = queue_am_fraction_perc * total_llap_mem_normalized Logger.info("DBG: Calculated 'hive_tez_am_cap_available' : {0}, using following: queue_am_fraction_perc : {1}, " "total_llap_mem_normalized : {2}".format(hive_tez_am_cap_available, queue_am_fraction_perc, total_llap_mem_normalized)) else: # Ambari managed 'llap' named queue at root level. - num_llap_nodes_requested = self.get_num_llap_nodes(services, configurations) #Input + # Set 'num_llap_nodes_requested' for 1st invocation, as it gets passed as 1 otherwise, read from config. + + # Check if its : 1. 1st invocation from UI ('enable_hive_interactive' in changed-configurations) + # OR 2. 1st invocation from BP (services['changed-configurations'] should be empty in this case) + if (changed_configs_has_enable_hive_int or 0 == len(services['changed-configurations'])) \ + and services['configurations']['hive-interactive-env']['properties']['enable_hive_interactive']: + num_llap_nodes_requested = min_nodes_required + else: + num_llap_nodes_requested = self.get_num_llap_nodes(services, configurations) #Input total_llap_mem = num_llap_nodes_requested * yarn_nm_mem_in_mb_normalized Logger.info("DBG: Calculated 'total_llap_mem' : {0}, using following: num_llap_nodes_requested : {1}, " "yarn_nm_mem_in_mb_normalized : {2}".format(total_llap_mem, num_llap_nodes_requested, yarn_nm_mem_in_mb_normalized)) @@ -689,12 +723,6 @@ class YARNRecommender(service_advisor.ServiceAdvisor): # Common calculations now, irrespective of the queue selected. - # Get calculated value for Slider AM container Size - slider_am_container_size = self._normalizeUp(self.calculate_slider_am_size(yarn_min_container_size), - yarn_min_container_size) - Logger.info("DBG: Calculated 'slider_am_container_size' : {0}, using following: yarn_min_container_size : " - "{1}".format(slider_am_container_size, yarn_min_container_size)) - llap_mem_for_tezAm_and_daemons = total_llap_mem_normalized - slider_am_container_size Logger.info("DBG: Calculated 'llap_mem_for_tezAm_and_daemons' : {0}, using following : total_llap_mem_normalized : {1}, " "slider_am_container_size : {2}".format(llap_mem_for_tezAm_and_daemons, total_llap_mem_normalized, slider_am_container_size)) @@ -771,9 +799,9 @@ class YARNRecommender(service_advisor.ServiceAdvisor): "{2}, MIN_EXECUTOR_TO_AM_RATIO : {3}, mem_per_thread_for_llap : {4}, normalized_tez_am_container_size : " "{5}".format(max_llap_concurreny, max_llap_concurreny_limit, llap_mem_for_tezAm_and_daemons, MIN_EXECUTOR_TO_AM_RATIO, mem_per_thread_for_llap, normalized_tez_am_container_size)) - if max_llap_concurreny == 0: - max_llap_concurreny = 1 - Logger.info("DBG: Adjusted 'max_llap_concurreny' : 1.") + if int(max_llap_concurreny) < MAX_CONCURRENT_QUERIES_SMALL_CLUSTERS: + Logger.info("DBG: Adjusting 'max_llap_concurreny' from {0} to {1}".format(max_llap_concurreny, MAX_CONCURRENT_QUERIES_SMALL_CLUSTERS)) + max_llap_concurreny = MAX_CONCURRENT_QUERIES_SMALL_CLUSTERS if (max_llap_concurreny * normalized_tez_am_container_size) > hive_tez_am_cap_available: max_llap_concurreny = floor(hive_tez_am_cap_available / normalized_tez_am_container_size) @@ -808,8 +836,8 @@ class YARNRecommender(service_advisor.ServiceAdvisor): "yarn_min_container_size: {3}".format(llap_daemon_mem_per_node, llap_mem_daemon_size, num_llap_nodes_requested, yarn_min_container_size)) if llap_daemon_mem_per_node == 0: # Small cluster. No capacity left on a node after running AMs. - llap_daemon_mem_per_node = mem_per_thread_for_llap - num_llap_nodes = floor(llap_mem_daemon_size / mem_per_thread_for_llap) + llap_daemon_mem_per_node = self._normalizeUp(mem_per_thread_for_llap, yarn_min_container_size) + num_llap_nodes = floor(llap_mem_daemon_size / llap_daemon_mem_per_node) Logger.info("DBG: 'llap_daemon_mem_per_node' : 0, adjusted 'llap_daemon_mem_per_node' : {0}, 'num_llap_nodes' : {1}, using following: llap_mem_daemon_size : {2}, " "mem_per_thread_for_llap : {3}".format(llap_daemon_mem_per_node, num_llap_nodes, llap_mem_daemon_size, mem_per_thread_for_llap)) elif llap_daemon_mem_per_node < mem_per_thread_for_llap: @@ -882,7 +910,7 @@ class YARNRecommender(service_advisor.ServiceAdvisor): putHiveInteractiveSitePropertyAttribute('hive.server2.tez.sessions.per.default.queue', "maximum", max_llap_concurreny) num_llap_nodes = long(num_llap_nodes) - putHiveInteractiveEnvPropertyAttribute('num_llap_nodes', "minimum", 1) + putHiveInteractiveEnvPropertyAttribute('num_llap_nodes', "minimum", min_nodes_required) putHiveInteractiveEnvPropertyAttribute('num_llap_nodes', "maximum", node_manager_cnt) #TODO A single value is not being set for numNodes in case of a custom queue. Also the attribute is set to non-visible, so the UI likely ends up using an old cached value if (num_llap_nodes != num_llap_nodes_requested): @@ -890,6 +918,9 @@ class YARNRecommender(service_advisor.ServiceAdvisor): else: Logger.info("DBG: Used num_llap_nodes for calculations : {0}".format(num_llap_nodes_requested)) + putHiveInteractiveEnvProperty('num_llap_nodes_for_llap_daemons', num_llap_nodes) + Logger.info("DBG: Setting config 'num_llap_nodes_for_llap_daemons' as : {0}".format(num_llap_nodes)) + llap_container_size = long(llap_daemon_mem_per_node) putHiveInteractiveSiteProperty('hive.llap.daemon.yarn.container.mb', llap_container_size) @@ -909,6 +940,7 @@ class YARNRecommender(service_advisor.ServiceAdvisor): putHiveInteractiveSiteProperty('hive.auto.convert.join.noconditionaltask.size', hive_auto_convert_join_noconditionaltask_size) num_executors_per_node = long(num_executors_per_node) + Logger.info("DBG: Putting num_executors_per_node as {0}".format(num_executors_per_node)) putHiveInteractiveSiteProperty('hive.llap.daemon.num.executors', num_executors_per_node) putHiveInteractiveSitePropertyAttribute('hive.llap.daemon.num.executors', "minimum", 1) putHiveInteractiveSitePropertyAttribute('hive.llap.daemon.num.executors', "maximum", long(num_executors_per_node_max)) @@ -948,6 +980,7 @@ class YARNRecommender(service_advisor.ServiceAdvisor): putHiveInteractiveSitePropertyAttribute('hive.server2.tez.sessions.per.default.queue', "minimum", 1) putHiveInteractiveSitePropertyAttribute('hive.server2.tez.sessions.per.default.queue', "maximum", 1) putHiveInteractiveEnvProperty('num_llap_nodes', 0) + putHiveInteractiveEnvProperty('num_llap_nodes_for_llap_daemons', 0) putHiveInteractiveEnvPropertyAttribute('num_llap_nodes', "minimum", 1) putHiveInteractiveEnvPropertyAttribute('num_llap_nodes', "maximum", node_manager_cnt) putHiveInteractiveSiteProperty('hive.llap.daemon.yarn.container.mb', yarn_min_container_size) @@ -1344,7 +1377,7 @@ yarn.scheduler.capacity.root.{0}.maximum-am-resource-percent=1""".format(llap_qu # Check if services["changed-configurations"] is empty and 'yarn.scheduler.minimum-allocation-mb' is modified in current ST invocation. if not services["changed-configurations"] and yarn_site and yarn_min_allocation_property in yarn_site: yarn_min_container_size = yarn_site[yarn_min_allocation_property] - Logger.info("DBG: 'yarn.scheduler.minimum-allocation-mb' read from configurations as : {0}".format(yarn_min_container_size)) + Logger.info("DBG: 'yarn.scheduler.minimum-allocation-mb' read from output as : {0}".format(yarn_min_container_size)) # Check if 'yarn.scheduler.minimum-allocation-mb' is input in services array. elif yarn_site_properties and yarn_min_allocation_property in yarn_site_properties: @@ -1369,19 +1402,6 @@ yarn.scheduler.capacity.root.{0}.maximum-am-resource-percent=1""".format(llap_qu if yarn_min_container_size < 256: return 256 - def calculate_slider_am_size(self, yarn_min_container_size): - """ - Calculates the Slider App Master size based on YARN's Minimum Container Size. - - :type yarn_min_container_size int - """ - if yarn_min_container_size > 1024: - return 1024 - if yarn_min_container_size >= 256 and yarn_min_container_size <= 1024: - return yarn_min_container_size - if yarn_min_container_size < 256: - return 256 - def get_yarn_nm_mem_in_mb(self, services, configurations): """ Gets YARN NodeManager memory in MB (yarn.nodemanager.resource.memory-mb).