AMBARI-20281. Porting changes made in 2.5/stack_advisor to 
3.0/YARN/service_advisor for LLAP.


Project: http://git-wip-us.apache.org/repos/asf/ambari/repo
Commit: http://git-wip-us.apache.org/repos/asf/ambari/commit/fc8128b0
Tree: http://git-wip-us.apache.org/repos/asf/ambari/tree/fc8128b0
Diff: http://git-wip-us.apache.org/repos/asf/ambari/diff/fc8128b0

Branch: refs/heads/branch-feature-AMBARI-12556
Commit: fc8128b0de0a97c3c8ef9e7064776a5f0030ae6a
Parents: 8e6c321
Author: Swapan Shridhar <sshrid...@hortonworks.com>
Authored: Thu Mar 2 12:56:54 2017 -0800
Committer: Swapan Shridhar <sshrid...@hortonworks.com>
Committed: Thu Mar 2 13:26:38 2017 -0800

----------------------------------------------------------------------
 .../YARN/3.0.0.3.0/service_advisor.py           | 90 ++++++++++++--------
 1 file changed, 55 insertions(+), 35 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/ambari/blob/fc8128b0/ambari-server/src/main/resources/common-services/YARN/3.0.0.3.0/service_advisor.py
----------------------------------------------------------------------
diff --git 
a/ambari-server/src/main/resources/common-services/YARN/3.0.0.3.0/service_advisor.py
 
b/ambari-server/src/main/resources/common-services/YARN/3.0.0.3.0/service_advisor.py
index 18938a3..3629f30 100644
--- 
a/ambari-server/src/main/resources/common-services/YARN/3.0.0.3.0/service_advisor.py
+++ 
b/ambari-server/src/main/resources/common-services/YARN/3.0.0.3.0/service_advisor.py
@@ -509,10 +509,12 @@ class YARNRecommender(service_advisor.ServiceAdvisor):
     DEFAULT_EXECUTOR_TO_AM_RATIO = 20
     MIN_EXECUTOR_TO_AM_RATIO = 10
     MAX_CONCURRENT_QUERIES = 32
+    MAX_CONCURRENT_QUERIES_SMALL_CLUSTERS = 4 # Concurrency for clusters with 
<10 executors
     leafQueueNames = None
     MB_TO_BYTES = 1048576
     hsi_site = self.getServicesSiteProperties(services, 
YARNRecommender.HIVE_INTERACTIVE_SITE)
     yarn_site = self.getServicesSiteProperties(services, "yarn-site")
+    min_memory_required = 0
 
     # Update 'hive.llap.daemon.queue.name' prop combo entries
     self.setLlapDaemonQueuePropAttributes(services, configurations)
@@ -537,8 +539,10 @@ class YARNRecommender(service_advisor.ServiceAdvisor):
       # Check if it's 1st invocation after enabling Hive Server Interactive 
(config: enable_hive_interactive).
       changed_configs_has_enable_hive_int = 
self.isConfigPropertiesChanged(services, "hive-interactive-env", 
['enable_hive_interactive'], False)
       llap_named_queue_selected_in_curr_invocation = None
-      if changed_configs_has_enable_hive_int \
-          and 
services['configurations']['hive-interactive-env']['properties']['enable_hive_interactive']:
+      # Check if its : 1. 1st invocation from UI ('enable_hive_interactive' in 
changed-configurations)
+      # OR 2. 1st invocation from BP (services['changed-configurations'] 
should be empty in this case)
+      if (changed_configs_has_enable_hive_int or  0 == 
len(services['changed-configurations'])) \
+        and 
services['configurations']['hive-interactive-env']['properties']['enable_hive_interactive']:
         if len(leafQueueNames) == 1 or (len(leafQueueNames) == 2 and 
llap_queue_name in leafQueueNames):
           llap_named_queue_selected_in_curr_invocation = True
           putHiveInteractiveSiteProperty('hive.llap.daemon.queue.name', 
llap_queue_name)
@@ -552,15 +556,15 @@ class YARNRecommender(service_advisor.ServiceAdvisor):
 
       if (len(leafQueueNames) == 2 and (llap_daemon_selected_queue_name and 
llap_daemon_selected_queue_name == llap_queue_name) or
             llap_named_queue_selected_in_curr_invocation) or \
-          (len(leafQueueNames) == 1 and llap_daemon_selected_queue_name == 
'default' and llap_named_queue_selected_in_curr_invocation):
-        Logger.info("Setting visibility of num_llap_nodes to true.")
-        putHiveInteractiveEnvPropertyAttribute("num_llap_nodes", "visible", 
"true")
+        (len(leafQueueNames) == 1 and llap_daemon_selected_queue_name == 
'default' and llap_named_queue_selected_in_curr_invocation):
+        Logger.info("DBG: Setting 'num_llap_nodes' config's  READ ONLY 
attribute as 'False'.")
+        putHiveInteractiveEnvPropertyAttribute("num_llap_nodes", "read_only", 
"false")
         selected_queue_is_ambari_managed_llap = True
         Logger.info("DBG: Selected YARN queue for LLAP is : '{0}'. Current 
YARN queues : {1}. Setting 'Number of LLAP nodes' "
                     "slider visibility to 'True'".format(llap_queue_name, 
list(leafQueueNames)))
       else:
-        Logger.info("Setting visibility of num_llap_nodes to false.")
-        putHiveInteractiveEnvPropertyAttribute("num_llap_nodes", "visible", 
"false")
+        Logger.info("DBG: Setting 'num_llap_nodes' config's  READ ONLY 
attribute as 'True'.")
+        putHiveInteractiveEnvPropertyAttribute("num_llap_nodes", "read_only", 
"true")
         Logger.info("Selected YARN queue for LLAP is : '{0}'. Current YARN 
queues : {1}. Setting 'Number of LLAP nodes' "
                     "visibility to 
'False'.".format(llap_daemon_selected_queue_name, list(leafQueueNames)))
         selected_queue_is_ambari_managed_llap = False
@@ -640,6 +644,25 @@ class YARNRecommender(service_advisor.ServiceAdvisor):
       self.recommendDefaultLlapConfiguration(configurations, services, hosts)
       return
 
+    # Get calculated value for Slider AM container Size
+    slider_am_container_size = 
self._normalizeUp(self.calculate_slider_am_size(yarn_min_container_size),
+                                                 yarn_min_container_size)
+    Logger.info("DBG: Calculated 'slider_am_container_size' : {0}, using 
following: yarn_min_container_size : "
+                "{1}".format(slider_am_container_size, 
yarn_min_container_size))
+
+    min_memory_required = normalized_tez_am_container_size + 
slider_am_container_size + self._normalizeUp(mem_per_thread_for_llap, 
yarn_min_container_size)
+    Logger.info("DBG: Calculated 'min_memory_required': {0} using following : 
slider_am_container_size: {1}, "
+                "normalized_tez_am_container_size : {2}, 
mem_per_thread_for_llap : {3}, yarn_min_container_size : "
+                "{4}".format(min_memory_required, slider_am_container_size, 
normalized_tez_am_container_size, mem_per_thread_for_llap, 
yarn_min_container_size))
+
+    min_nodes_required = int(ceil( min_memory_required / 
yarn_nm_mem_in_mb_normalized))
+    Logger.info("DBG: Calculated 'min_node_required': {0}, using following : 
min_memory_required : {1}, yarn_nm_mem_in_mb_normalized "
+                ": {2}".format(min_nodes_required, min_memory_required, 
yarn_nm_mem_in_mb_normalized))
+    if min_nodes_required > node_manager_cnt:
+      Logger.warning("ERROR: Not enough memory/nodes to run LLAP");
+      self.recommendDefaultLlapConfiguration(configurations, services, hosts)
+      return
+
     mem_per_thread_for_llap = float(mem_per_thread_for_llap)
 
     Logger.info("DBG: selected_queue_is_ambari_managed_llap = 
{0}".format(selected_queue_is_ambari_managed_llap))
@@ -656,16 +679,27 @@ class YARNRecommender(service_advisor.ServiceAdvisor):
       Logger.info("DBG: Calculated '{0}' queue available capacity : {1}, using 
following: llap_daemon_selected_queue_cap : {2}, "
                   "yarn_min_container_size : 
{3}".format(llap_daemon_selected_queue_name, total_llap_mem_normalized,
                                                          
llap_daemon_selected_queue_cap, yarn_min_container_size))
-      '''Rounding up numNodes so that we run more daemons, and utilitze more 
CPUs. The rest of the calcaulkations will take care of cutting this down if 
required'''
+      '''Rounding up numNodes so that we run more daemons, and utilitze more 
CPUs. The rest of the calcaulations will take care of cutting this down if 
required'''
       num_llap_nodes_requested = ceil(total_llap_mem_normalized / 
yarn_nm_mem_in_mb_normalized)
       Logger.info("DBG: Calculated 'num_llap_nodes_requested' : {0}, using 
following: total_llap_mem_normalized : {1}, "
                   "yarn_nm_mem_in_mb_normalized : 
{2}".format(num_llap_nodes_requested, total_llap_mem_normalized, 
yarn_nm_mem_in_mb_normalized))
+      # Pouplate the 'num_llap_nodes_requested' in config 'num_llap_nodes', a 
read only config for non-Ambari managed queue case.
+      putHiveInteractiveEnvProperty('num_llap_nodes', num_llap_nodes_requested)
+      Logger.info("Setting config 'num_llap_nodes' as : 
{0}".format(num_llap_nodes_requested))
       queue_am_fraction_perc = 
float(self.__getQueueAmFractionFromCapacityScheduler(capacity_scheduler_properties,
 llap_daemon_selected_queue_name))
       hive_tez_am_cap_available = queue_am_fraction_perc * 
total_llap_mem_normalized
       Logger.info("DBG: Calculated 'hive_tez_am_cap_available' : {0}, using 
following: queue_am_fraction_perc : {1}, "
                   "total_llap_mem_normalized : 
{2}".format(hive_tez_am_cap_available, queue_am_fraction_perc, 
total_llap_mem_normalized))
     else:  # Ambari managed 'llap' named queue at root level.
-      num_llap_nodes_requested = self.get_num_llap_nodes(services, 
configurations) #Input
+      # Set 'num_llap_nodes_requested' for 1st invocation, as it gets passed 
as 1 otherwise, read from config.
+
+      # Check if its : 1. 1st invocation from UI ('enable_hive_interactive' in 
changed-configurations)
+      # OR 2. 1st invocation from BP (services['changed-configurations'] 
should be empty in this case)
+      if (changed_configs_has_enable_hive_int or  0 == 
len(services['changed-configurations'])) \
+        and 
services['configurations']['hive-interactive-env']['properties']['enable_hive_interactive']:
+        num_llap_nodes_requested = min_nodes_required
+      else:
+        num_llap_nodes_requested = self.get_num_llap_nodes(services, 
configurations) #Input
       total_llap_mem = num_llap_nodes_requested * yarn_nm_mem_in_mb_normalized
       Logger.info("DBG: Calculated 'total_llap_mem' : {0}, using following: 
num_llap_nodes_requested : {1}, "
                   "yarn_nm_mem_in_mb_normalized : {2}".format(total_llap_mem, 
num_llap_nodes_requested, yarn_nm_mem_in_mb_normalized))
@@ -689,12 +723,6 @@ class YARNRecommender(service_advisor.ServiceAdvisor):
 
     # Common calculations now, irrespective of the queue selected.
 
-    # Get calculated value for Slider AM container Size
-    slider_am_container_size = 
self._normalizeUp(self.calculate_slider_am_size(yarn_min_container_size),
-                                                 yarn_min_container_size)
-    Logger.info("DBG: Calculated 'slider_am_container_size' : {0}, using 
following: yarn_min_container_size : "
-                "{1}".format(slider_am_container_size, 
yarn_min_container_size))
-
     llap_mem_for_tezAm_and_daemons = total_llap_mem_normalized - 
slider_am_container_size
     Logger.info("DBG: Calculated 'llap_mem_for_tezAm_and_daemons' : {0}, using 
following : total_llap_mem_normalized : {1}, "
                 "slider_am_container_size : 
{2}".format(llap_mem_for_tezAm_and_daemons, total_llap_mem_normalized, 
slider_am_container_size))
@@ -771,9 +799,9 @@ class YARNRecommender(service_advisor.ServiceAdvisor):
                 "{2}, MIN_EXECUTOR_TO_AM_RATIO : {3}, mem_per_thread_for_llap 
: {4}, normalized_tez_am_container_size : "
                 "{5}".format(max_llap_concurreny, max_llap_concurreny_limit, 
llap_mem_for_tezAm_and_daemons, MIN_EXECUTOR_TO_AM_RATIO,
                              mem_per_thread_for_llap, 
normalized_tez_am_container_size))
-    if max_llap_concurreny == 0:
-      max_llap_concurreny = 1
-      Logger.info("DBG: Adjusted 'max_llap_concurreny' : 1.")
+    if int(max_llap_concurreny) < MAX_CONCURRENT_QUERIES_SMALL_CLUSTERS:
+      Logger.info("DBG: Adjusting 'max_llap_concurreny' from {0} to 
{1}".format(max_llap_concurreny, MAX_CONCURRENT_QUERIES_SMALL_CLUSTERS))
+      max_llap_concurreny = MAX_CONCURRENT_QUERIES_SMALL_CLUSTERS
 
     if (max_llap_concurreny * normalized_tez_am_container_size) > 
hive_tez_am_cap_available:
       max_llap_concurreny = floor(hive_tez_am_cap_available / 
normalized_tez_am_container_size)
@@ -808,8 +836,8 @@ class YARNRecommender(service_advisor.ServiceAdvisor):
                 "yarn_min_container_size: 
{3}".format(llap_daemon_mem_per_node, llap_mem_daemon_size, 
num_llap_nodes_requested, yarn_min_container_size))
     if llap_daemon_mem_per_node == 0:
       # Small cluster. No capacity left on a node after running AMs.
-      llap_daemon_mem_per_node = mem_per_thread_for_llap
-      num_llap_nodes = floor(llap_mem_daemon_size / mem_per_thread_for_llap)
+      llap_daemon_mem_per_node = self._normalizeUp(mem_per_thread_for_llap, 
yarn_min_container_size)
+      num_llap_nodes = floor(llap_mem_daemon_size / llap_daemon_mem_per_node)
       Logger.info("DBG: 'llap_daemon_mem_per_node' : 0, adjusted 
'llap_daemon_mem_per_node' : {0}, 'num_llap_nodes' : {1}, using following: 
llap_mem_daemon_size : {2}, "
                   "mem_per_thread_for_llap : 
{3}".format(llap_daemon_mem_per_node, num_llap_nodes, llap_mem_daemon_size, 
mem_per_thread_for_llap))
     elif llap_daemon_mem_per_node < mem_per_thread_for_llap:
@@ -882,7 +910,7 @@ class YARNRecommender(service_advisor.ServiceAdvisor):
     
putHiveInteractiveSitePropertyAttribute('hive.server2.tez.sessions.per.default.queue',
 "maximum", max_llap_concurreny)
 
     num_llap_nodes = long(num_llap_nodes)
-    putHiveInteractiveEnvPropertyAttribute('num_llap_nodes', "minimum", 1)
+    putHiveInteractiveEnvPropertyAttribute('num_llap_nodes', "minimum", 
min_nodes_required)
     putHiveInteractiveEnvPropertyAttribute('num_llap_nodes', "maximum", 
node_manager_cnt)
     #TODO A single value is not being set for numNodes in case of a custom 
queue. Also the attribute is set to non-visible, so the UI likely ends up using 
an old cached value
     if (num_llap_nodes != num_llap_nodes_requested):
@@ -890,6 +918,9 @@ class YARNRecommender(service_advisor.ServiceAdvisor):
     else:
       Logger.info("DBG: Used num_llap_nodes for calculations : 
{0}".format(num_llap_nodes_requested))
 
+    putHiveInteractiveEnvProperty('num_llap_nodes_for_llap_daemons', 
num_llap_nodes)
+    Logger.info("DBG: Setting config 'num_llap_nodes_for_llap_daemons' as : 
{0}".format(num_llap_nodes))
+
     llap_container_size = long(llap_daemon_mem_per_node)
     putHiveInteractiveSiteProperty('hive.llap.daemon.yarn.container.mb', 
llap_container_size)
 
@@ -909,6 +940,7 @@ class YARNRecommender(service_advisor.ServiceAdvisor):
     
putHiveInteractiveSiteProperty('hive.auto.convert.join.noconditionaltask.size', 
hive_auto_convert_join_noconditionaltask_size)
 
     num_executors_per_node = long(num_executors_per_node)
+    Logger.info("DBG: Putting num_executors_per_node as 
{0}".format(num_executors_per_node))
     putHiveInteractiveSiteProperty('hive.llap.daemon.num.executors', 
num_executors_per_node)
     putHiveInteractiveSitePropertyAttribute('hive.llap.daemon.num.executors', 
"minimum", 1)
     putHiveInteractiveSitePropertyAttribute('hive.llap.daemon.num.executors', 
"maximum", long(num_executors_per_node_max))
@@ -948,6 +980,7 @@ class YARNRecommender(service_advisor.ServiceAdvisor):
     
putHiveInteractiveSitePropertyAttribute('hive.server2.tez.sessions.per.default.queue',
 "minimum", 1)
     
putHiveInteractiveSitePropertyAttribute('hive.server2.tez.sessions.per.default.queue',
 "maximum", 1)
     putHiveInteractiveEnvProperty('num_llap_nodes', 0)
+    putHiveInteractiveEnvProperty('num_llap_nodes_for_llap_daemons', 0)
     putHiveInteractiveEnvPropertyAttribute('num_llap_nodes', "minimum", 1)
     putHiveInteractiveEnvPropertyAttribute('num_llap_nodes', "maximum", 
node_manager_cnt)
     putHiveInteractiveSiteProperty('hive.llap.daemon.yarn.container.mb', 
yarn_min_container_size)
@@ -1344,7 +1377,7 @@ 
yarn.scheduler.capacity.root.{0}.maximum-am-resource-percent=1""".format(llap_qu
     # Check if services["changed-configurations"] is empty and 
'yarn.scheduler.minimum-allocation-mb' is modified in current ST invocation.
     if not services["changed-configurations"] and yarn_site and 
yarn_min_allocation_property in yarn_site:
       yarn_min_container_size = yarn_site[yarn_min_allocation_property]
-      Logger.info("DBG: 'yarn.scheduler.minimum-allocation-mb' read from 
configurations as : {0}".format(yarn_min_container_size))
+      Logger.info("DBG: 'yarn.scheduler.minimum-allocation-mb' read from 
output as : {0}".format(yarn_min_container_size))
 
     # Check if 'yarn.scheduler.minimum-allocation-mb' is input in services 
array.
     elif yarn_site_properties and yarn_min_allocation_property in 
yarn_site_properties:
@@ -1369,19 +1402,6 @@ 
yarn.scheduler.capacity.root.{0}.maximum-am-resource-percent=1""".format(llap_qu
     if yarn_min_container_size < 256:
       return 256
 
-  def calculate_slider_am_size(self, yarn_min_container_size):
-    """
-    Calculates the Slider App Master size based on YARN's Minimum Container 
Size.
-
-    :type yarn_min_container_size int
-    """
-    if yarn_min_container_size > 1024:
-      return 1024
-    if yarn_min_container_size >= 256 and yarn_min_container_size <= 1024:
-      return yarn_min_container_size
-    if yarn_min_container_size < 256:
-      return 256
-
   def get_yarn_nm_mem_in_mb(self, services, configurations):
     """
     Gets YARN NodeManager memory in MB (yarn.nodemanager.resource.memory-mb).

Reply via email to