http://git-wip-us.apache.org/repos/asf/ambari/blob/82e44131/ambari-server/src/main/resources/stacks/HDP/2.5/services/stack_advisor.py ---------------------------------------------------------------------- diff --git a/ambari-server/src/main/resources/stacks/HDP/2.5/services/stack_advisor.py b/ambari-server/src/main/resources/stacks/HDP/2.5/services/stack_advisor.py index b52d753..ddf407e 100644 --- a/ambari-server/src/main/resources/stacks/HDP/2.5/services/stack_advisor.py +++ b/ambari-server/src/main/resources/stacks/HDP/2.5/services/stack_advisor.py @@ -18,13 +18,13 @@ limitations under the License. """ import math +import traceback from ambari_commons.str_utils import string_set_equals from resource_management.core.logger import Logger from resource_management.core.exceptions import Fail from resource_management.libraries.functions.get_bare_principal import get_bare_principal - class HDP25StackAdvisor(HDP24StackAdvisor): def __init__(self): @@ -94,7 +94,7 @@ class HDP25StackAdvisor(HDP24StackAdvisor): parentItems = super(HDP25StackAdvisor, self).getComponentLayoutValidations(services, hosts) childItems = [] - hsi_hosts = self.getHostsForComponent(services, "HIVE", "HIVE_SERVER_INTERACTIVE") + hsi_hosts = self.__getHostsForComponent(services, "HIVE", "HIVE_SERVER_INTERACTIVE") if len(hsi_hosts) > 1: message = "Only one host can install HIVE_SERVER_INTERACTIVE. " childItems.append( @@ -139,12 +139,14 @@ class HDP25StackAdvisor(HDP24StackAdvisor): "item": self.getWarnItem( "Should be set to recommended value to report metrics to Ambari Metrics service.")}) + return self.toConfigurationValidationProblems(validationItems, "storm-site") def validateAtlasConfigurations(self, properties, recommendedDefaults, configurations, services, hosts): - application_properties = self.getSiteProperties(configurations, "application-properties") + application_properties = getSiteProperties(configurations, "application-properties") validationItems = [] + #<editor-fold desc="LDAP and AD"> auth_type = application_properties['atlas.authentication.method.ldap.type'] auth_ldap_enable = application_properties['atlas.authentication.method.ldap'].lower() == 'true' Logger.info("Validating Atlas configs, authentication type: %s" % str(auth_type)) @@ -181,6 +183,7 @@ class HDP25StackAdvisor(HDP24StackAdvisor): if prop not in application_properties or application_properties[prop] is None or application_properties[prop].strip() == "": validationItems.append({"config-name": prop, "item": self.getErrorItem("If authentication type is %s, this property is required." % auth_type)}) + #</editor-fold> if application_properties['atlas.graph.index.search.backend'] == 'solr5' and \ not application_properties['atlas.graph.index.search.solr.zookeeper-url']: @@ -249,13 +252,13 @@ class HDP25StackAdvisor(HDP24StackAdvisor): def validateYarnConfigurations(self, properties, recommendedDefaults, configurations, services, hosts): parentValidationProblems = super(HDP25StackAdvisor, self).validateYARNConfigurations(properties, recommendedDefaults, configurations, services, hosts) - yarn_site_properties = self.getSiteProperties(configurations, "yarn-site") + yarn_site_properties = getSiteProperties(configurations, "yarn-site") servicesList = [service["StackServices"]["service_name"] for service in services["services"]] componentsListList = [service["components"] for service in services["services"]] componentsList = [item["StackServiceComponents"] for sublist in componentsListList for item in sublist] validationItems = [] - hsi_hosts = self.getHostsForComponent(services, "HIVE", "HIVE_SERVER_INTERACTIVE") + hsi_hosts = self.__getHostsForComponent(services, "HIVE", "HIVE_SERVER_INTERACTIVE") if len(hsi_hosts) > 0: # HIVE_SERVER_INTERACTIVE is mapped to a host if 'yarn.resourcemanager.work-preserving-recovery.enabled' not in yarn_site_properties or \ @@ -268,116 +271,124 @@ class HDP25StackAdvisor(HDP24StackAdvisor): validationProblems.extend(parentValidationProblems) return validationProblems + """ + Does the following validation checks for HIVE_SERVER_INTERACTIVE's hive-interactive-site configs. + 1. Queue selected in 'hive.llap.daemon.queue.name' config should be sized >= to minimum required to run LLAP + and Hive2 app. + 2. Queue selected in 'hive.llap.daemon.queue.name' config state should not be 'STOPPED'. + 3. 'hive.server2.enable.doAs' config should be set to 'false' for Hive2. + 4. 'Maximum Total Concurrent Queries'(hive.server2.tez.sessions.per.default.queue) should not consume more that 50% of selected queue for LLAP. + 5. if 'llap' queue is selected, in order to run Service Checks, 'remaining available capacity' in cluster is atleast 512 MB. + """ def validateHiveInteractiveSiteConfigurations(self, properties, recommendedDefaults, configurations, services, hosts): - """ - Does the following validation checks for HIVE_SERVER_INTERACTIVE's hive-interactive-site configs. - 1. Queue selected in 'hive.llap.daemon.queue.name' config should be sized >= to minimum required to run LLAP - and Hive2 app. - 2. Queue selected in 'hive.llap.daemon.queue.name' config state should not be 'STOPPED'. - 3. 'hive.server2.enable.doAs' config should be set to 'false' for Hive2. - 4. 'Maximum Total Concurrent Queries'(hive.server2.tez.sessions.per.default.queue) should not consume more that 50% of selected queue for LLAP. - 5. if 'llap' queue is selected, in order to run Service Checks, 'remaining available capacity' in cluster is atleast 512 MB. - """ validationItems = [] - hsi_hosts = self.getHostsForComponent(services, "HIVE", "HIVE_SERVER_INTERACTIVE") - llap_queue_name = None - llap_queue_cap_perc = None + hsi_hosts = self.__getHostsForComponent(services, "HIVE", "HIVE_SERVER_INTERACTIVE") + curr_selected_queue_for_llap = None + curr_selected_queue_for_llap_cap_perc = None MIN_ASSUMED_CAP_REQUIRED_FOR_SERVICE_CHECKS = 512 - llap_queue_cap = None - hsi_site = self.getServicesSiteProperties(services, self.HIVE_INTERACTIVE_SITE) - - if len(hsi_hosts) == 0: - return [] + current_selected_queue_for_llap_cap = None - # Get total cluster capacity - node_manager_host_list = self.getHostsForComponent(services, "YARN", "NODEMANAGER") - node_manager_cnt = len(node_manager_host_list) - yarn_nm_mem_in_mb = self.get_yarn_nm_mem_in_mb(services, configurations) - total_cluster_cap = node_manager_cnt * yarn_nm_mem_in_mb - capacity_scheduler_properties, received_as_key_value_pair = self.getCapacitySchedulerProperties(services) + if len(hsi_hosts) > 0: + # Get total cluster capacity + node_manager_host_list = self.get_node_manager_hosts(services, hosts) + node_manager_cnt = len(node_manager_host_list) + yarn_nm_mem_in_mb = self.get_yarn_nm_mem_in_mb(services, configurations) + total_cluster_capacity = node_manager_cnt * yarn_nm_mem_in_mb - if not capacity_scheduler_properties: - Logger.warning("Couldn't retrieve 'capacity-scheduler' properties while doing validation checks for Hive Server Interactive.") - return [] - - if hsi_site: - if "hive.llap.daemon.queue.name" in hsi_site and hsi_site['hive.llap.daemon.queue.name']: - llap_queue_name = hsi_site['hive.llap.daemon.queue.name'] - llap_queue_cap = self.__getSelectedQueueTotalCap(capacity_scheduler_properties, llap_queue_name, total_cluster_cap) - - if llap_queue_cap: - llap_queue_cap_perc = float(llap_queue_cap * 100 / total_cluster_cap) - min_reqd_queue_cap_perc = self.min_queue_perc_reqd_for_llap_and_hive_app(services, hosts, configurations) - - # Validate that the selected queue in 'hive.llap.daemon.queue.name' should be sized >= to minimum required - # to run LLAP and Hive2 app. - if llap_queue_cap_perc < min_reqd_queue_cap_perc: - errMsg1 = "Selected queue '{0}' capacity ({1}%) is less than minimum required capacity ({2}%) for LLAP " \ - "app to run".format(llap_queue_name, llap_queue_cap_perc, min_reqd_queue_cap_perc) - validationItems.append({"config-name": "hive.llap.daemon.queue.name", "item": self.getErrorItem(errMsg1)}) - else: - Logger.error("Couldn't retrieve '{0}' queue's capacity from 'capacity-scheduler' while doing validation checks for " - "Hive Server Interactive.".format(llap_queue_name)) - - # Validate that current selected queue in 'hive.llap.daemon.queue.name' state is not STOPPED. - llap_selected_queue_state = self.__getQueueStateFromCapacityScheduler(capacity_scheduler_properties, llap_queue_name) - if llap_selected_queue_state: - if llap_selected_queue_state == "STOPPED": - errMsg2 = "Selected queue '{0}' current state is : '{1}'. It is required to be in 'RUNNING' state for LLAP to run"\ - .format(llap_queue_name, llap_selected_queue_state) - validationItems.append({"config-name": "hive.llap.daemon.queue.name","item": self.getErrorItem(errMsg2)}) + capacity_scheduler_properties, received_as_key_value_pair = self.getCapacitySchedulerProperties(services) + if capacity_scheduler_properties: + if self.HIVE_INTERACTIVE_SITE in services['configurations'] and \ + 'hive.llap.daemon.queue.name' in services['configurations'][self.HIVE_INTERACTIVE_SITE]['properties']: + curr_selected_queue_for_llap = services['configurations'][self.HIVE_INTERACTIVE_SITE]['properties']['hive.llap.daemon.queue.name'] + if curr_selected_queue_for_llap: + current_selected_queue_for_llap_cap = self.__getSelectedQueueTotalCap(capacity_scheduler_properties, + curr_selected_queue_for_llap, total_cluster_capacity) + if current_selected_queue_for_llap_cap: + curr_selected_queue_for_llap_cap_perc = int(current_selected_queue_for_llap_cap * 100 / total_cluster_capacity) + min_reqd_queue_cap_perc = self.min_queue_perc_reqd_for_llap_and_hive_app(services, hosts, configurations) + + # Validate that the selected queue in 'hive.llap.daemon.queue.name' should be sized >= to minimum required + # to run LLAP and Hive2 app. + if curr_selected_queue_for_llap_cap_perc < min_reqd_queue_cap_perc: + errMsg1 = "Selected queue '{0}' capacity ({1}%) is less than minimum required capacity ({2}%) for LLAP " \ + "app to run".format(curr_selected_queue_for_llap, curr_selected_queue_for_llap_cap_perc, min_reqd_queue_cap_perc) + validationItems.append({"config-name": "hive.llap.daemon.queue.name","item": self.getErrorItem(errMsg1)}) + else: + Logger.error("Couldn't retrieve '{0}' queue's capacity from 'capacity-scheduler' while doing validation checks for " + "Hive Server Interactive.".format(curr_selected_queue_for_llap)) + + # Validate that current selected queue in 'hive.llap.daemon.queue.name' state is not STOPPED. + llap_selected_queue_state = self.__getQueueStateFromCapacityScheduler(capacity_scheduler_properties, curr_selected_queue_for_llap) + if llap_selected_queue_state: + if llap_selected_queue_state == "STOPPED": + errMsg2 = "Selected queue '{0}' current state is : '{1}'. It is required to be in 'RUNNING' state for LLAP to run"\ + .format(curr_selected_queue_for_llap, llap_selected_queue_state) + validationItems.append({"config-name": "hive.llap.daemon.queue.name","item": self.getErrorItem(errMsg2)}) + else: + Logger.error("Couldn't retrieve '{0}' queue's state from 'capacity-scheduler' while doing validation checks for " + "Hive Server Interactive.".format(curr_selected_queue_for_llap)) + else: + Logger.error("Couldn't retrieve current selection for 'hive.llap.daemon.queue.name' while doing validation " + "checks for Hive Server Interactive.") else: - Logger.error("Couldn't retrieve '{0}' queue's state from 'capacity-scheduler' while doing validation checks for " - "Hive Server Interactive.".format(llap_queue_name)) + Logger.error("Couldn't retrieve 'hive.llap.daemon.queue.name' config from 'hive-interactive-site' while doing " + "validation checks for Hive Server Interactive.") + pass else: - Logger.error("Couldn't retrieve 'hive.llap.daemon.queue.name' config from 'hive-interactive-site' while doing " - "validation checks for Hive Server Interactive.") - - # Validate that 'hive.server2.enable.doAs' config is not set to 'true' for Hive2. - if 'hive.server2.enable.doAs' in hsi_site and hsi_site['hive.server2.enable.doAs'] == "true": - validationItems.append({"config-name": "hive.server2.enable.doAs", "item": self.getErrorItem("Value should be set to 'false' for Hive2.")}) - - # Validate that 'Maximum Total Concurrent Queries'(hive.server2.tez.sessions.per.default.queue) is not consuming more that - # 50% of selected queue for LLAP. - if llap_queue_cap and 'hive.server2.tez.sessions.per.default.queue' in hsi_site: - num_tez_sessions = hsi_site['hive.server2.tez.sessions.per.default.queue'] - if num_tez_sessions: - num_tez_sessions = long(num_tez_sessions) - yarn_min_container_size = long(self.get_yarn_min_container_size(services, configurations)) - tez_am_container_size = self.calculate_tez_am_container_size(services, long(total_cluster_cap)) - normalized_tez_am_container_size = self._normalizeUp(tez_am_container_size, yarn_min_container_size) - llap_selected_queue_cap_remaining = llap_queue_cap - (normalized_tez_am_container_size * num_tez_sessions) - if llap_selected_queue_cap_remaining <= llap_queue_cap/2: - errMsg3 = " Reducing the 'Maximum Total Concurrent Queries' (value: {0}) is advisable as it is consuming more than 50% of " \ - "'{1}' queue for LLAP.".format(num_tez_sessions, llap_queue_name) - validationItems.append({"config-name": "hive.server2.tez.sessions.per.default.queue","item": self.getWarnItem(errMsg3)}) - - # Validate that 'remaining available capacity' in cluster is at least 512 MB, after 'llap' queue is selected, - # in order to run Service Checks. - if llap_queue_name and llap_queue_cap_perc and llap_queue_name == self.AMBARI_MANAGED_LLAP_QUEUE_NAME: - curr_selected_queue_for_llap_cap = float(llap_queue_cap_perc) / 100 * total_cluster_cap - available_cap_in_cluster = total_cluster_cap - curr_selected_queue_for_llap_cap - if available_cap_in_cluster < MIN_ASSUMED_CAP_REQUIRED_FOR_SERVICE_CHECKS: - errMsg4 = "Capacity used by '{0}' queue is '{1}'. Service checks may not run as remaining available capacity " \ - "({2}) in cluster is less than 512 MB.".format(self.AMBARI_MANAGED_LLAP_QUEUE_NAME, curr_selected_queue_for_llap_cap, available_cap_in_cluster) - validationItems.append({"config-name": "hive.llap.daemon.queue.name","item": self.getWarnItem(errMsg4)}) + Logger.error("Couldn't retrieve 'capacity-scheduler' properties while doing validation checks for Hive Server Interactive.") + pass + + if self.HIVE_INTERACTIVE_SITE in services['configurations']: + # Validate that 'hive.server2.enable.doAs' config is not set to 'true' for Hive2. + if 'hive.server2.enable.doAs' in services['configurations'][self.HIVE_INTERACTIVE_SITE]['properties']: + hive2_enable_do_as = services['configurations'][self.HIVE_INTERACTIVE_SITE]['properties']['hive.server2.enable.doAs'] + if hive2_enable_do_as == 'true': + validationItems.append({"config-name": "hive.server2.enable.doAs","item": self.getErrorItem("Value should be set to 'false' for Hive2.")}) + + # Validate that 'Maximum Total Concurrent Queries'(hive.server2.tez.sessions.per.default.queue) is not consuming more that + # 50% of selected queue for LLAP. + if current_selected_queue_for_llap_cap and 'hive.server2.tez.sessions.per.default.queue' in \ + services['configurations']['hive-interactive-site']['properties']: + num_tez_sessions = services['configurations']['hive-interactive-site']['properties']['hive.server2.tez.sessions.per.default.queue'] + if num_tez_sessions: + num_tez_sessions = long(num_tez_sessions) + yarn_min_container_size = self.get_yarn_min_container_size(services, configurations) + tez_am_container_size = self.calculate_tez_am_container_size(services, long(total_cluster_capacity)) + normalized_tez_am_container_size = self._normalizeUp(tez_am_container_size, yarn_min_container_size) + llap_selected_queue_cap_remaining = current_selected_queue_for_llap_cap - (normalized_tez_am_container_size * num_tez_sessions) + if llap_selected_queue_cap_remaining <= current_selected_queue_for_llap_cap/2: + errMsg3 = " Reducing the 'Maximum Total Concurrent Queries' (value: {0}) is advisable as it is consuming more than 50% of " \ + "'{1}' queue for LLAP.".format(num_tez_sessions, curr_selected_queue_for_llap) + validationItems.append({"config-name": "hive.server2.tez.sessions.per.default.queue","item": self.getWarnItem(errMsg3)}) + + # Validate that 'remaining available capacity' in cluster is atleast 512 MB, after 'llap' queue is selected, + # in order to run Service Checks. + if curr_selected_queue_for_llap and curr_selected_queue_for_llap_cap_perc and \ + curr_selected_queue_for_llap == self.AMBARI_MANAGED_LLAP_QUEUE_NAME: + curr_selected_queue_for_llap_cap = float(curr_selected_queue_for_llap_cap_perc) / 100 * total_cluster_capacity + available_cap_in_cluster = total_cluster_capacity - curr_selected_queue_for_llap_cap + if available_cap_in_cluster < MIN_ASSUMED_CAP_REQUIRED_FOR_SERVICE_CHECKS: + errMsg4 = "Capacity used by '{0}' queue is '{1}'. Service checks may not run as remaining available capacity " \ + "({2}) in cluster is less than 512 MB.".format(self.AMBARI_MANAGED_LLAP_QUEUE_NAME, curr_selected_queue_for_llap_cap, available_cap_in_cluster) + validationItems.append({"config-name": "hive.llap.daemon.queue.name","item": self.getWarnItem(errMsg4)}) validationProblems = self.toConfigurationValidationProblems(validationItems, "hive-interactive-site") return validationProblems def validateHiveInteractiveEnvConfigurations(self, properties, recommendedDefaults, configurations, services, hosts): - hive_site_env_properties = self.getSiteProperties(configurations, "hive-interactive-env") + hive_site_env_properties = getSiteProperties(configurations, "hive-interactive-env") validationItems = [] - hsi_hosts = self.getHostsForComponent(services, "HIVE", "HIVE_SERVER_INTERACTIVE") + hsi_hosts = self.__getHostsForComponent(services, "HIVE", "HIVE_SERVER_INTERACTIVE") if len(hsi_hosts) > 0: # HIVE_SERVER_INTERACTIVE is mapped to a host if 'enable_hive_interactive' not in hive_site_env_properties or ( - 'enable_hive_interactive' in hive_site_env_properties and - hive_site_env_properties['enable_hive_interactive'].lower() != 'true'): - + 'enable_hive_interactive' in hive_site_env_properties and hive_site_env_properties[ + 'enable_hive_interactive'].lower() != 'true'): validationItems.append({"config-name": "enable_hive_interactive", "item": self.getErrorItem( "HIVE_SERVER_INTERACTIVE requires enable_hive_interactive in hive-interactive-env set to true.")}) + pass + else: # no HIVE_SERVER_INTERACTIVE if 'enable_hive_interactive' in hive_site_env_properties and hive_site_env_properties[ @@ -385,6 +396,8 @@ class HDP25StackAdvisor(HDP24StackAdvisor): validationItems.append({"config-name": "enable_hive_interactive", "item": self.getErrorItem( "enable_hive_interactive in hive-interactive-env should be set to false.")}) + pass + pass validationProblems = self.toConfigurationValidationProblems(validationItems, "hive-interactive-env") return validationProblems @@ -424,15 +437,15 @@ class HDP25StackAdvisor(HDP24StackAdvisor): def recommendStormConfigurations(self, configurations, clusterData, services, hosts): super(HDP25StackAdvisor, self).recommendStormConfigurations(configurations, clusterData, services, hosts) - storm_site = self.getServicesSiteProperties(services, "storm-site") + storm_site = getServicesSiteProperties(services, "storm-site") putStormSiteProperty = self.putProperty(configurations, "storm-site", services) putStormSiteAttributes = self.putPropertyAttribute(configurations, "storm-site") security_enabled = (storm_site is not None and "storm.zookeeper.superACL" in storm_site) - + if security_enabled: _storm_principal_name = services['configurations']['storm-env']['properties']['storm_principal_name'] storm_bare_jaas_principal = get_bare_principal(_storm_principal_name) - if 'nimbus.impersonation.acl' in storm_site: + if 'nimbus.impersonation.acl' in storm_site: storm_nimbus_impersonation_acl = storm_site["nimbus.impersonation.acl"] storm_nimbus_impersonation_acl.replace('{{storm_bare_jaas_principal}}', storm_bare_jaas_principal) putStormSiteProperty('nimbus.impersonation.acl', storm_nimbus_impersonation_acl) @@ -599,7 +612,7 @@ class HDP25StackAdvisor(HDP24StackAdvisor): if 'atlas-env' in services['configurations']: atlas_server_metadata_size = 50000 if 'atlas_server_metadata_size' in services['configurations']['atlas-env']['properties']: - atlas_server_metadata_size = float(services['configurations']['atlas-env']['properties']['atlas_server_metadata_size']) + atlas_server_metadata_size = int(services['configurations']['atlas-env']['properties']['atlas_server_metadata_size']) atlas_server_xmx = 2048 @@ -656,42 +669,47 @@ class HDP25StackAdvisor(HDP24StackAdvisor): putHiveInteractiveEnvPropertyAttribute = self.putPropertyAttribute(configurations, "hive-interactive-env") # For 'Hive Server Interactive', if the component exists. - hsi_hosts = self.getHostsForComponent(services, "HIVE", "HIVE_SERVER_INTERACTIVE") - hsi_properties = self.getServicesSiteProperties(services, self.HIVE_INTERACTIVE_SITE) - + hsi_hosts = self.__getHostsForComponent(services, "HIVE", "HIVE_SERVER_INTERACTIVE") if len(hsi_hosts) > 0: + hsi_host = hsi_hosts[0] putHiveInteractiveEnvProperty('enable_hive_interactive', 'true') # Update 'hive.llap.daemon.queue.name' property attributes if capacity scheduler is changed. - if hsi_properties and 'hive.llap.daemon.queue.name' in hsi_properties: + if self.HIVE_INTERACTIVE_SITE in services['configurations']: + if 'hive.llap.daemon.queue.name' in services['configurations'][self.HIVE_INTERACTIVE_SITE]['properties']: self.setLlapDaemonQueuePropAttributes(services, configurations) - hsi_conf_properties = self.getSiteProperties(configurations, self.HIVE_INTERACTIVE_SITE) - - hive_tez_default_queue = hsi_properties["hive.llap.daemon.queue.name"] - if hsi_conf_properties and "hive.llap.daemon.queue.name" in hsi_conf_properties: - hive_tez_default_queue = hsi_conf_properties['hive.llap.daemon.queue.name'] - + # Update 'hive.server2.tez.default.queues' value + hive_tez_default_queue = None + if 'hive-interactive-site' in configurations and \ + 'hive.llap.daemon.queue.name' in configurations[self.HIVE_INTERACTIVE_SITE]['properties']: + hive_tez_default_queue = configurations[self.HIVE_INTERACTIVE_SITE]['properties']['hive.llap.daemon.queue.name'] + Logger.info("'hive.llap.daemon.queue.name' value from configurations : '{0}'".format(hive_tez_default_queue)) + if not hive_tez_default_queue: + hive_tez_default_queue = services['configurations'][self.HIVE_INTERACTIVE_SITE]['properties']['hive.llap.daemon.queue.name'] + Logger.info("'hive.llap.daemon.queue.name' value from services : '{0}'".format(hive_tez_default_queue)) if hive_tez_default_queue: putHiveInteractiveSiteProperty("hive.server2.tez.default.queues", hive_tez_default_queue) - Logger.debug("Updated 'hive.server2.tez.default.queues' config : '{0}'".format(hive_tez_default_queue)) + Logger.info("Updated 'hive.server2.tez.default.queues' config : '{0}'".format(hive_tez_default_queue)) else: putHiveInteractiveEnvProperty('enable_hive_interactive', 'false') putHiveInteractiveEnvPropertyAttribute("num_llap_nodes", "visible", "false") - if hsi_properties and "hive.llap.zk.sm.connectionString" in hsi_properties: + if self.HIVE_INTERACTIVE_SITE in services['configurations'] and \ + 'hive.llap.zk.sm.connectionString' in services['configurations'][self.HIVE_INTERACTIVE_SITE]['properties']: + # Fill the property 'hive.llap.zk.sm.connectionString' required by Hive Server Interactive (HiveServer2) zookeeper_host_port = self.getZKHostPortString(services) if zookeeper_host_port: putHiveInteractiveSiteProperty("hive.llap.zk.sm.connectionString", zookeeper_host_port) + pass def recommendYARNConfigurations(self, configurations, clusterData, services, hosts): super(HDP25StackAdvisor, self).recommendYARNConfigurations(configurations, clusterData, services, hosts) - hsi_env_poperties = self.getServicesSiteProperties(services, "hive-interactive-env") - cluster_env = self.getServicesSiteProperties(services, "cluster-env") # Queue 'llap' creation/removal logic (Used by Hive Interactive server and associated LLAP) - if hsi_env_poperties and 'enable_hive_interactive' in hsi_env_poperties: - enable_hive_interactive = hsi_env_poperties['enable_hive_interactive'] + if 'hive-interactive-env' in services['configurations'] and \ + 'enable_hive_interactive' in services['configurations']['hive-interactive-env']['properties']: + enable_hive_interactive = services['configurations']['hive-interactive-env']['properties']['enable_hive_interactive'] LLAP_QUEUE_NAME = 'llap' # Hive Server interactive is already added or getting added @@ -702,8 +720,8 @@ class HDP25StackAdvisor(HDP24StackAdvisor): putYarnSiteProperty = self.putProperty(configurations, "yarn-site", services) stack_root = "/usr/hdp" - if cluster_env and "stack_root" in cluster_env: - stack_root = cluster_env["stack_root"] + if "cluster-env" in services["configurations"] and "stack_root" in services["configurations"]["cluster-env"]["properties"]: + stack_root = services["configurations"]["cluster-env"]["properties"]["stack_root"] timeline_plugin_classes_values = [] timeline_plugin_classpath_values = [] @@ -718,457 +736,546 @@ class HDP25StackAdvisor(HDP24StackAdvisor): putYarnSiteProperty('yarn.timeline-service.entity-group-fs-store.group-id-plugin-classes', ",".join(timeline_plugin_classes_values)) putYarnSiteProperty('yarn.timeline-service.entity-group-fs-store.group-id-plugin-classpath', ":".join(timeline_plugin_classpath_values)) - def updateLlapConfigs(self, configurations, services, hosts, llap_queue_name): - """ - Entry point for updating Hive's 'LLAP app' configs namely : - (1). num_llap_nodes (2). hive.llap.daemon.yarn.container.mb - (3). hive.llap.daemon.num.executors (4). hive.llap.io.memory.size (5). llap_heap_size (6). slider_am_container_mb, - (7). hive.server2.tez.sessions.per.default.queue, (8). tez.am.resource.memory.mb (9). hive.tez.container.size - (10). tez.runtime.io.sort.mb (11). tez.runtime.unordered.output.buffer.size-mb (12). hive.llap.io.threadpool.size, and - (13). hive.llap.io.enabled. + """ + Entry point for updating Hive's 'LLAP app' configs namely : (1). num_llap_nodes (2). hive.llap.daemon.yarn.container.mb + (3). hive.llap.daemon.num.executors (4). hive.llap.io.memory.size (5). llap_heap_size (6). slider_am_container_mb, + (7). hive.server2.tez.sessions.per.default.queue, (8). tez.am.resource.memory.mb (9). hive.tez.container.size + (10). tez.runtime.io.sort.mb (11). tez.runtime.unordered.output.buffer.size-mb (12). hive.llap.io.threadpool.size, and + (13). hive.llap.io.enabled. - The trigger point for updating LLAP configs (mentioned above) is change in values of any of the following: - (1). 'enable_hive_interactive' set to 'true' (2). 'num_llap_nodes' (3). 'hive.server2.tez.sessions.per.default.queue' - (4). Change in queue selection for config 'hive.llap.daemon.queue.name'. + The trigger point for updating LLAP configs (mentioned above) is change in values of any of the following: + (1). 'enable_hive_interactive' set to 'true' (2). 'num_llap_nodes' (3). 'hive.server2.tez.sessions.per.default.queue' + (4). Change in queue selection for config 'hive.llap.daemon.queue.name'. - If change in value for 'num_llap_nodes' or 'hive.server2.tez.sessions.per.default.queue' is detected, that config - value is not calulated, but read and use in calculation for dependent configs. + If change in value for 'num_llap_nodes' or 'hive.server2.tez.sessions.per.default.queue' is detected, that config + value is not calulated, but read and use in calculation for dependent configs. - Note: All memory calculations are in MB, unless specified otherwise. - """ + Note: All memory caluclations are in MB, unless specified otherwise. + """ + def updateLlapConfigs(self, configurations, services, hosts, llap_queue_name): + Logger.info("Entered updateLlapConfigs() ..") putHiveInteractiveSiteProperty = self.putProperty(configurations, self.HIVE_INTERACTIVE_SITE, services) putHiveInteractiveSitePropertyAttribute = self.putPropertyAttribute(configurations, self.HIVE_INTERACTIVE_SITE) + putHiveInteractiveEnvProperty = self.putProperty(configurations, "hive-interactive-env", services) putHiveInteractiveEnvPropertyAttribute = self.putPropertyAttribute(configurations, "hive-interactive-env") + putTezInteractiveSiteProperty = self.putProperty(configurations, "tez-interactive-site", services) + llap_daemon_selected_queue_name = None - selected_queue_is_ambari_managed_llap = None # Queue named 'llap' at root level is Ambari managed. + selected_queue_is_ambari_managed_llap = None # Queue named 'llap' at root level is Ambari managed. llap_selected_queue_am_percent = None DEFAULT_EXECUTOR_TO_AM_RATIO = 20 MIN_EXECUTOR_TO_AM_RATIO = 10 MAX_CONCURRENT_QUERIES = 32 leafQueueNames = None MB_TO_BYTES = 1048576 - hsi_site = self.getServicesSiteProperties(services, self.HIVE_INTERACTIVE_SITE) - yarn_site = self.getServicesSiteProperties(services, "yarn-site") # Update 'hive.llap.daemon.queue.name' prop combo entries self.setLlapDaemonQueuePropAttributes(services, configurations) if not services["changed-configurations"]: read_llap_daemon_yarn_cont_mb = long(self.get_yarn_min_container_size(services, configurations)) - putHiveInteractiveSiteProperty("hive.llap.daemon.yarn.container.mb", read_llap_daemon_yarn_cont_mb) + putHiveInteractiveSiteProperty('hive.llap.daemon.yarn.container.mb', read_llap_daemon_yarn_cont_mb) + # initial memory setting to make sure hive.llap.daemon.yarn.container.mb >= yarn.scheduler.minimum-allocation-mb + Logger.info("Adjusted 'hive.llap.daemon.yarn.container.mb' to yarn min container size as initial size " + "(" + str(self.get_yarn_min_container_size(services, configurations)) + " MB).") - if hsi_site and "hive.llap.daemon.queue.name" in hsi_site: - llap_daemon_selected_queue_name = hsi_site["hive.llap.daemon.queue.name"] + try: + if self.HIVE_INTERACTIVE_SITE in services['configurations'] and \ + 'hive.llap.daemon.queue.name' in services['configurations'][self.HIVE_INTERACTIVE_SITE]['properties']: + llap_daemon_selected_queue_name = services['configurations'][self.HIVE_INTERACTIVE_SITE]['properties']['hive.llap.daemon.queue.name'] - # Update Visibility of 'num_llap_nodes' slider. Visible only if selected queue is Ambari created 'llap'. - capacity_scheduler_properties, received_as_key_value_pair = self.getCapacitySchedulerProperties(services) - if capacity_scheduler_properties: - # Get all leaf queues. - leafQueueNames = self.getAllYarnLeafQueues(capacity_scheduler_properties) - Logger.info("YARN leaf Queues = {0}".format(leafQueueNames)) - if len(leafQueueNames) == 0: - Logger.error("Queue(s) couldn't be retrieved from capacity-scheduler.") - return - - # Check if it's 1st invocation after enabling Hive Server Interactive (config: enable_hive_interactive). - changed_configs_has_enable_hive_int = self.isConfigPropertiesChanged(services, "hive-interactive-env", ['enable_hive_interactive'], False) - llap_named_queue_selected_in_curr_invocation = None - if changed_configs_has_enable_hive_int \ - and services['configurations']['hive-interactive-env']['properties']['enable_hive_interactive']: - if len(leafQueueNames) == 1 or (len(leafQueueNames) == 2 and llap_queue_name in leafQueueNames): - llap_named_queue_selected_in_curr_invocation = True - putHiveInteractiveSiteProperty('hive.llap.daemon.queue.name', llap_queue_name) - putHiveInteractiveSiteProperty('hive.server2.tez.default.queues', llap_queue_name) + # Update Visibility of 'num_llap_nodes' slider. Visible only if selected queue is Ambari created 'llap'. + capacity_scheduler_properties, received_as_key_value_pair = self.getCapacitySchedulerProperties(services) + if capacity_scheduler_properties: + # Get all leaf queues. + leafQueueNames = self.getAllYarnLeafQueues(capacity_scheduler_properties) + Logger.info("YARN leaf Queues = {0}".format(leafQueueNames)) + if len(leafQueueNames) == 0: + raise Fail("Queue(s) couldn't be retrieved from capacity-scheduler.") + + # Check if it's 1st invocation after enabling Hive Server Interactive (config: enable_hive_interactive). + changed_configs_has_enable_hive_int = self.are_config_props_in_changed_configs(services, "hive-interactive-env", + set(['enable_hive_interactive']), False) + llap_named_queue_selected_in_curr_invocation = None + if changed_configs_has_enable_hive_int \ + and services['configurations']['hive-interactive-env']['properties']['enable_hive_interactive']: + if (len(leafQueueNames) == 1 or (len(leafQueueNames) == 2 and llap_queue_name in leafQueueNames)): + llap_named_queue_selected_in_curr_invocation = True + putHiveInteractiveSiteProperty('hive.llap.daemon.queue.name', llap_queue_name) + putHiveInteractiveSiteProperty('hive.server2.tez.default.queues', llap_queue_name) + Logger.info("'hive.llap.daemon.queue.name' and 'hive.server2.tez.default.queues' values set as : {0}".format(llap_queue_name)) + else: + first_leaf_queue = list(leafQueueNames)[0] # 1st invocation, pick the 1st leaf queue and set it as selected. + putHiveInteractiveSiteProperty('hive.llap.daemon.queue.name', first_leaf_queue) + putHiveInteractiveSiteProperty('hive.server2.tez.default.queues', first_leaf_queue) + llap_named_queue_selected_in_curr_invocation = False + Logger.info("'hive.llap.daemon.queue.name' and 'hive.server2.tez.default.queues' values set as : {0}".format(first_leaf_queue)) + Logger.info("llap_named_queue_selected_in_curr_invocation = {0}".format(llap_named_queue_selected_in_curr_invocation)) + + if (len(leafQueueNames) == 2 and (llap_daemon_selected_queue_name != None and llap_daemon_selected_queue_name == llap_queue_name) or \ + llap_named_queue_selected_in_curr_invocation) or \ + (len(leafQueueNames) == 1 and llap_daemon_selected_queue_name == 'default' and llap_named_queue_selected_in_curr_invocation): + putHiveInteractiveEnvPropertyAttribute("num_llap_nodes", "visible", "true") + Logger.info("Selected YARN queue for LLAP is : '{0}'. Current YARN queues : {1}. Setting 'Number of LLAP nodes' " + "slider visibility to 'True'".format(llap_queue_name, list(leafQueueNames))) + selected_queue_is_ambari_managed_llap = True else: - first_leaf_queue = list(leafQueueNames)[0] # 1st invocation, pick the 1st leaf queue and set it as selected. - putHiveInteractiveSiteProperty('hive.llap.daemon.queue.name', first_leaf_queue) - putHiveInteractiveSiteProperty('hive.server2.tez.default.queues', first_leaf_queue) - llap_named_queue_selected_in_curr_invocation = False - - if (len(leafQueueNames) == 2 and (llap_daemon_selected_queue_name and llap_daemon_selected_queue_name == llap_queue_name) or - llap_named_queue_selected_in_curr_invocation) or \ - (len(leafQueueNames) == 1 and llap_daemon_selected_queue_name == 'default' and llap_named_queue_selected_in_curr_invocation): - putHiveInteractiveEnvPropertyAttribute("num_llap_nodes", "visible", "true") - selected_queue_is_ambari_managed_llap = True + putHiveInteractiveEnvPropertyAttribute("num_llap_nodes", "visible", "false") + Logger.info("Selected YARN queue for LLAP is : '{0}'. Current YARN queues : {1}. Setting 'Number of LLAP nodes' " + "visibility to 'False'.".format(llap_daemon_selected_queue_name, list(leafQueueNames))) + selected_queue_is_ambari_managed_llap = False + + if not llap_named_queue_selected_in_curr_invocation: # We would be creating the 'llap' queue later. Thus, cap-sched doesn't have + # state information pertaining to 'llap' queue. + # Check: State of the selected queue should not be STOPPED. + if llap_daemon_selected_queue_name: + llap_selected_queue_state = self.__getQueueStateFromCapacityScheduler(capacity_scheduler_properties, llap_daemon_selected_queue_name) + if llap_selected_queue_state == None or llap_selected_queue_state == "STOPPED": + raise Fail("Selected LLAP app queue '{0}' current state is : '{1}'. Setting LLAP configs to default " + "values.".format(llap_daemon_selected_queue_name, llap_selected_queue_state)) + else: + raise Fail("Retrieved LLAP app queue name is : '{0}'. Setting LLAP configs to default values." + .format(llap_daemon_selected_queue_name)) else: - putHiveInteractiveEnvPropertyAttribute("num_llap_nodes", "visible", "false") - selected_queue_is_ambari_managed_llap = False - - if not llap_named_queue_selected_in_curr_invocation: # We would be creating the 'llap' queue later. Thus, cap-sched doesn't have - # state information pertaining to 'llap' queue. - # Check: State of the selected queue should not be STOPPED. - if llap_daemon_selected_queue_name: - llap_selected_queue_state = self.__getQueueStateFromCapacityScheduler(capacity_scheduler_properties, llap_daemon_selected_queue_name) - if llap_selected_queue_state is None or llap_selected_queue_state == "STOPPED": - Logger.error("Selected LLAP app queue '{0}' current state is : '{1}'. Setting LLAP configs to default " - "values.".format(llap_daemon_selected_queue_name, llap_selected_queue_state)) - self.recommendDefaultLlapConfiguration(configurations, services, hosts) - return - else: - Logger.error("Retrieved LLAP app queue name is : '{0}'. Setting LLAP configs to default values." - .format(llap_daemon_selected_queue_name)) - self.recommendDefaultLlapConfiguration(configurations, services, hosts) - return - else: - Logger.error("Couldn't retrieve 'capacity-scheduler' properties while doing YARN queue adjustment for Hive Server Interactive." - " Not calculating LLAP configs.") - return - - changed_configs_in_hive_int_env = None - llap_concurrency_in_changed_configs = None - llap_daemon_queue_in_changed_configs = None - # Calculations are triggered only if there is change in any one of the following props : - # 'num_llap_nodes', 'enable_hive_interactive', 'hive.server2.tez.sessions.per.default.queue' - # or 'hive.llap.daemon.queue.name' has change in value selection. - # OR - # services['changed-configurations'] is empty implying that this is the Blueprint call. (1st invocation) - if 'changed-configurations' in services.keys(): - config_names_to_be_checked = set(['num_llap_nodes', 'enable_hive_interactive']) - changed_configs_in_hive_int_env = self.isConfigPropertiesChanged(services, "hive-interactive-env", config_names_to_be_checked, False) - - # Determine if there is change detected in "hive-interactive-site's" configs based on which we calculate llap configs. - llap_concurrency_in_changed_configs = self.isConfigPropertiesChanged(services, self.HIVE_INTERACTIVE_SITE, ['hive.server2.tez.sessions.per.default.queue'], False) - llap_daemon_queue_in_changed_configs = self.isConfigPropertiesChanged(services, self.HIVE_INTERACTIVE_SITE, ['hive.llap.daemon.queue.name'], False) - - if not changed_configs_in_hive_int_env and not llap_concurrency_in_changed_configs and \ - not llap_daemon_queue_in_changed_configs and services["changed-configurations"]: - - return - - node_manager_host_list = self.getHostsForComponent(services, "YARN", "NODEMANAGER") - node_manager_cnt = len(node_manager_host_list) - yarn_nm_mem_in_mb = self.get_yarn_nm_mem_in_mb(services, configurations) - total_cluster_capacity = node_manager_cnt * yarn_nm_mem_in_mb - yarn_min_container_size = float(self.get_yarn_min_container_size(services, configurations)) - tez_am_container_size = self.calculate_tez_am_container_size(services, long(total_cluster_capacity)) - normalized_tez_am_container_size = self._normalizeUp(tez_am_container_size, yarn_min_container_size) - - if yarn_site and "yarn.nodemanager.resource.cpu-vcores" in yarn_site: - cpu_per_nm_host = float(yarn_site["yarn.nodemanager.resource.cpu-vcores"]) - else: - self.recommendDefaultLlapConfiguration(configurations, services, hosts) - return - - # Calculate the available memory for LLAP app - yarn_nm_mem_in_mb_normalized = self._normalizeDown(yarn_nm_mem_in_mb, yarn_min_container_size) - mem_per_thread_for_llap = self.calculate_mem_per_thread_for_llap(services, yarn_nm_mem_in_mb_normalized, cpu_per_nm_host) - - if mem_per_thread_for_llap is None: - self.recommendDefaultLlapConfiguration(configurations, services, hosts) - return - - mem_per_thread_for_llap = float(mem_per_thread_for_llap) - - if not selected_queue_is_ambari_managed_llap: - llap_daemon_selected_queue_cap = self.__getSelectedQueueTotalCap(capacity_scheduler_properties, llap_daemon_selected_queue_name, total_cluster_capacity) - - if llap_daemon_selected_queue_cap <= 0: - Logger.warning("'{0}' queue capacity percentage retrieved = {1}. Expected > 0.".format( - llap_daemon_selected_queue_name, llap_daemon_selected_queue_cap)) - self.recommendDefaultLlapConfiguration(configurations, services, hosts) + Logger.error("Couldn't retrieve 'capacity-scheduler' properties while doing YARN queue adjustment for Hive Server Interactive." + " Not calculating LLAP configs.") return - total_llap_mem_normalized = self._normalizeDown(llap_daemon_selected_queue_cap, yarn_min_container_size) - num_llap_nodes_requested = math.floor(total_llap_mem_normalized / yarn_nm_mem_in_mb_normalized) - queue_am_fraction_perc = float(self.__getQueueAmFractionFromCapacityScheduler(capacity_scheduler_properties, llap_daemon_selected_queue_name)) - hive_tez_am_cap_available = queue_am_fraction_perc * total_llap_mem_normalized - else: # Ambari managed 'llap' named queue at root level. - num_llap_nodes_requested = self.get_num_llap_nodes(services, configurations) #Input - total_llap_mem = num_llap_nodes_requested * yarn_nm_mem_in_mb_normalized - total_llap_mem_normalized = float(self._normalizeDown(total_llap_mem, yarn_min_container_size)) - - # What percent is 'total_llap_mem' of 'total_cluster_capacity' ? - llap_named_queue_cap_fraction = math.ceil(total_llap_mem_normalized / total_cluster_capacity * 100) - - if llap_named_queue_cap_fraction > 100: - Logger.warning("Calculated '{0}' queue size = {1}. Cannot be > 100.".format(llap_queue_name, llap_named_queue_cap_fraction)) - self.recommendDefaultLlapConfiguration(configurations, services, hosts) + changed_configs_in_hive_int_env = None + llap_concurrency_in_changed_configs = None + llap_daemon_queue_in_changed_configs = None + # Calculations are triggered only if there is change in any one of the following props : + # 'num_llap_nodes', 'enable_hive_interactive', 'hive.server2.tez.sessions.per.default.queue' + # or 'hive.llap.daemon.queue.name' has change in value selection. + # OR + # services['changed-configurations'] is empty implying that this is the Blueprint call. (1st invocation) + if 'changed-configurations' in services.keys(): + config_names_to_be_checked = set(['num_llap_nodes', 'enable_hive_interactive']) + changed_configs_in_hive_int_env = self.are_config_props_in_changed_configs(services, "hive-interactive-env", + config_names_to_be_checked, False) + + # Determine if there is change detected in "hive-interactive-site's" configs based on which we calculate llap configs. + llap_concurrency_in_changed_configs = self.are_config_props_in_changed_configs(services, "hive-interactive-site", + set(['hive.server2.tez.sessions.per.default.queue']), False) + llap_daemon_queue_in_changed_configs = self.are_config_props_in_changed_configs(services, "hive-interactive-site", + set(['hive.llap.daemon.queue.name']), False) + + if not changed_configs_in_hive_int_env and \ + not llap_concurrency_in_changed_configs and \ + not llap_daemon_queue_in_changed_configs and \ + services["changed-configurations"]: + Logger.info("LLAP parameters not modified. Not adjusting LLAP configs.") + Logger.info("Current 'changed-configuration' received is : {0}".format(services["changed-configurations"])) return - # Adjust capacity scheduler for the 'llap' named queue. - self.checkAndManageLlapQueue(services, configurations, hosts, llap_queue_name, llap_named_queue_cap_fraction) - hive_tez_am_cap_available = total_llap_mem_normalized - - # Common calculations now, irrespective of the queue selected. - - # Get calculated value for Slider AM container Size - slider_am_container_size = self._normalizeUp(self.calculate_slider_am_size(yarn_min_container_size), - yarn_min_container_size) - - llap_mem_for_tezAm_and_daemons = total_llap_mem_normalized - slider_am_container_size - - if llap_mem_for_tezAm_and_daemons < 2 * yarn_min_container_size: - Logger.warning("Not enough capacity available on the cluster to run LLAP") - self.recommendDefaultLlapConfiguration(configurations, services, hosts) - return - - # Calculate llap concurrency (i.e. Number of Tez AM's) - max_executors_per_node = self.get_max_executors_per_node(yarn_nm_mem_in_mb_normalized, cpu_per_nm_host, mem_per_thread_for_llap) - - # Read 'hive.server2.tez.sessions.per.default.queue' prop if it's in changed-configs, else calculate it. - if not llap_concurrency_in_changed_configs: - if max_executors_per_node <= 0: - Logger.warning("Calculated 'max_executors_per_node' = {0}. Expected value >= 1.".format(max_executors_per_node)) - self.recommendDefaultLlapConfiguration(configurations, services, hosts) - return - - # Default 1 AM for every 20 executor threads. - # The second part of the min calculates based on mem required for DEFAULT_EXECUTOR_TO_AM_RATIO executors + 1 AM, - # making use of total memory. However, it's possible that total memory will not be used - and the numExecutors is - # instead limited by #CPUs. Use maxPerNode to factor this in. - llap_concurreny_limit = min(math.floor(max_executors_per_node * num_llap_nodes_requested / DEFAULT_EXECUTOR_TO_AM_RATIO), MAX_CONCURRENT_QUERIES) - llap_concurrency = min(llap_concurreny_limit, math.floor(llap_mem_for_tezAm_and_daemons / (DEFAULT_EXECUTOR_TO_AM_RATIO * mem_per_thread_for_llap + normalized_tez_am_container_size))) - if llap_concurrency == 0: - llap_concurrency = 1 - - if llap_concurrency * normalized_tez_am_container_size > hive_tez_am_cap_available: - llap_concurrency = math.floor(hive_tez_am_cap_available / normalized_tez_am_container_size) - - if llap_concurrency <= 0: - Logger.warning("Calculated 'LLAP Concurrent Queries' = {0}. Expected value >= 1.".format(llap_concurrency)) - self.recommendDefaultLlapConfiguration(configurations, services, hosts) - return - else: - # Read current value - if 'hive.server2.tez.sessions.per.default.queue' in hsi_site: - llap_concurrency = long(hsi_site['hive.server2.tez.sessions.per.default.queue']) - if llap_concurrency <= 0: - Logger.warning("'hive.server2.tez.sessions.per.default.queue' current value : {0}. Expected value : >= 1".format(llap_concurrency)) - self.recommendDefaultLlapConfiguration(configurations, services, hosts) - return + Logger.info("\nPerforming LLAP config calculations ......") + node_manager_host_list = self.get_node_manager_hosts(services, hosts) + node_manager_cnt = len(node_manager_host_list) + yarn_nm_mem_in_mb = self.get_yarn_nm_mem_in_mb(services, configurations) + total_cluster_capacity = node_manager_cnt * yarn_nm_mem_in_mb + Logger.info("Calculated total_cluster_capacity : {0}, using following : node_manager_cnt : {1}, " + "yarn_nm_mem_in_mb : {2}".format(total_cluster_capacity, node_manager_cnt, yarn_nm_mem_in_mb)) + + yarn_min_container_size = self.get_yarn_min_container_size(services, configurations) + + tez_am_container_size = self.calculate_tez_am_container_size(services, long(total_cluster_capacity)) + normalized_tez_am_container_size = self._normalizeUp(tez_am_container_size, yarn_min_container_size) + cpu_per_nm_host = self.get_cpu_per_nm_host(services) + Logger.info("Calculated normalized_tez_am_container_size : {0}, using following : tez_am_container_size : {1}, " + "total_cluster_capacity : {2}".format(normalized_tez_am_container_size, tez_am_container_size, + total_cluster_capacity)) + + # Calculate the available memory for LLAP app + yarn_nm_mem_in_mb_normalized = self._normalizeDown(yarn_nm_mem_in_mb, yarn_min_container_size) + mem_per_thread_for_llap = self.calculate_mem_per_thread_for_llap(services, yarn_nm_mem_in_mb_normalized, cpu_per_nm_host) + Logger.info("Calculated mem_per_thread_for_llap : {0}, using following: yarn_nm_mem_in_mb_normalized : {1}, " + "cpu_per_nm_host : {2}".format(mem_per_thread_for_llap, yarn_nm_mem_in_mb_normalized, cpu_per_nm_host)) + + Logger.info("selected_queue_is_ambari_managed_llap = {0}".format(selected_queue_is_ambari_managed_llap)) + if not selected_queue_is_ambari_managed_llap: + llap_daemon_selected_queue_cap = self.__getSelectedQueueTotalCap(capacity_scheduler_properties, llap_daemon_selected_queue_name, total_cluster_capacity) + assert(llap_daemon_selected_queue_cap > 0, "'{0}' queue capacity percentage retrieved = {1}. " + "Expected > 0.".format(llap_daemon_selected_queue_name, llap_daemon_selected_queue_cap)) + total_llap_mem_normalized = self._normalizeDown(llap_daemon_selected_queue_cap, yarn_min_container_size) + Logger.info("Calculated '{0}' queue available capacity : {1}, using following: llap_daemon_selected_queue_cap : {2}, " + "yarn_min_container_size : {3}".format(llap_daemon_selected_queue_name, total_llap_mem_normalized, + llap_daemon_selected_queue_cap, yarn_min_container_size)) + num_llap_nodes_requested = math.floor(total_llap_mem_normalized / yarn_nm_mem_in_mb_normalized) + Logger.info("Calculated 'num_llap_nodes_requested' : {0}, using following: total_llap_mem_normalized : {1}, " + "yarn_nm_mem_in_mb_normalized : {2}".format(num_llap_nodes_requested, total_llap_mem_normalized, yarn_nm_mem_in_mb_normalized)) + queue_am_fraction_perc = float(self.__getQueueAmFractionFromCapacityScheduler(capacity_scheduler_properties, llap_daemon_selected_queue_name)) + hive_tez_am_cap_available = queue_am_fraction_perc * total_llap_mem_normalized + Logger.info("Calculated 'hive_tez_am_cap_available' : {0}, using following: queue_am_fraction_perc : {1}, " + "total_llap_mem_normalized : {2}".format(hive_tez_am_cap_available, queue_am_fraction_perc, total_llap_mem_normalized)) + else: # Ambari managed 'llap' named queue at root level. + num_llap_nodes_requested = self.get_num_llap_nodes(services, configurations) #Input + total_llap_mem = num_llap_nodes_requested * yarn_nm_mem_in_mb_normalized + Logger.info("Calculated 'total_llap_mem' : {0}, using following: num_llap_nodes_requested : {1}, " + "yarn_nm_mem_in_mb_normalized : {2}".format(total_llap_mem, num_llap_nodes_requested, yarn_nm_mem_in_mb_normalized)) + total_llap_mem_normalized = float(self._normalizeDown(total_llap_mem, yarn_min_container_size)) + Logger.info("Calculated 'total_llap_mem_normalized' : {0}, using following: total_llap_mem : {1}, " + "yarn_min_container_size : {2}".format(total_llap_mem_normalized, total_llap_mem, yarn_min_container_size)) + # What percent is 'total_llap_mem' of 'total_cluster_capacity' ? + llap_named_queue_cap_fraction = math.ceil(total_llap_mem_normalized / total_cluster_capacity * 100) + assert(llap_named_queue_cap_fraction <= 100), "Calculated '{0}' queue size = {1}. Cannot be > 100.".format(llap_queue_name, llap_named_queue_cap_fraction) + Logger.info("Calculated '{0}' queue capacity percent = {1}.".format(llap_queue_name, llap_named_queue_cap_fraction)) + # Adjust capacity scheduler for the 'llap' named queue. + self.checkAndManageLlapQueue(services, configurations, hosts, llap_queue_name, llap_named_queue_cap_fraction) + hive_tez_am_cap_available = total_llap_mem_normalized + Logger.info("hive_tez_am_cap_available : {0}".format(hive_tez_am_cap_available)) + + #Common calculations now, irrespective of the queue selected. + + # Get calculated value for Slider AM container Size + slider_am_container_size = self._normalizeUp(self.calculate_slider_am_size(yarn_min_container_size), + yarn_min_container_size) + Logger.info("Calculated 'slider_am_container_size' : {0}, using following: yarn_min_container_size : " + "{1}".format(slider_am_container_size, yarn_min_container_size)) + + llap_mem_for_tezAm_and_daemons = total_llap_mem_normalized - slider_am_container_size + assert (llap_mem_for_tezAm_and_daemons >= 2 * yarn_min_container_size), "Not enough capacity available on the cluster to run LLAP" + Logger.info("Calculated 'llap_mem_for_tezAm_and_daemons' : {0}, using following : total_llap_mem_normalized : {1}, " + "slider_am_container_size : {2}".format(llap_mem_for_tezAm_and_daemons, total_llap_mem_normalized, slider_am_container_size)) + + + # Calculate llap concurrency (i.e. Number of Tez AM's) + max_executors_per_node = self.get_max_executors_per_node(yarn_nm_mem_in_mb_normalized, cpu_per_nm_host, mem_per_thread_for_llap) + + # Read 'hive.server2.tez.sessions.per.default.queue' prop if it's in changed-configs, else calculate it. + if not llap_concurrency_in_changed_configs: + assert(max_executors_per_node > 0), "Calculated 'max_executors_per_node' = {1}. Expected value >= 1.".format(max_executors_per_node) + Logger.info("Calculated 'max_executors_per_node' : {0}, using following: yarn_nm_mem_in_mb_normalized : {1}, cpu_per_nm_host : {2}, " + "mem_per_thread_for_llap: {3}".format(max_executors_per_node, yarn_nm_mem_in_mb_normalized, cpu_per_nm_host, mem_per_thread_for_llap)) + # Default 1 AM for every 20 executor threads. + # The second part of the min calculates based on mem required for DEFAULT_EXECUTOR_TO_AM_RATIO executors + 1 AM, + # making use of total memory. However, it's possible that total memory will not be used - and the numExecutors is + # instead limited by #CPUs. Use maxPerNode to factor this in. + llap_concurreny_limit = min(math.floor(max_executors_per_node * num_llap_nodes_requested / DEFAULT_EXECUTOR_TO_AM_RATIO), MAX_CONCURRENT_QUERIES) + Logger.info("Calculated 'llap_concurreny_limit' : {0}, using following : max_executors_per_node : {1}, num_llap_nodes_requested : {2}, DEFAULT_EXECUTOR_TO_AM_RATIO " + ": {3}, MAX_CONCURRENT_QUERIES : {4}".format(llap_concurreny_limit, max_executors_per_node, num_llap_nodes_requested, DEFAULT_EXECUTOR_TO_AM_RATIO, MAX_CONCURRENT_QUERIES)) + llap_concurrency = min(llap_concurreny_limit, math.floor(llap_mem_for_tezAm_and_daemons / (DEFAULT_EXECUTOR_TO_AM_RATIO * mem_per_thread_for_llap + normalized_tez_am_container_size))) + Logger.info("Calculated 'llap_concurrency' : {0}, using following : llap_concurreny_limit : {1}, llap_mem_for_tezAm_and_daemons : " + "{2}, DEFAULT_EXECUTOR_TO_AM_RATIO : {3}, mem_per_thread_for_llap : {4}, normalized_tez_am_container_size : " + "{5}".format(llap_concurrency, llap_concurreny_limit, llap_mem_for_tezAm_and_daemons, DEFAULT_EXECUTOR_TO_AM_RATIO, + mem_per_thread_for_llap, normalized_tez_am_container_size)) + if (llap_concurrency == 0): + llap_concurrency = 1 + Logger.info("Adjusted 'llap_concurrency' : 1.") + + if (llap_concurrency * normalized_tez_am_container_size > hive_tez_am_cap_available): + llap_concurrency = math.floor(hive_tez_am_cap_available / normalized_tez_am_container_size) + assert(llap_concurrency > 0), "Calculated 'LLAP Concurrent Queries' = {0}. Expected value >= 1.".format(llap_concurrency) + Logger.info("Adjusted 'llap_concurrency' : {0}, using following: hive_tez_am_cap_available : {1}, normalized_tez_am_container_size: " + "{2}".format(llap_concurrency, hive_tez_am_cap_available, normalized_tez_am_container_size)) else: - llap_concurrency = 1 - Logger.warning("Couldn't retrieve Hive Server interactive's 'hive.server2.tez.sessions.per.default.queue' config. Setting default value 1.") - self.recommendDefaultLlapConfiguration(configurations, services, hosts) - return - - # Calculate 'Max LLAP Consurrency', irrespective of whether 'llap_concurrency' was read or calculated. - max_llap_concurreny_limit = min(math.floor(max_executors_per_node * num_llap_nodes_requested / MIN_EXECUTOR_TO_AM_RATIO), MAX_CONCURRENT_QUERIES) - max_llap_concurreny = min(max_llap_concurreny_limit, math.floor(llap_mem_for_tezAm_and_daemons / (MIN_EXECUTOR_TO_AM_RATIO * - mem_per_thread_for_llap + normalized_tez_am_container_size))) - if max_llap_concurreny == 0: - max_llap_concurreny = 1 - - if (max_llap_concurreny * normalized_tez_am_container_size) > hive_tez_am_cap_available: - max_llap_concurreny = math.floor(hive_tez_am_cap_available / normalized_tez_am_container_size) - if max_llap_concurreny <= 0: - Logger.warning("Calculated 'Max. LLAP Concurrent Queries' = {0}. Expected value > 1".format(max_llap_concurreny)) - self.recommendDefaultLlapConfiguration(configurations, services, hosts) - return - - # Calculate value for 'num_llap_nodes', an across cluster config. - tez_am_memory_required = llap_concurrency * normalized_tez_am_container_size - llap_mem_daemon_size = llap_mem_for_tezAm_and_daemons - tez_am_memory_required - - if llap_mem_daemon_size < yarn_min_container_size: - Logger.warning("Calculated 'LLAP Daemon Size = {0}'. Expected >= 'YARN Minimum Container Size' ({1})'".format( - llap_mem_daemon_size, yarn_min_container_size)) - self.recommendDefaultLlapConfiguration(configurations, services, hosts) - return + # Read current value + if 'hive.server2.tez.sessions.per.default.queue' in services['configurations'][self.HIVE_INTERACTIVE_SITE][ + 'properties']: + llap_concurrency = long(services['configurations'][self.HIVE_INTERACTIVE_SITE]['properties'][ + 'hive.server2.tez.sessions.per.default.queue']) + assert (llap_concurrency >= 1), "'hive.server2.tez.sessions.per.default.queue' current value : {0}. Expected value : >= 1" \ + .format(llap_concurrency) + Logger.info("Read 'llap_concurrency' : {0}".format(llap_concurrency )) + else: + raise Fail( + "Couldn't retrieve Hive Server interactive's 'hive.server2.tez.sessions.per.default.queue' config.") + + # Calculate 'Max LLAP Consurrency', irrespective of whether 'llap_concurrency' was read or calculated. + max_llap_concurreny_limit = min(math.floor(max_executors_per_node * num_llap_nodes_requested / MIN_EXECUTOR_TO_AM_RATIO), MAX_CONCURRENT_QUERIES) + Logger.info("Calculated 'max_llap_concurreny_limit' : {0}, using following : max_executors_per_node : {1}, num_llap_nodes_requested " + ": {2}, MIN_EXECUTOR_TO_AM_RATIO : {3}, MAX_CONCURRENT_QUERIES : {4}".format(max_llap_concurreny_limit, max_executors_per_node, + num_llap_nodes_requested, MIN_EXECUTOR_TO_AM_RATIO, + MAX_CONCURRENT_QUERIES)) + max_llap_concurreny = min(max_llap_concurreny_limit, math.floor(llap_mem_for_tezAm_and_daemons / (MIN_EXECUTOR_TO_AM_RATIO * + mem_per_thread_for_llap + normalized_tez_am_container_size))) + Logger.info("Calculated 'max_llap_concurreny' : {0}, using following : max_llap_concurreny_limit : {1}, llap_mem_for_tezAm_and_daemons : " + "{2}, MIN_EXECUTOR_TO_AM_RATIO : {3}, mem_per_thread_for_llap : {4}, normalized_tez_am_container_size : " + "{5}".format(max_llap_concurreny, max_llap_concurreny_limit, llap_mem_for_tezAm_and_daemons, MIN_EXECUTOR_TO_AM_RATIO, + mem_per_thread_for_llap, normalized_tez_am_container_size)) + if (max_llap_concurreny == 0): + max_llap_concurreny = 1 + Logger.info("Adjusted 'max_llap_concurreny' : 1.") + + if (max_llap_concurreny * normalized_tez_am_container_size > hive_tez_am_cap_available): + max_llap_concurreny = math.floor(hive_tez_am_cap_available / normalized_tez_am_container_size) + assert(max_llap_concurreny > 0), "Calculated 'Max. LLAP Concurrent Queries' = {0}. Expected value > 1".format(max_llap_concurreny) + Logger.info("Adjusted 'max_llap_concurreny' : {0}, using following: hive_tez_am_cap_available : {1}, normalized_tez_am_container_size: " + "{2}".format(max_llap_concurreny, hive_tez_am_cap_available, normalized_tez_am_container_size)) + + + # Calculate value for 'num_llap_nodes', an across cluster config. + tez_am_memory_required = llap_concurrency * normalized_tez_am_container_size + Logger.info("Calculated 'tez_am_memory_required' : {0}, using following : llap_concurrency : {1}, normalized_tez_am_container_size : " + "{2}".format(tez_am_memory_required, llap_concurrency, normalized_tez_am_container_size)) + llap_mem_daemon_size = llap_mem_for_tezAm_and_daemons - tez_am_memory_required + assert (llap_mem_daemon_size >= yarn_min_container_size), "Calculated 'LLAP Daemon Size = {0}'. Expected >= 'YARN Minimum Container " \ + "Size' ({1})'".format(llap_mem_daemon_size, yarn_min_container_size) + assert(llap_mem_daemon_size >= mem_per_thread_for_llap or llap_mem_daemon_size >= yarn_min_container_size), "Not enough memory available for executors." + Logger.info("Calculated 'llap_mem_daemon_size' : {0}, using following : llap_mem_for_tezAm_and_daemons : {1}, tez_am_memory_required : " + "{2}".format(llap_mem_daemon_size, llap_mem_for_tezAm_and_daemons, tez_am_memory_required)) + + llap_daemon_mem_per_node = self._normalizeDown(llap_mem_daemon_size / num_llap_nodes_requested, yarn_min_container_size) + Logger.info("Calculated 'llap_daemon_mem_per_node' : {0}, using following : llap_mem_daemon_size : {1}, num_llap_nodes_requested : {2}, " + "yarn_min_container_size: {3}".format(llap_daemon_mem_per_node, llap_mem_daemon_size, num_llap_nodes_requested, yarn_min_container_size)) + if (llap_daemon_mem_per_node == 0): + # Small cluster. No capacity left on a node after running AMs. + llap_daemon_mem_per_node = mem_per_thread_for_llap + num_llap_nodes = math.floor(llap_mem_daemon_size / mem_per_thread_for_llap) + Logger.info("'llap_daemon_mem_per_node' : 0, adjusted 'llap_daemon_mem_per_node' : {0}, 'num_llap_nodes' : {1}, using following: llap_mem_daemon_size : {2}, " + "mem_per_thread_for_llap : {3}".format(llap_daemon_mem_per_node, num_llap_nodes, llap_mem_daemon_size, mem_per_thread_for_llap)) + elif (llap_daemon_mem_per_node < mem_per_thread_for_llap): + # Previously computed value of memory per thread may be too high. Cut the number of nodes. (Alternately reduce memory per node) + llap_daemon_mem_per_node = mem_per_thread_for_llap + num_llap_nodes = math.floor(llap_mem_daemon_size / mem_per_thread_for_llap) + Logger.info("'llap_daemon_mem_per_node'({0}) < mem_per_thread_for_llap({1}), adjusted 'llap_daemon_mem_per_node' " + ": {2}".format(llap_daemon_mem_per_node, mem_per_thread_for_llap, llap_daemon_mem_per_node)) + else: + # All good. We have a proper value for memoryPerNode. + num_llap_nodes = num_llap_nodes_requested + Logger.info("num_llap_nodes : {0}".format(num_llap_nodes)) + + num_executors_per_node_max = self.get_max_executors_per_node(yarn_nm_mem_in_mb_normalized, cpu_per_nm_host, mem_per_thread_for_llap) + assert(num_executors_per_node_max >= 1), "Calculated 'Max. Executors per Node' = {0}. Expected values >= 1.".format(num_executors_per_node_max) + Logger.info("Calculated 'num_executors_per_node_max' : {0}, using following : yarn_nm_mem_in_mb_normalized : {1}, cpu_per_nm_host : {2}, " + "mem_per_thread_for_llap: {3}".format(num_executors_per_node_max, yarn_nm_mem_in_mb_normalized, cpu_per_nm_host, mem_per_thread_for_llap)) + + # NumExecutorsPerNode is not necessarily max - since some capacity would have been reserved for AMs, if this value were based on mem. + num_executors_per_node = min(math.floor(llap_daemon_mem_per_node / mem_per_thread_for_llap), num_executors_per_node_max) + assert(num_executors_per_node > 0), "Calculated 'Number of Executors Per Node' = {0}. Expected value >= 1".format(num_executors_per_node) + Logger.info("Calculated 'num_executors_per_node' : {0}, using following : llap_daemon_mem_per_node : {1}, num_executors_per_node_max : {2}, " + "mem_per_thread_for_llap: {3}".format(num_executors_per_node, llap_daemon_mem_per_node, num_executors_per_node_max, mem_per_thread_for_llap)) + + # Now figure out how much of the memory will be used by the executors, and how much will be used by the cache. + total_mem_for_executors_per_node = num_executors_per_node * mem_per_thread_for_llap + cache_mem_per_node = llap_daemon_mem_per_node - total_mem_for_executors_per_node + + tez_runtime_io_sort_mb = ((long)((0.8 * mem_per_thread_for_llap) / 3)) + tez_runtime_unordered_output_buffer_size = long(0.8 * 0.075 * mem_per_thread_for_llap) + # 'hive_auto_convert_join_noconditionaltask_size' value is in bytes. Thus, multiplying it by 1048576. + hive_auto_convert_join_noconditionaltask_size = ((long)((0.8 * mem_per_thread_for_llap) / 3)) * MB_TO_BYTES + + # Calculate value for prop 'llap_heap_size' + llap_xmx = max(total_mem_for_executors_per_node * 0.8, total_mem_for_executors_per_node - self.get_llap_headroom_space(services, configurations)) + Logger.info("Calculated llap_app_heap_size : {0}, using following : total_mem_for_executors : {1}".format(llap_xmx, total_mem_for_executors_per_node)) + + # Calculate 'hive_heapsize' for Hive2/HiveServer2 (HSI) + hive_server_interactive_heapsize = None + hive_server_interactive_hosts = self.getHostsWithComponent("HIVE", "HIVE_SERVER_INTERACTIVE", services, hosts) + if hive_server_interactive_hosts is None: + # If its None, read the base service YARN's NODEMANAGER node memory, as are host are considered homogenous. + hive_server_interactive_hosts = self.getHostsWithComponent("YARN", "NODEMANAGER", services, hosts) + if hive_server_interactive_hosts is not None and len(hive_server_interactive_hosts) > 0: + host_mem = long(hive_server_interactive_hosts[0]["Hosts"]["total_mem"]) + hive_server_interactive_heapsize = min(max(2048.0, 400.0*llap_concurrency), 3.0/8 * host_mem) + Logger.info("Calculated 'hive_server_interactive_heapsize' : {0}, using following : llap_concurrency : {1}, host_mem : " + "{2}".format(hive_server_interactive_heapsize, llap_concurrency, host_mem)) + + + Logger.info("Updating the calculations....") + + # Done with calculations, updating calculated configs. + + normalized_tez_am_container_size = long(normalized_tez_am_container_size) + putTezInteractiveSiteProperty('tez.am.resource.memory.mb', normalized_tez_am_container_size) + Logger.info("'Tez for Hive2' config 'tez.am.resource.memory.mb' updated. Current: {0}".format(normalized_tez_am_container_size)) + + if not llap_concurrency_in_changed_configs: + min_llap_concurrency = 1 + putHiveInteractiveSiteProperty('hive.server2.tez.sessions.per.default.queue', llap_concurrency) + putHiveInteractiveSitePropertyAttribute('hive.server2.tez.sessions.per.default.queue', "minimum", + min_llap_concurrency) + + Logger.info("Hive2 config 'hive.server2.tez.sessions.per.default.queue' updated. Min : {0}, Current: {1}" \ + .format(min_llap_concurrency, llap_concurrency)) + + putHiveInteractiveSitePropertyAttribute('hive.server2.tez.sessions.per.default.queue', "maximum", max_llap_concurreny) + Logger.info("Hive2 config 'hive.server2.tez.sessions.per.default.queue' updated. Max : {0}".format(max_llap_concurreny)) + + num_llap_nodes = long(num_llap_nodes) + putHiveInteractiveEnvPropertyAttribute('num_llap_nodes', "minimum", 1) + putHiveInteractiveEnvPropertyAttribute('num_llap_nodes', "maximum", node_manager_cnt) + if (num_llap_nodes != num_llap_nodes_requested): + Logger.info("User requested num_llap_nodes : {0}, but used/adjusted value for calculations is : {1}".format(num_llap_nodes_requested, num_llap_nodes)) + else: + Logger.info("Used num_llap_nodes for calculations : {0}".format(num_llap_nodes_requested)) + Logger.info("LLAP config 'num_llap_nodes' updated. Min: 1, Max: {0}".format(node_manager_cnt)) - if llap_mem_daemon_size < mem_per_thread_for_llap or llap_mem_daemon_size < yarn_min_container_size: - Logger.warning("Not enough memory available for executors.") - self.recommendDefaultLlapConfiguration(configurations, services, hosts) - return + llap_container_size = long(llap_daemon_mem_per_node) + putHiveInteractiveSiteProperty('hive.llap.daemon.yarn.container.mb', llap_container_size) + Logger.info("LLAP config 'hive.llap.daemon.yarn.container.mb' updated. Current: {0}".format(llap_container_size)) - llap_daemon_mem_per_node = self._normalizeDown(llap_mem_daemon_size / num_llap_nodes_requested, yarn_min_container_size) - if llap_daemon_mem_per_node == 0: - # Small cluster. No capacity left on a node after running AMs. - llap_daemon_mem_per_node = mem_per_thread_for_llap - num_llap_nodes = math.floor(llap_mem_daemon_size / mem_per_thread_for_llap) - elif llap_daemon_mem_per_node < mem_per_thread_for_llap: - # Previously computed value of memory per thread may be too high. Cut the number of nodes. (Alternately reduce memory per node) - llap_daemon_mem_per_node = mem_per_thread_for_llap - num_llap_nodes = math.floor(llap_mem_daemon_size / mem_per_thread_for_llap) - else: - # All good. We have a proper value for memoryPerNode. - num_llap_nodes = num_llap_nodes_requested + # Set 'hive.tez.container.size' only if it is read as "SET_ON_FIRST_INVOCATION", implying initialization. + # Else, we don't (1). Override the previous calculated value or (2). User provided value. + if self.get_hive_tez_container_size(services) == self.CONFIG_VALUE_UINITIALIZED: + mem_per_thread_for_llap = long(mem_per_thread_for_llap) + putHiveInteractiveSiteProperty('hive.tez.container.size', mem_per_thread_for_llap) + Logger.info("LLAP config 'hive.tez.container.size' updated. Current: {0}".format(mem_per_thread_for_llap)) - num_executors_per_node_max = self.get_max_executors_per_node(yarn_nm_mem_in_mb_normalized, cpu_per_nm_host, mem_per_thread_for_llap) - if num_executors_per_node_max < 1: - Logger.warning("Calculated 'Max. Executors per Node' = {0}. Expected values >= 1.".format(num_executors_per_node_max)) - self.recommendDefaultLlapConfiguration(configurations, services, hosts) - return + putTezInteractiveSiteProperty('tez.runtime.io.sort.mb', tez_runtime_io_sort_mb) + if "tez-site" in services["configurations"] and "tez.runtime.sorter.class" in services["configurations"]["tez-site"]["properties"]: + if services["configurations"]["tez-site"]["properties"]["tez.runtime.sorter.class"] == "LEGACY": + putTezInteractiveSiteProperty("tez.runtime.io.sort.mb", "maximum", 1800) + Logger.info("'Tez for Hive2' config 'tez.runtime.io.sort.mb' updated. Current: {0}".format(tez_runtime_io_sort_mb)) - # NumExecutorsPerNode is not necessarily max - since some capacity would have been reserved for AMs, if this value were based on mem. - num_executors_per_node = min(math.floor(llap_daemon_mem_per_node / mem_per_thread_for_llap), num_executors_per_node_max) - if num_executors_per_node <= 0: - Logger.warning("Calculated 'Number of Executors Per Node' = {0}. Expected value >= 1".format(num_executors_per_node)) - self.recommendDefaultLlapConfiguration(configurations, services, hosts) - return + putTezInteractiveSiteProperty('tez.runtime.unordered.output.buffer.size-mb', tez_runtime_unordered_output_buffer_size) + Logger.info("'Tez for Hive2' config 'tez.runtime.unordered.output.buffer.size-mb' updated. Current: {0}".format(tez_runtime_unordered_output_buffer_size)) - # Now figure out how much of the memory will be used by the executors, and how much will be used by the cache. - total_mem_for_executors_per_node = num_executors_per_node * mem_per_thread_for_llap - cache_mem_per_node = llap_daemon_mem_per_node - total_mem_for_executors_per_node + putHiveInteractiveSiteProperty('hive.auto.convert.join.noconditionaltask.size', hive_auto_convert_join_noconditionaltask_size) + Logger.info("HIVE2 config 'hive.auto.convert.join.noconditionaltask.size' updated. Current: {0}".format(hive_auto_convert_join_noconditionaltask_size)) - tez_runtime_io_sort_mb = (long((0.8 * mem_per_thread_for_llap) / 3)) - tez_runtime_unordered_output_buffer_size = long(0.8 * 0.075 * mem_per_thread_for_llap) - # 'hive_auto_convert_join_noconditionaltask_size' value is in bytes. Thus, multiplying it by 1048576. - hive_auto_convert_join_noconditionaltask_size = (long((0.8 * mem_per_thread_for_llap) / 3)) * MB_TO_BYTES - # Calculate value for prop 'llap_heap_size' - llap_xmx = max(total_mem_for_executors_per_node * 0.8, total_mem_for_executors_per_node - self.get_llap_headroom_space(services, configurations)) + num_executors_per_node = long(num_executors_per_node) + putHiveInteractiveSiteProperty('hive.llap.daemon.num.executors', num_executors_per_node) + putHiveInteractiveSitePropertyAttribute('hive.llap.daemon.num.executors', "minimum", 1) + putHiveInteractiveSitePropertyAttribute('hive.llap.daemon.num.executors', "maximum", int(num_executors_per_node_max)) + Logger.info("LLAP config 'hive.llap.daemon.num.executors' updated. Current: {0}, Min: 1, " + "Max: {1}".format(num_executors_per_node, int(num_executors_per_node_max))) + # 'hive.llap.io.threadpool.size' config value is to be set same as value calculated for + # 'hive.llap.daemon.num.executors' at all times. + putHiveInteractiveSiteProperty('hive.llap.io.threadpool.size', num_executors_per_node) + Logger.info("LLAP config 'hive.llap.io.threadpool.size' updated. Current: {0}".format(num_executors_per_node)) - # Calculate 'hive_heapsize' for Hive2/HiveServer2 (HSI) - hive_server_interactive_heapsize = None - hive_server_interactive_hosts = self.getHostsWithComponent("HIVE", "HIVE_SERVER_INTERACTIVE", services, hosts) - if hive_server_interactive_hosts is None: - # If its None, read the base service YARN's NODEMANAGER node memory, as are host are considered homogenous. - hive_server_interactive_hosts = self.getHostsWithComponent("YARN", "NODEMANAGER", services, hosts) - if hive_server_interactive_hosts is not None and len(hive_server_interactive_hosts) > 0: - host_mem = long(hive_server_interactive_hosts[0]["Hosts"]["total_mem"]) - hive_server_interactive_heapsize = min(max(2048.0, 400.0*llap_concurrency), 3.0/8 * host_mem) + cache_mem_per_node = long(cache_mem_per_node) + putHiveInteractiveSiteProperty('hive.llap.io.memory.size', cache_mem_per_node) + Logger.info("LLAP config 'hive.llap.io.memory.size' updated. Current: {0}".format(cache_mem_per_node)) + llap_io_enabled = 'false' + if cache_mem_per_node >= 64: + llap_io_enabled = 'true' - # Done with calculations, updating calculated configs. + if hive_server_interactive_heapsize != None: + putHiveInteractiveEnvProperty("hive_heapsize", int(hive_server_interactive_heapsize)) + Logger.info("Hive2 config 'hive_heapsize' updated. Current : {0}".format(int(hive_server_interactive_heapsize))) - normalized_tez_am_container_size = long(normalized_tez_am_container_size) - putTezInteractiveSiteProperty('tez.am.resource.memory.mb', normalized_tez_am_container_size) + putHiveInteractiveSiteProperty('hive.llap.io.enabled', llap_io_enabled) + Logger.info("Hive2 config 'hive.llap.io.enabled' updated to '{0}' as part of " + "'hive.llap.io.memory.size' calculation.".format(llap_io_enabled)) - if not llap_concurrency_in_changed_configs: - min_llap_concurrency = 1 - putHiveInteractiveSiteProperty('hive.server2.tez.sessions.per.default.queue', llap_concurrency) - putHiveInteractiveSitePropertyAttribute('hive.server2.tez.sessions.per.default.queue', "minimum", - min_llap_concurrency) + llap_xmx = long(llap_xmx) + putHiveInteractiveEnvProperty('llap_heap_size', llap_xmx) + Logger.info("LLAP config 'llap_heap_size' updated. Current: {0}".format(llap_xmx)) - putHiveInteractiveSitePropertyAttribute('hive.server2.tez.sessions.per.default.queue', "maximum", max_llap_concurreny) + slider_am_container_size = long(slider_am_container_size) + putHiveInteractiveEnvProperty('slider_am_container_mb', slider_am_container_size) + Logger.info("LLAP config 'slider_am_container_mb' updated. Current: {0}".format(slider_am_container_size)) - num_llap_nodes = long(num_llap_nodes) - putHiveInteractiveEnvPropertyAttribute('num_llap_nodes', "minimum", 1) - putHiveInteractiveEnvPropertyAttribute('num_llap_nodes', "maximum", node_manager_cnt) + except Exception as e: + # Set default values, if caught an Exception. The 'llap queue capacity' is left untouched, as it can be increased, + # triggerring recalculation. + Logger.info(e.message+" Skipping calculating LLAP configs. Setting them to minimum values.") + traceback.print_exc() - llap_container_size = long(llap_daemon_mem_per_node) - putHiveInteractiveSiteProperty('hive.llap.daemon.yarn.container.mb', llap_container_size) + try: + yarn_min_container_size = long(self.get_yarn_min_container_size(services, configurations)) + slider_am_container_size = long(self.calculate_slider_am_size(yarn_min_container_size)) - # Set 'hive.tez.container.size' only if it is read as "SET_ON_FIRST_INVOCATION", implying initialization. - # Else, we don't (1). Override the previous calculated value or (2). User provided value. - if self.get_hive_tez_container_size(services) == self.CONFIG_VALUE_UINITIALIZED: - mem_per_thread_for_llap = long(mem_per_thread_for_llap) - putHiveInteractiveSiteProperty('hive.tez.container.size', mem_per_thread_for_llap) + node_manager_host_list = self.get_node_manager_hosts(services, hosts) + node_manager_cnt = len(node_manager_host_list) - putTezInteractiveSiteProperty('tez.runtime.io.sort.mb', tez_runtime_io_sort_mb) - if "tez-site" in services["configurations"] and "tez.runtime.sorter.class" in services["configurations"]["tez-site"]["properties"]: - if services["configurations"]["tez-site"]["properties"]["tez.runtime.sorter.class"] == "LEGACY": - putTezInteractiveSiteProperty("tez.runtime.io.sort.mb", "maximum", 1800) + putHiveInteractiveSiteProperty('hive.server2.tez.sessions.per.default.queue', 1) + putHiveInteractiveSitePropertyAttribute('hive.server2.tez.sessions.per.default.queue', "minimum", 1) + putHiveInteractiveSitePropertyAttribute('hive.server2.tez.sessions.per.default.queue', "maximum", 1) - putTezInteractiveSiteProperty('tez.runtime.unordered.output.buffer.size-mb', tez_runtime_unordered_output_buffer_size) - putHiveInteractiveSiteProperty('hive.auto.convert.join.noconditionaltask.size', hive_auto_convert_join_noconditionaltask_size) + putHiveInteractiveEnvProperty('num_llap_nodes', 0) + putHiveInteractiveEnvPropertyAttribute('num_llap_nodes', "minimum", 1) + putHiveInteractiveEnvPropertyAttribute('num_llap_nodes', "maximum", node_manager_cnt) - num_executors_per_node = long(num_executors_per_node) - putHiveInteractiveSiteProperty('hive.llap.daemon.num.executors', num_executors_per_node) - putHiveInteractiveSitePropertyAttribute('hive.llap.daemon.num.executors', "minimum", 1) - putHiveInteractiveSitePropertyAttribute('hive.llap.daemon.num.executors', "maximum", float(num_executors_per_node_max)) + putHiveInteractiveSiteProperty('hive.llap.daemon.yarn.container.mb', yarn_min_container_size) + putHiveInteractiveSitePropertyAttribute('hive.llap.daemon.yarn.container.mb', "minimum", yarn_min_container_size) - # 'hive.llap.io.threadpool.size' config value is to be set same as value calculated for - # 'hive.llap.daemon.num.executors' at all times. - cache_mem_per_node = long(cache_mem_per_node) + putHiveInteractiveSiteProperty('hive.llap.daemon.num.executors', 0) + putHiveInteractiveSitePropertyAttribute('hive.llap.daemon.num.executors', "minimum", 1) - putHiveInteractiveSiteProperty('hive.llap.io.threadpool.size', num_executors_per_node) - putHiveInteractiveSiteProperty('hive.llap.io.memory.size', cache_mem_per_node) + putHiveInteractiveSiteProperty('hive.llap.io.threadpool.size', 0) - if hive_server_interactive_heapsize is not None: - putHiveInteractiveEnvProperty("hive_heapsize", int(hive_server_interactive_heapsize)) + putHiveInteractiveSiteProperty('hive.llap.io.threadpool.size', 0) - llap_io_enabled = 'true' if long(cache_mem_per_node) >= 64 else 'false' - putHiveInteractiveSiteProperty('hive.llap.io.enabled', llap_io_enabled) + putHiveInteractiveSiteProperty('hive.llap.io.memory.size', 0) - putHiveInteractiveEnvProperty('llap_heap_size', long(llap_xmx)) - putHiveInteractiveEnvProperty('slider_am_container_mb', long(slider_am_container_size)) + putHiveInteractiveEnvProperty('llap_heap_size', 0) - def recommendDefaultLlapConfiguration(self, configurations, services, hosts): - putHiveInteractiveSiteProperty = self.putProperty(configurations, self.HIVE_INTERACTIVE_SITE, services) - putHiveInteractiveSitePropertyAttribute = self.putPropertyAttribute(configurations, self.HIVE_INTERACTIVE_SITE) + putHiveInteractiveEnvProperty('slider_am_container_mb', slider_am_container_size) - putHiveInteractiveEnvProperty = self.putProperty(configurations, "hive-interactive-env", services) - putHiveInteractiveEnvPropertyAttribute = self.putPropertyAttribute(configurations, "hive-interactive-env") + except Exception as e: + Logger.info("Problem setting minimum values for LLAP configs in Exception code.") + traceback.print_exc() - yarn_min_container_size = long(self.get_yarn_min_container_size(services, configurations)) - slider_am_container_size = long(self.calculate_slider_am_size(yarn_min_container_size)) - - node_manager_host_list = self.getHostsForComponent(services, "YARN", "NODEMANAGER") - node_manager_cnt = len(node_manager_host_list) - - putHiveInteractiveSiteProperty('hive.server2.tez.sessions.per.default.queue', 1) - putHiveInteractiveSitePropertyAttribute('hive.server2.tez.sessions.per.default.queue', "minimum", 1) - putHiveInteractiveSitePropertyAttribute('hive.server2.tez.sessions.per.default.queue', "maximum", 1) - putHiveInteractiveEnvProperty('num_llap_nodes', 0) - putHiveInteractiveEnvPropertyAttribute('num_llap_nodes', "minimum", 1) - putHiveInteractiveEnvPropertyAttribute('num_llap_nodes', "maximum", node_manager_cnt) - putHiveInteractiveSiteProperty('hive.llap.daemon.yarn.container.mb', yarn_min_container_size) - putHiveInteractiveSitePropertyAttribute('hive.llap.daemon.yarn.container.mb', "minimum", yarn_min_container_size) - putHiveInteractiveSiteProperty('hive.llap.daemon.num.executors', 0) - putHiveInteractiveSitePropertyAttribute('hive.llap.daemon.num.executors
<TRUNCATED>
