http://git-wip-us.apache.org/repos/asf/ambari/blob/1aba730c/ambari-server/src/main/resources/common-services/SPARK/2.2.0/scripts/params.py ---------------------------------------------------------------------- diff --git a/ambari-server/src/main/resources/common-services/SPARK/2.2.0/scripts/params.py b/ambari-server/src/main/resources/common-services/SPARK/2.2.0/scripts/params.py new file mode 100644 index 0000000..ab58cb6 --- /dev/null +++ b/ambari-server/src/main/resources/common-services/SPARK/2.2.0/scripts/params.py @@ -0,0 +1,266 @@ +#!/usr/bin/python +""" +Licensed to the Apache Software Foundation (ASF) under one +or more contributor license agreements. See the NOTICE file +distributed with this work for additional information +regarding copyright ownership. The ASF licenses this file +to you under the Apache License, Version 2.0 (the +"License"); you may not use this file except in compliance +with the License. You may obtain a copy of the License at + + http://www.apache.org/licenses/LICENSE-2.0 + +Unless required by applicable law or agreed to in writing, software +distributed under the License is distributed on an "AS IS" BASIS, +WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +See the License for the specific language governing permissions and +limitations under the License. + +""" + +import socket + +import status_params +from resource_management.libraries.functions.stack_features import check_stack_feature +from resource_management.libraries.functions.constants import StackFeature +from resource_management.libraries.functions import conf_select, stack_select +from resource_management.libraries.functions.version import format_stack_version +from resource_management.libraries.functions.copy_tarball import get_sysprep_skip_copy_tarballs_hdfs +from resource_management.libraries.functions.format import format +from resource_management.libraries.functions.default import default +from resource_management.libraries.functions import get_kinit_path +from resource_management.libraries.functions.get_not_managed_resources import get_not_managed_resources +from resource_management.libraries.resources.hdfs_resource import HdfsResource +from resource_management.libraries.script.script import Script + +# a map of the Ambari role to the component name +# for use with <stack-root>/current/<component> +SERVER_ROLE_DIRECTORY_MAP = { + 'SPARK_JOBHISTORYSERVER' : 'spark-historyserver', + 'SPARK_CLIENT' : 'spark-client', + 'SPARK_THRIFTSERVER' : 'spark-thriftserver', + 'LIVY_SERVER' : 'livy-server', + 'LIVY_CLIENT' : 'livy-client' + +} + +component_directory = Script.get_component_from_role(SERVER_ROLE_DIRECTORY_MAP, "SPARK_CLIENT") + +config = Script.get_config() +tmp_dir = Script.get_tmp_dir() + +stack_name = status_params.stack_name +stack_root = Script.get_stack_root() +stack_version_unformatted = config['hostLevelParams']['stack_version'] +stack_version_formatted = format_stack_version(stack_version_unformatted) + +sysprep_skip_copy_tarballs_hdfs = get_sysprep_skip_copy_tarballs_hdfs() + +# New Cluster Stack Version that is defined during the RESTART of a Stack Upgrade +version = default("/commandParams/version", None) + +spark_conf = '/etc/spark/conf' +hadoop_conf_dir = conf_select.get_hadoop_conf_dir() +hadoop_bin_dir = stack_select.get_hadoop_dir("bin") + +if stack_version_formatted and check_stack_feature(StackFeature.ROLLING_UPGRADE, stack_version_formatted): + hadoop_home = stack_select.get_hadoop_dir("home") + spark_conf = format("{stack_root}/current/{component_directory}/conf") + spark_log_dir = config['configurations']['spark-env']['spark_log_dir'] + spark_pid_dir = status_params.spark_pid_dir + spark_home = format("{stack_root}/current/{component_directory}") + +spark_daemon_memory = config['configurations']['spark-env']['spark_daemon_memory'] +spark_thrift_server_conf_file = spark_conf + "/spark-thrift-sparkconf.conf" +java_home = config['hostLevelParams']['java_home'] + +hdfs_user = config['configurations']['hadoop-env']['hdfs_user'] +hdfs_principal_name = config['configurations']['hadoop-env']['hdfs_principal_name'] +hdfs_user_keytab = config['configurations']['hadoop-env']['hdfs_user_keytab'] +user_group = config['configurations']['cluster-env']['user_group'] + +spark_user = status_params.spark_user +hive_user = status_params.hive_user +spark_group = status_params.spark_group +user_group = status_params.user_group +spark_hdfs_user_dir = format("/user/{spark_user}") +spark_history_dir = default('/configurations/spark-defaults/spark.history.fs.logDirectory', "hdfs:///spark-history") + +spark_history_server_pid_file = status_params.spark_history_server_pid_file +spark_thrift_server_pid_file = status_params.spark_thrift_server_pid_file + +spark_history_server_start = format("{spark_home}/sbin/start-history-server.sh") +spark_history_server_stop = format("{spark_home}/sbin/stop-history-server.sh") + +spark_thrift_server_start = format("{spark_home}/sbin/start-thriftserver.sh") +spark_thrift_server_stop = format("{spark_home}/sbin/stop-thriftserver.sh") +spark_hadoop_lib_native = format("{stack_root}/current/hadoop-client/lib/native:{stack_root}/current/hadoop-client/lib/native/Linux-amd64-64") + +run_example_cmd = format("{spark_home}/bin/run-example") +spark_smoke_example = "SparkPi" +spark_service_check_cmd = format( + "{run_example_cmd} --master yarn --deploy-mode cluster --num-executors 1 --driver-memory 256m --executor-memory 256m --executor-cores 1 {spark_smoke_example} 1") + +spark_jobhistoryserver_hosts = default("/clusterHostInfo/spark_jobhistoryserver_hosts", []) + +if len(spark_jobhistoryserver_hosts) > 0: + spark_history_server_host = spark_jobhistoryserver_hosts[0] +else: + spark_history_server_host = "localhost" + +# spark-defaults params +ui_ssl_enabled = default("configurations/spark-defaults/spark.ssl.enabled", False) + +spark_yarn_historyServer_address = default(spark_history_server_host, "localhost") +spark_history_scheme = "http" +spark_history_ui_port = config['configurations']['spark-defaults']['spark.history.ui.port'] + +if ui_ssl_enabled: + spark_history_ui_port = str(int(spark_history_ui_port) + 400) + spark_history_scheme = "https" + + +spark_env_sh = config['configurations']['spark-env']['content'] +spark_log4j_properties = config['configurations']['spark-log4j-properties']['content'] +spark_metrics_properties = config['configurations']['spark-metrics-properties']['content'] + +hive_server_host = default("/clusterHostInfo/hive_server_host", []) +is_hive_installed = not len(hive_server_host) == 0 + +security_enabled = config['configurations']['cluster-env']['security_enabled'] +kinit_path_local = get_kinit_path(default('/configurations/kerberos-env/executable_search_paths', None)) +spark_kerberos_keytab = config['configurations']['spark-defaults']['spark.history.kerberos.keytab'] +spark_kerberos_principal = config['configurations']['spark-defaults']['spark.history.kerberos.principal'] +smoke_user_keytab = config['configurations']['cluster-env']['smokeuser_keytab'] +smokeuser_principal = config['configurations']['cluster-env']['smokeuser_principal_name'] + +spark_thriftserver_hosts = default("/clusterHostInfo/spark_thriftserver_hosts", []) +has_spark_thriftserver = not len(spark_thriftserver_hosts) == 0 + +# hive-site params +spark_hive_properties = { + 'hive.metastore.uris': config['configurations']['hive-site']['hive.metastore.uris'] +} + +# security settings +if security_enabled: + spark_principal = spark_kerberos_principal.replace('_HOST',spark_history_server_host.lower()) + + if is_hive_installed: + spark_hive_properties.update({ + 'hive.metastore.sasl.enabled': str(config['configurations']['hive-site']['hive.metastore.sasl.enabled']).lower(), + 'hive.metastore.kerberos.keytab.file': config['configurations']['hive-site']['hive.metastore.kerberos.keytab.file'], + 'hive.server2.authentication.spnego.principal': config['configurations']['hive-site']['hive.server2.authentication.spnego.principal'], + 'hive.server2.authentication.spnego.keytab': config['configurations']['hive-site']['hive.server2.authentication.spnego.keytab'], + 'hive.metastore.kerberos.principal': config['configurations']['hive-site']['hive.metastore.kerberos.principal'], + 'hive.server2.authentication.kerberos.principal': config['configurations']['hive-site']['hive.server2.authentication.kerberos.principal'], + 'hive.server2.authentication.kerberos.keytab': config['configurations']['hive-site']['hive.server2.authentication.kerberos.keytab'], + 'hive.server2.authentication': config['configurations']['hive-site']['hive.server2.authentication'], + }) + + hive_kerberos_keytab = config['configurations']['hive-site']['hive.server2.authentication.kerberos.keytab'] + hive_kerberos_principal = config['configurations']['hive-site']['hive.server2.authentication.kerberos.principal'].replace('_HOST', socket.getfqdn().lower()) + +# thrift server support - available on HDP 2.3 or higher +spark_thrift_sparkconf = None +spark_thrift_cmd_opts_properties = '' +spark_thrift_fairscheduler_content = None +spark_thrift_master = "yarn-client" +if 'nm_hosts' in config['clusterHostInfo'] and len(config['clusterHostInfo']['nm_hosts']) == 1: + # use local mode when there's only one nodemanager + spark_thrift_master = "local[4]" + +if has_spark_thriftserver and 'spark-thrift-sparkconf' in config['configurations']: + spark_thrift_sparkconf = config['configurations']['spark-thrift-sparkconf'] + spark_thrift_cmd_opts_properties = config['configurations']['spark-env']['spark_thrift_cmd_opts'] + if is_hive_installed: + # update default metastore client properties (async wait for metastore component) it is useful in case of + # blueprint provisioning when hive-metastore and spark-thriftserver is not on the same host. + spark_hive_properties.update({ + 'hive.metastore.client.socket.timeout' : config['configurations']['hive-site']['hive.metastore.client.socket.timeout'] + }) + spark_hive_properties.update(config['configurations']['spark-hive-site-override']) + + if 'spark-thrift-fairscheduler' in config['configurations'] and 'fairscheduler_content' in config['configurations']['spark-thrift-fairscheduler']: + spark_thrift_fairscheduler_content = config['configurations']['spark-thrift-fairscheduler']['fairscheduler_content'] + +default_fs = config['configurations']['core-site']['fs.defaultFS'] +hdfs_site = config['configurations']['hdfs-site'] +hdfs_resource_ignore_file = "/var/lib/ambari-agent/data/.hdfs_resource_ignore" + +ats_host = set(default("/clusterHostInfo/app_timeline_server_hosts", [])) +has_ats = len(ats_host) > 0 + +dfs_type = default("/commandParams/dfs_type", "") + +# livy related config + +# livy for spark is only supported from HDP 2.6 +has_livyserver = False + +if stack_version_formatted and check_stack_feature(StackFeature.SPARK_LIVY, stack_version_formatted): + livy_component_directory = Script.get_component_from_role(SERVER_ROLE_DIRECTORY_MAP, "LIVY_SERVER") + livy_conf = format("{stack_root}/current/{livy_component_directory}/conf") + livy_log_dir = config['configurations']['livy-env']['livy_log_dir'] + livy_pid_dir = status_params.livy_pid_dir + livy_home = format("{stack_root}/current/{livy_component_directory}") + livy_user = status_params.livy_user + livy_group = status_params.livy_group + user_group = status_params.user_group + livy_hdfs_user_dir = format("/user/{livy_user}") + livy_server_pid_file = status_params.livy_server_pid_file + livy_recovery_dir = default("/configurations/livy-conf/livy.server.recovery.state-store.url", "/livy-recovery") + + livy_server_start = format("{livy_home}/bin/livy-server start") + livy_server_stop = format("{livy_home}/bin/livy-server stop") + livy_logs_dir = format("{livy_home}/logs") + + livy_env_sh = config['configurations']['livy-env']['content'] + livy_log4j_properties = config['configurations']['livy-log4j-properties']['content'] + livy_spark_blacklist_properties = config['configurations']['livy-spark-blacklist']['content'] + + if 'livy.server.kerberos.keytab' in config['configurations']['livy-conf']: + livy_kerberos_keytab = config['configurations']['livy-conf']['livy.server.kerberos.keytab'] + else: + livy_kerberos_keytab = config['configurations']['livy-conf']['livy.server.launch.kerberos.keytab'] + if 'livy.server.kerberos.principal' in config['configurations']['livy-conf']: + livy_kerberos_principal = config['configurations']['livy-conf']['livy.server.kerberos.principal'] + else: + livy_kerberos_principal = config['configurations']['livy-conf']['livy.server.launch.kerberos.principal'] + + livy_livyserver_hosts = default("/clusterHostInfo/livy_server_hosts", []) + + # ats 1.5 properties + entity_groupfs_active_dir = config['configurations']['yarn-site']['yarn.timeline-service.entity-group-fs-store.active-dir'] + entity_groupfs_active_dir_mode = 01777 + entity_groupfs_store_dir = config['configurations']['yarn-site']['yarn.timeline-service.entity-group-fs-store.done-dir'] + entity_groupfs_store_dir_mode = 0700 + is_webhdfs_enabled = hdfs_site['dfs.webhdfs.enabled'] + + if len(livy_livyserver_hosts) > 0: + has_livyserver = True + if security_enabled: + livy_principal = livy_kerberos_principal.replace('_HOST', config['hostname'].lower()) + + livy_livyserver_port = default('configurations/livy-conf/livy.server.port',8999) + + +import functools +#create partial functions with common arguments for every HdfsResource call +#to create/delete hdfs directory/file/copyfromlocal we need to call params.HdfsResource in code +HdfsResource = functools.partial( + HdfsResource, + user=hdfs_user, + hdfs_resource_ignore_file = hdfs_resource_ignore_file, + security_enabled = security_enabled, + keytab = hdfs_user_keytab, + kinit_path_local = kinit_path_local, + hadoop_bin_dir = hadoop_bin_dir, + hadoop_conf_dir = hadoop_conf_dir, + principal_name = hdfs_principal_name, + hdfs_site = hdfs_site, + default_fs = default_fs, + immutable_paths = get_not_managed_resources(), + dfs_type = dfs_type +) +
http://git-wip-us.apache.org/repos/asf/ambari/blob/1aba730c/ambari-server/src/main/resources/common-services/SPARK/2.2.0/scripts/service_check.py ---------------------------------------------------------------------- diff --git a/ambari-server/src/main/resources/common-services/SPARK/2.2.0/scripts/service_check.py b/ambari-server/src/main/resources/common-services/SPARK/2.2.0/scripts/service_check.py new file mode 100644 index 0000000..518c624 --- /dev/null +++ b/ambari-server/src/main/resources/common-services/SPARK/2.2.0/scripts/service_check.py @@ -0,0 +1,62 @@ +""" +Licensed to the Apache Software Foundation (ASF) under one +or more contributor license agreements. See the NOTICE file +distributed with this work for additional information +regarding copyright ownership. The ASF licenses this file +to you under the Apache License, Version 2.0 (the +"License"); you may not use this file except in compliance +with the License. You may obtain a copy of the License at + + http://www.apache.org/licenses/LICENSE-2.0 + +Unless required by applicable law or agree in writing, software +distributed under the License is distributed on an "AS IS" BASIS, +WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +See the License for the specific language governing permissions and +limitations under the License. +""" +import subprocess +import time + +from resource_management.libraries.script.script import Script +from resource_management.libraries.functions.format import format +from resource_management.core.resources.system import Execute +from resource_management.core.logger import Logger + +class SparkServiceCheck(Script): + def service_check(self, env): + import params + env.set_params(params) + + if params.security_enabled: + spark_kinit_cmd = format("{kinit_path_local} -kt {spark_kerberos_keytab} {spark_principal}; ") + Execute(spark_kinit_cmd, user=params.spark_user) + if params.has_livyserver: + livy_kinit_cmd = format("{kinit_path_local} -kt {smoke_user_keytab} {smokeuser_principal}; ") + Execute(livy_kinit_cmd, user=params.livy_user) + + Execute(format("curl -s -o /dev/null -w'%{{http_code}}' --negotiate -u: -k {spark_history_scheme}://{spark_history_server_host}:{spark_history_ui_port} | grep 200"), + tries=5, + try_sleep=3, + logoutput=True + ) + if params.has_livyserver: + live_livyserver_host = "" + for livyserver_host in params.livy_livyserver_hosts: + try: + Execute(format("curl -s -o /dev/null -w'%{{http_code}}' --negotiate -u: -k http://{livyserver_host}:{livy_livyserver_port}/sessions | grep 200"), + tries=3, + try_sleep=1, + logoutput=True, + user=params.livy_user + ) + live_livyserver_host = livyserver_host + break + except: + pass + if len(params.livy_livyserver_hosts) > 0 and live_livyserver_host == "": + raise Fail(format("Connection to all Livy servers failed")) + +if __name__ == "__main__": + SparkServiceCheck().execute() + http://git-wip-us.apache.org/repos/asf/ambari/blob/1aba730c/ambari-server/src/main/resources/common-services/SPARK/2.2.0/scripts/setup_livy.py ---------------------------------------------------------------------- diff --git a/ambari-server/src/main/resources/common-services/SPARK/2.2.0/scripts/setup_livy.py b/ambari-server/src/main/resources/common-services/SPARK/2.2.0/scripts/setup_livy.py new file mode 100644 index 0000000..adaca87 --- /dev/null +++ b/ambari-server/src/main/resources/common-services/SPARK/2.2.0/scripts/setup_livy.py @@ -0,0 +1,88 @@ +#!/usr/bin/python +""" +Licensed to the Apache Software Foundation (ASF) under one +or more contributor license agreements. See the NOTICE file +distributed with this work for additional information +regarding copyright ownership. The ASF licenses this file +to you under the Apache License, Version 2.0 (the +"License"); you may not use this file except in compliance +with the License. You may obtain a copy of the License at + + http://www.apache.org/licenses/LICENSE-2.0 + +Unless required by applicable law or agreed to in writing, software +distributed under the License is distributed on an "AS IS" BASIS, +WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +See the License for the specific language governing permissions and +limitations under the License. + +""" + +import os +from resource_management import Directory, File, PropertiesFile, InlineTemplate, format + + +def setup_livy(env, type, upgrade_type = None, action = None): + import params + + Directory([params.livy_pid_dir, params.livy_log_dir], + owner=params.livy_user, + group=params.user_group, + mode=0775, + create_parents = True + ) + if type == 'server' and action == 'config': + params.HdfsResource(params.livy_hdfs_user_dir, + type="directory", + action="create_on_execute", + owner=params.livy_user, + mode=0775 + ) + params.HdfsResource(None, action="execute") + + params.HdfsResource(params.livy_recovery_dir, + type="directory", + action="create_on_execute", + owner=params.livy_user, + mode=0700 + ) + params.HdfsResource(None, action="execute") + + # create livy-env.sh in etc/conf dir + File(os.path.join(params.livy_conf, 'livy-env.sh'), + owner=params.livy_user, + group=params.livy_group, + content=InlineTemplate(params.livy_env_sh), + mode=0644, + ) + + # create livy.conf in etc/conf dir + PropertiesFile(format("{livy_conf}/livy.conf"), + properties = params.config['configurations']['livy-conf'], + key_value_delimiter = " ", + owner=params.livy_user, + group=params.livy_group, + ) + + # create log4j.properties in etc/conf dir + File(os.path.join(params.livy_conf, 'log4j.properties'), + owner=params.livy_user, + group=params.livy_group, + content=params.livy_log4j_properties, + mode=0644, + ) + + # create spark-blacklist.properties in etc/conf dir + File(os.path.join(params.livy_conf, 'spark-blacklist.conf'), + owner=params.livy_user, + group=params.livy_group, + content=params.livy_spark_blacklist_properties, + mode=0644, + ) + + Directory(params.livy_logs_dir, + owner=params.livy_user, + group=params.livy_group, + mode=0755, + ) + http://git-wip-us.apache.org/repos/asf/ambari/blob/1aba730c/ambari-server/src/main/resources/common-services/SPARK/2.2.0/scripts/setup_spark.py ---------------------------------------------------------------------- diff --git a/ambari-server/src/main/resources/common-services/SPARK/2.2.0/scripts/setup_spark.py b/ambari-server/src/main/resources/common-services/SPARK/2.2.0/scripts/setup_spark.py new file mode 100644 index 0000000..9329ce0 --- /dev/null +++ b/ambari-server/src/main/resources/common-services/SPARK/2.2.0/scripts/setup_spark.py @@ -0,0 +1,116 @@ +#!/usr/bin/python +""" +Licensed to the Apache Software Foundation (ASF) under one +or more contributor license agreements. See the NOTICE file +distributed with this work for additional information +regarding copyright ownership. The ASF licenses this file +to you under the Apache License, Version 2.0 (the +"License"); you may not use this file except in compliance +with the License. You may obtain a copy of the License at + + http://www.apache.org/licenses/LICENSE-2.0 + +Unless required by applicable law or agreed to in writing, software +distributed under the License is distributed on an "AS IS" BASIS, +WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +See the License for the specific language governing permissions and +limitations under the License. + +""" + +import sys +import fileinput +import shutil +import os + +from resource_management.core.exceptions import ComponentIsNotRunning +from resource_management.core.logger import Logger +from resource_management.core import shell +from resource_management.core.source import InlineTemplate +from resource_management.core.resources.system import Directory, File +from resource_management.libraries.resources.properties_file import PropertiesFile +from resource_management.libraries.functions.version import format_stack_version +from resource_management.libraries.functions.stack_features import check_stack_feature +from resource_management.libraries.functions.constants import StackFeature +from resource_management.libraries.functions.format import format +from resource_management.libraries.resources.xml_config import XmlConfig + +def setup_spark(env, type, upgrade_type = None, action = None): + import params + + Directory([params.spark_pid_dir, params.spark_log_dir], + owner=params.spark_user, + group=params.user_group, + mode=0775, + create_parents = True + ) + if type == 'server' and action == 'config': + params.HdfsResource(params.spark_hdfs_user_dir, + type="directory", + action="create_on_execute", + owner=params.spark_user, + mode=0775 + ) + params.HdfsResource(None, action="execute") + + PropertiesFile(format("{spark_conf}/spark-defaults.conf"), + properties = params.config['configurations']['spark-defaults'], + key_value_delimiter = " ", + owner=params.spark_user, + group=params.spark_group, + mode=0644 + ) + + # create spark-env.sh in etc/conf dir + File(os.path.join(params.spark_conf, 'spark-env.sh'), + owner=params.spark_user, + group=params.spark_group, + content=InlineTemplate(params.spark_env_sh), + mode=0644, + ) + + #create log4j.properties in etc/conf dir + File(os.path.join(params.spark_conf, 'log4j.properties'), + owner=params.spark_user, + group=params.spark_group, + content=params.spark_log4j_properties, + mode=0644, + ) + + #create metrics.properties in etc/conf dir + File(os.path.join(params.spark_conf, 'metrics.properties'), + owner=params.spark_user, + group=params.spark_group, + content=InlineTemplate(params.spark_metrics_properties), + mode=0644 + ) + + if params.is_hive_installed: + XmlConfig("hive-site.xml", + conf_dir=params.spark_conf, + configurations=params.spark_hive_properties, + owner=params.spark_user, + group=params.spark_group, + mode=0644) + + if params.has_spark_thriftserver: + PropertiesFile(params.spark_thrift_server_conf_file, + properties = params.config['configurations']['spark-thrift-sparkconf'], + owner = params.hive_user, + group = params.user_group, + key_value_delimiter = " ", + mode=0644 + ) + + effective_version = params.version if upgrade_type is not None else params.stack_version_formatted + if effective_version: + effective_version = format_stack_version(effective_version) + + if params.spark_thrift_fairscheduler_content and effective_version and check_stack_feature(StackFeature.SPARK_16PLUS, effective_version): + # create spark-thrift-fairscheduler.xml + File(os.path.join(params.spark_conf,"spark-thrift-fairscheduler.xml"), + owner=params.spark_user, + group=params.spark_group, + mode=0755, + content=InlineTemplate(params.spark_thrift_fairscheduler_content) + ) http://git-wip-us.apache.org/repos/asf/ambari/blob/1aba730c/ambari-server/src/main/resources/common-services/SPARK/2.2.0/scripts/spark_client.py ---------------------------------------------------------------------- diff --git a/ambari-server/src/main/resources/common-services/SPARK/2.2.0/scripts/spark_client.py b/ambari-server/src/main/resources/common-services/SPARK/2.2.0/scripts/spark_client.py new file mode 100644 index 0000000..a2e53cd --- /dev/null +++ b/ambari-server/src/main/resources/common-services/SPARK/2.2.0/scripts/spark_client.py @@ -0,0 +1,60 @@ +#!/usr/bin/python +""" +Licensed to the Apache Software Foundation (ASF) under one +or more contributor license agreements. See the NOTICE file +distributed with this work for additional information +regarding copyright ownership. The ASF licenses this file +to you under the Apache License, Version 2.0 (the +"License"); you may not use this file except in compliance +with the License. You may obtain a copy of the License at + + http://www.apache.org/licenses/LICENSE-2.0 + +Unless required by applicable law or agreed to in writing, software +distributed under the License is distributed on an "AS IS" BASIS, +WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +See the License for the specific language governing permissions and +limitations under the License. + +""" + +import sys +from resource_management.libraries.script.script import Script +from resource_management.libraries.functions import conf_select, stack_select +from resource_management.libraries.functions.stack_features import check_stack_feature +from resource_management.libraries.functions.constants import StackFeature +from resource_management.core.exceptions import ClientComponentHasNoStatus +from resource_management.core.logger import Logger +from resource_management.core import shell +from setup_spark import setup_spark + + +class SparkClient(Script): + def install(self, env): + self.install_packages(env) + self.configure(env) + + def configure(self, env, upgrade_type=None, config_dir=None): + import params + env.set_params(params) + + setup_spark(env, 'client', upgrade_type=upgrade_type, action = 'config') + + def status(self, env): + raise ClientComponentHasNoStatus() + + def get_component_name(self): + return "spark-client" + + def pre_upgrade_restart(self, env, upgrade_type=None): + import params + + env.set_params(params) + if params.version and check_stack_feature(StackFeature.ROLLING_UPGRADE, params.version): + Logger.info("Executing Spark Client Stack Upgrade pre-restart") + conf_select.select(params.stack_name, "spark", params.version) + stack_select.select("spark-client", params.version) + +if __name__ == "__main__": + SparkClient().execute() + http://git-wip-us.apache.org/repos/asf/ambari/blob/1aba730c/ambari-server/src/main/resources/common-services/SPARK/2.2.0/scripts/spark_service.py ---------------------------------------------------------------------- diff --git a/ambari-server/src/main/resources/common-services/SPARK/2.2.0/scripts/spark_service.py b/ambari-server/src/main/resources/common-services/SPARK/2.2.0/scripts/spark_service.py new file mode 100644 index 0000000..c6619e4 --- /dev/null +++ b/ambari-server/src/main/resources/common-services/SPARK/2.2.0/scripts/spark_service.py @@ -0,0 +1,145 @@ +#!/usr/bin/env python + +''' +Licensed to the Apache Software Foundation (ASF) under one +or more contributor license agreements. See the NOTICE file +distributed with this work for additional information +regarding copyright ownership. The ASF licenses this file +to you under the Apache License, Version 2.0 (the +"License"); you may not use this file except in compliance +with the License. You may obtain a copy of the License at + + http://www.apache.org/licenses/LICENSE-2.0 + +Unless required by applicable law or agreed to in writing, software +distributed under the License is distributed on an "AS IS" BASIS, +WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +See the License for the specific language governing permissions and +limitations under the License. +''' +import socket +import tarfile +import os +from contextlib import closing + +from resource_management.libraries.script.script import Script +from resource_management.libraries.resources.hdfs_resource import HdfsResource +from resource_management.libraries.functions.copy_tarball import copy_to_hdfs, get_tarball_paths +from resource_management.libraries.functions import format +from resource_management.core.resources.system import File, Execute +from resource_management.libraries.functions.version import format_stack_version +from resource_management.libraries.functions.stack_features import check_stack_feature +from resource_management.libraries.functions.constants import StackFeature +from resource_management.libraries.functions.show_logs import show_logs + + +def make_tarfile(output_filename, source_dir): + try: + os.remove(output_filename) + except OSError: + pass + parent_dir=os.path.dirname(output_filename) + if not os.path.exists(parent_dir): + os.makedirs(parent_dir) + os.chmod(parent_dir, 0711) + with closing(tarfile.open(output_filename, "w:gz")) as tar: + for file in os.listdir(source_dir): + tar.add(os.path.join(source_dir,file),arcname=file) + os.chmod(output_filename, 0644) + + +def spark_service(name, upgrade_type=None, action=None): + import params + + if action == 'start': + + effective_version = params.version if upgrade_type is not None else params.stack_version_formatted + if effective_version: + effective_version = format_stack_version(effective_version) + + if name == 'jobhistoryserver' and effective_version and check_stack_feature(StackFeature.SPARK_16PLUS, effective_version): + # create & copy spark-hdp-yarn-archive.tar.gz to hdfs + if not params.sysprep_skip_copy_tarballs_hdfs: + source_dir=params.spark_home+"/jars" + tmp_archive_file=get_tarball_paths("spark")[1] + make_tarfile(tmp_archive_file, source_dir) + copy_to_hdfs("spark", params.user_group, params.hdfs_user, skip=params.sysprep_skip_copy_tarballs_hdfs, replace_existing_files=True) + # create spark history directory + params.HdfsResource(params.spark_history_dir, + type="directory", + action="create_on_execute", + owner=params.spark_user, + group=params.user_group, + mode=0777, + recursive_chmod=True + ) + params.HdfsResource(None, action="execute") + + if params.security_enabled: + spark_kinit_cmd = format("{kinit_path_local} -kt {spark_kerberos_keytab} {spark_principal}; ") + Execute(spark_kinit_cmd, user=params.spark_user) + + # Spark 1.3.1.2.3, and higher, which was included in HDP 2.3, does not have a dependency on Tez, so it does not + # need to copy the tarball, otherwise, copy it. + if params.stack_version_formatted and check_stack_feature(StackFeature.TEZ_FOR_SPARK, params.stack_version_formatted): + resource_created = copy_to_hdfs("tez", params.user_group, params.hdfs_user, skip=params.sysprep_skip_copy_tarballs_hdfs) + if resource_created: + params.HdfsResource(None, action="execute") + + if name == 'jobhistoryserver': + historyserver_no_op_test = format( + 'ls {spark_history_server_pid_file} >/dev/null 2>&1 && ps -p `cat {spark_history_server_pid_file}` >/dev/null 2>&1') + try: + Execute(format('{spark_history_server_start}'), + user=params.spark_user, + environment={'JAVA_HOME': params.java_home}, + not_if=historyserver_no_op_test) + except: + show_logs(params.spark_log_dir, user=params.spark_user) + raise + + elif name == 'sparkthriftserver': + if params.security_enabled: + hive_principal = params.hive_kerberos_principal + hive_kinit_cmd = format("{kinit_path_local} -kt {hive_kerberos_keytab} {hive_principal}; ") + Execute(hive_kinit_cmd, user=params.hive_user) + + thriftserver_no_op_test = format( + 'ls {spark_thrift_server_pid_file} >/dev/null 2>&1 && ps -p `cat {spark_thrift_server_pid_file}` >/dev/null 2>&1') + try: + Execute(format('{spark_thrift_server_start} --properties-file {spark_thrift_server_conf_file} {spark_thrift_cmd_opts_properties}'), + user=params.hive_user, + environment={'JAVA_HOME': params.java_home}, + not_if=thriftserver_no_op_test + ) + except: + show_logs(params.spark_log_dir, user=params.hive_user) + raise + elif action == 'stop': + if name == 'jobhistoryserver': + try: + Execute(format('{spark_history_server_stop}'), + user=params.spark_user, + environment={'JAVA_HOME': params.java_home} + ) + except: + show_logs(params.spark_log_dir, user=params.spark_user) + raise + File(params.spark_history_server_pid_file, + action="delete" + ) + + elif name == 'sparkthriftserver': + try: + Execute(format('{spark_thrift_server_stop}'), + user=params.hive_user, + environment={'JAVA_HOME': params.java_home} + ) + except: + show_logs(params.spark_log_dir, user=params.hive_user) + raise + File(params.spark_thrift_server_pid_file, + action="delete" + ) + + http://git-wip-us.apache.org/repos/asf/ambari/blob/1aba730c/ambari-server/src/main/resources/common-services/SPARK/2.2.0/scripts/spark_thrift_server.py ---------------------------------------------------------------------- diff --git a/ambari-server/src/main/resources/common-services/SPARK/2.2.0/scripts/spark_thrift_server.py b/ambari-server/src/main/resources/common-services/SPARK/2.2.0/scripts/spark_thrift_server.py new file mode 100644 index 0000000..de82c16 --- /dev/null +++ b/ambari-server/src/main/resources/common-services/SPARK/2.2.0/scripts/spark_thrift_server.py @@ -0,0 +1,89 @@ +#!/usr/bin/python +""" +Licensed to the Apache Software Foundation (ASF) under one +or more contributor license agreements. See the NOTICE file +distributed with this work for additional information +regarding copyright ownership. The ASF licenses this file +to you under the Apache License, Version 2.0 (the +"License"); you may not use this file except in compliance +with the License. You may obtain a copy of the License at + + http://www.apache.org/licenses/LICENSE-2.0 + +Unless required by applicable law or agreed to in writing, software +distributed under the License is distributed on an "AS IS" BASIS, +WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +See the License for the specific language governing permissions and +limitations under the License. + +""" + +import sys +import os + +from resource_management.libraries.script.script import Script +from resource_management.libraries.functions import conf_select, stack_select +from resource_management.libraries.functions.stack_features import check_stack_feature +from resource_management.libraries.functions.constants import StackFeature +from resource_management.libraries.functions.check_process_status import check_process_status +from resource_management.core.logger import Logger +from resource_management.core import shell +from setup_spark import setup_spark +from spark_service import spark_service + + +class SparkThriftServer(Script): + + def install(self, env): + import params + env.set_params(params) + + self.install_packages(env) + + def configure(self, env, upgrade_type=None, config_dir=None): + import params + env.set_params(params) + setup_spark(env, 'server', upgrade_type = upgrade_type, action = 'config') + + def start(self, env, upgrade_type=None): + import params + env.set_params(params) + + self.configure(env) + spark_service('sparkthriftserver', upgrade_type=upgrade_type, action='start') + + def stop(self, env, upgrade_type=None): + import params + env.set_params(params) + spark_service('sparkthriftserver', upgrade_type=upgrade_type, action='stop') + + def status(self, env): + import status_params + env.set_params(status_params) + check_process_status(status_params.spark_thrift_server_pid_file) + + def get_component_name(self): + return "spark-thriftserver" + + def pre_upgrade_restart(self, env, upgrade_type=None): + import params + + env.set_params(params) + Logger.info("Executing Spark Thrift Server Stack Upgrade pre-restart") + conf_select.select(params.stack_name, "spark", params.version) + stack_select.select("spark-thriftserver", params.version) + + def get_log_folder(self): + import params + return params.spark_log_dir + + def get_user(self): + import params + return params.hive_user + + def get_pid_files(self): + import status_params + return [status_params.spark_thrift_server_pid_file] + +if __name__ == "__main__": + SparkThriftServer().execute() http://git-wip-us.apache.org/repos/asf/ambari/blob/1aba730c/ambari-server/src/main/resources/common-services/SPARK/2.2.0/scripts/status_params.py ---------------------------------------------------------------------- diff --git a/ambari-server/src/main/resources/common-services/SPARK/2.2.0/scripts/status_params.py b/ambari-server/src/main/resources/common-services/SPARK/2.2.0/scripts/status_params.py new file mode 100644 index 0000000..07dcc47 --- /dev/null +++ b/ambari-server/src/main/resources/common-services/SPARK/2.2.0/scripts/status_params.py @@ -0,0 +1,45 @@ +#!/usr/bin/env python +""" +Licensed to the Apache Software Foundation (ASF) under one +or more contributor license agreements. See the NOTICE file +distributed with this work for additional information +regarding copyright ownership. The ASF licenses this file +to you under the Apache License, Version 2.0 (the +"License"); you may not use this file except in compliance +with the License. You may obtain a copy of the License at + + http://www.apache.org/licenses/LICENSE-2.0 + +Unless required by applicable law or agreed to in writing, software +distributed under the License is distributed on an "AS IS" BASIS, +WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +See the License for the specific language governing permissions and +limitations under the License. + +""" + +from resource_management.libraries.functions.format import format +from resource_management.libraries.script.script import Script +from resource_management.libraries.functions.default import default + +config = Script.get_config() + +spark_user = config['configurations']['spark-env']['spark_user'] +spark_group = config['configurations']['spark-env']['spark_group'] +user_group = config['configurations']['cluster-env']['user_group'] + +if 'hive-env' in config['configurations']: + hive_user = config['configurations']['hive-env']['hive_user'] +else: + hive_user = "hive" + +spark_pid_dir = config['configurations']['spark-env']['spark_pid_dir'] +spark_history_server_pid_file = format("{spark_pid_dir}/spark-{spark_user}-org.apache.spark.deploy.history.HistoryServer-1.pid") +spark_thrift_server_pid_file = format("{spark_pid_dir}/spark-{hive_user}-org.apache.spark.sql.hive.thriftserver.HiveThriftServer2-1.pid") +stack_name = default("/hostLevelParams/stack_name", None) + +if "livy-env" in config['configurations']: + livy_user = config['configurations']['livy-env']['livy_user'] + livy_group = config['configurations']['livy-env']['livy_group'] + livy_pid_dir = config['configurations']['livy-env']['livy_pid_dir'] + livy_server_pid_file = format("{livy_pid_dir}/livy-{livy_user}-server.pid") \ No newline at end of file http://git-wip-us.apache.org/repos/asf/ambari/blob/1aba730c/ambari-server/src/main/resources/stacks/HDP/3.0/services/SPARK/metainfo.xml ---------------------------------------------------------------------- diff --git a/ambari-server/src/main/resources/stacks/HDP/3.0/services/SPARK/metainfo.xml b/ambari-server/src/main/resources/stacks/HDP/3.0/services/SPARK/metainfo.xml new file mode 100644 index 0000000..7fe44fe --- /dev/null +++ b/ambari-server/src/main/resources/stacks/HDP/3.0/services/SPARK/metainfo.xml @@ -0,0 +1,32 @@ +<?xml version="1.0"?> +<!-- +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +--> +<metainfo> + <schemaVersion>2.0</schemaVersion> + <services> + <service> + <name>SPARK</name> + <comment>Apache Spark is a fast and general engine for large-scale data processing</comment> + <version>2.2.0</version> + <selection>DEFAULT</selection> + <extends>common-services/SPARK/2.2.0</extends> + </service> + </services> +</metainfo>
