This is an automated email from the ASF dual-hosted git repository.
yaolei pushed a commit to branch trunk
in repository https://gitbox.apache.org/repos/asf/ambari.git
The following commit(s) were added to refs/heads/trunk by this push:
new 1bb4a2a7eb AMBARI-25932: fix wrong config file name in spark service
advisor (#3695)
1bb4a2a7eb is described below
commit 1bb4a2a7ebdb95024d943279ff94012d9510e8c9
Author: jialiang <[email protected]>
AuthorDate: Thu May 25 17:25:35 2023 +0800
AMBARI-25932: fix wrong config file name in spark service advisor (#3695)
* AMBARI-25932: fix wrong config file name in spark service advisor
* Trigger CI/CD
* refactor spark2 related variable name to spark
* fix
* remove livy2 in spark advisor ,cause it not supported in bigtop 3.2 stack
* remove spark sac and atlas related code,cause we haven't integrated Atlas
yet.
---
.../BIGTOP/3.2.0/services/SPARK/service_advisor.py | 198 +++++----------------
1 file changed, 44 insertions(+), 154 deletions(-)
diff --git
a/ambari-server/src/main/resources/stacks/BIGTOP/3.2.0/services/SPARK/service_advisor.py
b/ambari-server/src/main/resources/stacks/BIGTOP/3.2.0/services/SPARK/service_advisor.py
index c1e4bd05b1..55d8eaed13 100644
---
a/ambari-server/src/main/resources/stacks/BIGTOP/3.2.0/services/SPARK/service_advisor.py
+++
b/ambari-server/src/main/resources/stacks/BIGTOP/3.2.0/services/SPARK/service_advisor.py
@@ -42,10 +42,10 @@ except Exception as e:
traceback.print_exc()
print "Failed to load parent"
-class Spark2ServiceAdvisor(service_advisor.ServiceAdvisor):
+class SparkServiceAdvisor(service_advisor.ServiceAdvisor):
def __init__(self, *args, **kwargs):
- self.as_super = super(Spark2ServiceAdvisor, self)
+ self.as_super = super(SparkServiceAdvisor, self)
self.as_super.__init__(*args, **kwargs)
# Always call these methods
@@ -78,8 +78,8 @@ class Spark2ServiceAdvisor(service_advisor.ServiceAdvisor):
Must be overriden in child class.
"""
- self.heap_size_properties = {"SPARK2_JOBHISTORYSERVER":
- [{"config-name": "spark2-env",
+ self.heap_size_properties = {"SPARK_JOBHISTORYSERVER":
+ [{"config-name": "spark-env",
"property": "spark_daemon_memory",
"default": "2048m"}]}
@@ -115,7 +115,7 @@ class Spark2ServiceAdvisor(service_advisor.ServiceAdvisor):
Must be overriden in child class.
"""
- return self.getServiceComponentCardinalityValidations(services, hosts,
"SPARK2")
+ return self.getServiceComponentCardinalityValidations(services, hosts,
"SPARK")
def getServiceConfigurationRecommendations(self, configurations,
clusterData, services, hosts):
"""
@@ -125,12 +125,9 @@ class Spark2ServiceAdvisor(service_advisor.ServiceAdvisor):
#Logger.info("Class: %s, Method: %s. Recommending Service Configurations."
%
# (self.__class__.__name__, inspect.stack()[0][3]))
- recommender = Spark2Recommender()
- recommender.recommendSpark2ConfigurationsFromHDP25(configurations,
clusterData, services, hosts)
- recommender.recommendSPARK2ConfigurationsFromHDP26(configurations,
clusterData, services, hosts)
- recommender.recommendSPARK2ConfigurationsFromHDP30(configurations,
clusterData, services, hosts)
-
-
+ recommender = SparkRecommender()
+ recommender.recommendSparkConfigurationsFromHDP25(configurations,
clusterData, services, hosts)
+ recommender.recommendSPARKConfigurationsFromHDP26(configurations,
clusterData, services, hosts)
def getServiceConfigurationsValidationItems(self, configurations,
recommendedDefaults, services, hosts):
"""
@@ -141,18 +138,18 @@ class
Spark2ServiceAdvisor(service_advisor.ServiceAdvisor):
#Logger.info("Class: %s, Method: %s. Validating Configurations." %
# (self.__class__.__name__, inspect.stack()[0][3]))
- validator = Spark2Validator()
+ validator = SparkValidator()
# Calls the methods of the validator using arguments,
# method(siteProperties, siteRecommendations, configurations, services,
hosts)
return validator.validateListOfConfigUsingMethod(configurations,
recommendedDefaults, services, hosts, validator.validators)
def isComponentUsingCardinalityForLayout(self, componentName):
- return componentName in ('SPARK2_THRIFTSERVER', 'LIVY2_SERVER')
+ return componentName in ('SPARK_THRIFTSERVER')
@staticmethod
def isKerberosEnabled(services, configurations):
"""
- Determines if security is enabled by testing the value of
spark2-defaults/spark.history.kerberos.enabled enabled.
+ Determines if security is enabled by testing the value of
spark-defaults/spark.history.kerberos.enabled enabled.
If the property exists and is equal to "true", then is it enabled;
otherwise is it assumed to be
disabled.
@@ -163,45 +160,38 @@ class
Spark2ServiceAdvisor(service_advisor.ServiceAdvisor):
:rtype: bool
:return: True or False
"""
- if configurations and "spark2-defaults" in configurations and \
- "spark.history.kerberos.enabled" in
configurations["spark2-defaults"]["properties"]:
- return
configurations["spark2-defaults"]["properties"]["spark.history.kerberos.enabled"].lower()
== "true"
- elif services and "spark2-defaults" in services["configurations"] and \
- "spark.history.kerberos.enabled" in
services["configurations"]["spark2-defaults"]["properties"]:
- return
services["configurations"]["spark2-defaults"]["properties"]["spark.history.kerberos.enabled"].lower()
== "true"
+ if configurations and "spark-defaults" in configurations and \
+ "spark.history.kerberos.enabled" in
configurations["spark-defaults"]["properties"]:
+ return
configurations["spark-defaults"]["properties"]["spark.history.kerberos.enabled"].lower()
== "true"
+ elif services and "spark-defaults" in services["configurations"] and \
+ "spark.history.kerberos.enabled" in
services["configurations"]["spark-defaults"]["properties"]:
+ return
services["configurations"]["spark-defaults"]["properties"]["spark.history.kerberos.enabled"].lower()
== "true"
else:
return False
-class Spark2Recommender(service_advisor.ServiceAdvisor):
+class SparkRecommender(service_advisor.ServiceAdvisor):
"""
- Spark2 Recommender suggests properties when adding the service for the first
time or modifying configs via the UI.
+ Spark Recommender suggests properties when adding the service for the first
time or modifying configs via the UI.
"""
def __init__(self, *args, **kwargs):
- self.as_super = super(Spark2Recommender, self)
+ self.as_super = super(SparkRecommender, self)
self.as_super.__init__(*args, **kwargs)
- def recommendSpark2ConfigurationsFromHDP25(self, configurations,
clusterData, services, hosts):
+ def recommendSparkConfigurationsFromHDP25(self, configurations, clusterData,
services, hosts):
"""
:type configurations dict
:type clusterData dict
:type services dict
:type hosts dict
"""
- putSparkProperty = self.putProperty(configurations, "spark2-defaults",
services)
- putSparkThriftSparkConf = self.putProperty(configurations,
"spark2-thrift-sparkconf", services)
-
- spark_queue = self.recommendYarnQueue(services, "spark2-defaults",
"spark.yarn.queue")
+ putSparkProperty = self.putProperty(configurations, "spark-defaults",
services)
+ spark_queue = self.recommendYarnQueue(services, "spark-defaults",
"spark.yarn.queue")
if spark_queue is not None:
putSparkProperty("spark.yarn.queue", spark_queue)
- spark_thrift_queue = self.recommendYarnQueue(services,
"spark2-thrift-sparkconf", "spark.yarn.queue")
- if spark_thrift_queue is not None:
- putSparkThriftSparkConf("spark.yarn.queue", spark_thrift_queue)
-
-
- def recommendSPARK2ConfigurationsFromHDP26(self, configurations,
clusterData, services, hosts):
+ def recommendSPARKConfigurationsFromHDP26(self, configurations, clusterData,
services, hosts):
"""
:type configurations dict
:type clusterData dict
@@ -209,70 +199,16 @@ class Spark2Recommender(service_advisor.ServiceAdvisor):
:type hosts dict
"""
- if Spark2ServiceAdvisor.isKerberosEnabled(services, configurations):
-
- spark2_defaults = self.getServicesSiteProperties(services,
"spark2-defaults")
-
- if spark2_defaults:
- putSpark2DafaultsProperty = self.putProperty(configurations,
"spark2-defaults", services)
- putSpark2DafaultsProperty('spark.acls.enable', 'true')
- putSpark2DafaultsProperty('spark.admin.acls', '')
- putSpark2DafaultsProperty('spark.history.ui.acls.enable', 'true')
- putSpark2DafaultsProperty('spark.history.ui.admin.acls', '')
-
-
- self.__addZeppelinToLivy2SuperUsers(configurations, services)
-
-
- def recommendSPARK2ConfigurationsFromHDP30(self, configurations,
clusterData, services, hosts):
-
- # SAC
- if "spark2-atlas-application-properties-override" in
services["configurations"]:
- spark2_atlas_application_properties_override =
self.getServicesSiteProperties(services,
"spark2-atlas-application-properties-override")
- spark2_defaults_properties = self.getServicesSiteProperties(services,
"spark2-defaults")
- spark2_thriftspark_conf_properties =
self.getServicesSiteProperties(services, "spark2-thrift-sparkconf")
- putSpark2DefautlsProperty = self.putProperty(configurations,
"spark2-defaults", services)
- putSpark2DefaultsPropertyAttribute =
self.putPropertyAttribute(configurations,"spark2-defaults")
- putSpark2ThriftSparkConfProperty = self.putProperty(configurations,
"spark2-thrift-sparkconf", services)
- putSpark2AtlasHookProperty = self.putProperty(configurations,
"spark2-atlas-application-properties-override", services)
- putSpark2AtlasHookPropertyAttribute =
self.putPropertyAttribute(configurations,"spark2-atlas-application-properties-override")
- spark2_sac_enabled = None
- if
self.checkSiteProperties(spark2_atlas_application_properties_override,
"atlas.spark.enabled"):
- spark2_sac_enabled =
spark2_atlas_application_properties_override["atlas.spark.enabled"]
- spark2_sac_enabled = str(spark2_sac_enabled).upper() == 'TRUE'
-
- if spark2_sac_enabled:
-
- self.setOrAddValueToProperty(putSpark2DefautlsProperty,
spark2_defaults_properties, "spark.driver.extraClassPath",
"/usr/hdp/current/spark-atlas-connector/*", ":")
- self.setOrAddValueToProperty(putSpark2DefautlsProperty,
spark2_defaults_properties, "spark.yarn.dist.files",
"/etc/spark2/conf/atlas-application.properties.yarn#atlas-application.properties",
",")
- self.setOrAddValueToProperty(putSpark2ThriftSparkConfProperty,
spark2_thriftspark_conf_properties, "spark.driver.extraClassPath",
"/usr/hdp/current/spark-atlas-connector/*", ":")
-
- self.setOrAddValueToProperty(putSpark2DefautlsProperty,
spark2_defaults_properties, "spark.extraListeners",
"com.hortonworks.spark.atlas.SparkAtlasEventTracker", ",")
- self.setOrAddValueToProperty(putSpark2DefautlsProperty,
spark2_defaults_properties, "spark.sql.queryExecutionListeners",
"com.hortonworks.spark.atlas.SparkAtlasEventTracker", ",")
- self.setOrAddValueToProperty(putSpark2ThriftSparkConfProperty,
spark2_thriftspark_conf_properties, "spark.extraListeners",
"com.hortonworks.spark.atlas.SparkAtlasEventTracker", ",")
- self.setOrAddValueToProperty(putSpark2ThriftSparkConfProperty,
spark2_thriftspark_conf_properties, "spark.sql.queryExecutionListeners",
"com.hortonworks.spark.atlas.SparkAtlasEventTracker", ",")
-
- self.setOrAddValueToProperty(putSpark2DefautlsProperty,
spark2_defaults_properties, "spark.sql.streaming.streamingQueryListeners",
"com.hortonworks.spark.atlas.SparkAtlasStreamingQueryEventTracker", ",")
- self.setOrAddValueToProperty(putSpark2ThriftSparkConfProperty,
spark2_thriftspark_conf_properties,
"spark.sql.streaming.streamingQueryListeners",
"com.hortonworks.spark.atlas.SparkAtlasStreamingQueryEventTracker", ",")
-
- putSpark2AtlasHookProperty("atlas.client.checkModelInStart", "false")
-
- else:
-
- self.removeValueFromProperty(putSpark2DefautlsProperty,
spark2_defaults_properties, "spark.driver.extraClassPath",
"/usr/hdp/current/spark-atlas-connector/*", ":")
- self.removeValueFromProperty(putSpark2DefautlsProperty,
spark2_defaults_properties, "spark.yarn.dist.files",
"/etc/spark2/conf/atlas-application.properties.yarn#atlas-application.properties",
",")
- self.removeValueFromProperty(putSpark2ThriftSparkConfProperty,
spark2_thriftspark_conf_properties, "spark.driver.extraClassPath",
"/usr/hdp/current/spark-atlas-connector/*", ":")
+ if SparkServiceAdvisor.isKerberosEnabled(services, configurations):
- self.removeValueFromProperty(putSpark2DefautlsProperty,
spark2_defaults_properties, "spark.extraListeners",
"com.hortonworks.spark.atlas.SparkAtlasEventTracker", ",")
- self.removeValueFromProperty(putSpark2DefautlsProperty,
spark2_defaults_properties, "spark.sql.queryExecutionListeners",
"com.hortonworks.spark.atlas.SparkAtlasEventTracker", ",")
- self.removeValueFromProperty(putSpark2ThriftSparkConfProperty,
spark2_thriftspark_conf_properties, "spark.extraListeners",
"com.hortonworks.spark.atlas.SparkAtlasEventTracker", ",")
- self.removeValueFromProperty(putSpark2ThriftSparkConfProperty,
spark2_thriftspark_conf_properties, "spark.sql.queryExecutionListeners",
"com.hortonworks.spark.atlas.SparkAtlasEventTracker", ",")
-
- self.removeValueFromProperty(putSpark2DefautlsProperty,
spark2_defaults_properties, "spark.sql.streaming.streamingQueryListeners",
"com.hortonworks.spark.atlas.SparkAtlasStreamingQueryEventTracker", ",")
- self.removeValueFromProperty(putSpark2ThriftSparkConfProperty,
spark2_thriftspark_conf_properties,
"spark.sql.streaming.streamingQueryListeners",
"com.hortonworks.spark.atlas.SparkAtlasStreamingQueryEventTracker", ",")
-
- putSpark2AtlasHookPropertyAttribute("atlas.client.checkModelInStart",
"delete", "true")
+ spark_defaults = self.getServicesSiteProperties(services,
"spark-defaults")
+ if spark_defaults:
+ putSparkDafaultsProperty = self.putProperty(configurations,
"spark-defaults", services)
+ putSparkDafaultsProperty('spark.acls.enable', 'true')
+ putSparkDafaultsProperty('spark.admin.acls', '')
+ putSparkDafaultsProperty('spark.history.ui.acls.enable', 'true')
+ putSparkDafaultsProperty('spark.history.ui.admin.acls', '')
def setOrAddValueToProperty(self, putConfigProperty, config, propertyName,
propertyValue, separator):
@@ -289,85 +225,39 @@ class Spark2Recommender(service_advisor.ServiceAdvisor):
else:
putConfigProperty(propertyName,
str(config[propertyName]).replace(separator + propertyValue, ""))
- def __addZeppelinToLivy2SuperUsers(self, configurations, services):
- """
- If Kerberos is enabled AND Zeppelin is installed AND Spark2 Livy Server is
installed, then set
- livy2-conf/livy.superusers to contain the Zeppelin principal name from
- zeppelin-site/zeppelin.server.kerberos.principal
-
- :param configurations:
- :param services:
- """
- if Spark2ServiceAdvisor.isKerberosEnabled(services, configurations):
- zeppelin_site = self.getServicesSiteProperties(services, "zeppelin-site")
-
- if zeppelin_site and 'zeppelin.server.kerberos.principal' in
zeppelin_site:
- zeppelin_principal =
zeppelin_site['zeppelin.server.kerberos.principal']
- zeppelin_user = zeppelin_principal.split('@')[0] if zeppelin_principal
else None
-
- if zeppelin_user:
- livy2_conf = self.getServicesSiteProperties(services, 'livy2-conf')
-
- if livy2_conf:
- superusers = livy2_conf['livy.superusers'] if livy2_conf and
'livy.superusers' in livy2_conf else None
-
- # add the Zeppelin user to the set of users
- if superusers:
- _superusers = superusers.split(',')
- _superusers = [x.strip() for x in _superusers]
- _superusers = filter(None, _superusers) # Removes empty string
elements from array
- else:
- _superusers = []
-
- if zeppelin_user not in _superusers:
- _superusers.append(zeppelin_user)
- putLivy2ConfProperty = self.putProperty(configurations,
'livy2-conf', services)
- putLivy2ConfProperty('livy.superusers', ','.join(_superusers))
-
-class Spark2Validator(service_advisor.ServiceAdvisor):
+class SparkValidator(service_advisor.ServiceAdvisor):
"""
- Spark2 Validator checks the correctness of properties whenever the service
is first added or the user attempts to
+ Spark Validator checks the correctness of properties whenever the service is
first added or the user attempts to
change configs via the UI.
"""
def __init__(self, *args, **kwargs):
- self.as_super = super(Spark2Validator, self)
+ self.as_super = super(SparkValidator, self)
self.as_super.__init__(*args, **kwargs)
- self.validators = [("spark2-defaults",
self.validateSpark2DefaultsFromHDP25),
- ("spark2-thrift-sparkconf",
self.validateSpark2ThriftSparkConfFromHDP25),
- ("spark2-atlas-application-properties-override",
self.validateSpark2AtlasApplicationPropertiesFromHDP30)]
-
-
- def validateSpark2DefaultsFromHDP25(self, properties, recommendedDefaults,
configurations, services, hosts):
- validationItems = [
- {
- "config-name": 'spark.yarn.queue',
- "item": self.validatorYarnQueue(properties, recommendedDefaults,
'spark.yarn.queue', services)
- }
- ]
- return self.toConfigurationValidationProblems(validationItems,
"spark2-defaults")
+ self.validators = [("spark-defaults", self.validateSparkDefaultsFromHDP25),
+ ("spark-atlas-application-properties-override",
self.validateSparkAtlasApplicationPropertiesFromHDP30)]
- def validateSpark2ThriftSparkConfFromHDP25(self, properties,
recommendedDefaults, configurations, services, hosts):
+ def validateSparkDefaultsFromHDP25(self, properties, recommendedDefaults,
configurations, services, hosts):
validationItems = [
{
"config-name": 'spark.yarn.queue',
"item": self.validatorYarnQueue(properties, recommendedDefaults,
'spark.yarn.queue', services)
}
]
- return self.toConfigurationValidationProblems(validationItems,
"spark2-thrift-sparkconf")
+ return self.toConfigurationValidationProblems(validationItems,
"spark-defaults")
- def validateSpark2AtlasApplicationPropertiesFromHDP30(self, properties,
recommendedDefaults, configurations, services, hosts):
+ def validateSparkAtlasApplicationPropertiesFromHDP30(self, properties,
recommendedDefaults, configurations, services, hosts):
validationItems = []
servicesList = [service["StackServices"]["service_name"] for service in
services["services"]]
- if not "ATLAS" in servicesList and 'atlas.spark.enabled' in
services['configurations']['spark2-atlas-application-properties-override']['properties']
and \
-
str(services['configurations']['spark2-atlas-application-properties-override']['properties']['atlas.spark.enabled']).upper()
== 'TRUE':
- validationItems.append({"config-name":
"spark2-atlas-application-properties-override",
+ if not "ATLAS" in servicesList and 'atlas.spark.enabled' in
services['configurations']['spark-atlas-application-properties-override']['properties']
and \
+
str(services['configurations']['spark-atlas-application-properties-override']['properties']['atlas.spark.enabled']).upper()
== 'TRUE':
+ validationItems.append({"config-name":
"spark-atlas-application-properties-override",
+ "item":
self.getErrorItem("SAC could be enabled only if ATLAS service is available on
cluster!")})
- return self.toConfigurationValidationProblems(validationItems,
"spark2-atlas-application-properties-override")
+ return self.toConfigurationValidationProblems(validationItems,
"spark-atlas-application-properties-override")
---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]