AMBARI-20485 HDP 3.0 TP - create Service Advisor for Spark (dsen)

Project: http://git-wip-us.apache.org/repos/asf/ambari/repo
Commit: http://git-wip-us.apache.org/repos/asf/ambari/commit/845e9215
Tree: http://git-wip-us.apache.org/repos/asf/ambari/tree/845e9215
Diff: http://git-wip-us.apache.org/repos/asf/ambari/diff/845e9215

Branch: refs/heads/branch-feature-AMBARI-12556
Commit: 845e921534b69ed4464f0304e0a639ce84084953
Parents: 1aba730
Author: Dmytro Sen <[email protected]>
Authored: Wed Mar 22 18:34:04 2017 +0200
Committer: Dmytro Sen <[email protected]>
Committed: Wed Mar 22 18:34:04 2017 +0200

----------------------------------------------------------------------
 .../SPARK/2.2.0/service_advisor.py              | 202 +++++++++++++++++++
 1 file changed, 202 insertions(+)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/ambari/blob/845e9215/ambari-server/src/main/resources/common-services/SPARK/2.2.0/service_advisor.py
----------------------------------------------------------------------
diff --git 
a/ambari-server/src/main/resources/common-services/SPARK/2.2.0/service_advisor.py
 
b/ambari-server/src/main/resources/common-services/SPARK/2.2.0/service_advisor.py
new file mode 100644
index 0000000..ee803b0
--- /dev/null
+++ 
b/ambari-server/src/main/resources/common-services/SPARK/2.2.0/service_advisor.py
@@ -0,0 +1,202 @@
+#!/usr/bin/env ambari-python-wrap
+"""
+Licensed to the Apache Software Foundation (ASF) under one
+or more contributor license agreements.  See the NOTICE file
+distributed with this work for additional information
+regarding copyright ownership.  The ASF licenses this file
+to you under the Apache License, Version 2.0 (the
+"License"); you may not use this file except in compliance
+with the License.  You may obtain a copy of the License at
+
+    http://www.apache.org/licenses/LICENSE-2.0
+
+Unless required by applicable law or agreed to in writing, software
+distributed under the License is distributed on an "AS IS" BASIS,
+WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+See the License for the specific language governing permissions and
+limitations under the License.
+"""
+
+# Python imports
+import imp
+import os
+import traceback
+
+
+from resource_management.core.logger import Logger
+
+SCRIPT_DIR = os.path.dirname(os.path.abspath(__file__))
+STACKS_DIR = os.path.join(SCRIPT_DIR, '../../../stacks/')
+PARENT_FILE = os.path.join(STACKS_DIR, 'service_advisor.py')
+
+try:
+  with open(PARENT_FILE, 'rb') as fp:
+    service_advisor = imp.load_module('service_advisor', fp, PARENT_FILE, 
('.py', 'rb', imp.PY_SOURCE))
+except Exception as e:
+  traceback.print_exc()
+  print "Failed to load parent"
+
+class SparkServiceAdvisor(service_advisor.ServiceAdvisor):
+
+  def __init__(self, *args, **kwargs):
+    self.as_super = super(SparkServiceAdvisor, self)
+    self.as_super.__init__(*args, **kwargs)
+
+    # Always call these methods
+    self.modifyMastersWithMultipleInstances()
+    self.modifyCardinalitiesDict()
+    self.modifyHeapSizeProperties()
+    self.modifyNotValuableComponents()
+    self.modifyComponentsNotPreferableOnServer()
+    self.modifyComponentLayoutSchemes()
+
+  def modifyMastersWithMultipleInstances(self):
+    """
+    Modify the set of masters with multiple instances.
+    Must be overriden in child class.
+    """
+    # Nothing to do
+    pass
+
+  def modifyCardinalitiesDict(self):
+    """
+    Modify the dictionary of cardinalities.
+    Must be overriden in child class.
+    """
+    # Nothing to do
+    pass
+
+  def modifyHeapSizeProperties(self):
+    """
+    Modify the dictionary of heap size properties.
+    Must be overriden in child class.
+    """
+    self.heap_size_properties = {"SPARK_JOBHISTORYSERVER":
+                                   [{"config-name": "spark-env",
+                                     "property": "spark_daemon_memory",
+                                     "default": "1024m"}]
+                                 }
+
+  def modifyNotValuableComponents(self):
+    """
+    Modify the set of components whose host assignment is based on other 
services.
+    Must be overriden in child class.
+    """
+    # Nothing to do
+    pass
+
+  def modifyComponentsNotPreferableOnServer(self):
+    """
+    Modify the set of components that are not preferable on the server.
+    Must be overriden in child class.
+    """
+    # Nothing to do
+    pass
+
+  def modifyComponentLayoutSchemes(self):
+    """
+    Modify layout scheme dictionaries for components.
+    The scheme dictionary basically maps the number of hosts to
+    host index where component should exist.
+    Must be overriden in child class.
+    """
+    # Nothing to do
+    pass
+
+  def getServiceComponentLayoutValidations(self, services, hosts):
+    """
+    Get a list of errors.
+    Must be overriden in child class.
+    """
+    # TODO this is from HDP25StackAdvisor, do we have something similar in 
Service Advisor?
+    #
+    # return super(HDP25StackAdvisor, 
self).isComponentUsingCardinalityForLayout (componentName) or  componentName in 
['SPARK_THRIFTSERVER', 'LIVY_SERVER']
+
+    # Nothing to do
+    return []
+
+  def getServiceConfigurationRecommendations(self, configurations, 
clusterData, services, hosts):
+    """
+    Entry point.
+    Must be overriden in child class.
+    """
+    Logger.info("Class: %s, Method: %s. Recommending Service Configurations." %
+                (self.__class__.__name__, inspect.stack()[0][3]))
+    recommender = SparkRecommender()
+
+    recommender.recommendSparkConfigurationsFromHDP25(configurations, 
clusterData, services, hosts)
+    # Nothing to do
+    pass
+
+  def getServiceConfigurationsValidationItems(self, configurations, 
recommendedDefaults, services, hosts):
+    """
+    Entry point.
+    Validate configurations for the service. Return a list of errors.
+    The code for this function should be the same for each Service Advisor.
+    """
+    Logger.info("Class: %s, Method: %s. Validating Configurations." %
+                (self.__class__.__name__, inspect.stack()[0][3]))
+
+    validator = SparkValidator()
+    # Calls the methods of the validator using arguments,
+    # method(siteProperties, siteRecommendations, configurations, services, 
hosts)
+    return validator.validateListOfConfigUsingMethod(configurations, 
recommendedDefaults, services, hosts, validator.validators)
+
+class SparkRecommender(service_advisor.ServiceAdvisor):
+  """
+  Spark Recommender suggests properties when adding the service for the first 
time or modifying configs via the UI.
+  """
+
+  def __init__(self, *args, **kwargs):
+    self.as_super = super(SparkRecommender, self)
+    self.as_super.__init__(*args, **kwargs)
+
+  def recommendSparkConfigurationsFromHDP25(self, configurations, clusterData, 
services, hosts):
+    """
+    :type configurations dict
+    :type clusterData dict
+    :type services dict
+    :type hosts dict
+    """
+    putSparkProperty = self.putProperty(configurations, "spark-defaults", 
services)
+    putSparkThriftSparkConf = self.putProperty(configurations, 
"spark-thrift-sparkconf", services)
+
+    spark_queue = self.recommendYarnQueue(services, "spark-defaults", 
"spark.yarn.queue")
+    if spark_queue is not None:
+      putSparkProperty("spark.yarn.queue", spark_queue)
+
+    spark_thrift_queue = self.recommendYarnQueue(services, 
"spark-thrift-sparkconf", "spark.yarn.queue")
+    if spark_thrift_queue is not None:
+      putSparkThriftSparkConf("spark.yarn.queue", spark_thrift_queue)
+
+class SparkValidator(service_advisor.ServiceAdvisor):
+  """
+  Spark Validator checks the correctness of properties whenever the service is 
first added or the user attempts to
+  change configs via the UI.
+  """
+
+  def __init__(self, *args, **kwargs):
+    self.as_super = super(SparkValidator, self)
+    self.as_super.__init__(*args, **kwargs)
+
+    self.validators = [("spark-defaults", self.validateSparkDefaultsFromHDP25),
+                       ("spark-thrift-sparkconf", 
self.validateSparkThriftSparkConfFromHDP25)]
+
+
+  def validateSparkDefaultsFromHDP25(self, properties, recommendedDefaults, 
configurations, services, hosts):
+    validationItems = [
+      {
+        "config-name": 'spark.yarn.queue',
+        "item": self.validatorYarnQueue(properties, recommendedDefaults, 
'spark.yarn.queue', services)
+      }
+    ]
+    return self.toConfigurationValidationProblems(validationItems, 
"spark-defaults")
+
+  def validateSparkThriftSparkConfFromHDP25(self, properties, 
recommendedDefaults, configurations, services, hosts):
+    validationItems = [
+      {
+        "config-name": 'spark.yarn.queue',
+        "item": self.validatorYarnQueue(properties, recommendedDefaults, 
'spark.yarn.queue', services)
+      }
+    ]
+    return self.toConfigurationValidationProblems(validationItems, 
"spark-thrift-sparkconf")
\ No newline at end of file

Reply via email to