Repository: ambari Updated Branches: refs/heads/branch-feature-AMBARI-18901 8be71baf4 -> 4fcc49383
http://git-wip-us.apache.org/repos/asf/ambari/blob/4fcc4938/ambari-server/src/main/resources/stacks/HDP/2.5/services/stack_advisor.py ---------------------------------------------------------------------- diff --git a/ambari-server/src/main/resources/stacks/HDP/2.5/services/stack_advisor.py b/ambari-server/src/main/resources/stacks/HDP/2.5/services/stack_advisor.py index 98c93bf..f9a3a9a 100644 --- a/ambari-server/src/main/resources/stacks/HDP/2.5/services/stack_advisor.py +++ b/ambari-server/src/main/resources/stacks/HDP/2.5/services/stack_advisor.py @@ -33,7 +33,6 @@ class HDP25StackAdvisor(HDP24StackAdvisor): self.HIVE_INTERACTIVE_SITE = 'hive-interactive-site' self.YARN_ROOT_DEFAULT_QUEUE_NAME = 'default' self.AMBARI_MANAGED_LLAP_QUEUE_NAME = 'llap' - self.CONFIG_VALUE_UINITIALIZED = 'SET_ON_FIRST_INVOCATION' def recommendOozieConfigurations(self, configurations, clusterData, services, hosts): super(HDP25StackAdvisor,self).recommendOozieConfigurations(configurations, clusterData, services, hosts) @@ -326,7 +325,7 @@ class HDP25StackAdvisor(HDP24StackAdvisor): if num_tez_sessions: num_tez_sessions = long(num_tez_sessions) yarn_min_container_size = self.get_yarn_min_container_size(services, configurations) - tez_am_container_size = self.calculate_tez_am_container_size(services, long(total_cluster_capacity)) + tez_am_container_size = self.calculate_tez_am_container_size(long(total_cluster_capacity)) normalized_tez_am_container_size = self._normalizeUp(tez_am_container_size, yarn_min_container_size) llap_selected_queue_cap_remaining = current_selected_queue_for_llap_cap - (normalized_tez_am_container_size * num_tez_sessions) if llap_selected_queue_cap_remaining <= current_selected_queue_for_llap_cap/2: @@ -663,7 +662,7 @@ class HDP25StackAdvisor(HDP24StackAdvisor): # Update 'hive.llap.daemon.queue.name' property attributes if capacity scheduler is changed. if self.HIVE_INTERACTIVE_SITE in services['configurations']: if 'hive.llap.daemon.queue.name' in services['configurations'][self.HIVE_INTERACTIVE_SITE]['properties']: - self.setLlapDaemonQueuePropAttributes(services, configurations) + self.setLlapDaemonQueuePropAttributesAndCapSliderVisibility(services, configurations) # Update 'hive.server2.tez.default.queues' value hive_tez_default_queue = None @@ -679,7 +678,7 @@ class HDP25StackAdvisor(HDP24StackAdvisor): Logger.info("Updated 'hive.server2.tez.default.queues' config : '{0}'".format(hive_tez_default_queue)) else: putHiveInteractiveEnvProperty('enable_hive_interactive', 'false') - putHiveInteractiveEnvPropertyAttribute("num_llap_nodes", "visible", "false") + putHiveInteractiveEnvPropertyAttribute("llap_queue_capacity", "visible", "false") if self.HIVE_INTERACTIVE_SITE in services['configurations'] and \ 'hive.llap.zk.sm.connectionString' in services['configurations'][self.HIVE_INTERACTIVE_SITE]['properties']: @@ -700,6 +699,7 @@ class HDP25StackAdvisor(HDP24StackAdvisor): # Hive Server interactive is already added or getting added if enable_hive_interactive == 'true': + self.checkAndManageLlapQueue(services, configurations, hosts, LLAP_QUEUE_NAME) self.updateLlapConfigs(configurations, services, hosts, LLAP_QUEUE_NAME) else: # When Hive Interactive Server is in 'off/removed' state. self.checkAndStopLlapQueue(services, configurations, LLAP_QUEUE_NAME) @@ -724,22 +724,17 @@ class HDP25StackAdvisor(HDP24StackAdvisor): """ Entry point for updating Hive's 'LLAP app' configs namely : (1). num_llap_nodes (2). hive.llap.daemon.yarn.container.mb - (3). hive.llap.daemon.num.executors (4). hive.llap.io.memory.size (5). llap_heap_size (6). slider_am_container_mb, - (7). hive.server2.tez.sessions.per.default.queue, (8). tez.am.resource.memory.mb (9). hive.tez.container.size - (10). tez.runtime.io.sort.mb (11). tez.runtime.unordered.output.buffer.size-mb (12). hive.llap.io.threadpool.size, and - (13). hive.llap.io.enabled. + (3). hive.llap.daemon.num.executors (4). hive.llap.io.memory.size (5). llap_heap_size (6). slider_am_container_mb, + and (7). hive.server2.tez.sessions.per.default.queue The trigger point for updating LLAP configs (mentioned above) is change in values of any of the following: - (1). 'enable_hive_interactive' set to 'true' (2). 'num_llap_nodes' (3). 'hive.server2.tez.sessions.per.default.queue' + (1). 'enable_hive_interactive' set to 'true' (2). 'llap_queue_capacity' (3). 'hive.server2.tez.sessions.per.default.queue' (4). Change in queue selection for config 'hive.llap.daemon.queue.name'. - If change in value for 'num_llap_nodes' or 'hive.server2.tez.sessions.per.default.queue' is detected, that config + If change in value for 'llap_queue_capacity' or 'hive.server2.tez.sessions.per.default.queue' is detected, that config value is not calulated, but read and use in calculation for dependent configs. - - Note: All memory caluclations are in MB, unless specified otherwise. """ def updateLlapConfigs(self, configurations, services, hosts, llap_queue_name): - Logger.info("Entered updateLlapConfigs() ..") putHiveInteractiveSiteProperty = self.putProperty(configurations, self.HIVE_INTERACTIVE_SITE, services) putHiveInteractiveSitePropertyAttribute = self.putPropertyAttribute(configurations, self.HIVE_INTERACTIVE_SITE) @@ -749,16 +744,11 @@ class HDP25StackAdvisor(HDP24StackAdvisor): putTezInteractiveSiteProperty = self.putProperty(configurations, "tez-interactive-site", services) llap_daemon_selected_queue_name = None - selected_queue_is_ambari_managed_llap = None # Queue named 'llap' at root level is Ambari managed. - llap_selected_queue_am_percent = None - DEFAULT_EXECUTOR_TO_AM_RATIO = 20 - MIN_EXECUTOR_TO_AM_RATIO = 10 - MAX_CONCURRENT_QUERIES = 32 - leafQueueNames = None - MB_TO_BYTES = 1048576 + llap_queue_selected_in_current_call = None + LLAP_MAX_CONCURRENCY = 32 # Allow a max of 32 concurrency. - # Update 'hive.llap.daemon.queue.name' prop combo entries - self.setLlapDaemonQueuePropAttributes(services, configurations) + # Update 'hive.llap.daemon.queue.name' prop combo entries and llap capacity slider visibility. + self.setLlapDaemonQueuePropAttributesAndCapSliderVisibility(services, configurations) if not services["changed-configurations"]: read_llap_daemon_yarn_cont_mb = long(self.get_yarn_min_container_size(services, configurations)) @@ -772,48 +762,33 @@ class HDP25StackAdvisor(HDP24StackAdvisor): 'hive.llap.daemon.queue.name' in services['configurations'][self.HIVE_INTERACTIVE_SITE]['properties']: llap_daemon_selected_queue_name = services['configurations'][self.HIVE_INTERACTIVE_SITE]['properties']['hive.llap.daemon.queue.name'] - # Update Visibility of 'num_llap_nodes' slider. Visible only if selected queue is Ambari created 'llap'. + if 'hive.llap.daemon.queue.name' in configurations[self.HIVE_INTERACTIVE_SITE]['properties']: + llap_queue_selected_in_current_call = configurations[self.HIVE_INTERACTIVE_SITE]['properties']['hive.llap.daemon.queue.name'] + + # Update Visibility of 'llap_queue_capacity' slider. capacity_scheduler_properties, received_as_key_value_pair = self.getCapacitySchedulerProperties(services) if capacity_scheduler_properties: # Get all leaf queues. leafQueueNames = self.getAllYarnLeafQueues(capacity_scheduler_properties) - Logger.info("YARN leaf Queues = {0}".format(leafQueueNames)) - - # Check if it's 1st invocation after enabling Hive Server Interactive (config: enable_hive_interactive). - changed_configs_has_enable_hive_int = self.are_config_props_in_changed_configs(services, "hive-interactive-env", - set(['enable_hive_interactive']), False) - llap_named_queue_selected_in_curr_invocation = False - if changed_configs_has_enable_hive_int and services['configurations']['hive-interactive-env']['properties']['enable_hive_interactive']: - llap_named_queue_selected_in_curr_invocation = True - putHiveInteractiveSiteProperty('hive.llap.daemon.queue.name', llap_queue_name) - putHiveInteractiveSiteProperty('hive.server2.tez.default.queues', llap_queue_name) - Logger.info("'hive.llap.daemon.queue.name' and 'hive.server2.tez.default.queues' values set as : {0}".format(llap_queue_name)) - Logger.info("llap_named_queue_selected_in_curr_invocation = {0}".format(llap_named_queue_selected_in_curr_invocation)) - - if (len(leafQueueNames) == 2 and (llap_daemon_selected_queue_name != None and llap_daemon_selected_queue_name == llap_queue_name) or \ - llap_named_queue_selected_in_curr_invocation) or \ - (len(leafQueueNames) == 1 and llap_daemon_selected_queue_name == 'default' and llap_named_queue_selected_in_curr_invocation): - putHiveInteractiveEnvPropertyAttribute("num_llap_nodes", "visible", "true") - Logger.info("Selected YARN queue for LLAP is : '{0}'. Current YARN queues : {1}. Setting 'Number of LLAP nodes' " - "slider visibility to 'True'".format(llap_queue_name, list(leafQueueNames))) - selected_queue_is_ambari_managed_llap = True + if len(leafQueueNames) == 2 and \ + (llap_daemon_selected_queue_name != None and llap_daemon_selected_queue_name == llap_queue_name) or \ + (llap_queue_selected_in_current_call != None and llap_queue_selected_in_current_call == llap_queue_name): + putHiveInteractiveEnvPropertyAttribute("llap_queue_capacity", "visible", "true") + Logger.info("Selected YARN queue is '{0}'. Setting LLAP queue capacity slider visibility to 'True'".format(llap_queue_name)) else: - putHiveInteractiveEnvPropertyAttribute("num_llap_nodes", "visible", "false") - Logger.info("Selected YARN queue for LLAP is : '{0}'. Current YARN queues : {1}. Setting 'Number of LLAP nodes' " - "visibility to 'False'.".format(llap_daemon_selected_queue_name, list(leafQueueNames))) - selected_queue_is_ambari_managed_llap = False - - if not llap_named_queue_selected_in_curr_invocation: # We would be creating the 'llap' queue later. Thus, cap-sched doesn't have - # state information pertaining to 'llap' queue. - # Check: State of the selected queue should not be STOPPED. - if llap_daemon_selected_queue_name: - llap_selected_queue_state = self.__getQueueStateFromCapacityScheduler(capacity_scheduler_properties, llap_daemon_selected_queue_name) - if llap_selected_queue_state == None or llap_selected_queue_state == "STOPPED": - raise Fail("Selected LLAP app queue '{0}' current state is : '{1}'. Setting LLAP configs to default " - "values.".format(llap_daemon_selected_queue_name, llap_selected_queue_state)) - else: - raise Fail("Retrieved LLAP app queue name is : '{0}'. Setting LLAP configs to default values." - .format(llap_daemon_selected_queue_name)) + putHiveInteractiveEnvPropertyAttribute("llap_queue_capacity", "visible", "false") + Logger.info("Queue selected for LLAP app is : '{0}'. Current YARN queues : {1}. Setting '{2}' queue capacity slider " + "visibility to 'False'.".format(llap_daemon_selected_queue_name, list(leafQueueNames), llap_queue_name)) + if llap_daemon_selected_queue_name: + llap_selected_queue_state = self.__getQueueStateFromCapacityScheduler(capacity_scheduler_properties, llap_daemon_selected_queue_name) + if llap_selected_queue_state == None or llap_selected_queue_state == "STOPPED": + putHiveInteractiveEnvPropertyAttribute("llap_queue_capacity", "visible", "false") + raise Fail("Selected LLAP app queue '{0}' current state is : '{1}'. Setting LLAP configs to default values " + "and 'llap' queue capacity slider visibility to 'False'." + .format(llap_daemon_selected_queue_name, llap_selected_queue_state)) + else: + raise Fail("Retrieved LLAP app queue name is : '{0}'. Setting LLAP configs to default values." + .format(llap_daemon_selected_queue_name)) else: Logger.error("Couldn't retrieve 'capacity-scheduler' properties while doing YARN queue adjustment for Hive Server Interactive." " Not calculating LLAP configs.") @@ -823,12 +798,12 @@ class HDP25StackAdvisor(HDP24StackAdvisor): llap_concurrency_in_changed_configs = None llap_daemon_queue_in_changed_configs = None # Calculations are triggered only if there is change in any one of the following props : - # 'num_llap_nodes', 'enable_hive_interactive', 'hive.server2.tez.sessions.per.default.queue' + # 'llap_queue_capacity', 'enable_hive_interactive', 'hive.server2.tez.sessions.per.default.queue' # or 'hive.llap.daemon.queue.name' has change in value selection. # OR # services['changed-configurations'] is empty implying that this is the Blueprint call. (1st invocation) if 'changed-configurations' in services.keys(): - config_names_to_be_checked = set(['num_llap_nodes', 'enable_hive_interactive']) + config_names_to_be_checked = set(['llap_queue_capacity', 'enable_hive_interactive']) changed_configs_in_hive_int_env = self.are_config_props_in_changed_configs(services, "hive-interactive-env", config_names_to_be_checked, False) @@ -846,265 +821,180 @@ class HDP25StackAdvisor(HDP24StackAdvisor): Logger.info("Current 'changed-configuration' received is : {0}".format(services["changed-configurations"])) return - Logger.info("\nPerforming LLAP config calculations ......") node_manager_host_list = self.get_node_manager_hosts(services, hosts) node_manager_cnt = len(node_manager_host_list) yarn_nm_mem_in_mb = self.get_yarn_nm_mem_in_mb(services, configurations) total_cluster_capacity = node_manager_cnt * yarn_nm_mem_in_mb - Logger.info("Calculated total_cluster_capacity : {0}, using following : node_manager_cnt : {1}, " + Logger.info("\n\nCalculated total_cluster_capacity : {0}, using following : node_manager_cnt : {1}, " "yarn_nm_mem_in_mb : {2}".format(total_cluster_capacity, node_manager_cnt, yarn_nm_mem_in_mb)) + # Check which queue is selected in 'hive.llap.daemon.queue.name', to determine current queue capacity + current_selected_queue_for_llap_cap = None + yarn_root_queues = capacity_scheduler_properties.get("yarn.scheduler.capacity.root.queues") + if llap_queue_selected_in_current_call == llap_queue_name \ + or llap_daemon_selected_queue_name == llap_queue_name \ + and (llap_queue_name in yarn_root_queues and len(leafQueueNames) == 2): + current_selected_queue_for_llap_cap_perc = self.get_llap_cap_percent_slider(services, configurations) + current_selected_queue_for_llap_cap = current_selected_queue_for_llap_cap_perc / 100 * total_cluster_capacity + else: # any queue other than 'llap' + current_selected_queue_for_llap_cap = self.__getSelectedQueueTotalCap(capacity_scheduler_properties, + llap_daemon_selected_queue_name, total_cluster_capacity) + assert (current_selected_queue_for_llap_cap >= 1), "Current selected queue '{0}' capacity value : {1}. Expected value : >= 1" \ + .format(llap_daemon_selected_queue_name, current_selected_queue_for_llap_cap) yarn_min_container_size = self.get_yarn_min_container_size(services, configurations) - - tez_am_container_size = self.calculate_tez_am_container_size(services, long(total_cluster_capacity)) + tez_am_container_size = self.calculate_tez_am_container_size(long(total_cluster_capacity)) normalized_tez_am_container_size = self._normalizeUp(tez_am_container_size, yarn_min_container_size) - cpu_per_nm_host = self.get_cpu_per_nm_host(services) Logger.info("Calculated normalized_tez_am_container_size : {0}, using following : tez_am_container_size : {1}, " "total_cluster_capacity : {2}".format(normalized_tez_am_container_size, tez_am_container_size, total_cluster_capacity)) - - # Calculate the available memory for LLAP app - yarn_nm_mem_in_mb_normalized = self._normalizeDown(yarn_nm_mem_in_mb, yarn_min_container_size) - mem_per_thread_for_llap = self.calculate_mem_per_thread_for_llap(services, yarn_nm_mem_in_mb_normalized, cpu_per_nm_host) - Logger.info("Calculated mem_per_thread_for_llap : {0}, using following: yarn_nm_mem_in_mb_normalized : {1}, " - "cpu_per_nm_host : {2}".format(mem_per_thread_for_llap, yarn_nm_mem_in_mb_normalized, cpu_per_nm_host)) - - Logger.info("selected_queue_is_ambari_managed_llap = {0}".format(selected_queue_is_ambari_managed_llap)) - if not selected_queue_is_ambari_managed_llap: - llap_daemon_selected_queue_cap = self.__getSelectedQueueTotalCap(capacity_scheduler_properties, llap_daemon_selected_queue_name, total_cluster_capacity) - assert(llap_daemon_selected_queue_cap > 0, "'{0}' queue capacity percentage retrieved = {1}. " - "Expected > 0.".format(llap_daemon_selected_queue_name, llap_daemon_selected_queue_cap)) - total_llap_mem_normalized = self._normalizeDown(llap_daemon_selected_queue_cap, yarn_min_container_size) - Logger.info("Calculated '{0}' queue available capacity : {1}, using following: llap_daemon_selected_queue_cap : {2}, " - "yarn_min_container_size : {3}".format(llap_daemon_selected_queue_name, total_llap_mem_normalized, - llap_daemon_selected_queue_cap, yarn_min_container_size)) - num_llap_nodes_requested = math.floor(total_llap_mem_normalized / yarn_nm_mem_in_mb_normalized) - Logger.info("Calculated 'num_llap_nodes_requested' : {0}, using following: total_llap_mem_normalized : {1}, " - "yarn_nm_mem_in_mb_normalized : {2}".format(num_llap_nodes_requested, total_llap_mem_normalized, yarn_nm_mem_in_mb_normalized)) - queue_am_fraction_perc = float(self.__getQueueAmFractionFromCapacityScheduler(capacity_scheduler_properties, llap_daemon_selected_queue_name)) - hive_tez_am_cap_available = queue_am_fraction_perc * total_llap_mem_normalized - Logger.info("Calculated 'hive_tez_am_cap_available' : {0}, using following: queue_am_fraction_perc : {1}, " - "total_llap_mem_normalized : {2}".format(hive_tez_am_cap_available, queue_am_fraction_perc, total_llap_mem_normalized)) - else: # Ambari managed 'llap' named queue at root level. - num_llap_nodes_requested = self.get_num_llap_nodes(services, configurations) #Input - total_llap_mem = num_llap_nodes_requested * yarn_nm_mem_in_mb_normalized - Logger.info("Calculated 'total_llap_mem' : {0}, using following: num_llap_nodes_requested : {1}, " - "yarn_nm_mem_in_mb_normalized : {2}".format(total_llap_mem, num_llap_nodes_requested, yarn_nm_mem_in_mb_normalized)) - total_llap_mem_normalized = float(self._normalizeDown(total_llap_mem, yarn_min_container_size)) - Logger.info("Calculated 'total_llap_mem_normalized' : {0}, using following: total_llap_mem : {1}, " - "yarn_min_container_size : {2}".format(total_llap_mem_normalized, total_llap_mem, yarn_min_container_size)) - # What percent is 'total_llap_mem' of 'total_cluster_capacity' ? - llap_named_queue_cap_fraction = math.ceil(total_llap_mem_normalized / total_cluster_capacity * 100) - assert(llap_named_queue_cap_fraction <= 100), "Calculated '{0}' queue size = {1}. Cannot be > 100.".format(llap_queue_name, llap_named_queue_cap_fraction) - Logger.info("Calculated '{0}' queue capacity percent = {1}.".format(llap_queue_name, llap_named_queue_cap_fraction)) - # Adjust capacity scheduler for the 'llap' named queue. - self.checkAndManageLlapQueue(services, configurations, hosts, llap_queue_name, llap_named_queue_cap_fraction) - hive_tez_am_cap_available = total_llap_mem_normalized - Logger.info("hive_tez_am_cap_available : {0}".format(hive_tez_am_cap_available)) - - #Common calculations now, irrespective of the queue selected. + normalized_selected_queue_for_llap_cap = long(self._normalizeDown(current_selected_queue_for_llap_cap, yarn_min_container_size)) # Get calculated value for Slider AM container Size slider_am_container_size = self._normalizeUp(self.calculate_slider_am_size(yarn_min_container_size), yarn_min_container_size) - Logger.info("Calculated 'slider_am_container_size' : {0}, using following: yarn_min_container_size : " - "{1}".format(slider_am_container_size, yarn_min_container_size)) - - llap_mem_for_tezAm_and_daemons = total_llap_mem_normalized - slider_am_container_size - assert (llap_mem_for_tezAm_and_daemons >= 2 * yarn_min_container_size), "Not enough capacity available on the cluster to run LLAP" - Logger.info("Calculated 'llap_mem_for_tezAm_and_daemons' : {0}, using following : total_llap_mem_normalized : {1}, " - "slider_am_container_size : {2}".format(llap_mem_for_tezAm_and_daemons, total_llap_mem_normalized, slider_am_container_size)) - - - # Calculate llap concurrency (i.e. Number of Tez AM's) - max_executors_per_node = self.get_max_executors_per_node(yarn_nm_mem_in_mb_normalized, cpu_per_nm_host, mem_per_thread_for_llap) # Read 'hive.server2.tez.sessions.per.default.queue' prop if it's in changed-configs, else calculate it. if not llap_concurrency_in_changed_configs: - assert(max_executors_per_node > 0), "Calculated 'max_executors_per_node' = {1}. Expected value >= 1.".format(max_executors_per_node) - Logger.info("Calculated 'max_executors_per_node' : {0}, using following: yarn_nm_mem_in_mb_normalized : {1}, cpu_per_nm_host : {2}, " - "mem_per_thread_for_llap: {3}".format(max_executors_per_node, yarn_nm_mem_in_mb_normalized, cpu_per_nm_host, mem_per_thread_for_llap)) - # Default 1 AM for every 20 executor threads. - # The second part of the min calculates based on mem required for DEFAULT_EXECUTOR_TO_AM_RATIO executors + 1 AM, - # making use of total memory. However, it's possible that total memory will not be used - and the numExecutors is - # instead limited by #CPUs. Use maxPerNode to factor this in. - llap_concurreny_limit = min(math.floor(max_executors_per_node * num_llap_nodes_requested / DEFAULT_EXECUTOR_TO_AM_RATIO), MAX_CONCURRENT_QUERIES) - Logger.info("Calculated 'llap_concurreny_limit' : {0}, using following : max_executors_per_node : {1}, num_llap_nodes_requested : {2}, DEFAULT_EXECUTOR_TO_AM_RATIO " - ": {3}, MAX_CONCURRENT_QUERIES : {4}".format(llap_concurreny_limit, max_executors_per_node, num_llap_nodes_requested, DEFAULT_EXECUTOR_TO_AM_RATIO, MAX_CONCURRENT_QUERIES)) - llap_concurrency = min(llap_concurreny_limit, math.floor(llap_mem_for_tezAm_and_daemons / (DEFAULT_EXECUTOR_TO_AM_RATIO * mem_per_thread_for_llap + normalized_tez_am_container_size))) - Logger.info("Calculated 'llap_concurrency' : {0}, using following : llap_concurreny_limit : {1}, llap_mem_for_tezAm_and_daemons : " - "{2}, DEFAULT_EXECUTOR_TO_AM_RATIO : {3}, mem_per_thread_for_llap : {4}, normalized_tez_am_container_size : " - "{5}".format(llap_concurrency, llap_concurreny_limit, llap_mem_for_tezAm_and_daemons, DEFAULT_EXECUTOR_TO_AM_RATIO, - mem_per_thread_for_llap, normalized_tez_am_container_size)) - if (llap_concurrency == 0): - llap_concurrency = 1 - Logger.info("Adjusted 'llap_concurrency' : 1.") - - if (llap_concurrency * normalized_tez_am_container_size > hive_tez_am_cap_available): - llap_concurrency = math.floor(hive_tez_am_cap_available / normalized_tez_am_container_size) - assert(llap_concurrency > 0), "Calculated 'LLAP Concurrent Queries' = {0}. Expected value >= 1.".format(llap_concurrency) - Logger.info("Adjusted 'llap_concurrency' : {0}, using following: hive_tez_am_cap_available : {1}, normalized_tez_am_container_size: " - "{2}".format(llap_concurrency, hive_tez_am_cap_available, normalized_tez_am_container_size)) + # Calculate llap concurrency (i.e. Number of Tez AM's) + llap_concurrency = float(normalized_selected_queue_for_llap_cap * 0.25 / normalized_tez_am_container_size) + llap_concurrency = max(long(llap_concurrency), 1) + Logger.info("Calculated llap_concurrency : {0}, using following : normalized_selected_queue_for_llap_cap : {1}, " + "normalized_tez_am_container_size : {2}".format(llap_concurrency, normalized_selected_queue_for_llap_cap, + normalized_tez_am_container_size)) + # Limit 'llap_concurrency' to reach a max. of 32. + if llap_concurrency > LLAP_MAX_CONCURRENCY: + llap_concurrency = LLAP_MAX_CONCURRENCY else: # Read current value if 'hive.server2.tez.sessions.per.default.queue' in services['configurations'][self.HIVE_INTERACTIVE_SITE][ 'properties']: llap_concurrency = long(services['configurations'][self.HIVE_INTERACTIVE_SITE]['properties'][ 'hive.server2.tez.sessions.per.default.queue']) - assert (llap_concurrency >= 1), "'hive.server2.tez.sessions.per.default.queue' current value : {0}. Expected value : >= 1" \ + assert ( + llap_concurrency >= 1), "'hive.server2.tez.sessions.per.default.queue' current value : {0}. Expected value : >= 1" \ .format(llap_concurrency) - Logger.info("Read 'llap_concurrency' : {0}".format(llap_concurrency )) else: raise Fail( "Couldn't retrieve Hive Server interactive's 'hive.server2.tez.sessions.per.default.queue' config.") - # Calculate 'Max LLAP Consurrency', irrespective of whether 'llap_concurrency' was read or calculated. - max_llap_concurreny_limit = min(math.floor(max_executors_per_node * num_llap_nodes_requested / MIN_EXECUTOR_TO_AM_RATIO), MAX_CONCURRENT_QUERIES) - Logger.info("Calculated 'max_llap_concurreny_limit' : {0}, using following : max_executors_per_node : {1}, num_llap_nodes_requested " - ": {2}, MIN_EXECUTOR_TO_AM_RATIO : {3}, MAX_CONCURRENT_QUERIES : {4}".format(max_llap_concurreny_limit, max_executors_per_node, - num_llap_nodes_requested, MIN_EXECUTOR_TO_AM_RATIO, - MAX_CONCURRENT_QUERIES)) - max_llap_concurreny = min(max_llap_concurreny_limit, math.floor(llap_mem_for_tezAm_and_daemons / (MIN_EXECUTOR_TO_AM_RATIO * - mem_per_thread_for_llap + normalized_tez_am_container_size))) - Logger.info("Calculated 'max_llap_concurreny' : {0}, using following : max_llap_concurreny_limit : {1}, llap_mem_for_tezAm_and_daemons : " - "{2}, MIN_EXECUTOR_TO_AM_RATIO : {3}, mem_per_thread_for_llap : {4}, normalized_tez_am_container_size : " - "{5}".format(max_llap_concurreny, max_llap_concurreny_limit, llap_mem_for_tezAm_and_daemons, MIN_EXECUTOR_TO_AM_RATIO, - mem_per_thread_for_llap, normalized_tez_am_container_size)) - if (max_llap_concurreny == 0): - max_llap_concurreny = 1 - Logger.info("Adjusted 'max_llap_concurreny' : 1.") - - if (max_llap_concurreny * normalized_tez_am_container_size > hive_tez_am_cap_available): - max_llap_concurreny = math.floor(hive_tez_am_cap_available / normalized_tez_am_container_size) - assert(max_llap_concurreny > 0), "Calculated 'Max. LLAP Concurrent Queries' = {0}. Expected value > 1".format(max_llap_concurreny) - Logger.info("Adjusted 'max_llap_concurreny' : {0}, using following: hive_tez_am_cap_available : {1}, normalized_tez_am_container_size: " - "{2}".format(max_llap_concurreny, hive_tez_am_cap_available, normalized_tez_am_container_size)) + + # Calculate 'total memory available for llap daemons' across cluster + total_am_capacity_required = normalized_tez_am_container_size * llap_concurrency + slider_am_container_size + cap_available_for_daemons = normalized_selected_queue_for_llap_cap - total_am_capacity_required + Logger.info( + "Calculated cap_available_for_daemons : {0}, using following : current_selected_queue_for_llap_cap : {1}, " + "yarn_nm_mem_in_mb : {2}, total_cluster_capacity : {3}, normalized_selected_queue_for_llap_cap : {4}, normalized_tez_am_container_size" + " : {5}, yarn_min_container_size : {6}, llap_concurrency : {7}, total_am_capacity_required : {8}" + .format(cap_available_for_daemons, current_selected_queue_for_llap_cap, yarn_nm_mem_in_mb, + total_cluster_capacity, + normalized_selected_queue_for_llap_cap, normalized_tez_am_container_size, yarn_min_container_size, llap_concurrency, + total_am_capacity_required)) + if cap_available_for_daemons < yarn_min_container_size: + raise Fail( + "'Capacity available for LLAP daemons'({0}) < 'YARN minimum container size'({1}). Invalid configuration detected. " + "Increase LLAP queue size.".format(cap_available_for_daemons, yarn_min_container_size)) + # Calculate value for 'num_llap_nodes', an across cluster config. - tez_am_memory_required = llap_concurrency * normalized_tez_am_container_size - Logger.info("Calculated 'tez_am_memory_required' : {0}, using following : llap_concurrency : {1}, normalized_tez_am_container_size : " - "{2}".format(tez_am_memory_required, llap_concurrency, normalized_tez_am_container_size)) - llap_mem_daemon_size = llap_mem_for_tezAm_and_daemons - tez_am_memory_required - assert (llap_mem_daemon_size >= yarn_min_container_size), "Calculated 'LLAP Daemon Size = {0}'. Expected >= 'YARN Minimum Container " \ - "Size' ({1})'".format(llap_mem_daemon_size, yarn_min_container_size) - assert(llap_mem_daemon_size >= mem_per_thread_for_llap or llap_mem_daemon_size >= yarn_min_container_size), "Not enough memory available for executors." - Logger.info("Calculated 'llap_mem_daemon_size' : {0}, using following : llap_mem_for_tezAm_and_daemons : {1}, tez_am_memory_required : " - "{2}".format(llap_mem_daemon_size, llap_mem_for_tezAm_and_daemons, tez_am_memory_required)) - - llap_daemon_mem_per_node = self._normalizeDown(llap_mem_daemon_size / num_llap_nodes_requested, yarn_min_container_size) - Logger.info("Calculated 'llap_daemon_mem_per_node' : {0}, using following : llap_mem_daemon_size : {1}, num_llap_nodes_requested : {2}, " - "yarn_min_container_size: {3}".format(llap_daemon_mem_per_node, llap_mem_daemon_size, num_llap_nodes_requested, yarn_min_container_size)) - if (llap_daemon_mem_per_node == 0): - # Small cluster. No capacity left on a node after running AMs. - llap_daemon_mem_per_node = mem_per_thread_for_llap - num_llap_nodes = math.floor(llap_mem_daemon_size / mem_per_thread_for_llap) - Logger.info("'llap_daemon_mem_per_node' : 0, adjusted 'llap_daemon_mem_per_node' : {0}, 'num_llap_nodes' : {1}, using following: llap_mem_daemon_size : {2}, " - "mem_per_thread_for_llap : {3}".format(llap_daemon_mem_per_node, num_llap_nodes, llap_mem_daemon_size, mem_per_thread_for_llap)) - elif (llap_daemon_mem_per_node < mem_per_thread_for_llap): - # Previously computed value of memory per thread may be too high. Cut the number of nodes. (Alternately reduce memory per node) - llap_daemon_mem_per_node = mem_per_thread_for_llap - num_llap_nodes = math.floor(llap_mem_daemon_size / mem_per_thread_for_llap) - Logger.info("'llap_daemon_mem_per_node'({0}) < mem_per_thread_for_llap({1}), adjusted 'llap_daemon_mem_per_node' " - ": {2}".format(llap_daemon_mem_per_node, mem_per_thread_for_llap, llap_daemon_mem_per_node)) + # Also, get calculated value for 'hive.llap.daemon.yarn.container.mb' based on 'num_llap_nodes' value, a per node config. + num_llap_nodes_raw = cap_available_for_daemons / yarn_nm_mem_in_mb + if num_llap_nodes_raw < 1.00: + # Set the llap nodes to min. value of 1 and 'llap_container_size' to min. YARN allocation. + num_llap_nodes = 1 + llap_container_size = self._normalizeUp(cap_available_for_daemons, yarn_min_container_size) + Logger.info("Calculated llap_container_size : {0}, using following : cap_available_for_daemons : {1}, " + "yarn_min_container_size : {2}".format(llap_container_size, cap_available_for_daemons, + yarn_min_container_size)) + else: + num_llap_nodes = math.floor(num_llap_nodes_raw) + llap_container_size = self._normalizeDown(yarn_nm_mem_in_mb, yarn_min_container_size) + Logger.info("Calculated llap_container_size : {0}, using following : yarn_nm_mem_in_mb : {1}, " + "yarn_min_container_size : {2}".format(llap_container_size, yarn_nm_mem_in_mb, + yarn_min_container_size)) + Logger.info( + "Calculated num_llap_nodes : {0} using following : yarn_nm_mem_in_mb : {1}, cap_available_for_daemons : {2} " \ + .format(num_llap_nodes, yarn_nm_mem_in_mb, cap_available_for_daemons)) + + + # Calculate value for 'hive.llap.daemon.num.executors', a per node config. + hive_tez_container_size = self.get_hive_tez_container_size(services, configurations) + if 'yarn.nodemanager.resource.cpu-vcores' in services['configurations']['yarn-site']['properties']: + cpu_per_nm_host = float(services['configurations']['yarn-site']['properties'][ + 'yarn.nodemanager.resource.cpu-vcores']) + assert (cpu_per_nm_host > 0), "'yarn.nodemanager.resource.cpu-vcores' current value : {0}. Expected value : > 0" \ + .format(cpu_per_nm_host) else: - # All good. We have a proper value for memoryPerNode. - num_llap_nodes = num_llap_nodes_requested - Logger.info("num_llap_nodes : {0}".format(num_llap_nodes)) - - num_executors_per_node_max = self.get_max_executors_per_node(yarn_nm_mem_in_mb_normalized, cpu_per_nm_host, mem_per_thread_for_llap) - assert(num_executors_per_node_max >= 1), "Calculated 'Max. Executors per Node' = {0}. Expected values >= 1.".format(num_executors_per_node_max) - Logger.info("Calculated 'num_executors_per_node_max' : {0}, using following : yarn_nm_mem_in_mb_normalized : {1}, cpu_per_nm_host : {2}, " - "mem_per_thread_for_llap: {3}".format(num_executors_per_node_max, yarn_nm_mem_in_mb_normalized, cpu_per_nm_host, mem_per_thread_for_llap)) - - # NumExecutorsPerNode is not necessarily max - since some capacity would have been reserved for AMs, if this value were based on mem. - num_executors_per_node = min(math.floor(llap_daemon_mem_per_node / mem_per_thread_for_llap), num_executors_per_node_max) - assert(num_executors_per_node > 0), "Calculated 'Number of Executors Per Node' = {0}. Expected value >= 1".format(num_executors_per_node) - Logger.info("Calculated 'num_executors_per_node' : {0}, using following : llap_daemon_mem_per_node : {1}, num_executors_per_node_max : {2}, " - "mem_per_thread_for_llap: {3}".format(num_executors_per_node, llap_daemon_mem_per_node, num_executors_per_node_max, mem_per_thread_for_llap)) - - # Now figure out how much of the memory will be used by the executors, and how much will be used by the cache. - total_mem_for_executors_per_node = num_executors_per_node * mem_per_thread_for_llap - cache_mem_per_node = llap_daemon_mem_per_node - total_mem_for_executors_per_node - - tez_runtime_io_sort_mb = ((long)((0.8 * mem_per_thread_for_llap) / 3)) - tez_runtime_unordered_output_buffer_size = long(0.8 * 0.075 * mem_per_thread_for_llap) - # 'hive_auto_convert_join_noconditionaltask_size' value is in bytes. Thus, multiplying it by 1048576. - hive_auto_convert_join_noconditionaltask_size = ((long)((0.8 * mem_per_thread_for_llap) / 3)) * MB_TO_BYTES + raise Fail("Couldn't retrieve YARN's 'yarn.nodemanager.resource.cpu-vcores' config.") + + num_executors_per_node_raw = math.floor(llap_container_size / hive_tez_container_size) + num_executors_per_node = min(num_executors_per_node_raw, cpu_per_nm_host) + Logger.info("calculated num_executors_per_node: {0}, using following : hive_tez_container_size : {1}, " + "cpu_per_nm_host : {2}, num_executors_per_node_raw : {3}, llap_container_size : {4}" + .format(num_executors_per_node, hive_tez_container_size, cpu_per_nm_host, num_executors_per_node_raw, + llap_container_size)) + assert (num_executors_per_node >= 0), "'Number of executors per node' : {0}. Expected value : > 0".format( + num_executors_per_node) + + total_mem_for_executors = num_executors_per_node * hive_tez_container_size + + # Calculate value for 'cache' (hive.llap.io.memory.size), a per node config. + cache_size_per_node = llap_container_size - total_mem_for_executors + Logger.info( + "Calculated cache_size_per_node : {0} using following : hive_container_size : {1}, llap_container_size" + " : {2}, num_executors_per_node : {3}" + .format(cache_size_per_node, hive_tez_container_size, llap_container_size, num_executors_per_node)) + if cache_size_per_node < 0: # Run with '0' cache. + Logger.info( + "Calculated 'cache_size_per_node' : {0}. Setting 'cache_size_per_node' to 0.".format(cache_size_per_node)) + cache_size_per_node = 0 - # Calculate value for prop 'llap_heap_size' - llap_xmx = max(total_mem_for_executors_per_node * 0.8, total_mem_for_executors_per_node - self.get_llap_headroom_space(services, configurations)) - Logger.info("Calculated llap_app_heap_size : {0}, using following : total_mem_for_executors : {1}".format(llap_xmx, total_mem_for_executors_per_node)) - Logger.info("Updating the calculations....") + # Calculate value for prop 'llap_heap_size' + llap_xmx = max(total_mem_for_executors * 0.8, total_mem_for_executors - self.get_llap_headroom_space(services, configurations)) + Logger.info("Calculated llap_app_heap_size : {0}, using following : hive_container_size : {1}, " + "total_mem_for_executors : {2}".format(llap_xmx, hive_tez_container_size, total_mem_for_executors)) - # Done with calculations, updating calculated configs. + # Updating calculated configs. normalized_tez_am_container_size = long(normalized_tez_am_container_size) putTezInteractiveSiteProperty('tez.am.resource.memory.mb', normalized_tez_am_container_size) - Logger.info("'Tez for Hive2' config 'tez.am.resource.memory.mb' updated. Current: {0}".format(normalized_tez_am_container_size)) + Logger.info("'Tez for Hive2' config 'tez.am.resource.memory.mb' updated. Current: {0}".format( + normalized_tez_am_container_size)) if not llap_concurrency_in_changed_configs: min_llap_concurrency = 1 putHiveInteractiveSiteProperty('hive.server2.tez.sessions.per.default.queue', llap_concurrency) putHiveInteractiveSitePropertyAttribute('hive.server2.tez.sessions.per.default.queue', "minimum", min_llap_concurrency) - - Logger.info("Hive2 config 'hive.server2.tez.sessions.per.default.queue' updated. Min : {0}, Current: {1}" \ - .format(min_llap_concurrency, llap_concurrency)) - - putHiveInteractiveSitePropertyAttribute('hive.server2.tez.sessions.per.default.queue', "maximum", max_llap_concurreny) - Logger.info("Hive2 config 'hive.server2.tez.sessions.per.default.queue' updated. Max : {0}".format(max_llap_concurreny)) + putHiveInteractiveSitePropertyAttribute('hive.server2.tez.sessions.per.default.queue', "maximum", + LLAP_MAX_CONCURRENCY) + Logger.info( + "Hive2 config 'hive.server2.tez.sessions.per.default.queue' updated. Min : {0}, Current: {1}, Max: {2}" \ + .format(min_llap_concurrency, llap_concurrency, LLAP_MAX_CONCURRENCY)) num_llap_nodes = long(num_llap_nodes) - putHiveInteractiveEnvPropertyAttribute('num_llap_nodes', "minimum", 1) - putHiveInteractiveEnvPropertyAttribute('num_llap_nodes', "maximum", node_manager_cnt) - if (num_llap_nodes != num_llap_nodes_requested): - Logger.info("User requested num_llap_nodes : {0}, but used/adjusted value for calculations is : {1}".format(num_llap_nodes_requested, num_llap_nodes)) - else: - Logger.info("Used num_llap_nodes for calculations : {0}".format(num_llap_nodes_requested)) - Logger.info("LLAP config 'num_llap_nodes' updated. Min: 1, Max: {0}".format(node_manager_cnt)) - llap_container_size = long(llap_daemon_mem_per_node) + putHiveInteractiveEnvProperty('num_llap_nodes', num_llap_nodes) + Logger.info("LLAP config 'num_llap_nodes' updated. Current: {0}".format(num_llap_nodes)) + + llap_container_size = long(llap_container_size) putHiveInteractiveSiteProperty('hive.llap.daemon.yarn.container.mb', llap_container_size) Logger.info("LLAP config 'hive.llap.daemon.yarn.container.mb' updated. Current: {0}".format(llap_container_size)) - # Set 'hive.tez.container.size' only if it is read as "SET_ON_FIRST_INVOCATION", implying initialization. - # Else, we don't (1). Override the previous calculated value or (2). User provided value. - if self.get_hive_tez_container_size(services) == self.CONFIG_VALUE_UINITIALIZED: - mem_per_thread_for_llap = long(mem_per_thread_for_llap) - putHiveInteractiveSiteProperty('hive.tez.container.size', mem_per_thread_for_llap) - Logger.info("LLAP config 'hive.tez.container.size' updated. Current: {0}".format(mem_per_thread_for_llap)) - - putTezInteractiveSiteProperty('tez.runtime.io.sort.mb', tez_runtime_io_sort_mb) - if "tez-site" in services["configurations"] and "tez.runtime.sorter.class" in services["configurations"]["tez-site"]["properties"]: - if services["configurations"]["tez-site"]["properties"]["tez.runtime.sorter.class"] == "LEGACY": - putTezInteractiveSiteProperty("tez.runtime.io.sort.mb", "maximum", 1800) - Logger.info("'Tez for Hive2' config 'tez.runtime.io.sort.mb' updated. Current: {0}".format(tez_runtime_io_sort_mb)) - - putTezInteractiveSiteProperty('tez.runtime.unordered.output.buffer.size-mb', tez_runtime_unordered_output_buffer_size) - Logger.info("'Tez for Hive2' config 'tez.runtime.unordered.output.buffer.size-mb' updated. Current: {0}".format(tez_runtime_unordered_output_buffer_size)) - - putHiveInteractiveSiteProperty('hive.auto.convert.join.noconditionaltask.size', hive_auto_convert_join_noconditionaltask_size) - Logger.info("HIVE2 config 'hive.auto.convert.join.noconditionaltask.size' updated. Current: {0}".format(hive_auto_convert_join_noconditionaltask_size)) - - num_executors_per_node = long(num_executors_per_node) putHiveInteractiveSiteProperty('hive.llap.daemon.num.executors', num_executors_per_node) - putHiveInteractiveSitePropertyAttribute('hive.llap.daemon.num.executors', "minimum", 1) - putHiveInteractiveSitePropertyAttribute('hive.llap.daemon.num.executors', "maximum", long(num_executors_per_node_max)) - Logger.info("LLAP config 'hive.llap.daemon.num.executors' updated. Current: {0}, Min: 1, " - "Max: {1}".format(num_executors_per_node, num_executors_per_node_max)) + Logger.info("LLAP config 'hive.llap.daemon.num.executors' updated. Current: {0}".format(num_executors_per_node)) # 'hive.llap.io.threadpool.size' config value is to be set same as value calculated for # 'hive.llap.daemon.num.executors' at all times. putHiveInteractiveSiteProperty('hive.llap.io.threadpool.size', num_executors_per_node) Logger.info("LLAP config 'hive.llap.io.threadpool.size' updated. Current: {0}".format(num_executors_per_node)) - cache_mem_per_node = long(cache_mem_per_node) - putHiveInteractiveSiteProperty('hive.llap.io.memory.size', cache_mem_per_node) - Logger.info("LLAP config 'hive.llap.io.memory.size' updated. Current: {0}".format(cache_mem_per_node)) + cache_size_per_node = long(cache_size_per_node) + putHiveInteractiveSiteProperty('hive.llap.io.memory.size', cache_size_per_node) + Logger.info("LLAP config 'hive.llap.io.memory.size' updated. Current: {0}".format(cache_size_per_node)) llap_io_enabled = 'false' - if cache_mem_per_node >= 64: + if cache_size_per_node >= 64: llap_io_enabled = 'true' putHiveInteractiveSiteProperty('hive.llap.io.enabled', llap_io_enabled) @@ -1134,7 +1024,7 @@ class HDP25StackAdvisor(HDP24StackAdvisor): putHiveInteractiveSiteProperty('hive.server2.tez.sessions.per.default.queue', 1) putHiveInteractiveSitePropertyAttribute('hive.server2.tez.sessions.per.default.queue', "minimum", 1) - putHiveInteractiveSitePropertyAttribute('hive.server2.tez.sessions.per.default.queue', "maximum", 1) + putHiveInteractiveSitePropertyAttribute('hive.server2.tez.sessions.per.default.queue', "maximum", 32) putHiveInteractiveEnvProperty('num_llap_nodes', 0) putHiveInteractiveEnvPropertyAttribute('num_llap_nodes', "minimum", 1) @@ -1203,84 +1093,57 @@ class HDP25StackAdvisor(HDP24StackAdvisor): return node_manager_hosts """ - Returns current value of number of LLAP nodes in cluster (num_llap_nodes) + Returns the current LLAP queue capacity percentage value. (llap_queue_capacity) """ - def get_num_llap_nodes(self, services, configurations): - num_llap_nodes = None - # Check if 'num_llap_nodes' is modified in current ST invocation. - if 'hive-interactive-env' in configurations and 'num_llap_nodes' in configurations['hive-interactive-env']['properties']: - num_llap_nodes = float(configurations['hive-interactive-env']['properties']['num_llap_nodes']) - Logger.info("'num_llap_nodes' read from configurations as : {0}".format(num_llap_nodes)) - - if num_llap_nodes is None: - # Check if 'num_llap_nodes' is input in services array. - if 'num_llap_nodes' in services['configurations']['hive-interactive-env']['properties']: - num_llap_nodes = float(services['configurations']['hive-interactive-env']['properties']['num_llap_nodes']) - Logger.info("'num_llap_nodes' read from services as : {0}".format(num_llap_nodes)) - - if num_llap_nodes is None: - raise Fail("Couldn't retrieve Hive Server 'num_llap_nodes' config.") - assert (num_llap_nodes > 0), "'num_llap_nodes' current value : {0}. Expected value : > 0".format(num_llap_nodes) - - return num_llap_nodes - + def get_llap_cap_percent_slider(self, services, configurations): + llap_slider_cap_percentage = 0 + if 'llap_queue_capacity' in services['configurations']['hive-interactive-env']['properties']: + llap_slider_cap_percentage = float( + services['configurations']['hive-interactive-env']['properties']['llap_queue_capacity']) + Logger.error("'llap_queue_capacity' not present in services['configurations']['hive-interactive-env']['properties'].") + if llap_slider_cap_percentage <= 0 : + if 'hive-interactive-env' in configurations and \ + 'llap_queue_capacity' in configurations["hive-interactive-env"]["properties"]: + llap_slider_cap_percentage = float(configurations["hive-interactive-env"]["properties"]["llap_queue_capacity"]) + assert (llap_slider_cap_percentage > 0), "'llap_queue_capacity' is set to : {0}. Should be > 0.".format(llap_slider_cap_percentage) + return llap_slider_cap_percentage - def get_max_executors_per_node(self, nm_mem_per_node_normalized, nm_cpus_per_node, mem_per_thread): - # TODO: This potentially takes up the entire node leaving no space for AMs. - return min(math.floor(nm_mem_per_node_normalized / mem_per_thread), nm_cpus_per_node) """ - Calculates 'mem_per_thread_for_llap' for 1st time initialization. Else returns 'hive.tez.container.size' read value. - """ - def calculate_mem_per_thread_for_llap(self, services, nm_mem_per_node_normalized, cpu_per_nm_host): - hive_tez_container_size = self.get_hive_tez_container_size(services) - calculated_hive_tez_container_size = None - if hive_tez_container_size == self.CONFIG_VALUE_UINITIALIZED: - if nm_mem_per_node_normalized <= 1024: - calculated_hive_tez_container_size = min(512, nm_mem_per_node_normalized) - elif nm_mem_per_node_normalized <= 4096: - calculated_hive_tez_container_size = 1024 - elif nm_mem_per_node_normalized <= 10240: - calculated_hive_tez_container_size = 2048 - elif nm_mem_per_node_normalized <= 24576: - calculated_hive_tez_container_size = 3072 - else: - calculated_hive_tez_container_size = 4096 - Logger.info("Calculated and returning 'hive_tez_container_size' : {0}".format(calculated_hive_tez_container_size)) - return float(calculated_hive_tez_container_size) - else: - Logger.info("Returning 'hive_tez_container_size' : {0}".format(hive_tez_container_size)) - return float(hive_tez_container_size) - - """ - Read YARN config 'yarn.nodemanager.resource.cpu-vcores'. + Returns current value of number of LLAP nodes in cluster (num_llap_nodes) """ - def get_cpu_per_nm_host(self, services): - cpu_per_nm_host = None - - if 'yarn.nodemanager.resource.cpu-vcores' in services['configurations']['yarn-site']['properties']: - cpu_per_nm_host = float(services['configurations']['yarn-site']['properties'][ - 'yarn.nodemanager.resource.cpu-vcores']) - assert (cpu_per_nm_host > 0), "'yarn.nodemanager.resource.cpu-vcores' current value : {0}. Expected value : > 0" \ - .format(cpu_per_nm_host) + def get_num_llap_nodes(self, services): + if 'num_llap_nodes' in services['configurations']['hive-interactive-env']['properties']: + num_llap_nodes = float( + services['configurations']['hive-interactive-env']['properties']['num_llap_nodes']) + assert (num_llap_nodes > 0), "Number of LLAP nodes read : {0}. Expected value : > 0".format( + num_llap_nodes) + return num_llap_nodes else: - raise Fail("Couldn't retrieve YARN's 'yarn.nodemanager.resource.cpu-vcores' config.") - return cpu_per_nm_host + raise Fail("Couldn't retrieve Hive Server interactive's 'num_llap_nodes' config.") """ - Gets HIVE Tez container size (hive.tez.container.size). + Gets HIVE Tez container size (hive.tez.container.size). Takes into account if it has been calculated as part of current + Stack Advisor invocation. """ - def get_hive_tez_container_size(self, services): + def get_hive_tez_container_size(self, services, configurations): hive_container_size = None - if 'hive.tez.container.size' in services['configurations'][self.HIVE_INTERACTIVE_SITE]['properties']: - hive_container_size = services['configurations'][self.HIVE_INTERACTIVE_SITE]['properties']['hive.tez.container.size'] - Logger.info("'hive.tez.container.size' read from services as : {0}".format(hive_container_size)) - - if hive_container_size is None: + # Check if 'hive.tez.container.size' is modified in current ST invocation. + if 'hive-site' in configurations and 'hive.tez.container.size' in configurations['hive-site']['properties']: + hive_container_size = float(configurations['hive-site']['properties']['hive.tez.container.size']) + Logger.info("'hive.tez.container.size' read from configurations as : {0}".format(hive_container_size)) + + if not hive_container_size: + # Check if 'hive.tez.container.size' is input in services array. + if 'hive.tez.container.size' in services['configurations']['hive-site']['properties']: + hive_container_size = float(services['configurations']['hive-site']['properties']['hive.tez.container.size']) + Logger.info("'hive.tez.container.size' read from services as : {0}".format(hive_container_size)) + if not hive_container_size: raise Fail("Couldn't retrieve Hive Server 'hive.tez.container.size' config.") - if hive_container_size != self.CONFIG_VALUE_UINITIALIZED: - assert (hive_container_size >= 0), "'hive.tez.container.size' current value : {0}. " \ - "Expected value : >= 0".format(hive_container_size) + + assert (hive_container_size > 0), "'hive.tez.container.size' current value : {0}. Expected value : > 0".format( + hive_container_size) + return hive_container_size """ @@ -1293,7 +1156,7 @@ class HDP25StackAdvisor(HDP24StackAdvisor): hive_container_size = float(configurations['hive-interactive-env']['properties']['llap_headroom_space']) Logger.info("'llap_headroom_space' read from configurations as : {0}".format(llap_headroom_space)) - if llap_headroom_space is None: + if not llap_headroom_space: # Check if 'llap_headroom_space' is input in services array. if 'llap_headroom_space' in services['configurations']['hive-interactive-env']['properties']: llap_headroom_space = float(services['configurations']['hive-interactive-env']['properties']['llap_headroom_space']) @@ -1330,7 +1193,7 @@ class HDP25StackAdvisor(HDP24StackAdvisor): yarn_min_container_size = float(services['configurations']['yarn-site']['properties']['yarn.scheduler.minimum-allocation-mb']) Logger.info("'yarn.scheduler.minimum-allocation-mb' read from services as : {0}".format(yarn_min_container_size)) - if yarn_min_container_size is None: + if not yarn_min_container_size: raise Fail("Couldn't retrieve YARN's 'yarn.scheduler.minimum-allocation-mb' config.") assert (yarn_min_container_size > 0), "'yarn.scheduler.minimum-allocation-mb' current value : {0}. " \ @@ -1368,14 +1231,14 @@ class HDP25StackAdvisor(HDP24StackAdvisor): yarn_nm_mem_in_mb = float(configurations['yarn-site']['properties']['yarn.nodemanager.resource.memory-mb']) Logger.info("'yarn.nodemanager.resource.memory-mb' read from configurations as : {0}".format(yarn_nm_mem_in_mb)) - if yarn_nm_mem_in_mb is None: + if not yarn_nm_mem_in_mb: # Check if 'yarn.nodemanager.resource.memory-mb' is input in services array. if 'yarn-site' in services['configurations'] and \ 'yarn.nodemanager.resource.memory-mb' in services['configurations']['yarn-site']['properties']: yarn_nm_mem_in_mb = float(services['configurations']['yarn-site']['properties']['yarn.nodemanager.resource.memory-mb']) Logger.info("'yarn.nodemanager.resource.memory-mb' read from services as : {0}".format(yarn_nm_mem_in_mb)) - if yarn_nm_mem_in_mb is None: + if not yarn_nm_mem_in_mb: raise Fail("Couldn't retrieve YARN's 'yarn.nodemanager.resource.memory-mb' config.") assert (yarn_nm_mem_in_mb > 0.0), "'yarn.nodemanager.resource.memory-mb' current value : {0}. " \ @@ -1384,45 +1247,21 @@ class HDP25StackAdvisor(HDP24StackAdvisor): return yarn_nm_mem_in_mb """ - Calculates Tez App Master container size (tez.am.resource.memory.mb) for tez_hive2/tez-site on initialization if values read is 0. - Else returns the read value. + Determines Tez App Master container size (tez.am.resource.memory.mb) for tez_hive2/tez-site based on total cluster capacity. """ - def calculate_tez_am_container_size(self, services, total_cluster_capacity): + def calculate_tez_am_container_size(self, total_cluster_capacity): if total_cluster_capacity is None or not isinstance(total_cluster_capacity, long): raise Fail ("Passed-in 'Total Cluster Capacity' is : '{0}'".format(total_cluster_capacity)) - tez_am_resource_memory_mb = self.get_tez_am_resource_memory_mb(services) - calculated_tez_am_resource_memory_mb = None - if tez_am_resource_memory_mb == self.CONFIG_VALUE_UINITIALIZED: - if total_cluster_capacity <= 0: - raise Fail ("Passed-in 'Total Cluster Capacity' ({0}) is Invalid.".format(total_cluster_capacity)) - if total_cluster_capacity <= 4096: - calculated_tez_am_resource_memory_mb = 256 - elif total_cluster_capacity > 4096 and total_cluster_capacity <= 73728: - calculated_tez_am_resource_memory_mb = 512 - elif total_cluster_capacity > 73728: - calculated_tez_am_resource_memory_mb = 1536 - Logger.info("Calculated and returning 'tez_am_resource_memory_mb' as : {0}".format(calculated_tez_am_resource_memory_mb)) - return float(calculated_tez_am_resource_memory_mb) - else: - Logger.info("Returning 'tez_am_resource_memory_mb' as : {0}".format(tez_am_resource_memory_mb)) - return float(tez_am_resource_memory_mb) + if total_cluster_capacity <= 0: + raise Fail ("Passed-in 'Total Cluster Capacity' ({0}) is Invalid.".format(total_cluster_capacity)) + if total_cluster_capacity <= 4096: + return 256 + elif total_cluster_capacity > 4096 and total_cluster_capacity <= 73728: + return 512 + elif total_cluster_capacity > 73728: + return 1536 - """ - Gets Tez's AM resource memory (tez.am.resource.memory.mb) from services. - """ - def get_tez_am_resource_memory_mb(self, services): - tez_am_resource_memory_mb = None - if 'tez.am.resource.memory.mb' in services['configurations']['tez-interactive-site']['properties']: - tez_am_resource_memory_mb = services['configurations']['tez-interactive-site']['properties']['tez.am.resource.memory.mb'] - Logger.info("'tez.am.resource.memory.mb' read from services as : {0}".format(tez_am_resource_memory_mb)) - - if tez_am_resource_memory_mb is None: - raise Fail("Couldn't retrieve tez's 'tez.am.resource.memory.mb' config.") - if tez_am_resource_memory_mb != self.CONFIG_VALUE_UINITIALIZED: - assert (tez_am_resource_memory_mb >= 0), "'tez.am.resource.memory.mb' current value : {0}. " \ - "Expected value : >= 0".format(tez_am_resource_memory_mb) - return tez_am_resource_memory_mb """ Calculate minimum queue capacity required in order to get LLAP and HIVE2 app into running state. @@ -1437,8 +1276,8 @@ class HDP25StackAdvisor(HDP24StackAdvisor): # Calculate based on minimum size required by containers. yarn_min_container_size = self.get_yarn_min_container_size(services, configurations) slider_am_size = self.calculate_slider_am_size(yarn_min_container_size) - hive_tez_container_size = self.get_hive_tez_container_size(services) - tez_am_container_size = self.calculate_tez_am_container_size(services, long(total_cluster_cap)) + hive_tez_container_size = self.get_hive_tez_container_size(services, configurations) + tez_am_container_size = self.calculate_tez_am_container_size(long(total_cluster_cap)) normalized_val = self._normalizeUp(slider_am_size, yarn_min_container_size) + self._normalizeUp\ (hive_tez_container_size, yarn_min_container_size) + self._normalizeUp(tez_am_container_size, yarn_min_container_size) @@ -1473,7 +1312,7 @@ class HDP25StackAdvisor(HDP24StackAdvisor): (2). Updates 'llap' queue capacity and state, if current selected queue is 'llap', and only 2 queues exist at root level : 'default' and 'llap'. """ - def checkAndManageLlapQueue(self, services, configurations, hosts, llap_queue_name, llap_queue_cap_perc): + def checkAndManageLlapQueue(self, services, configurations, hosts, llap_queue_name): Logger.info("Determining creation/adjustment of 'capacity-scheduler' for 'llap' queue.") putHiveInteractiveEnvProperty = self.putProperty(configurations, "hive-interactive-env", services) putHiveInteractiveSiteProperty = self.putProperty(configurations, self.HIVE_INTERACTIVE_SITE, services) @@ -1484,6 +1323,24 @@ class HDP25StackAdvisor(HDP24StackAdvisor): capacity_scheduler_properties, received_as_key_value_pair = self.getCapacitySchedulerProperties(services) if capacity_scheduler_properties: leafQueueNames = self.getAllYarnLeafQueues(capacity_scheduler_properties) + # Get the llap Cluster percentage used for 'llap' Queue creation + if 'llap_queue_capacity' in services['configurations']['hive-interactive-env']['properties']: + llap_slider_cap_percentage = int( + services['configurations']['hive-interactive-env']['properties']['llap_queue_capacity']) + min_reqd_queue_cap_perc = self.min_queue_perc_reqd_for_llap_and_hive_app(services, hosts, configurations) + if min_reqd_queue_cap_perc > 100: + min_reqd_queue_cap_perc = 100 + Logger.info("Received 'Minimum Required LLAP queue capacity' : {0}% (out of bounds), adjusted it to : 100%".format(min_reqd_queue_cap_perc)) + + # Adjust 'llap' queue capacity slider value to be minimum required if out of expected bounds. + if llap_slider_cap_percentage <= 0 or llap_slider_cap_percentage > 100: + Logger.info("Adjusting HIVE 'llap_queue_capacity' from {0}% (invalid size) to {1}%".format(llap_slider_cap_percentage, min_reqd_queue_cap_perc)) + putHiveInteractiveEnvProperty('llap_queue_capacity', min_reqd_queue_cap_perc) + llap_slider_cap_percentage = min_reqd_queue_cap_perc + else: + Logger.error("Problem retrieving LLAP Queue Capacity. Skipping creating {0} queue".format(llap_queue_name)) + return + cap_sched_config_keys = capacity_scheduler_properties.keys() yarn_default_queue_capacity = -1 @@ -1521,14 +1378,14 @@ class HDP25StackAdvisor(HDP24StackAdvisor): if 'default' in leafQueueNames and \ ((len(leafQueueNames) == 1 and int(yarn_default_queue_capacity) == 100) or \ ((len(leafQueueNames) == 2 and llap_queue_name in leafQueueNames) and \ - ((currLlapQueueState == 'STOPPED' and enabled_hive_int_in_changed_configs) or (currLlapQueueState == 'RUNNING' and currLlapQueueCap != llap_queue_cap_perc)))): - adjusted_default_queue_cap = str(100 - llap_queue_cap_perc) + ((currLlapQueueState == 'STOPPED' and enabled_hive_int_in_changed_configs) or (currLlapQueueState == 'RUNNING' and currLlapQueueCap != llap_slider_cap_percentage)))): + adjusted_default_queue_cap = str(100 - llap_slider_cap_percentage) hive_user = '*' # Open to all if 'hive_user' in services['configurations']['hive-env']['properties']: hive_user = services['configurations']['hive-env']['properties']['hive_user'] - llap_queue_cap_perc = str(llap_queue_cap_perc) + llap_slider_cap_percentage = str(llap_slider_cap_percentage) # If capacity-scheduler configs are received as one concatenated string, we deposit the changed configs back as # one concatenated string. @@ -1555,9 +1412,9 @@ class HDP25StackAdvisor(HDP24StackAdvisor): + "yarn.scheduler.capacity.root." + llap_queue_name + ".ordering-policy=fifo\n" \ + "yarn.scheduler.capacity.root." + llap_queue_name + ".minimum-user-limit-percent=100\n" \ + "yarn.scheduler.capacity.root." + llap_queue_name + ".maximum-capacity=" \ - + llap_queue_cap_perc + "\n" \ + + llap_slider_cap_percentage + "\n" \ + "yarn.scheduler.capacity.root." + llap_queue_name + ".capacity=" \ - + llap_queue_cap_perc + "\n" \ + + llap_slider_cap_percentage + "\n" \ + "yarn.scheduler.capacity.root." + llap_queue_name + ".acl_submit_applications=" \ + hive_user + "\n" \ + "yarn.scheduler.capacity.root." + llap_queue_name + ".acl_administer_queue=" \ @@ -1586,8 +1443,8 @@ class HDP25StackAdvisor(HDP24StackAdvisor): putCapSchedProperty("yarn.scheduler.capacity.root." + llap_queue_name + ".state", "RUNNING") putCapSchedProperty("yarn.scheduler.capacity.root." + llap_queue_name + ".ordering-policy", "fifo") putCapSchedProperty("yarn.scheduler.capacity.root." + llap_queue_name + ".minimum-user-limit-percent", "100") - putCapSchedProperty("yarn.scheduler.capacity.root." + llap_queue_name + ".maximum-capacity", llap_queue_cap_perc) - putCapSchedProperty("yarn.scheduler.capacity.root." + llap_queue_name + ".capacity", llap_queue_cap_perc) + putCapSchedProperty("yarn.scheduler.capacity.root." + llap_queue_name + ".maximum-capacity", llap_slider_cap_percentage) + putCapSchedProperty("yarn.scheduler.capacity.root." + llap_queue_name + ".capacity", llap_slider_cap_percentage) putCapSchedProperty("yarn.scheduler.capacity.root." + llap_queue_name + ".acl_submit_applications", hive_user) putCapSchedProperty("yarn.scheduler.capacity.root." + llap_queue_name + ".acl_administer_queue", hive_user) putCapSchedProperty("yarn.scheduler.capacity.root." + llap_queue_name + ".maximum-am-resource-percent", "1") @@ -1599,16 +1456,19 @@ class HDP25StackAdvisor(HDP24StackAdvisor): if updated_cap_sched_configs_str or updated_cap_sched_configs_as_dict: if len(leafQueueNames) == 1: # 'llap' queue didn't exist before Logger.info("Created YARN Queue : '{0}' with capacity : {1}%. Adjusted 'default' queue capacity to : {2}%" \ - .format(llap_queue_name, llap_queue_cap_perc, adjusted_default_queue_cap)) + .format(llap_queue_name, llap_slider_cap_percentage, adjusted_default_queue_cap)) else: # Queue existed, only adjustments done. - Logger.info("Adjusted YARN Queue : '{0}'. Current capacity : {1}%. State: RUNNING.".format(llap_queue_name, llap_queue_cap_perc)) + Logger.info("Adjusted YARN Queue : '{0}'. Current capacity : {1}%. State: RUNNING.".format(llap_queue_name, llap_slider_cap_percentage)) Logger.info("Adjusted 'default' queue capacity to : {0}%".format(adjusted_default_queue_cap)) # Update Hive 'hive.llap.daemon.queue.name' prop to use 'llap' queue. putHiveInteractiveSiteProperty('hive.llap.daemon.queue.name', llap_queue_name) putHiveInteractiveSiteProperty('hive.server2.tez.default.queues', llap_queue_name) + putHiveInteractiveEnvPropertyAttribute('llap_queue_capacity', "minimum", min_reqd_queue_cap_perc) + putHiveInteractiveEnvPropertyAttribute('llap_queue_capacity', "maximum", 100) + # Update 'hive.llap.daemon.queue.name' prop combo entries and llap capacity slider visibility. - self.setLlapDaemonQueuePropAttributes(services, configurations) + self.setLlapDaemonQueuePropAttributesAndCapSliderVisibility(services, configurations) else: Logger.debug("Not creating/adjusting {0} queue. Current YARN queues : {1}".format(llap_queue_name, list(leafQueueNames))) else: @@ -1687,10 +1547,13 @@ class HDP25StackAdvisor(HDP24StackAdvisor): """ Checks and sets the 'Hive Server Interactive' 'hive.llap.daemon.queue.name' config Property Attributes. Takes into account that 'capacity-scheduler' may have changed (got updated) in current Stack Advisor invocation. + + Also, updates the 'llap_queue_capacity' slider visibility. """ - def setLlapDaemonQueuePropAttributes(self, services, configurations): + def setLlapDaemonQueuePropAttributesAndCapSliderVisibility(self, services, configurations): Logger.info("Determining 'hive.llap.daemon.queue.name' config Property Attributes.") putHiveInteractiveSitePropertyAttribute = self.putPropertyAttribute(configurations, self.HIVE_INTERACTIVE_SITE) + putHiveInteractiveEnvPropertyAttribute = self.putPropertyAttribute(configurations, "hive-interactive-env") capacity_scheduler_properties = dict() @@ -1740,6 +1603,29 @@ class HDP25StackAdvisor(HDP24StackAdvisor): leafQueues = sorted(leafQueues, key=lambda q: q['value']) putHiveInteractiveSitePropertyAttribute("hive.llap.daemon.queue.name", "entries", leafQueues) Logger.info("'hive.llap.daemon.queue.name' config Property Attributes set to : {0}".format(leafQueues)) + + # Update 'llap_queue_capacity' slider visibility to 'true' if current selected queue in 'hive.llap.daemon.queue.name' + # is 'llap', else 'false'. + llap_daemon_selected_queue_name = None + llap_queue_selected_in_current_call = None + if self.HIVE_INTERACTIVE_SITE in services['configurations'] and \ + 'hive.llap.daemon.queue.name' in services['configurations'][self.HIVE_INTERACTIVE_SITE]['properties']: + llap_daemon_selected_queue_name = services['configurations'][self.HIVE_INTERACTIVE_SITE]['properties']['hive.llap.daemon.queue.name'] + + if self.HIVE_INTERACTIVE_SITE in configurations and \ + 'hive.llap.daemon.queue.name' in configurations[self.HIVE_INTERACTIVE_SITE]['properties']: + llap_queue_selected_in_current_call = configurations[self.HIVE_INTERACTIVE_SITE]['properties']['hive.llap.daemon.queue.name'] + + # Check to see if only 2 queues exist at root level : 'default' and 'llap' and current selected queue in 'hive.llap.daemon.queue.name' + # is 'llap'. + if len(leafQueueNames) == 2 and \ + ((llap_daemon_selected_queue_name != None and llap_daemon_selected_queue_name == 'llap') or \ + (llap_queue_selected_in_current_call != None and llap_queue_selected_in_current_call == 'llap')): + putHiveInteractiveEnvPropertyAttribute("llap_queue_capacity", "visible", "true") + Logger.info("Setting LLAP queue capacity slider visibility to 'True'.") + else: + putHiveInteractiveEnvPropertyAttribute("llap_queue_capacity", "visible", "false") + Logger.info("Setting LLAP queue capacity slider visibility to 'False'.") else: Logger.error("Problem retrieving YARN queues. Skipping updating HIVE Server Interactve " "'hive.server2.tez.default.queues' property attributes.") @@ -1775,23 +1661,6 @@ class HDP25StackAdvisor(HDP24StackAdvisor): return llap_selected_queue_state """ - Retrieves the passed in queue's 'AM fraction' from Capacity Scheduler. - """ - def __getQueueAmFractionFromCapacityScheduler(self, capacity_scheduler_properties, llap_daemon_selected_queue_name): - # Identify the key which contains the AM fraction for 'llap_daemon_selected_queue_name'. - cap_sched_keys = capacity_scheduler_properties.keys() - llap_selected_queue_am_percent_key = None - for key in cap_sched_keys: - if "yarn.scheduler.capacity.maximum-am-resource-percent" in key: - llap_selected_queue_am_percent_key = key - Logger.info("AM percent key got for {0} queue is : {1}".format(llap_daemon_selected_queue_name, llap_selected_queue_am_percent_key)) - break; - assert(llap_selected_queue_am_percent_key != None), "Couldn't determine '{0}' queue's relevant key for AM percent.".format(llap_daemon_selected_queue_name) - llap_selected_queue_am_percent = capacity_scheduler_properties.get(llap_selected_queue_am_percent_key) - Logger.info("value for key {0} is {1}".format(llap_selected_queue_am_percent_key, llap_selected_queue_am_percent)) - return llap_selected_queue_am_percent - - """ Calculates the total available capacity for the passed-in YARN queue of any level based on the percentages. """ def __getSelectedQueueTotalCap(self, capacity_scheduler_properties, llap_daemon_selected_queue_name, total_cluster_capacity): http://git-wip-us.apache.org/repos/asf/ambari/blob/4fcc4938/ambari-server/src/test/java/org/apache/ambari/server/upgrade/UpgradeCatalog250Test.java ---------------------------------------------------------------------- diff --git a/ambari-server/src/test/java/org/apache/ambari/server/upgrade/UpgradeCatalog250Test.java b/ambari-server/src/test/java/org/apache/ambari/server/upgrade/UpgradeCatalog250Test.java index 5b520c3..4135919 100644 --- a/ambari-server/src/test/java/org/apache/ambari/server/upgrade/UpgradeCatalog250Test.java +++ b/ambari-server/src/test/java/org/apache/ambari/server/upgrade/UpgradeCatalog250Test.java @@ -18,14 +18,31 @@ package org.apache.ambari.server.upgrade; -import com.google.common.collect.Maps; -import com.google.gson.Gson; -import com.google.inject.Binder; -import com.google.inject.Guice; -import com.google.inject.Injector; -import com.google.inject.Module; -import com.google.inject.Provider; +import javax.persistence.EntityManager; import junit.framework.Assert; +import static org.easymock.EasyMock.anyObject; +import static org.easymock.EasyMock.anyString; +import static org.easymock.EasyMock.capture; +import static org.easymock.EasyMock.createMockBuilder; +import static org.easymock.EasyMock.createNiceMock; +import static org.easymock.EasyMock.createStrictMock; +import static org.easymock.EasyMock.eq; +import static org.easymock.EasyMock.expect; +import static org.easymock.EasyMock.expectLastCall; +import static org.easymock.EasyMock.newCapture; +import static org.easymock.EasyMock.replay; +import static org.easymock.EasyMock.reset; +import static org.easymock.EasyMock.verify; +import static org.junit.Assert.assertTrue; + +import java.lang.reflect.Method; +import java.sql.Connection; +import java.sql.ResultSet; +import java.sql.Statement; +import java.util.HashMap; +import java.util.List; +import java.util.Map; + import org.apache.ambari.server.actionmanager.ActionManager; import org.apache.ambari.server.configuration.Configuration; import org.apache.ambari.server.controller.AmbariManagementController; @@ -44,29 +61,13 @@ import org.junit.After; import org.junit.Before; import org.junit.Test; -import javax.persistence.EntityManager; -import java.lang.reflect.Method; -import java.sql.Connection; -import java.sql.ResultSet; -import java.sql.Statement; -import java.util.HashMap; -import java.util.List; -import java.util.Map; - -import static org.easymock.EasyMock.anyObject; -import static org.easymock.EasyMock.anyString; -import static org.easymock.EasyMock.capture; -import static org.easymock.EasyMock.createMockBuilder; -import static org.easymock.EasyMock.createNiceMock; -import static org.easymock.EasyMock.createStrictMock; -import static org.easymock.EasyMock.eq; -import static org.easymock.EasyMock.expect; -import static org.easymock.EasyMock.expectLastCall; -import static org.easymock.EasyMock.newCapture; -import static org.easymock.EasyMock.replay; -import static org.easymock.EasyMock.reset; -import static org.easymock.EasyMock.verify; -import static org.junit.Assert.assertTrue; +import com.google.common.collect.Maps; +import com.google.gson.Gson; +import com.google.inject.Binder; +import com.google.inject.Guice; +import com.google.inject.Injector; +import com.google.inject.Module; +import com.google.inject.Provider; /** * {@link UpgradeCatalog250} unit tests. @@ -211,14 +212,10 @@ public class UpgradeCatalog250Test { Method updateAmsConfigs = UpgradeCatalog250.class.getDeclaredMethod("updateAMSConfigs"); Method updateKafkaConfigs = UpgradeCatalog250.class.getDeclaredMethod("updateKafkaConfigs"); Method addNewConfigurationsFromXml = AbstractUpgradeCatalog.class.getDeclaredMethod("addNewConfigurationsFromXml"); - Method updateHIVEInteractiveConfigs = UpgradeCatalog250.class.getDeclaredMethod("updateHIVEInteractiveConfigs"); - Method updateTEZInteractiveConfigs = UpgradeCatalog250.class.getDeclaredMethod("updateTEZInteractiveConfigs"); UpgradeCatalog250 upgradeCatalog250 = createMockBuilder(UpgradeCatalog250.class) .addMockedMethod(updateAmsConfigs) .addMockedMethod(updateKafkaConfigs) - .addMockedMethod(updateHIVEInteractiveConfigs) - .addMockedMethod(updateTEZInteractiveConfigs) .addMockedMethod(addNewConfigurationsFromXml) .createMock(); @@ -232,12 +229,6 @@ public class UpgradeCatalog250Test { upgradeCatalog250.updateKafkaConfigs(); expectLastCall().once(); - upgradeCatalog250.updateHIVEInteractiveConfigs(); - expectLastCall().once(); - - upgradeCatalog250.updateTEZInteractiveConfigs(); - expectLastCall().once(); - replay(upgradeCatalog250); upgradeCatalog250.executeDMLUpdates(); @@ -371,220 +362,4 @@ public class UpgradeCatalog250Test { Map<String, String> updatedProperties = propertiesCapture.getValue(); assertTrue(Maps.difference(newProperties, updatedProperties).areEqual()); } - - @Test - public void testHIVEInteractiveUpdateConfigHiveTezContSize() throws Exception { - Map<String, String> oldProperties = new HashMap<String, String>() { - { - put("hive.tez.container.size", "2048"); - } - }; - Map<String, String> newProperties = new HashMap<String, String>() { - { - put("hive.tez.container.size", "SET_ON_FIRST_INVOCATION"); - } - }; - - EasyMockSupport easyMockSupport = new EasyMockSupport(); - - Clusters clusters = easyMockSupport.createNiceMock(Clusters.class); - final Cluster cluster = easyMockSupport.createNiceMock(Cluster.class); - Config mockHive = easyMockSupport.createNiceMock(Config.class); - - expect(clusters.getClusters()).andReturn(new HashMap<String, Cluster>() {{ - put("normal", cluster); - }}).anyTimes(); - expect(cluster.getDesiredConfigByType("hive-interactive-site")).andReturn(mockHive).atLeastOnce(); - expect(mockHive.getProperties()).andReturn(oldProperties).anyTimes(); - - Injector injector = easyMockSupport.createNiceMock(Injector.class); - expect(injector.getInstance(Gson.class)).andReturn(null).anyTimes(); - expect(injector.getInstance(MaintenanceStateHelper.class)).andReturn(null).anyTimes(); - expect(injector.getInstance(KerberosHelper.class)).andReturn(createNiceMock(KerberosHelper.class)).anyTimes(); - - replay(injector, clusters, mockHive, cluster); - - AmbariManagementControllerImpl controller = createMockBuilder(AmbariManagementControllerImpl.class) - .addMockedMethod("createConfiguration") - .addMockedMethod("getClusters", new Class[] { }) - .addMockedMethod("createConfig") - .withConstructor(createNiceMock(ActionManager.class), clusters, injector) - .createNiceMock(); - - Injector injector2 = easyMockSupport.createNiceMock(Injector.class); - Capture<Map> propertiesCapture = EasyMock.newCapture(); - - expect(injector2.getInstance(AmbariManagementController.class)).andReturn(controller).anyTimes(); - expect(controller.getClusters()).andReturn(clusters).anyTimes(); - expect(controller.createConfig(anyObject(Cluster.class), anyString(), capture(propertiesCapture), anyString(), - anyObject(Map.class))).andReturn(createNiceMock(Config.class)).once(); - replay(controller, injector2); - new UpgradeCatalog250(injector2).updateHIVEInteractiveConfigs(); - easyMockSupport.verifyAll(); - - Map<String, String> updatedProperties = propertiesCapture.getValue(); - assertTrue(Maps.difference(updatedProperties, newProperties).areEqual()); - } - - @Test - public void testHIVEInteractiveUpdateConfigHiveJoinSize() throws Exception { - Map<String, String> oldProperties = new HashMap<String, String>() { - { - put("hive.auto.convert.join.noconditionaltask.size", "3"); - } - }; - Map<String, String> newProperties = new HashMap<String, String>() { - { - put("hive.auto.convert.join.noconditionaltask.size", "1000000000"); - } - }; - - EasyMockSupport easyMockSupport = new EasyMockSupport(); - - Clusters clusters = easyMockSupport.createNiceMock(Clusters.class); - final Cluster cluster = easyMockSupport.createNiceMock(Cluster.class); - Config mockHive = easyMockSupport.createNiceMock(Config.class); - - expect(clusters.getClusters()).andReturn(new HashMap<String, Cluster>() {{ - put("normal", cluster); - }}).anyTimes(); - expect(cluster.getDesiredConfigByType("hive-interactive-site")).andReturn(mockHive).atLeastOnce(); - expect(mockHive.getProperties()).andReturn(oldProperties).anyTimes(); - - Injector injector = easyMockSupport.createNiceMock(Injector.class); - expect(injector.getInstance(Gson.class)).andReturn(null).anyTimes(); - expect(injector.getInstance(MaintenanceStateHelper.class)).andReturn(null).anyTimes(); - expect(injector.getInstance(KerberosHelper.class)).andReturn(createNiceMock(KerberosHelper.class)).anyTimes(); - - replay(injector, clusters, mockHive, cluster); - - AmbariManagementControllerImpl controller = createMockBuilder(AmbariManagementControllerImpl.class) - .addMockedMethod("createConfiguration") - .addMockedMethod("getClusters", new Class[] { }) - .addMockedMethod("createConfig") - .withConstructor(createNiceMock(ActionManager.class), clusters, injector) - .createNiceMock(); - - Injector injector2 = easyMockSupport.createNiceMock(Injector.class); - Capture<Map> propertiesCapture = EasyMock.newCapture(); - - expect(injector2.getInstance(AmbariManagementController.class)).andReturn(controller).anyTimes(); - expect(controller.getClusters()).andReturn(clusters).anyTimes(); - expect(controller.createConfig(anyObject(Cluster.class), anyString(), capture(propertiesCapture), anyString(), - anyObject(Map.class))).andReturn(createNiceMock(Config.class)).anyTimes(); - replay(controller, injector2); - new UpgradeCatalog250(injector2).updateHIVEInteractiveConfigs(); - easyMockSupport.verifyAll(); - - Map<String, String> updatedProperties = propertiesCapture.getValue(); - assertTrue(Maps.difference(updatedProperties, newProperties).areEqual()); - } - - @Test - public void testTEZInteractiveUpdateConfigTezRunTimeIoMb() throws Exception { - Map<String, String> oldProperties = new HashMap<String, String>() { - { - put("tez.runtime.io.sort.mb", "1024"); - } - }; - Map<String, String> newProperties = new HashMap<String, String>() { - { - put("tez.runtime.io.sort.mb", "512"); - } - }; - EasyMockSupport easyMockSupport = new EasyMockSupport(); - - Clusters clusters = easyMockSupport.createNiceMock(Clusters.class); - final Cluster cluster = easyMockSupport.createNiceMock(Cluster.class); - Config mockHive = easyMockSupport.createNiceMock(Config.class); - - expect(clusters.getClusters()).andReturn(new HashMap<String, Cluster>() {{ - put("normal", cluster); - }}).anyTimes(); - expect(cluster.getDesiredConfigByType("tez-interactive-site")).andReturn(mockHive).atLeastOnce(); - expect(mockHive.getProperties()).andReturn(oldProperties).anyTimes(); - - Injector injector = easyMockSupport.createNiceMock(Injector.class); - expect(injector.getInstance(Gson.class)).andReturn(null).anyTimes(); - expect(injector.getInstance(MaintenanceStateHelper.class)).andReturn(null).anyTimes(); - expect(injector.getInstance(KerberosHelper.class)).andReturn(createNiceMock(KerberosHelper.class)).anyTimes(); - - replay(injector, clusters, mockHive, cluster); - - AmbariManagementControllerImpl controller = createMockBuilder(AmbariManagementControllerImpl.class) - .addMockedMethod("createConfiguration") - .addMockedMethod("getClusters", new Class[] { }) - .addMockedMethod("createConfig") - .withConstructor(createNiceMock(ActionManager.class), clusters, injector) - .createNiceMock(); - - Injector injector2 = easyMockSupport.createNiceMock(Injector.class); - Capture<Map> propertiesCapture = EasyMock.newCapture(); - - expect(injector2.getInstance(AmbariManagementController.class)).andReturn(controller).anyTimes(); - expect(controller.getClusters()).andReturn(clusters).anyTimes(); - expect(controller.createConfig(anyObject(Cluster.class), anyString(), capture(propertiesCapture), anyString(), - anyObject(Map.class))).andReturn(createNiceMock(Config.class)).once(); - - replay(controller, injector2); - new UpgradeCatalog250(injector2).updateTEZInteractiveConfigs(); - easyMockSupport.verifyAll(); - - Map<String, String> updatedProperties = propertiesCapture.getValue(); - assertTrue(Maps.difference(newProperties, updatedProperties).areEqual()); - } - - @Test - public void testTEZInteractiveUpdateConfigTezOutputBufferMb() throws Exception { - Map<String, String> oldProperties = new HashMap<String, String>() { - { - put("tez.runtime.unordered.output.buffer.size-mb", "1024"); - } - }; - Map<String, String> newProperties = new HashMap<String, String>() { - { - put("tez.runtime.unordered.output.buffer.size-mb", "100"); - } - }; - EasyMockSupport easyMockSupport = new EasyMockSupport(); - - Clusters clusters = easyMockSupport.createNiceMock(Clusters.class); - final Cluster cluster = easyMockSupport.createNiceMock(Cluster.class); - Config mockHive = easyMockSupport.createNiceMock(Config.class); - - expect(clusters.getClusters()).andReturn(new HashMap<String, Cluster>() {{ - put("normal", cluster); - }}).anyTimes(); - expect(cluster.getDesiredConfigByType("tez-interactive-site")).andReturn(mockHive).atLeastOnce(); - expect(mockHive.getProperties()).andReturn(oldProperties).anyTimes(); - - Injector injector = easyMockSupport.createNiceMock(Injector.class); - expect(injector.getInstance(Gson.class)).andReturn(null).anyTimes(); - expect(injector.getInstance(MaintenanceStateHelper.class)).andReturn(null).anyTimes(); - expect(injector.getInstance(KerberosHelper.class)).andReturn(createNiceMock(KerberosHelper.class)).anyTimes(); - - replay(injector, clusters, mockHive, cluster); - - AmbariManagementControllerImpl controller = createMockBuilder(AmbariManagementControllerImpl.class) - .addMockedMethod("createConfiguration") - .addMockedMethod("getClusters", new Class[] { }) - .addMockedMethod("createConfig") - .withConstructor(createNiceMock(ActionManager.class), clusters, injector) - .createNiceMock(); - - Injector injector2 = easyMockSupport.createNiceMock(Injector.class); - Capture<Map> propertiesCapture = EasyMock.newCapture(); - - expect(injector2.getInstance(AmbariManagementController.class)).andReturn(controller).anyTimes(); - expect(controller.getClusters()).andReturn(clusters).anyTimes(); - expect(controller.createConfig(anyObject(Cluster.class), anyString(), capture(propertiesCapture), anyString(), - anyObject(Map.class))).andReturn(createNiceMock(Config.class)).anyTimes(); - - replay(controller, injector2); - new UpgradeCatalog250(injector2).updateTEZInteractiveConfigs(); - easyMockSupport.verifyAll(); - - Map<String, String> updatedProperties = propertiesCapture.getValue(); - assertTrue(Maps.difference(newProperties, updatedProperties).areEqual()); - } }
