http://git-wip-us.apache.org/repos/asf/ambari/blob/6ab4d28a/ambari-server/src/main/resources/common-services/STORM/1.0.1.3.0/package/scripts/params_linux.py ---------------------------------------------------------------------- diff --git a/ambari-server/src/main/resources/common-services/STORM/1.0.1.3.0/package/scripts/params_linux.py b/ambari-server/src/main/resources/common-services/STORM/1.0.1.3.0/package/scripts/params_linux.py new file mode 100644 index 0000000..78ec165 --- /dev/null +++ b/ambari-server/src/main/resources/common-services/STORM/1.0.1.3.0/package/scripts/params_linux.py @@ -0,0 +1,424 @@ +#!/usr/bin/env python +""" +Licensed to the Apache Software Foundation (ASF) under one +or more contributor license agreements. See the NOTICE file +distributed with this work for additional information +regarding copyright ownership. The ASF licenses this file +to you under the Apache License, Version 2.0 (the +"License"); you may not use this file except in compliance +with the License. You may obtain a copy of the License at + + http://www.apache.org/licenses/LICENSE-2.0 + +Unless required by applicable law or agreed to in writing, software +distributed under the License is distributed on an "AS IS" BASIS, +WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +See the License for the specific language governing permissions and +limitations under the License. + +""" +import os +import re +import ambari_simplejson as json # simplejson is much faster comparing to Python 2.6 json module and has the same functions set. + +import status_params + +from ambari_commons.constants import AMBARI_SUDO_BINARY +from ambari_commons import yaml_utils +from resource_management.libraries.functions import format +from resource_management.libraries.functions.default import default +from resource_management.libraries.functions.get_bare_principal import get_bare_principal +from resource_management.libraries.script import Script +from resource_management.libraries.resources.hdfs_resource import HdfsResource +from resource_management.libraries.functions import stack_select +from resource_management.libraries.functions import conf_select +from resource_management.libraries.functions import get_kinit_path +from resource_management.libraries.functions.get_not_managed_resources import get_not_managed_resources +from resource_management.libraries.functions.stack_features import check_stack_feature +from resource_management.libraries.functions.stack_features import get_stack_feature_version +from resource_management.libraries.functions import StackFeature +from resource_management.libraries.functions.expect import expect +from resource_management.libraries.functions.setup_atlas_hook import has_atlas_in_cluster +from resource_management.libraries.functions import is_empty +from ambari_commons.ambari_metrics_helper import select_metric_collector_hosts_from_hostnames +from resource_management.libraries.functions.setup_ranger_plugin_xml import get_audit_configs, generate_ranger_service_config + +# server configurations +config = Script.get_config() +tmp_dir = Script.get_tmp_dir() +stack_root = status_params.stack_root +sudo = AMBARI_SUDO_BINARY + +limits_conf_dir = "/etc/security/limits.d" + +# Needed since this is an Atlas Hook service. +cluster_name = config['clusterName'] + +stack_name = status_params.stack_name +upgrade_direction = default("/commandParams/upgrade_direction", None) +version = default("/commandParams/version", None) + +agent_stack_retry_on_unavailability = config['hostLevelParams']['agent_stack_retry_on_unavailability'] +agent_stack_retry_count = expect("/hostLevelParams/agent_stack_retry_count", int) + +storm_component_home_dir = status_params.storm_component_home_dir +conf_dir = status_params.conf_dir + +stack_version_unformatted = status_params.stack_version_unformatted +stack_version_formatted = status_params.stack_version_formatted +stack_supports_ru = stack_version_formatted and check_stack_feature(StackFeature.ROLLING_UPGRADE, stack_version_formatted) +stack_supports_storm_kerberos = stack_version_formatted and check_stack_feature(StackFeature.STORM_KERBEROS, stack_version_formatted) +stack_supports_storm_ams = stack_version_formatted and check_stack_feature(StackFeature.STORM_AMS, stack_version_formatted) +stack_supports_core_site_for_ranger_plugin = check_stack_feature(StackFeature.CORE_SITE_FOR_RANGER_PLUGINS_SUPPORT, stack_version_formatted) + +# get the correct version to use for checking stack features +version_for_stack_feature_checks = get_stack_feature_version(config) + +stack_supports_ranger_kerberos = check_stack_feature(StackFeature.RANGER_KERBEROS_SUPPORT, version_for_stack_feature_checks) +stack_supports_ranger_audit_db = check_stack_feature(StackFeature.RANGER_AUDIT_DB_SUPPORT, version_for_stack_feature_checks) + +# default hadoop params +rest_lib_dir = "/usr/lib/storm/contrib/storm-rest" +storm_bin_dir = "/usr/bin" +storm_lib_dir = "/usr/lib/storm/lib/" + +# hadoop parameters for 2.2+ +if stack_supports_ru: + rest_lib_dir = format("{storm_component_home_dir}/contrib/storm-rest") + storm_bin_dir = format("{storm_component_home_dir}/bin") + storm_lib_dir = format("{storm_component_home_dir}/lib") + log4j_dir = format("{storm_component_home_dir}/log4j2") + +storm_user = config['configurations']['storm-env']['storm_user'] +log_dir = config['configurations']['storm-env']['storm_log_dir'] +pid_dir = status_params.pid_dir +local_dir = config['configurations']['storm-site']['storm.local.dir'] +user_group = config['configurations']['cluster-env']['user_group'] +java64_home = config['hostLevelParams']['java_home'] +jps_binary = format("{java64_home}/bin/jps") +nimbus_port = config['configurations']['storm-site']['nimbus.thrift.port'] +storm_zookeeper_root_dir = default('/configurations/storm-site/storm.zookeeper.root', None) +storm_zookeeper_servers = config['configurations']['storm-site']['storm.zookeeper.servers'] +storm_zookeeper_port = config['configurations']['storm-site']['storm.zookeeper.port'] +storm_logs_supported = config['configurations']['storm-env']['storm_logs_supported'] + +# nimbus.seeds is supported in HDP 2.3.0.0 and higher +nimbus_seeds_supported = default('/configurations/storm-env/nimbus_seeds_supported', False) +nimbus_host = default('/configurations/storm-site/nimbus.host', None) +nimbus_seeds = default('/configurations/storm-site/nimbus.seeds', None) +default_topology_max_replication_wait_time_sec = default('/configurations/storm-site/topology.max.replication.wait.time.sec.default', -1) +nimbus_hosts = default("/clusterHostInfo/nimbus_hosts", []) +default_topology_min_replication_count = default('/configurations/storm-site/topology.min.replication.count.default', 1) + +#Calculate topology.max.replication.wait.time.sec and topology.min.replication.count +if len(nimbus_hosts) > 1: + # for HA Nimbus + actual_topology_max_replication_wait_time_sec = -1 + actual_topology_min_replication_count = len(nimbus_hosts) / 2 + 1 +else: + # for non-HA Nimbus + actual_topology_max_replication_wait_time_sec = default_topology_max_replication_wait_time_sec + actual_topology_min_replication_count = default_topology_min_replication_count + +if 'topology.max.replication.wait.time.sec.default' in config['configurations']['storm-site']: + del config['configurations']['storm-site']['topology.max.replication.wait.time.sec.default'] +if 'topology.min.replication.count.default' in config['configurations']['storm-site']: + del config['configurations']['storm-site']['topology.min.replication.count.default'] + +rest_api_port = "8745" +rest_api_admin_port = "8746" +rest_api_conf_file = format("{conf_dir}/config.yaml") +storm_env_sh_template = config['configurations']['storm-env']['content'] +jmxremote_port = config['configurations']['storm-env']['jmxremote_port'] + +if 'ganglia_server_host' in config['clusterHostInfo'] and len(config['clusterHostInfo']['ganglia_server_host'])>0: + ganglia_installed = True + ganglia_server = config['clusterHostInfo']['ganglia_server_host'][0] + ganglia_report_interval = 60 +else: + ganglia_installed = False + +security_enabled = config['configurations']['cluster-env']['security_enabled'] + +storm_ui_host = default("/clusterHostInfo/storm_ui_server_hosts", []) + +storm_user_nofile_limit = default('/configurations/storm-env/storm_user_nofile_limit', 128000) +storm_user_nproc_limit = default('/configurations/storm-env/storm_user_noproc_limit', 65536) + +if security_enabled: + _hostname_lowercase = config['hostname'].lower() + _storm_principal_name = config['configurations']['storm-env']['storm_principal_name'] + storm_jaas_principal = _storm_principal_name.replace('_HOST',_hostname_lowercase) + _ambari_principal_name = default('/configurations/cluster-env/ambari_principal_name', None) + storm_keytab_path = config['configurations']['storm-env']['storm_keytab'] + + if stack_supports_storm_kerberos: + storm_ui_keytab_path = config['configurations']['storm-env']['storm_ui_keytab'] + _storm_ui_jaas_principal_name = config['configurations']['storm-env']['storm_ui_principal_name'] + storm_ui_jaas_principal = _storm_ui_jaas_principal_name.replace('_HOST',_hostname_lowercase) + storm_bare_jaas_principal = get_bare_principal(_storm_principal_name) + if _ambari_principal_name: + ambari_bare_jaas_principal = get_bare_principal(_ambari_principal_name) + _nimbus_principal_name = config['configurations']['storm-env']['nimbus_principal_name'] + nimbus_jaas_principal = _nimbus_principal_name.replace('_HOST', _hostname_lowercase) + nimbus_bare_jaas_principal = get_bare_principal(_nimbus_principal_name) + nimbus_keytab_path = config['configurations']['storm-env']['nimbus_keytab'] + +kafka_bare_jaas_principal = None +if stack_supports_storm_kerberos: + if security_enabled: + storm_thrift_transport = config['configurations']['storm-site']['_storm.thrift.secure.transport'] + # generate KafkaClient jaas config if kafka is kerberoized + _kafka_principal_name = default("/configurations/kafka-env/kafka_principal_name", None) + kafka_bare_jaas_principal = get_bare_principal(_kafka_principal_name) + else: + storm_thrift_transport = config['configurations']['storm-site']['_storm.thrift.nonsecure.transport'] + +set_instanceId = "false" +if 'cluster-env' in config['configurations'] and \ + 'metrics_collector_external_hosts' in config['configurations']['cluster-env']: + ams_collector_hosts = config['configurations']['cluster-env']['metrics_collector_external_hosts'] + set_instanceId = "true" +else: + ams_collector_hosts = ",".join(default("/clusterHostInfo/metrics_collector_hosts", [])) +has_metric_collector = not len(ams_collector_hosts) == 0 +metric_collector_port = None +if has_metric_collector: + if 'cluster-env' in config['configurations'] and \ + 'metrics_collector_external_port' in config['configurations']['cluster-env']: + metric_collector_port = config['configurations']['cluster-env']['metrics_collector_external_port'] + else: + metric_collector_web_address = default("/configurations/ams-site/timeline.metrics.service.webapp.address", "0.0.0.0:6188") + if metric_collector_web_address.find(':') != -1: + metric_collector_port = metric_collector_web_address.split(':')[1] + else: + metric_collector_port = '6188' + + metric_collector_report_interval = 60 + metric_collector_app_id = "nimbus" + if default("/configurations/ams-site/timeline.metrics.service.http.policy", "HTTP_ONLY") == "HTTPS_ONLY": + metric_collector_protocol = 'https' + else: + metric_collector_protocol = 'http' + metric_truststore_path= default("/configurations/ams-ssl-client/ssl.client.truststore.location", "") + metric_truststore_type= default("/configurations/ams-ssl-client/ssl.client.truststore.type", "") + metric_truststore_password= default("/configurations/ams-ssl-client/ssl.client.truststore.password", "") + pass +metrics_report_interval = default("/configurations/ams-site/timeline.metrics.sink.report.interval", 60) +metrics_collection_period = default("/configurations/ams-site/timeline.metrics.sink.collection.period", 10) +metric_collector_sink_jar = "/usr/lib/storm/lib/ambari-metrics-storm-sink-with-common-*.jar" +metric_collector_legacy_sink_jar = "/usr/lib/storm/lib/ambari-metrics-storm-sink-legacy-with-common-*.jar" +host_in_memory_aggregation = default("/configurations/ams-site/timeline.metrics.host.inmemory.aggregation", True) +host_in_memory_aggregation_port = default("/configurations/ams-site/timeline.metrics.host.inmemory.aggregation.port", 61888) + + +# Cluster Zookeeper quorum +zookeeper_quorum = "" +if storm_zookeeper_servers: + storm_zookeeper_servers_list = yaml_utils.get_values_from_yaml_array(storm_zookeeper_servers) + zookeeper_quorum = (":" + storm_zookeeper_port + ",").join(storm_zookeeper_servers_list) + zookeeper_quorum += ":" + storm_zookeeper_port + +jar_jvm_opts = '' + +######################################################## +############# Atlas related params ##################### +######################################################## +#region Atlas Hooks +storm_atlas_application_properties = default('/configurations/storm-atlas-application.properties', {}) +enable_atlas_hook = default('/configurations/storm-env/storm.atlas.hook', False) +atlas_hook_filename = default('/configurations/atlas-env/metadata_conf_file', 'atlas-application.properties') + +if enable_atlas_hook: + # Only append /etc/atlas/conf to classpath if on HDP 2.4.* + if check_stack_feature(StackFeature.ATLAS_CONF_DIR_IN_PATH, stack_version_formatted): + atlas_conf_dir = format('{stack_root}/current/atlas-server/conf') + jar_jvm_opts += '-Datlas.conf=' + atlas_conf_dir +#endregion + +storm_ui_port = config['configurations']['storm-site']['ui.port'] + +#Storm log4j properties +storm_a1_maxfilesize = default('/configurations/storm-cluster-log4j/storm_a1_maxfilesize', 100) +storm_a1_maxbackupindex = default('/configurations/storm-cluster-log4j/storm_a1_maxbackupindex', 9) +storm_wrkr_a1_maxfilesize = default('/configurations/storm-worker-log4j/storm_wrkr_a1_maxfilesize', 100) +storm_wrkr_a1_maxbackupindex = default('/configurations/storm-worker-log4j/storm_wrkr_a1_maxbackupindex', 9) +storm_wrkr_out_maxfilesize = default('/configurations/storm-worker-log4j/storm_wrkr_out_maxfilesize', 100) +storm_wrkr_out_maxbackupindex = default('/configurations/storm-worker-log4j/storm_wrkr_out_maxbackupindex', 4) +storm_wrkr_err_maxfilesize = default('/configurations/storm-worker-log4j/storm_wrkr_err_maxfilesize', 100) +storm_wrkr_err_maxbackupindex = default('/configurations/storm-worker-log4j/storm_wrkr_err_maxbackupindex', 4) + +storm_cluster_log4j_content = config['configurations']['storm-cluster-log4j']['content'] +storm_worker_log4j_content = config['configurations']['storm-worker-log4j']['content'] + +# some commands may need to supply the JAAS location when running as storm +storm_jaas_file = format("{conf_dir}/storm_jaas.conf") + +# for curl command in ranger plugin to get db connector +jdk_location = config['hostLevelParams']['jdk_location'] + +# ranger storm plugin start section + +# ranger host +ranger_admin_hosts = default("/clusterHostInfo/ranger_admin_hosts", []) +has_ranger_admin = not len(ranger_admin_hosts) == 0 + +# ranger support xml_configuration flag, instead of depending on ranger xml_configurations_supported/ranger-env, using stack feature +xml_configurations_supported = check_stack_feature(StackFeature.RANGER_XML_CONFIGURATION, version_for_stack_feature_checks) + +# ambari-server hostname +ambari_server_hostname = config['clusterHostInfo']['ambari_server_host'][0] + +# ranger storm plugin enabled property +enable_ranger_storm = default("/configurations/ranger-storm-plugin-properties/ranger-storm-plugin-enabled", "No") +enable_ranger_storm = True if enable_ranger_storm.lower() == 'yes' else False + +# ranger storm properties +if enable_ranger_storm: + # get ranger policy url + policymgr_mgr_url = config['configurations']['admin-properties']['policymgr_external_url'] + if xml_configurations_supported: + policymgr_mgr_url = config['configurations']['ranger-storm-security']['ranger.plugin.storm.policy.rest.url'] + + if not is_empty(policymgr_mgr_url) and policymgr_mgr_url.endswith('/'): + policymgr_mgr_url = policymgr_mgr_url.rstrip('/') + + # ranger audit db user + xa_audit_db_user = default('/configurations/admin-properties/audit_db_user', 'rangerlogger') + + # ranger storm service name + repo_name = str(config['clusterName']) + '_storm' + repo_name_value = config['configurations']['ranger-storm-security']['ranger.plugin.storm.service.name'] + if not is_empty(repo_name_value) and repo_name_value != "{{repo_name}}": + repo_name = repo_name_value + + common_name_for_certificate = config['configurations']['ranger-storm-plugin-properties']['common.name.for.certificate'] + repo_config_username = config['configurations']['ranger-storm-plugin-properties']['REPOSITORY_CONFIG_USERNAME'] + + # ranger-env config + ranger_env = config['configurations']['ranger-env'] + + # create ranger-env config having external ranger credential properties + if not has_ranger_admin and enable_ranger_storm: + external_admin_username = default('/configurations/ranger-storm-plugin-properties/external_admin_username', 'admin') + external_admin_password = default('/configurations/ranger-storm-plugin-properties/external_admin_password', 'admin') + external_ranger_admin_username = default('/configurations/ranger-storm-plugin-properties/external_ranger_admin_username', 'amb_ranger_admin') + external_ranger_admin_password = default('/configurations/ranger-storm-plugin-properties/external_ranger_admin_password', 'amb_ranger_admin') + ranger_env = {} + ranger_env['admin_username'] = external_admin_username + ranger_env['admin_password'] = external_admin_password + ranger_env['ranger_admin_username'] = external_ranger_admin_username + ranger_env['ranger_admin_password'] = external_ranger_admin_password + + ranger_plugin_properties = config['configurations']['ranger-storm-plugin-properties'] + policy_user = storm_user + repo_config_password = config['configurations']['ranger-storm-plugin-properties']['REPOSITORY_CONFIG_PASSWORD'] + + xa_audit_db_password = '' + if not is_empty(config['configurations']['admin-properties']['audit_db_password']) and stack_supports_ranger_audit_db and has_ranger_admin: + xa_audit_db_password = config['configurations']['admin-properties']['audit_db_password'] + + repo_config_password = config['configurations']['ranger-storm-plugin-properties']['REPOSITORY_CONFIG_PASSWORD'] + + downloaded_custom_connector = None + previous_jdbc_jar_name = None + driver_curl_source = None + driver_curl_target = None + previous_jdbc_jar = None + + if has_ranger_admin and stack_supports_ranger_audit_db: + xa_audit_db_flavor = config['configurations']['admin-properties']['DB_FLAVOR'] + jdbc_jar_name, previous_jdbc_jar_name, audit_jdbc_url, jdbc_driver = get_audit_configs(config) + + downloaded_custom_connector = format("{tmp_dir}/{jdbc_jar_name}") if stack_supports_ranger_audit_db else None + driver_curl_source = format("{jdk_location}/{jdbc_jar_name}") if stack_supports_ranger_audit_db else None + driver_curl_target = format("{storm_component_home_dir}/lib/{jdbc_jar_name}") if stack_supports_ranger_audit_db else None + previous_jdbc_jar = format("{storm_component_home_dir}/lib/{previous_jdbc_jar_name}") if stack_supports_ranger_audit_db else None + sql_connector_jar = '' + + storm_ranger_plugin_config = { + 'username': repo_config_username, + 'password': repo_config_password, + 'nimbus.url': 'http://' + storm_ui_host[0].lower() + ':' + str(storm_ui_port), + 'commonNameForCertificate': common_name_for_certificate + } + + storm_ranger_plugin_repo = { + 'isActive': 'true', + 'config': json.dumps(storm_ranger_plugin_config), + 'description': 'storm repo', + 'name': repo_name, + 'repositoryType': 'storm', + 'assetType': '6' + } + + custom_ranger_service_config = generate_ranger_service_config(ranger_plugin_properties) + if len(custom_ranger_service_config) > 0: + storm_ranger_plugin_config.update(custom_ranger_service_config) + + if stack_supports_ranger_kerberos and security_enabled: + policy_user = format('{storm_user},{storm_bare_jaas_principal}') + storm_ranger_plugin_config['policy.download.auth.users'] = policy_user + storm_ranger_plugin_config['tag.download.auth.users'] = policy_user + storm_ranger_plugin_config['ambari.service.check.user'] = policy_user + + storm_ranger_plugin_repo = { + 'isEnabled': 'true', + 'configs': storm_ranger_plugin_config, + 'description': 'storm repo', + 'name': repo_name, + 'type': 'storm' + } + + ranger_storm_principal = None + ranger_storm_keytab = None + if stack_supports_ranger_kerberos and security_enabled: + ranger_storm_principal = storm_jaas_principal + ranger_storm_keytab = storm_keytab_path + + xa_audit_db_is_enabled = False + if xml_configurations_supported and stack_supports_ranger_audit_db: + xa_audit_db_is_enabled = config['configurations']['ranger-storm-audit']['xasecure.audit.destination.db'] + + xa_audit_hdfs_is_enabled = default('/configurations/ranger-storm-audit/xasecure.audit.destination.hdfs', False) + ssl_keystore_password = config['configurations']['ranger-storm-policymgr-ssl']['xasecure.policymgr.clientssl.keystore.password'] if xml_configurations_supported else None + ssl_truststore_password = config['configurations']['ranger-storm-policymgr-ssl']['xasecure.policymgr.clientssl.truststore.password'] if xml_configurations_supported else None + credential_file = format('/etc/ranger/{repo_name}/cred.jceks') + + # for SQLA explicitly disable audit to DB for Ranger + if has_ranger_admin and stack_supports_ranger_audit_db and xa_audit_db_flavor.lower() == 'sqla': + xa_audit_db_is_enabled = False + +# ranger storm plugin end section + +namenode_hosts = default("/clusterHostInfo/namenode_host", []) +has_namenode = not len(namenode_hosts) == 0 + +availableServices = config['availableServices'] + +hdfs_user = config['configurations']['hadoop-env']['hdfs_user'] if has_namenode else None +hdfs_user_keytab = config['configurations']['hadoop-env']['hdfs_user_keytab'] if has_namenode else None +hdfs_principal_name = config['configurations']['hadoop-env']['hdfs_principal_name'] if has_namenode else None +hdfs_site = config['configurations']['hdfs-site'] if has_namenode else None +default_fs = config['configurations']['core-site']['fs.defaultFS'] if has_namenode else None +hadoop_bin_dir = stack_select.get_hadoop_dir("bin") if has_namenode else None +hadoop_conf_dir = conf_select.get_hadoop_conf_dir() if has_namenode else None +kinit_path_local = get_kinit_path(default('/configurations/kerberos-env/executable_search_paths', None)) + +import functools +#create partial functions with common arguments for every HdfsResource call +#to create/delete hdfs directory/file/copyfromlocal we need to call params.HdfsResource in code +HdfsResource = functools.partial( + HdfsResource, + user=hdfs_user, + hdfs_resource_ignore_file = "/var/lib/ambari-agent/data/.hdfs_resource_ignore", + security_enabled = security_enabled, + keytab = hdfs_user_keytab, + kinit_path_local = kinit_path_local, + hadoop_bin_dir = hadoop_bin_dir, + hadoop_conf_dir = hadoop_conf_dir, + principal_name = hdfs_principal_name, + hdfs_site = hdfs_site, + default_fs = default_fs, + immutable_paths = get_not_managed_resources() +)
http://git-wip-us.apache.org/repos/asf/ambari/blob/6ab4d28a/ambari-server/src/main/resources/common-services/STORM/1.0.1.3.0/package/scripts/params_windows.py ---------------------------------------------------------------------- diff --git a/ambari-server/src/main/resources/common-services/STORM/1.0.1.3.0/package/scripts/params_windows.py b/ambari-server/src/main/resources/common-services/STORM/1.0.1.3.0/package/scripts/params_windows.py new file mode 100644 index 0000000..a758375 --- /dev/null +++ b/ambari-server/src/main/resources/common-services/STORM/1.0.1.3.0/package/scripts/params_windows.py @@ -0,0 +1,60 @@ +#!/usr/bin/env python +""" +Licensed to the Apache Software Foundation (ASF) under one +or more contributor license agreements. See the NOTICE file +distributed with this work for additional information +regarding copyright ownership. The ASF licenses this file +to you under the Apache License, Version 2.0 (the +"License"); you may not use this file except in compliance +with the License. You may obtain a copy of the License at + + http://www.apache.org/licenses/LICENSE-2.0 + +Unless required by applicable law or agreed to in writing, software +distributed under the License is distributed on an "AS IS" BASIS, +WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +See the License for the specific language governing permissions and +limitations under the License. + +""" + +from status_params import * +from resource_management.libraries.script.script import Script +from resource_management.libraries.functions.default import default + +# server configurations +config = Script.get_config() + +stack_is_hdp23_or_further = Script.is_stack_greater_or_equal("2.3") + +stack_root = os.path.abspath(os.path.join(os.environ["HADOOP_HOME"],"..")) +conf_dir = os.environ["STORM_CONF_DIR"] +hadoop_user = config["configurations"]["cluster-env"]["hadoop.user.name"] +storm_user = hadoop_user + +security_enabled = config['configurations']['cluster-env']['security_enabled'] +default_topology_max_replication_wait_time_sec = default('/configurations/storm-site/topology.max.replication.wait.time.sec.default', -1) +nimbus_hosts = default("/clusterHostInfo/nimbus_hosts", []) +default_topology_min_replication_count = default('/configurations/storm-site/topology.min.replication.count.default', 1) + +#Calculate topology.max.replication.wait.time.sec and topology.min.replication.count +if len(nimbus_hosts) > 1: + # for HA Nimbus + actual_topology_max_replication_wait_time_sec = -1 + actual_topology_min_replication_count = len(nimbus_hosts) / 2 + 1 +else: + # for non-HA Nimbus + actual_topology_max_replication_wait_time_sec = default_topology_max_replication_wait_time_sec + actual_topology_min_replication_count = default_topology_min_replication_count + +if stack_is_hdp23_or_further: + if security_enabled: + storm_thrift_transport = config['configurations']['storm-site']['_storm.thrift.secure.transport'] + else: + storm_thrift_transport = config['configurations']['storm-site']['_storm.thrift.nonsecure.transport'] + +service_map = { + "nimbus" : nimbus_win_service_name, + "supervisor" : supervisor_win_service_name, + "ui" : ui_win_service_name +} http://git-wip-us.apache.org/repos/asf/ambari/blob/6ab4d28a/ambari-server/src/main/resources/common-services/STORM/1.0.1.3.0/package/scripts/rest_api.py ---------------------------------------------------------------------- diff --git a/ambari-server/src/main/resources/common-services/STORM/1.0.1.3.0/package/scripts/rest_api.py b/ambari-server/src/main/resources/common-services/STORM/1.0.1.3.0/package/scripts/rest_api.py new file mode 100644 index 0000000..f9b3b80 --- /dev/null +++ b/ambari-server/src/main/resources/common-services/STORM/1.0.1.3.0/package/scripts/rest_api.py @@ -0,0 +1,85 @@ +#!/usr/bin/env python +""" +Licensed to the Apache Software Foundation (ASF) under one +or more contributor license agreements. See the NOTICE file +distributed with this work for additional information +regarding copyright ownership. The ASF licenses this file +to you under the Apache License, Version 2.0 (the +"License"); you may not use this file except in compliance +with the License. You may obtain a copy of the License at + + http://www.apache.org/licenses/LICENSE-2.0 + +Unless required by applicable law or agreed to in writing, software +distributed under the License is distributed on an "AS IS" BASIS, +WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +See the License for the specific language governing permissions and +limitations under the License. + +""" + +import sys +from resource_management.libraries.functions import check_process_status +from resource_management.libraries.script import Script +from resource_management.libraries.functions import conf_select +from resource_management.libraries.functions import stack_select +from resource_management.libraries.functions import format +from resource_management.core.resources.system import Execute + +from storm import storm +from service import service +from service_check import ServiceCheck + + +class StormRestApi(Script): + """ + Storm REST API. + It was available in HDP 2.0 and 2.1. + In HDP 2.2, it was removed since the functionality was moved to Storm UI Server. + """ + + def get_component_name(self): + return "storm-client" + + def install(self, env): + self.install_packages(env) + self.configure(env) + + def configure(self, env): + import params + env.set_params(params) + + storm() + + def start(self, env, upgrade_type=None): + import params + env.set_params(params) + self.configure(env) + + service("rest_api", action="start") + + def stop(self, env, upgrade_type=None): + import params + env.set_params(params) + + service("rest_api", action="stop") + + def status(self, env): + import status_params + env.set_params(status_params) + check_process_status(status_params.pid_rest_api) + + def get_log_folder(self): + import params + return params.log_dir + + def get_user(self): + import params + return params.storm_user + + def get_pid_files(self): + import status_params + return [status_params.pid_rest_api] + +if __name__ == "__main__": + StormRestApi().execute() http://git-wip-us.apache.org/repos/asf/ambari/blob/6ab4d28a/ambari-server/src/main/resources/common-services/STORM/1.0.1.3.0/package/scripts/service.py ---------------------------------------------------------------------- diff --git a/ambari-server/src/main/resources/common-services/STORM/1.0.1.3.0/package/scripts/service.py b/ambari-server/src/main/resources/common-services/STORM/1.0.1.3.0/package/scripts/service.py new file mode 100644 index 0000000..b5e5cd5 --- /dev/null +++ b/ambari-server/src/main/resources/common-services/STORM/1.0.1.3.0/package/scripts/service.py @@ -0,0 +1,95 @@ +#!/usr/bin/env python +""" +Licensed to the Apache Software Foundation (ASF) under one +or more contributor license agreements. See the NOTICE file +distributed with this work for additional information +regarding copyright ownership. The ASF licenses this file +to you under the Apache License, Version 2.0 (the +"License"); you may not use this file except in compliance +with the License. You may obtain a copy of the License at + + http://www.apache.org/licenses/LICENSE-2.0 + +Unless required by applicable law or agreed to in writing, software +distributed under the License is distributed on an "AS IS" BASIS, +WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +See the License for the specific language governing permissions and +limitations under the License. + +""" + +import os + +from resource_management.core.resources import Execute +from resource_management.core.resources import File +from resource_management.core.shell import as_user +from resource_management.core import shell +from resource_management.core.logger import Logger +from resource_management.libraries.functions.format import format +from resource_management.libraries.functions import get_user_call_output +from resource_management.libraries.functions.show_logs import show_logs +import time + + +def service(name, action = 'start'): + import params + import status_params + + pid_file = status_params.pid_files[name] + no_op_test = as_user(format( + "ls {pid_file} >/dev/null 2>&1 && ps -p `cat {pid_file}` >/dev/null 2>&1"), user=params.storm_user) + + if name == 'ui': + process_grep = "storm.ui.core$" + elif name == "rest_api": + process_grep = format("{rest_lib_dir}/storm-rest-.*\.jar$") + else: + process_grep = format("storm.daemon.{name}$") + + find_proc = format("{jps_binary} -l | grep {process_grep}") + write_pid = format("{find_proc} | awk {{'print $1'}} > {pid_file}") + crt_pid_cmd = format("{find_proc} && {write_pid}") + storm_env = format( + "source {conf_dir}/storm-env.sh ; export PATH=$JAVA_HOME/bin:$PATH") + + if action == "start": + if name == "rest_api": + process_cmd = format( + "{storm_env} ; java -jar {rest_lib_dir}/`ls {rest_lib_dir} | grep -wE storm-rest-[0-9.-]+\.jar` server") + cmd = format( + "{process_cmd} {rest_api_conf_file} > {log_dir}/restapi.log 2>&1") + else: + # Storm start script gets forked into actual storm java process. + # Which means we can use the pid of start script as a pid of start component + cmd = format("{storm_env} ; storm {name} > {log_dir}/{name}.out 2>&1") + + cmd = format("{cmd} &\n echo $! > {pid_file}") + + Execute(cmd, + not_if = no_op_test, + user = params.storm_user, + path = params.storm_bin_dir, + ) + + File(pid_file, + owner = params.storm_user, + group = params.user_group + ) + elif action == "stop": + process_dont_exist = format("! ({no_op_test})") + if os.path.exists(pid_file): + pid = get_user_call_output.get_user_call_output(format("! test -f {pid_file} || cat {pid_file}"), user=params.storm_user)[1] + + # if multiple processes are running (for example user can start logviewer from console) + # there can be more than one id + pid = pid.replace("\n", " ") + + Execute(format("{sudo} kill {pid}"), + not_if = process_dont_exist) + + Execute(format("{sudo} kill -9 {pid}"), + not_if = format( + "sleep 2; {process_dont_exist} || sleep 20; {process_dont_exist}"), + ignore_failures = True) + + File(pid_file, action = "delete") http://git-wip-us.apache.org/repos/asf/ambari/blob/6ab4d28a/ambari-server/src/main/resources/common-services/STORM/1.0.1.3.0/package/scripts/service_check.py ---------------------------------------------------------------------- diff --git a/ambari-server/src/main/resources/common-services/STORM/1.0.1.3.0/package/scripts/service_check.py b/ambari-server/src/main/resources/common-services/STORM/1.0.1.3.0/package/scripts/service_check.py new file mode 100644 index 0000000..80ea0f5 --- /dev/null +++ b/ambari-server/src/main/resources/common-services/STORM/1.0.1.3.0/package/scripts/service_check.py @@ -0,0 +1,79 @@ +#!/usr/bin/env python +""" +Licensed to the Apache Software Foundation (ASF) under one +or more contributor license agreements. See the NOTICE file +distributed with this work for additional information +regarding copyright ownership. The ASF licenses this file +to you under the Apache License, Version 2.0 (the +"License"); you may not use this file except in compliance +with the License. You may obtain a copy of the License at + + http://www.apache.org/licenses/LICENSE-2.0 + +Unless required by applicable law or agreed to in writing, software +distributed under the License is distributed on an "AS IS" BASIS, +WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +See the License for the specific language governing permissions and +limitations under the License. + +""" + +import os + +from resource_management.libraries.functions.format import format +from resource_management.libraries.functions import get_unique_id_and_date +from resource_management.core.resources import File +from resource_management.core.resources import Execute +from resource_management.libraries.script import Script +from resource_management.core.source import StaticFile +from ambari_commons import OSCheck, OSConst +from ambari_commons.os_family_impl import OsFamilyImpl + +class ServiceCheck(Script): + pass + + +@OsFamilyImpl(os_family=OSConst.WINSRV_FAMILY) +class ServiceCheckWindows(ServiceCheck): + def service_check(self, env): + import params + env.set_params(params) + smoke_cmd = os.path.join(params.stack_root,"Run-SmokeTests.cmd") + service = "STORM" + Execute(format("cmd /C {smoke_cmd} {service}", smoke_cmd=smoke_cmd, service=service), user=params.storm_user, logoutput=True) + + +@OsFamilyImpl(os_family=OsFamilyImpl.DEFAULT) +class ServiceCheckDefault(ServiceCheck): + def service_check(self, env): + import params + env.set_params(params) + + unique = get_unique_id_and_date() + + File("/tmp/wordCount.jar", + content=StaticFile("wordCount.jar"), + owner=params.storm_user + ) + + cmd = "" + if params.nimbus_seeds_supported: + # Because this command is guaranteed to run on one of the hosts with storm client, there is no need + # to specify "-c nimbus.seeds={nimbus_seeds}" + cmd = format("storm jar /tmp/wordCount.jar storm.starter.WordCountTopology WordCount{unique}") + elif params.nimbus_host is not None: + cmd = format("storm jar /tmp/wordCount.jar storm.starter.WordCountTopology WordCount{unique} -c nimbus.host={nimbus_host}") + + Execute(cmd, + logoutput=True, + path=params.storm_bin_dir, + user=params.storm_user + ) + + Execute(format("storm kill WordCount{unique}"), + path=params.storm_bin_dir, + user=params.storm_user + ) + +if __name__ == "__main__": + ServiceCheck().execute() http://git-wip-us.apache.org/repos/asf/ambari/blob/6ab4d28a/ambari-server/src/main/resources/common-services/STORM/1.0.1.3.0/package/scripts/setup_ranger_storm.py ---------------------------------------------------------------------- diff --git a/ambari-server/src/main/resources/common-services/STORM/1.0.1.3.0/package/scripts/setup_ranger_storm.py b/ambari-server/src/main/resources/common-services/STORM/1.0.1.3.0/package/scripts/setup_ranger_storm.py new file mode 100644 index 0000000..c04496e --- /dev/null +++ b/ambari-server/src/main/resources/common-services/STORM/1.0.1.3.0/package/scripts/setup_ranger_storm.py @@ -0,0 +1,133 @@ +#!/usr/bin/env python +""" +Licensed to the Apache Software Foundation (ASF) under one +or more contributor license agreements. See the NOTICE file +distributed with this work for additional information +regarding copyright ownership. The ASF licenses this file +to you under the Apache License, Version 2.0 (the +"License"); you may not use this file except in compliance +with the License. You may obtain a copy of the License at + + http://www.apache.org/licenses/LICENSE-2.0 + +Unless required by applicable law or agreed to in writing, software +distributed under the License is distributed on an "AS IS" BASIS, +WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +See the License for the specific language governing permissions and +limitations under the License. + +""" +from resource_management.core.logger import Logger +from resource_management.libraries.functions.setup_ranger_plugin_xml import setup_core_site_for_required_plugins +from resource_management.libraries.resources.xml_config import XmlConfig +from resource_management.libraries.functions.format import format +from resource_management.core.resources import File, Directory + +def setup_ranger_storm(upgrade_type=None): + """ + :param upgrade_type: Upgrade Type such as "rolling" or "nonrolling" + """ + import params + if params.enable_ranger_storm and params.security_enabled: + + stack_version = None + if upgrade_type is not None: + stack_version = params.version + + if params.retryAble: + Logger.info("Storm: Setup ranger: command retry enables thus retrying if ranger admin is down !") + else: + Logger.info("Storm: Setup ranger: command retry not enabled thus skipping if ranger admin is down !") + + if params.xml_configurations_supported and params.enable_ranger_storm and params.xa_audit_hdfs_is_enabled: + if params.has_namenode: + params.HdfsResource("/ranger/audit", + type="directory", + action="create_on_execute", + owner=params.hdfs_user, + group=params.hdfs_user, + mode=0755, + recursive_chmod=True + ) + params.HdfsResource("/ranger/audit/storm", + type="directory", + action="create_on_execute", + owner=params.storm_user, + group=params.storm_user, + mode=0700, + recursive_chmod=True + ) + params.HdfsResource(None, action="execute") + + if params.xml_configurations_supported: + api_version=None + if params.stack_supports_ranger_kerberos: + api_version='v2' + from resource_management.libraries.functions.setup_ranger_plugin_xml import setup_ranger_plugin + setup_ranger_plugin('storm-nimbus', 'storm', params.previous_jdbc_jar, + params.downloaded_custom_connector, params.driver_curl_source, + params.driver_curl_target, params.java64_home, + params.repo_name, params.storm_ranger_plugin_repo, + params.ranger_env, params.ranger_plugin_properties, + params.policy_user, params.policymgr_mgr_url, + params.enable_ranger_storm, conf_dict=params.conf_dir, + component_user=params.storm_user, component_group=params.user_group, cache_service_list=['storm'], + plugin_audit_properties=params.config['configurations']['ranger-storm-audit'], plugin_audit_attributes=params.config['configuration_attributes']['ranger-storm-audit'], + plugin_security_properties=params.config['configurations']['ranger-storm-security'], plugin_security_attributes=params.config['configuration_attributes']['ranger-storm-security'], + plugin_policymgr_ssl_properties=params.config['configurations']['ranger-storm-policymgr-ssl'], plugin_policymgr_ssl_attributes=params.config['configuration_attributes']['ranger-storm-policymgr-ssl'], + component_list=['storm-client', 'storm-nimbus'], audit_db_is_enabled=params.xa_audit_db_is_enabled, + credential_file=params.credential_file, xa_audit_db_password=params.xa_audit_db_password, + ssl_truststore_password=params.ssl_truststore_password, ssl_keystore_password=params.ssl_keystore_password, + stack_version_override = stack_version, skip_if_rangeradmin_down= not params.retryAble,api_version=api_version, + is_security_enabled = params.security_enabled, + is_stack_supports_ranger_kerberos = params.stack_supports_ranger_kerberos, + component_user_principal=params.ranger_storm_principal if params.security_enabled else None, + component_user_keytab=params.ranger_storm_keytab if params.security_enabled else None) + else: + from resource_management.libraries.functions.setup_ranger_plugin import setup_ranger_plugin + setup_ranger_plugin('storm-nimbus', 'storm', params.previous_jdbc_jar, + params.downloaded_custom_connector, params.driver_curl_source, + params.driver_curl_target, params.java64_home, + params.repo_name, params.storm_ranger_plugin_repo, + params.ranger_env, params.ranger_plugin_properties, + params.policy_user, params.policymgr_mgr_url, + params.enable_ranger_storm, conf_dict=params.conf_dir, + component_user=params.storm_user, component_group=params.user_group, cache_service_list=['storm'], + plugin_audit_properties=params.config['configurations']['ranger-storm-audit'], plugin_audit_attributes=params.config['configuration_attributes']['ranger-storm-audit'], + plugin_security_properties=params.config['configurations']['ranger-storm-security'], plugin_security_attributes=params.config['configuration_attributes']['ranger-storm-security'], + plugin_policymgr_ssl_properties=params.config['configurations']['ranger-storm-policymgr-ssl'], plugin_policymgr_ssl_attributes=params.config['configuration_attributes']['ranger-storm-policymgr-ssl'], + component_list=['storm-client', 'storm-nimbus'], audit_db_is_enabled=params.xa_audit_db_is_enabled, + credential_file=params.credential_file, xa_audit_db_password=params.xa_audit_db_password, + ssl_truststore_password=params.ssl_truststore_password, ssl_keystore_password=params.ssl_keystore_password, + stack_version_override = stack_version, skip_if_rangeradmin_down= not params.retryAble) + + + site_files_create_path = format('{storm_component_home_dir}/extlib-daemon/ranger-storm-plugin-impl/conf') + Directory(site_files_create_path, + owner = params.storm_user, + group = params.user_group, + mode=0775, + create_parents = True, + cd_access = 'a' + ) + + if params.stack_supports_core_site_for_ranger_plugin and params.enable_ranger_storm and params.has_namenode and params.security_enabled: + Logger.info("Stack supports core-site.xml creation for Ranger plugin, creating create core-site.xml from namenode configuraitions") + setup_core_site_for_required_plugins(component_user=params.storm_user,component_group=params.user_group,create_core_site_path = site_files_create_path, config = params.config) + if len(params.namenode_hosts) > 1: + Logger.info('Ranger Storm plugin is enabled along with security and NameNode is HA , creating hdfs-site.xml') + XmlConfig("hdfs-site.xml", + conf_dir=site_files_create_path, + configurations=params.config['configurations']['hdfs-site'], + configuration_attributes=params.config['configuration_attributes']['hdfs-site'], + owner=params.storm_user, + group=params.user_group, + mode=0644 + ) + else: + Logger.info('Ranger Storm plugin is not enabled or security is disabled, removing hdfs-site.xml') + File(format('{site_files_create_path}/hdfs-site.xml'), action="delete") + else: + Logger.info("Stack does not support core-site.xml creation for Ranger plugin, skipping core-site.xml configurations") + else: + Logger.info('Ranger Storm plugin is not enabled') http://git-wip-us.apache.org/repos/asf/ambari/blob/6ab4d28a/ambari-server/src/main/resources/common-services/STORM/1.0.1.3.0/package/scripts/status_params.py ---------------------------------------------------------------------- diff --git a/ambari-server/src/main/resources/common-services/STORM/1.0.1.3.0/package/scripts/status_params.py b/ambari-server/src/main/resources/common-services/STORM/1.0.1.3.0/package/scripts/status_params.py new file mode 100644 index 0000000..d84b095 --- /dev/null +++ b/ambari-server/src/main/resources/common-services/STORM/1.0.1.3.0/package/scripts/status_params.py @@ -0,0 +1,83 @@ +#!/usr/bin/env python +""" +Licensed to the Apache Software Foundation (ASF) under one +or more contributor license agreements. See the NOTICE file +distributed with this work for additional information +regarding copyright ownership. The ASF licenses this file +to you under the Apache License, Version 2.0 (the +"License"); you may not use this file except in compliance +with the License. You may obtain a copy of the License at + + http://www.apache.org/licenses/LICENSE-2.0 + +Unless required by applicable law or agreed to in writing, software +distributed under the License is distributed on an "AS IS" BASIS, +WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +See the License for the specific language governing permissions and +limitations under the License. + +""" +from resource_management.libraries.script import Script +from resource_management.libraries.functions import get_kinit_path +from resource_management.libraries.functions import default, format +from resource_management.libraries.functions.version import format_stack_version +from resource_management.libraries.functions.stack_features import check_stack_feature +from resource_management.libraries.functions import StackFeature +from ambari_commons import OSCheck + +# a map of the Ambari role to the component name +# for use with <stack-root>/current/<component> +SERVER_ROLE_DIRECTORY_MAP = { + 'NIMBUS' : 'storm-nimbus', + 'SUPERVISOR' : 'storm-supervisor', + 'STORM_UI_SERVER' : 'storm-client', + 'DRPC_SERVER' : 'storm-client', + 'STORM_SERVICE_CHECK' : 'storm-client' +} + +component_directory = Script.get_component_from_role(SERVER_ROLE_DIRECTORY_MAP, "STORM_SERVICE_CHECK") + +config = Script.get_config() +stack_root = Script.get_stack_root() +stack_version_unformatted = str(config['hostLevelParams']['stack_version']) +stack_version_formatted = format_stack_version(stack_version_unformatted) + +if OSCheck.is_windows_family(): + nimbus_win_service_name = "nimbus" + supervisor_win_service_name = "supervisor" + ui_win_service_name = "ui" +else: + pid_dir = config['configurations']['storm-env']['storm_pid_dir'] + pid_nimbus = format("{pid_dir}/nimbus.pid") + pid_supervisor = format("{pid_dir}/supervisor.pid") + pid_drpc = format("{pid_dir}/drpc.pid") + pid_ui = format("{pid_dir}/ui.pid") + pid_logviewer = format("{pid_dir}/logviewer.pid") + pid_rest_api = format("{pid_dir}/restapi.pid") + + pid_files = { + "logviewer":pid_logviewer, + "ui": pid_ui, + "nimbus": pid_nimbus, + "supervisor": pid_supervisor, + "drpc": pid_drpc, + "rest_api": pid_rest_api + } + + # Security related/required params + hostname = config['hostname'] + security_enabled = config['configurations']['cluster-env']['security_enabled'] + kinit_path_local = get_kinit_path(default('/configurations/kerberos-env/executable_search_paths', None)) + tmp_dir = Script.get_tmp_dir() + + storm_component_home_dir = "/usr/lib/storm" + conf_dir = "/etc/storm/conf" + if stack_version_formatted and check_stack_feature(StackFeature.ROLLING_UPGRADE, stack_version_formatted): + storm_component_home_dir = format("{stack_root}/current/{component_directory}") + conf_dir = format("{stack_root}/current/{component_directory}/conf") + + storm_user = config['configurations']['storm-env']['storm_user'] + storm_ui_principal = default('/configurations/storm-env/storm_ui_principal_name', None) + storm_ui_keytab = default('/configurations/storm-env/storm_ui_keytab', None) + +stack_name = default("/hostLevelParams/stack_name", None) \ No newline at end of file http://git-wip-us.apache.org/repos/asf/ambari/blob/6ab4d28a/ambari-server/src/main/resources/common-services/STORM/1.0.1.3.0/package/scripts/storm.py ---------------------------------------------------------------------- diff --git a/ambari-server/src/main/resources/common-services/STORM/1.0.1.3.0/package/scripts/storm.py b/ambari-server/src/main/resources/common-services/STORM/1.0.1.3.0/package/scripts/storm.py new file mode 100644 index 0000000..99579d2 --- /dev/null +++ b/ambari-server/src/main/resources/common-services/STORM/1.0.1.3.0/package/scripts/storm.py @@ -0,0 +1,182 @@ +#!/usr/bin/env python +""" +Licensed to the Apache Software Foundation (ASF) under one +or more contributor license agreements. See the NOTICE file +distributed with this work for additional information +regarding copyright ownership. The ASF licenses this file +to you under the Apache License, Version 2.0 (the +"License"); you may not use this file except in compliance +with the License. You may obtain a copy of the License at + + http://www.apache.org/licenses/LICENSE-2.0 + +Unless required by applicable law or agreed to in writing, software +distributed under the License is distributed on an "AS IS" BASIS, +WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +See the License for the specific language governing permissions and +limitations under the License. + +""" + +from resource_management.core.exceptions import Fail +from resource_management.core.resources.service import ServiceConfig +from resource_management.core.resources.system import Directory, Execute, File, Link +from resource_management.core.source import InlineTemplate +from resource_management.libraries.resources.template_config import TemplateConfig +from resource_management.libraries.functions.format import format +from resource_management.libraries.script.script import Script +from resource_management.core.source import Template +from resource_management.libraries.functions.stack_features import check_stack_feature +from resource_management.libraries.functions import StackFeature +from storm_yaml_utils import yaml_config_template, yaml_config +from ambari_commons.os_family_impl import OsFamilyFuncImpl, OsFamilyImpl +from ambari_commons import OSConst +from resource_management.libraries.functions.setup_atlas_hook import has_atlas_in_cluster, setup_atlas_hook, setup_atlas_jar_symlinks +from ambari_commons.constants import SERVICE + + +@OsFamilyFuncImpl(os_family=OSConst.WINSRV_FAMILY) +def storm(name=None): + import params + yaml_config("storm.yaml", + conf_dir=params.conf_dir, + configurations=params.config['configurations']['storm-site'], + owner=params.storm_user + ) + + if params.service_map.has_key(name): + service_name = params.service_map[name] + ServiceConfig(service_name, + action="change_user", + username = params.storm_user, + password = Script.get_password(params.storm_user)) + + +@OsFamilyFuncImpl(os_family=OsFamilyImpl.DEFAULT) +def storm(name=None): + import params + import os + + Directory(params.log_dir, + owner=params.storm_user, + group=params.user_group, + mode=0777, + create_parents = True, + cd_access="a", + ) + + Directory([params.pid_dir, params.local_dir], + owner=params.storm_user, + group=params.user_group, + create_parents = True, + cd_access="a", + mode=0755, + ) + + Directory(params.conf_dir, + group=params.user_group, + create_parents = True, + cd_access="a", + ) + + File(format("{limits_conf_dir}/storm.conf"), + owner='root', + group='root', + mode=0644, + content=Template("storm.conf.j2") + ) + + File(format("{conf_dir}/config.yaml"), + content=Template("config.yaml.j2"), + owner=params.storm_user, + group=params.user_group + ) + + configurations = params.config['configurations']['storm-site'] + + File(format("{conf_dir}/storm.yaml"), + content=yaml_config_template(configurations), + owner=params.storm_user, + group=params.user_group + ) + + File(format("{conf_dir}/storm-env.sh"), + owner=params.storm_user, + content=InlineTemplate(params.storm_env_sh_template) + ) + + # Generate atlas-application.properties.xml file and symlink the hook jars + if params.enable_atlas_hook: + atlas_hook_filepath = os.path.join(params.conf_dir, params.atlas_hook_filename) + setup_atlas_hook(SERVICE.STORM, params.storm_atlas_application_properties, atlas_hook_filepath, params.storm_user, params.user_group) + storm_extlib_dir = os.path.join(params.storm_component_home_dir, "extlib") + setup_atlas_jar_symlinks("storm", storm_extlib_dir) + + if params.has_metric_collector: + File(format("{conf_dir}/storm-metrics2.properties"), + owner=params.storm_user, + group=params.user_group, + content=Template("storm-metrics2.properties.j2") + ) + + # Remove symlinks. They can be there, if you doing upgrade from HDP < 2.2 to HDP >= 2.2 + Link(format("{storm_lib_dir}/ambari-metrics-storm-sink.jar"), + action="delete") + # On old HDP 2.1 versions, this symlink may also exist and break EU to newer versions + Link("/usr/lib/storm/lib/ambari-metrics-storm-sink.jar", action="delete") + + if check_stack_feature(StackFeature.STORM_METRICS_APACHE_CLASSES, params.version_for_stack_feature_checks): + sink_jar = params.metric_collector_sink_jar + else: + sink_jar = params.metric_collector_legacy_sink_jar + + Execute(format("{sudo} ln -s {sink_jar} {storm_lib_dir}/ambari-metrics-storm-sink.jar"), + not_if=format("ls {storm_lib_dir}/ambari-metrics-storm-sink.jar"), + only_if=format("ls {sink_jar}") + ) + + if params.storm_logs_supported: + Directory(params.log4j_dir, + owner=params.storm_user, + group=params.user_group, + mode=0755, + create_parents = True + ) + + File(format("{log4j_dir}/cluster.xml"), + owner=params.storm_user, + content=InlineTemplate(params.storm_cluster_log4j_content) + ) + File(format("{log4j_dir}/worker.xml"), + owner=params.storm_user, + content=InlineTemplate(params.storm_worker_log4j_content) + ) + + if params.security_enabled: + TemplateConfig(format("{conf_dir}/storm_jaas.conf"), + owner=params.storm_user + ) + if params.stack_version_formatted and check_stack_feature(StackFeature.ROLLING_UPGRADE, params.stack_version_formatted): + TemplateConfig(format("{conf_dir}/client_jaas.conf"), + owner=params.storm_user + ) + minRuid = configurations['_storm.min.ruid'] if configurations.has_key('_storm.min.ruid') else '' + + min_user_ruid = int(minRuid) if minRuid.isdigit() else _find_real_user_min_uid() + + File(format("{conf_dir}/worker-launcher.cfg"), + content=Template("worker-launcher.cfg.j2", min_user_ruid = min_user_ruid), + owner='root', + group=params.user_group + ) + + +''' +Finds minimal real user UID +''' +def _find_real_user_min_uid(): + with open('/etc/login.defs') as f: + for line in f: + if line.strip().startswith('UID_MIN') and len(line.split()) == 2 and line.split()[1].isdigit(): + return int(line.split()[1]) + raise Fail("Unable to find UID_MIN in file /etc/login.defs. Expecting format e.g.: 'UID_MIN 500'") http://git-wip-us.apache.org/repos/asf/ambari/blob/6ab4d28a/ambari-server/src/main/resources/common-services/STORM/1.0.1.3.0/package/scripts/storm_upgrade.py ---------------------------------------------------------------------- diff --git a/ambari-server/src/main/resources/common-services/STORM/1.0.1.3.0/package/scripts/storm_upgrade.py b/ambari-server/src/main/resources/common-services/STORM/1.0.1.3.0/package/scripts/storm_upgrade.py new file mode 100644 index 0000000..bc245c4 --- /dev/null +++ b/ambari-server/src/main/resources/common-services/STORM/1.0.1.3.0/package/scripts/storm_upgrade.py @@ -0,0 +1,177 @@ +""" +Licensed to the Apache Software Foundation (ASF) under one +or more contributor license agreements. See the NOTICE file +distributed with this work for additional information +regarding copyright ownership. The ASF licenses this file +to you under the Apache License, Version 2.0 (the +"License"); you may not use this file except in compliance +with the License. You may obtain a copy of the License at + + http://www.apache.org/licenses/LICENSE-2.0 + +Unless required by applicable law or agreed to in writing, software +distributed under the License is distributed on an "AS IS" BASIS, +WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +See the License for the specific language governing permissions and +limitations under the License. + +""" +import ambari_simplejson as json # simplejson is much faster comparing to Python 2.6 json module and has the same functions set. +import os + +from ambari_commons import yaml_utils +from resource_management.core.logger import Logger +from resource_management.core.exceptions import Fail +from resource_management.core.resources.system import Directory +from resource_management.core.resources.system import File +from resource_management.core.resources.system import Execute +from resource_management.libraries.script.script import Script +from resource_management.libraries.functions.default import default +from resource_management.libraries.functions.format import format + +class StormUpgrade(Script): + """ + Applies to Rolling/Express Upgrade from HDP 2.1 or 2.2 to 2.3 or higher. + + Requirements: Needs to run from a host with ZooKeeper Client. + + This class helps perform some of the upgrade tasks needed for Storm during + a Rolling or Express upgrade. Storm writes data to disk locally and to ZooKeeper. + If any HDP 2.1 or 2.2 bits exist in these directories when an HDP 2.3 instance + starts up, it will fail to start properly. Because the upgrade framework in + Ambari doesn't yet have a mechanism to say "stop all" before starting to + upgrade each component, we need to rely on a Storm trick to bring down + running daemons. By removing the ZooKeeper data with running daemons, those + daemons will die. + """ + + def delete_storm_zookeeper_data(self, env): + """ + Deletes the Storm data from ZooKeeper, effectively bringing down all + Storm daemons. + :return: + """ + import params + + Logger.info('Clearing Storm data from ZooKeeper') + + storm_zookeeper_root_dir = params.storm_zookeeper_root_dir + if storm_zookeeper_root_dir is None: + raise Fail("The storm ZooKeeper directory specified by storm-site/storm.zookeeper.root must be specified") + + # The zookeeper client must be given a zookeeper host to contact. Guaranteed to have at least one host. + storm_zookeeper_server_list = yaml_utils.get_values_from_yaml_array(params.storm_zookeeper_servers) + if storm_zookeeper_server_list is None: + Logger.info("Unable to extract ZooKeeper hosts from '{0}', assuming localhost").format(params.storm_zookeeper_servers) + storm_zookeeper_server_list = ["localhost"] + + # For every zk server, try to remove /storm + zookeeper_data_cleared = False + for storm_zookeeper_server in storm_zookeeper_server_list: + # Determine where the zkCli.sh shell script is + zk_command_location = os.path.join(params.stack_root, "current", "zookeeper-client", "bin", "zkCli.sh") + if params.version is not None: + zk_command_location = os.path.join(params.stack_root, params.version, "zookeeper", "bin", "zkCli.sh") + + # create the ZooKeeper delete command + command = "{0} -server {1}:{2} rmr /storm".format( + zk_command_location, storm_zookeeper_server, params.storm_zookeeper_port) + + # clean out ZK + try: + # the ZK client requires Java to run; ensure it's on the path + env_map = { + 'JAVA_HOME': params.java64_home + } + + # AMBARI-12094: if security is enabled, then we need to tell zookeeper where the + # JAAS file is located since we don't use kinit directly with STORM + if params.security_enabled: + env_map['JVMFLAGS'] = "-Djava.security.auth.login.config={0}".format(params.storm_jaas_file) + + Execute(command, user=params.storm_user, environment=env_map, + logoutput=True, tries=1) + + zookeeper_data_cleared = True + break + except: + # the command failed, try a different ZK server + pass + + # fail if the ZK data could not be cleared + if not zookeeper_data_cleared: + raise Fail("Unable to clear ZooKeeper Storm data on any of the following ZooKeeper hosts: {0}".format( + storm_zookeeper_server_list)) + + + def delete_storm_local_data(self, env): + """ + Deletes Storm data from local directories. This will create a marker file + with JSON data representing the upgrade stack and request/stage ID. This + will prevent multiple Storm components on the same host from removing + the local directories more than once. + :return: + """ + import params + + Logger.info('Clearing Storm data from local directories...') + + storm_local_directory = params.local_dir + if storm_local_directory is None: + raise Fail("The storm local directory specified by storm-site/storm.local.dir must be specified") + + request_id = default("/requestId", None) + + stack_name = params.stack_name + stack_version = params.version + upgrade_direction = params.upgrade_direction + + json_map = {} + json_map["requestId"] = request_id + json_map["stackName"] = stack_name + json_map["stackVersion"] = stack_version + json_map["direction"] = upgrade_direction + + temp_directory = params.tmp_dir + marker_file = os.path.join(temp_directory, "storm-upgrade-{0}.json".format(stack_version)) + Logger.info("Marker file for upgrade/downgrade of Storm, {0}".format(marker_file)) + + if os.path.exists(marker_file): + Logger.info("The marker file exists.") + try: + with open(marker_file) as file_pointer: + existing_json_map = json.load(file_pointer) + + if cmp(json_map, existing_json_map) == 0: + Logger.info("The storm upgrade has already removed the local directories for {0}-{1} for " + "request {2} and direction {3}. Nothing else to do.".format(stack_name, stack_version, request_id, upgrade_direction)) + + # Nothing else to do here for this as it appears to have already been + # removed by another component being upgraded + return + else: + Logger.info("The marker file differs from the new value. Will proceed to delete Storm local dir, " + "and generate new file. Current marker file: {0}".format(str(existing_json_map))) + except Exception, e: + Logger.error("The marker file {0} appears to be corrupt; removing it. Error: {1}".format(marker_file, str(e))) + File(marker_file, action="delete") + else: + Logger.info('The marker file {0} does not exist; will attempt to delete local Storm directory if it exists.'.format(marker_file)) + + # Delete from local directory + if os.path.isdir(storm_local_directory): + Logger.info("Deleting storm local directory, {0}".format(storm_local_directory)) + Directory(storm_local_directory, action="delete", create_parents = True) + + # Recreate storm local directory + Logger.info("Recreating storm local directory, {0}".format(storm_local_directory)) + Directory(storm_local_directory, mode=0755, owner=params.storm_user, + group=params.user_group, create_parents = True) + + # The file doesn't exist, so create it + Logger.info("Saving marker file to {0} with contents: {1}".format(marker_file, str(json_map))) + with open(marker_file, 'w') as file_pointer: + json.dump(json_map, file_pointer, indent=2) + +if __name__ == "__main__": + StormUpgrade().execute() \ No newline at end of file http://git-wip-us.apache.org/repos/asf/ambari/blob/6ab4d28a/ambari-server/src/main/resources/common-services/STORM/1.0.1.3.0/package/scripts/storm_yaml_utils.py ---------------------------------------------------------------------- diff --git a/ambari-server/src/main/resources/common-services/STORM/1.0.1.3.0/package/scripts/storm_yaml_utils.py b/ambari-server/src/main/resources/common-services/STORM/1.0.1.3.0/package/scripts/storm_yaml_utils.py new file mode 100644 index 0000000..9d78e71 --- /dev/null +++ b/ambari-server/src/main/resources/common-services/STORM/1.0.1.3.0/package/scripts/storm_yaml_utils.py @@ -0,0 +1,53 @@ +#!/usr/bin/env python +""" +Licensed to the Apache Software Foundation (ASF) under one +or more contributor license agreements. See the NOTICE file +distributed with this work for additional information +regarding copyright ownership. The ASF licenses this file +to you under the Apache License, Version 2.0 (the +"License"); you may not use this file except in compliance +with the License. You may obtain a copy of the License at + + http://www.apache.org/licenses/LICENSE-2.0 + +Unless required by applicable law or agreed to in writing, software +distributed under the License is distributed on an "AS IS" BASIS, +WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +See the License for the specific language governing permissions and +limitations under the License. + +""" +import os +import resource_management + +from ambari_commons.yaml_utils import escape_yaml_property +from resource_management.core.source import InlineTemplate +from resource_management.core.resources.system import File + +def replace_jaas_placeholder(name, security_enabled, conf_dir): + if name.find('_JAAS_PLACEHOLDER') > -1: + if security_enabled: + return name.replace('_JAAS_PLACEHOLDER', '-Djava.security.auth.login.config=' + conf_dir + '/storm_jaas.conf') + else: + return name.replace('_JAAS_PLACEHOLDER', '') + else: + return name + +storm_yaml_template = """{% for key, value in configurations|dictsort if not key.startswith('_') %}{{key}} : {{ escape_yaml_property(replace_jaas_placeholder(resource_management.core.source.InlineTemplate(value).get_content().strip(), security_enabled, conf_dir)) }} +{% endfor %}""" + +def yaml_config_template(configurations): + return InlineTemplate(storm_yaml_template, configurations=configurations, + extra_imports=[escape_yaml_property, replace_jaas_placeholder, resource_management, + resource_management.core, resource_management.core.source]) + +def yaml_config(filename, configurations = None, conf_dir = None, owner = None, group = None): + import params + config_content = InlineTemplate('''{% for key, value in configurations_dict|dictsort %}{{ key }}: {{ escape_yaml_property(resource_management.core.source.InlineTemplate(value).get_content()) }} +{% endfor %}''', configurations_dict=configurations, extra_imports=[escape_yaml_property, resource_management, resource_management.core, resource_management.core.source]) + + File (os.path.join(params.conf_dir, filename), + content = config_content, + owner = owner, + mode = "f" + ) http://git-wip-us.apache.org/repos/asf/ambari/blob/6ab4d28a/ambari-server/src/main/resources/common-services/STORM/1.0.1.3.0/package/scripts/supervisor.py ---------------------------------------------------------------------- diff --git a/ambari-server/src/main/resources/common-services/STORM/1.0.1.3.0/package/scripts/supervisor.py b/ambari-server/src/main/resources/common-services/STORM/1.0.1.3.0/package/scripts/supervisor.py new file mode 100644 index 0000000..ec3f533 --- /dev/null +++ b/ambari-server/src/main/resources/common-services/STORM/1.0.1.3.0/package/scripts/supervisor.py @@ -0,0 +1,117 @@ +#!/usr/bin/env python +""" +Licensed to the Apache Software Foundation (ASF) under one +or more contributor license agreements. See the NOTICE file +distributed with this work for additional information +regarding copyright ownership. The ASF licenses this file +to you under the Apache License, Version 2.0 (the +"License"); you may not use this file except in compliance +with the License. You may obtain a copy of the License at + + http://www.apache.org/licenses/LICENSE-2.0 + +Unless required by applicable law or agreed to in writing, software +distributed under the License is distributed on an "AS IS" BASIS, +WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +See the License for the specific language governing permissions and +limitations under the License. + +""" + +import sys +from resource_management.libraries.functions import check_process_status +from resource_management.libraries.script import Script +from resource_management.libraries.functions import conf_select +from resource_management.libraries.functions import stack_select +from resource_management.libraries.functions import format +from resource_management.core.resources.system import Execute +from resource_management.libraries.functions.stack_features import check_stack_feature +from resource_management.libraries.functions import StackFeature +from storm import storm +from service import service +from ambari_commons import OSConst +from ambari_commons.os_family_impl import OsFamilyImpl +from resource_management.core.resources.service import Service + + +class Supervisor(Script): + def get_component_name(self): + return "storm-supervisor" + + def install(self, env): + self.install_packages(env) + self.configure(env) + + def configure(self, env): + import params + env.set_params(params) + storm("supervisor") + + +@OsFamilyImpl(os_family=OSConst.WINSRV_FAMILY) +class SupervisorWindows(Supervisor): + def start(self, env): + import status_params + env.set_params(status_params) + self.configure(env) + Service(status_params.supervisor_win_service_name, action="start") + + def stop(self, env): + import status_params + env.set_params(status_params) + Service(status_params.supervisor_win_service_name, action="stop") + + def status(self, env): + import status_params + from resource_management.libraries.functions.windows_service_utils import check_windows_service_status + env.set_params(status_params) + check_windows_service_status(status_params.supervisor_win_service_name) + + +@OsFamilyImpl(os_family=OsFamilyImpl.DEFAULT) +class SupervisorDefault(Supervisor): + + def pre_upgrade_restart(self, env, upgrade_type=None): + import params + env.set_params(params) + + if params.version and check_stack_feature(StackFeature.ROLLING_UPGRADE, params.version): + conf_select.select(params.stack_name, "storm", params.version) + stack_select.select("storm-client", params.version) + stack_select.select("storm-supervisor", params.version) + + def start(self, env, upgrade_type=None): + import params + env.set_params(params) + self.configure(env) + + service("supervisor", action="start") + service("logviewer", action="start") + + def stop(self, env, upgrade_type=None): + import params + env.set_params(params) + + service("supervisor", action="stop") + service("logviewer", action="stop") + + def status(self, env): + import status_params + env.set_params(status_params) + check_process_status(status_params.pid_supervisor) + + def get_log_folder(self): + import params + return params.log_dir + + def get_user(self): + import params + return params.storm_user + + def get_pid_files(self): + import status_params + return [status_params.pid_supervisor] + +if __name__ == "__main__": + Supervisor().execute() + http://git-wip-us.apache.org/repos/asf/ambari/blob/6ab4d28a/ambari-server/src/main/resources/common-services/STORM/1.0.1.3.0/package/scripts/supervisor_prod.py ---------------------------------------------------------------------- diff --git a/ambari-server/src/main/resources/common-services/STORM/1.0.1.3.0/package/scripts/supervisor_prod.py b/ambari-server/src/main/resources/common-services/STORM/1.0.1.3.0/package/scripts/supervisor_prod.py new file mode 100644 index 0000000..d6c3545 --- /dev/null +++ b/ambari-server/src/main/resources/common-services/STORM/1.0.1.3.0/package/scripts/supervisor_prod.py @@ -0,0 +1,84 @@ +#!/usr/bin/env python +""" +Licensed to the Apache Software Foundation (ASF) under one +or more contributor license agreements. See the NOTICE file +distributed with this work for additional information +regarding copyright ownership. The ASF licenses this file +to you under the Apache License, Version 2.0 (the +"License"); you may not use this file except in compliance +with the License. You may obtain a copy of the License at + + http://www.apache.org/licenses/LICENSE-2.0 + +Unless required by applicable law or agreed to in writing, software +distributed under the License is distributed on an "AS IS" BASIS, +WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +See the License for the specific language governing permissions and +limitations under the License. + +""" + +import sys +from storm import storm +from service import service +from supervisord_service import supervisord_service, supervisord_check_status +from resource_management.libraries.script import Script +from resource_management.libraries.functions import conf_select +from resource_management.libraries.functions import stack_select +from resource_management.libraries.functions import format +from resource_management.core.resources.system import Execute +from resource_management.libraries.functions.stack_features import check_stack_feature +from resource_management.libraries.functions import StackFeature + + +class Supervisor(Script): + + def get_component_name(self): + return "storm-supervisor" + + def install(self, env): + self.install_packages(env) + self.configure(env) + + def configure(self, env): + import params + env.set_params(params) + storm() + + def pre_upgrade_restart(self, env, upgrade_type=None): + import params + env.set_params(params) + + if params.version and check_stack_feature(StackFeature.ROLLING_UPGRADE, params.version): + conf_select.select(params.stack_name, "storm", params.version) + stack_select.select("storm-client", params.version) + stack_select.select("storm-supervisor", params.version) + + def start(self, env, upgrade_type=None): + import params + env.set_params(params) + self.configure(env) + + supervisord_service("supervisor", action="start") + service("logviewer", action="start") + + def stop(self, env, upgrade_type=None): + import params + env.set_params(params) + + supervisord_service("supervisor", action="stop") + service("logviewer", action="stop") + + def status(self, env): + supervisord_check_status("supervisor") + + def get_log_folder(self): + import params + return params.log_dir + + def get_user(self): + import params + return params.storm_user + +if __name__ == "__main__": + Supervisor().execute() http://git-wip-us.apache.org/repos/asf/ambari/blob/6ab4d28a/ambari-server/src/main/resources/common-services/STORM/1.0.1.3.0/package/scripts/supervisord_service.py ---------------------------------------------------------------------- diff --git a/ambari-server/src/main/resources/common-services/STORM/1.0.1.3.0/package/scripts/supervisord_service.py b/ambari-server/src/main/resources/common-services/STORM/1.0.1.3.0/package/scripts/supervisord_service.py new file mode 100644 index 0000000..6ff9f9c --- /dev/null +++ b/ambari-server/src/main/resources/common-services/STORM/1.0.1.3.0/package/scripts/supervisord_service.py @@ -0,0 +1,33 @@ +#!/usr/bin/env python +""" +Licensed to the Apache Software Foundation (ASF) under one +or more contributor license agreements. See the NOTICE file +distributed with this work for additional information +regarding copyright ownership. The ASF licenses this file +to you under the Apache License, Version 2.0 (the +"License"); you may not use this file except in compliance +with the License. You may obtain a copy of the License at + + http://www.apache.org/licenses/LICENSE-2.0 + +Unless required by applicable law or agreed to in writing, software +distributed under the License is distributed on an "AS IS" BASIS, +WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +See the License for the specific language governing permissions and +limitations under the License. + +""" + +from resource_management.core.resources.system import Execute +from resource_management.libraries.functions.format import format + +def supervisord_service(component_name, action): + Execute(format("supervisorctl {action} storm-{component_name}"), + wait_for_finish=False + ) + +def supervisord_check_status(component_name): + try: + Execute(format("supervisorctl status storm-{component_name} | grep RUNNING")) + except Fail: + raise ComponentIsNotRunning() http://git-wip-us.apache.org/repos/asf/ambari/blob/6ab4d28a/ambari-server/src/main/resources/common-services/STORM/1.0.1.3.0/package/scripts/ui_server.py ---------------------------------------------------------------------- diff --git a/ambari-server/src/main/resources/common-services/STORM/1.0.1.3.0/package/scripts/ui_server.py b/ambari-server/src/main/resources/common-services/STORM/1.0.1.3.0/package/scripts/ui_server.py new file mode 100644 index 0000000..e257ef9 --- /dev/null +++ b/ambari-server/src/main/resources/common-services/STORM/1.0.1.3.0/package/scripts/ui_server.py @@ -0,0 +1,137 @@ +#!/usr/bin/env python +""" +Licensed to the Apache Software Foundation (ASF) under one +or more contributor license agreements. See the NOTICE file +distributed with this work for additional information +regarding copyright ownership. The ASF licenses this file +to you under the Apache License, Version 2.0 (the +"License"); you may not use this file except in compliance +with the License. You may obtain a copy of the License at + + http://www.apache.org/licenses/LICENSE-2.0 + +Unless required by applicable law or agreed to in writing, software +distributed under the License is distributed on an "AS IS" BASIS, +WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +See the License for the specific language governing permissions and +limitations under the License. + +""" + +import sys +from storm import storm +from service import service +from service_check import ServiceCheck +from resource_management.libraries.functions import check_process_status +from resource_management.libraries.script import Script +from resource_management.libraries.functions import conf_select +from resource_management.libraries.functions import stack_select +from resource_management.libraries.functions import format +from resource_management.core.resources.system import Link +from resource_management.core.resources.system import Execute +from resource_management.libraries.functions.stack_features import check_stack_feature +from resource_management.libraries.functions import StackFeature +from resource_management.libraries.functions.security_commons import build_expectations, \ + cached_kinit_executor, get_params_from_filesystem, validate_security_config_properties, \ + FILE_TYPE_JAAS_CONF +from setup_ranger_storm import setup_ranger_storm +from ambari_commons import OSConst +from ambari_commons.os_family_impl import OsFamilyImpl +from resource_management.core.resources.service import Service + + +class UiServer(Script): + + def get_component_name(self): + return "storm-client" + + def install(self, env): + self.install_packages(env) + self.configure(env) + + def configure(self, env): + import params + env.set_params(params) + storm("ui") + +@OsFamilyImpl(os_family=OSConst.WINSRV_FAMILY) +class UiServerWindows(UiServer): + def start(self, env): + import status_params + env.set_params(status_params) + self.configure(env) + Service(status_params.ui_win_service_name, action="start") + + def stop(self, env): + import status_params + env.set_params(status_params) + Service(status_params.ui_win_service_name, action="stop") + + def status(self, env): + import status_params + env.set_params(status_params) + from resource_management.libraries.functions.windows_service_utils import check_windows_service_status + check_windows_service_status(status_params.ui_win_service_name) + + +@OsFamilyImpl(os_family=OsFamilyImpl.DEFAULT) +class UiServerDefault(UiServer): + + def pre_upgrade_restart(self, env, upgrade_type=None): + import params + env.set_params(params) + if params.version and check_stack_feature(StackFeature.ROLLING_UPGRADE, params.version): + conf_select.select(params.stack_name, "storm", params.version) + stack_select.select("storm-client", params.version) + + def link_metrics_sink_jar(self): + import params + # Add storm metrics reporter JAR to storm-ui-server classpath. + # Remove symlinks. They can be there, if you doing upgrade from HDP < 2.2 to HDP >= 2.2 + Link(format("{storm_lib_dir}/ambari-metrics-storm-sink.jar"), + action="delete") + # On old HDP 2.1 versions, this symlink may also exist and break EU to newer versions + Link("/usr/lib/storm/lib/ambari-metrics-storm-sink.jar", action="delete") + + if check_stack_feature(StackFeature.STORM_METRICS_APACHE_CLASSES, params.version_for_stack_feature_checks): + sink_jar = params.metric_collector_sink_jar + else: + sink_jar = params.metric_collector_legacy_sink_jar + + Execute(format("{sudo} ln -s {sink_jar} {storm_lib_dir}/ambari-metrics-storm-sink.jar"), + not_if=format("ls {storm_lib_dir}/ambari-metrics-storm-sink.jar"), + only_if=format("ls {sink_jar}") + ) + + def start(self, env, upgrade_type=None): + import params + env.set_params(params) + self.configure(env) + self.link_metrics_sink_jar() + setup_ranger_storm(upgrade_type=upgrade_type) + service("ui", action="start") + + def stop(self, env, upgrade_type=None): + import params + env.set_params(params) + service("ui", action="stop") + + def status(self, env): + import status_params + env.set_params(status_params) + check_process_status(status_params.pid_ui) + + def get_log_folder(self): + import params + return params.log_dir + + def get_user(self): + import params + return params.storm_user + + def get_pid_files(self): + import status_params + return [status_params.pid_ui] + +if __name__ == "__main__": + UiServer().execute() http://git-wip-us.apache.org/repos/asf/ambari/blob/6ab4d28a/ambari-server/src/main/resources/common-services/STORM/1.0.1.3.0/package/templates/client_jaas.conf.j2 ---------------------------------------------------------------------- diff --git a/ambari-server/src/main/resources/common-services/STORM/1.0.1.3.0/package/templates/client_jaas.conf.j2 b/ambari-server/src/main/resources/common-services/STORM/1.0.1.3.0/package/templates/client_jaas.conf.j2 new file mode 100644 index 0000000..b061cd1 --- /dev/null +++ b/ambari-server/src/main/resources/common-services/STORM/1.0.1.3.0/package/templates/client_jaas.conf.j2 @@ -0,0 +1,33 @@ +{# +# Licensed to the Apache Software Foundation (ASF) under one +# or more contributor license agreements. See the NOTICE file +# distributed with this work for additional information +# regarding copyright ownership. The ASF licenses this file +# to you under the Apache License, Version 2.0 (the +# "License"); you may not use this file except in compliance +# with the License. You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. +#} + +StormClient { + com.sun.security.auth.module.Krb5LoginModule required + useTicketCache=true + renewTicket=true + serviceName="{{nimbus_bare_jaas_principal}}"; +}; + +{% if kafka_bare_jaas_principal %} +KafkaClient { + com.sun.security.auth.module.Krb5LoginModule required + useTicketCache=true + renewTicket=true + serviceName="{{kafka_bare_jaas_principal}}"; +}; +{% endif %}
