Repository: ambari Updated Branches: refs/heads/trunk 1aad067cf -> 3817ad5da
http://git-wip-us.apache.org/repos/asf/ambari/blob/3817ad5d/ambari-server/src/main/resources/common-services/KAFKA/0.10.0.3.0/package/scripts/kafka.py ---------------------------------------------------------------------- diff --git a/ambari-server/src/main/resources/common-services/KAFKA/0.10.0.3.0/package/scripts/kafka.py b/ambari-server/src/main/resources/common-services/KAFKA/0.10.0.3.0/package/scripts/kafka.py new file mode 100644 index 0000000..680dd32 --- /dev/null +++ b/ambari-server/src/main/resources/common-services/KAFKA/0.10.0.3.0/package/scripts/kafka.py @@ -0,0 +1,276 @@ +#!/usr/bin/env python +""" +Licensed to the Apache Software Foundation (ASF) under one +or more contributor license agreements. See the NOTICE file +distributed with this work for additional information +regarding copyright ownership. The ASF licenses this file +to you under the Apache License, Version 2.0 (the +"License"); you may not use this file except in compliance +with the License. You may obtain a copy of the License at + + http://www.apache.org/licenses/LICENSE-2.0 + +Unless required by applicable law or agreed to in writing, software +distributed under the License is distributed on an "AS IS" BASIS, +WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +See the License for the specific language governing permissions and +limitations under the License. + +""" +import collections +import os + +from resource_management.libraries.functions.version import format_stack_version +from resource_management.libraries.resources.properties_file import PropertiesFile +from resource_management.libraries.resources.template_config import TemplateConfig +from resource_management.core.resources.system import Directory, Execute, File, Link +from resource_management.core.source import StaticFile, Template, InlineTemplate +from resource_management.libraries.functions import format +from resource_management.libraries.functions.stack_features import check_stack_feature +from resource_management.libraries.functions import StackFeature +from resource_management.libraries.functions import Direction + + +from resource_management.core.logger import Logger + + +def kafka(upgrade_type=None): + import params + ensure_base_directories() + + kafka_server_config = mutable_config_dict(params.config['configurations']['kafka-broker']) + # This still has an issue of hostnames being alphabetically out-of-order for broker.id in HDP-2.2. + # Starting in HDP 2.3, Kafka handles the generation of broker.id so Ambari doesn't have to. + + effective_version = params.stack_version_formatted if upgrade_type is None else format_stack_version(params.version) + Logger.info(format("Effective stack version: {effective_version}")) + + # In HDP-2.2 (Apache Kafka 0.8.1.1) we used to generate broker.ids based on hosts and add them to + # kafka's server.properties. In future version brokers can generate their own ids based on zookeeper seq + # We need to preserve the broker.id when user is upgrading from HDP-2.2 to any higher version. + # Once its preserved it will be written to kafka.log.dirs/meta.properties and it will be used from there on + # similarly we need preserve port as well during the upgrade + + if upgrade_type is not None and params.upgrade_direction == Direction.UPGRADE and \ + check_stack_feature(StackFeature.CREATE_KAFKA_BROKER_ID, params.current_version) and \ + check_stack_feature(StackFeature.KAFKA_LISTENERS, params.version): + if len(params.kafka_hosts) > 0 and params.hostname in params.kafka_hosts: + brokerid = str(sorted(params.kafka_hosts).index(params.hostname)) + kafka_server_config['broker.id'] = brokerid + Logger.info(format("Calculating broker.id as {brokerid}")) + if 'port' in kafka_server_config: + port = kafka_server_config['port'] + Logger.info(format("Port config from previous verson: {port}")) + listeners = kafka_server_config['listeners'] + kafka_server_config['listeners'] = listeners.replace("6667", port) + Logger.info(format("Kafka listeners after the port update: {listeners}")) + del kafka_server_config['port'] + + + if effective_version is not None and effective_version != "" and \ + check_stack_feature(StackFeature.CREATE_KAFKA_BROKER_ID, effective_version): + if len(params.kafka_hosts) > 0 and params.hostname in params.kafka_hosts: + brokerid = str(sorted(params.kafka_hosts).index(params.hostname)) + kafka_server_config['broker.id'] = brokerid + Logger.info(format("Calculating broker.id as {brokerid}")) + + # listeners and advertised.listeners are only added in 2.3.0.0 onwards. + if effective_version is not None and effective_version != "" and \ + check_stack_feature(StackFeature.KAFKA_LISTENERS, effective_version): + + listeners = kafka_server_config['listeners'].replace("localhost", params.hostname) + Logger.info(format("Kafka listeners: {listeners}")) + kafka_server_config['listeners'] = listeners + + if params.security_enabled and params.kafka_kerberos_enabled: + Logger.info("Kafka kerberos security is enabled.") + kafka_server_config['advertised.listeners'] = listeners + Logger.info(format("Kafka advertised listeners: {listeners}")) + elif 'advertised.listeners' in kafka_server_config: + advertised_listeners = kafka_server_config['advertised.listeners'].replace("localhost", params.hostname) + kafka_server_config['advertised.listeners'] = advertised_listeners + Logger.info(format("Kafka advertised listeners: {advertised_listeners}")) + else: + kafka_server_config['host.name'] = params.hostname + + if params.has_metric_collector: + kafka_server_config['kafka.timeline.metrics.hosts'] = params.ams_collector_hosts + kafka_server_config['kafka.timeline.metrics.port'] = params.metric_collector_port + kafka_server_config['kafka.timeline.metrics.protocol'] = params.metric_collector_protocol + kafka_server_config['kafka.timeline.metrics.truststore.path'] = params.metric_truststore_path + kafka_server_config['kafka.timeline.metrics.truststore.type'] = params.metric_truststore_type + kafka_server_config['kafka.timeline.metrics.truststore.password'] = params.metric_truststore_password + + kafka_data_dir = kafka_server_config['log.dirs'] + kafka_data_dirs = filter(None, kafka_data_dir.split(",")) + Directory(kafka_data_dirs, + mode=0755, + cd_access='a', + owner=params.kafka_user, + group=params.user_group, + create_parents = True, + recursive_ownership = True, + ) + + PropertiesFile("server.properties", + dir=params.conf_dir, + properties=kafka_server_config, + owner=params.kafka_user, + group=params.user_group, + ) + + File(format("{conf_dir}/kafka-env.sh"), + owner=params.kafka_user, + content=InlineTemplate(params.kafka_env_sh_template) + ) + + if (params.log4j_props != None): + File(format("{conf_dir}/log4j.properties"), + mode=0644, + group=params.user_group, + owner=params.kafka_user, + content=InlineTemplate(params.log4j_props) + ) + + if params.security_enabled and params.kafka_kerberos_enabled: + if params.kafka_jaas_conf_template: + File(format("{conf_dir}/kafka_jaas.conf"), + owner=params.kafka_user, + content=InlineTemplate(params.kafka_jaas_conf_template) + ) + else: + TemplateConfig(format("{conf_dir}/kafka_jaas.conf"), + owner=params.kafka_user) + + if params.kafka_client_jaas_conf_template: + File(format("{conf_dir}/kafka_client_jaas.conf"), + owner=params.kafka_user, + content=InlineTemplate(params.kafka_client_jaas_conf_template) + ) + else: + TemplateConfig(format("{conf_dir}/kafka_client_jaas.conf"), + owner=params.kafka_user) + + # On some OS this folder could be not exists, so we will create it before pushing there files + Directory(params.limits_conf_dir, + create_parents = True, + owner='root', + group='root' + ) + + File(os.path.join(params.limits_conf_dir, 'kafka.conf'), + owner='root', + group='root', + mode=0644, + content=Template("kafka.conf.j2") + ) + + File(os.path.join(params.conf_dir, 'tools-log4j.properties'), + owner='root', + group='root', + mode=0644, + content=Template("tools-log4j.properties.j2") + ) + + setup_symlink(params.kafka_managed_pid_dir, params.kafka_pid_dir) + setup_symlink(params.kafka_managed_log_dir, params.kafka_log_dir) + + +def mutable_config_dict(kafka_broker_config): + kafka_server_config = {} + for key, value in kafka_broker_config.iteritems(): + kafka_server_config[key] = value + return kafka_server_config + + +# Used to workaround the hardcoded pid/log dir used on the kafka bash process launcher +def setup_symlink(kafka_managed_dir, kafka_ambari_managed_dir): + import params + backup_folder_path = None + backup_folder_suffix = "_tmp" + if kafka_ambari_managed_dir != kafka_managed_dir: + if os.path.exists(kafka_managed_dir) and not os.path.islink(kafka_managed_dir): + + # Backup existing data before delete if config is changed repeatedly to/from default location at any point in time time, as there may be relevant contents (historic logs) + backup_folder_path = backup_dir_contents(kafka_managed_dir, backup_folder_suffix) + + Directory(kafka_managed_dir, + action="delete", + create_parents = True) + + elif os.path.islink(kafka_managed_dir) and os.path.realpath(kafka_managed_dir) != kafka_ambari_managed_dir: + Link(kafka_managed_dir, + action="delete") + + if not os.path.islink(kafka_managed_dir): + Link(kafka_managed_dir, + to=kafka_ambari_managed_dir) + + elif os.path.islink(kafka_managed_dir): # If config is changed and coincides with the kafka managed dir, remove the symlink and physically create the folder + Link(kafka_managed_dir, + action="delete") + + Directory(kafka_managed_dir, + mode=0755, + cd_access='a', + owner=params.kafka_user, + group=params.user_group, + create_parents = True, + recursive_ownership = True, + ) + + if backup_folder_path: + # Restore backed up files to current relevant dirs if needed - will be triggered only when changing to/from default path; + for file in os.listdir(backup_folder_path): + if os.path.isdir(os.path.join(backup_folder_path, file)): + Execute(('cp', '-r', os.path.join(backup_folder_path, file), kafka_managed_dir), + sudo=True) + Execute(("chown", "-R", format("{kafka_user}:{user_group}"), os.path.join(kafka_managed_dir, file)), + sudo=True) + else: + File(os.path.join(kafka_managed_dir,file), + owner=params.kafka_user, + content = StaticFile(os.path.join(backup_folder_path,file))) + + # Clean up backed up folder + Directory(backup_folder_path, + action="delete", + create_parents = True) + + +# Uses agent temp dir to store backup files +def backup_dir_contents(dir_path, backup_folder_suffix): + import params + backup_destination_path = params.tmp_dir + os.path.normpath(dir_path)+backup_folder_suffix + Directory(backup_destination_path, + mode=0755, + cd_access='a', + owner=params.kafka_user, + group=params.user_group, + create_parents = True, + recursive_ownership = True, + ) + # Safely copy top-level contents to backup folder + for file in os.listdir(dir_path): + if os.path.isdir(os.path.join(dir_path, file)): + Execute(('cp', '-r', os.path.join(dir_path, file), backup_destination_path), + sudo=True) + Execute(("chown", "-R", format("{kafka_user}:{user_group}"), os.path.join(backup_destination_path, file)), + sudo=True) + else: + File(os.path.join(backup_destination_path, file), + owner=params.kafka_user, + content = StaticFile(os.path.join(dir_path,file))) + + return backup_destination_path + +def ensure_base_directories(): + import params + Directory([params.kafka_log_dir, params.kafka_pid_dir, params.conf_dir], + mode=0755, + cd_access='a', + owner=params.kafka_user, + group=params.user_group, + create_parents = True, + recursive_ownership = True, + ) http://git-wip-us.apache.org/repos/asf/ambari/blob/3817ad5d/ambari-server/src/main/resources/common-services/KAFKA/0.10.0.3.0/package/scripts/kafka_broker.py ---------------------------------------------------------------------- diff --git a/ambari-server/src/main/resources/common-services/KAFKA/0.10.0.3.0/package/scripts/kafka_broker.py b/ambari-server/src/main/resources/common-services/KAFKA/0.10.0.3.0/package/scripts/kafka_broker.py new file mode 100644 index 0000000..81715f9 --- /dev/null +++ b/ambari-server/src/main/resources/common-services/KAFKA/0.10.0.3.0/package/scripts/kafka_broker.py @@ -0,0 +1,151 @@ +""" +Licensed to the Apache Software Foundation (ASF) under one +or more contributor license agreements. See the NOTICE file +distributed with this work for additional information +regarding copyright ownership. The ASF licenses this file +to you under the Apache License, Version 2.0 (the +"License"); you may not use this file except in compliance +with the License. You may obtain a copy of the License at + + http://www.apache.org/licenses/LICENSE-2.0 + +Unless required by applicable law or agreed to in writing, software +distributed under the License is distributed on an "AS IS" BASIS, +WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +See the License for the specific language governing permissions and +limitations under the License. + +""" +from resource_management import Script +from resource_management.core.logger import Logger +from resource_management.core.resources.system import Execute, File, Directory +from resource_management.libraries.functions import conf_select +from resource_management.libraries.functions import stack_select +from resource_management.libraries.functions import Direction +from resource_management.libraries.functions.version import format_stack_version +from resource_management.libraries.functions.format import format +from resource_management.libraries.functions.check_process_status import check_process_status +from resource_management.libraries.functions import StackFeature +from resource_management.libraries.functions.stack_features import check_stack_feature +from resource_management.libraries.functions.show_logs import show_logs +from kafka import ensure_base_directories + +import upgrade +from kafka import kafka +from setup_ranger_kafka import setup_ranger_kafka + +class KafkaBroker(Script): + + def get_component_name(self): + return "kafka-broker" + + def install(self, env): + self.install_packages(env) + + def configure(self, env, upgrade_type=None): + import params + env.set_params(params) + kafka(upgrade_type=upgrade_type) + + def pre_upgrade_restart(self, env, upgrade_type=None): + import params + env.set_params(params) + + if params.version and check_stack_feature(StackFeature.ROLLING_UPGRADE, params.version): + stack_select.select("kafka-broker", params.version) + + if params.version and check_stack_feature(StackFeature.CONFIG_VERSIONING, params.version): + conf_select.select(params.stack_name, "kafka", params.version) + + # This is extremely important since it should only be called if crossing the HDP 2.3.4.0 boundary. + if params.current_version and params.version and params.upgrade_direction: + src_version = dst_version = None + if params.upgrade_direction == Direction.UPGRADE: + src_version = format_stack_version(params.current_version) + dst_version = format_stack_version(params.version) + else: + # These represent the original values during the UPGRADE direction + src_version = format_stack_version(params.version) + dst_version = format_stack_version(params.downgrade_from_version) + + if not check_stack_feature(StackFeature.KAFKA_ACL_MIGRATION_SUPPORT, src_version) and check_stack_feature(StackFeature.KAFKA_ACL_MIGRATION_SUPPORT, dst_version): + # Calling the acl migration script requires the configs to be present. + self.configure(env, upgrade_type=upgrade_type) + upgrade.run_migration(env, upgrade_type) + + def start(self, env, upgrade_type=None): + import params + env.set_params(params) + self.configure(env, upgrade_type=upgrade_type) + + if params.security_enabled: + if params.version and check_stack_feature(StackFeature.KAFKA_KERBEROS, params.version): + kafka_kinit_cmd = format("{kinit_path_local} -kt {kafka_keytab_path} {kafka_jaas_principal};") + Execute(kafka_kinit_cmd, user=params.kafka_user) + + if params.is_supported_kafka_ranger: + setup_ranger_kafka() #Ranger Kafka Plugin related call + daemon_cmd = format('source {params.conf_dir}/kafka-env.sh ; {params.kafka_bin} start') + no_op_test = format('ls {params.kafka_pid_file} >/dev/null 2>&1 && ps -p `cat {params.kafka_pid_file}` >/dev/null 2>&1') + try: + Execute(daemon_cmd, + user=params.kafka_user, + not_if=no_op_test + ) + except: + show_logs(params.kafka_log_dir, params.kafka_user) + raise + + def stop(self, env, upgrade_type=None): + import params + env.set_params(params) + # Kafka package scripts change permissions on folders, so we have to + # restore permissions after installing repo version bits + # before attempting to stop Kafka Broker + ensure_base_directories() + daemon_cmd = format('source {params.conf_dir}/kafka-env.sh; {params.kafka_bin} stop') + try: + Execute(daemon_cmd, + user=params.kafka_user, + ) + except: + show_logs(params.kafka_log_dir, params.kafka_user) + raise + File(params.kafka_pid_file, + action = "delete" + ) + + def disable_security(self, env): + import params + if not params.zookeeper_connect: + Logger.info("No zookeeper connection string. Skipping reverting ACL") + return + if not params.secure_acls: + Logger.info("The zookeeper.set.acl is false. Skipping reverting ACL") + return + Execute( + "{0} --zookeeper.connect {1} --zookeeper.acl=unsecure".format(params.kafka_security_migrator, params.zookeeper_connect), \ + user=params.kafka_user, \ + environment={ 'JAVA_HOME': params.java64_home }, \ + logoutput=True, \ + tries=3) + + def status(self, env): + import status_params + env.set_params(status_params) + check_process_status(status_params.kafka_pid_file) + + def get_log_folder(self): + import params + return params.kafka_log_dir + + def get_user(self): + import params + return params.kafka_user + + def get_pid_files(self): + import status_params + return [status_params.kafka_pid_file] + +if __name__ == "__main__": + KafkaBroker().execute() http://git-wip-us.apache.org/repos/asf/ambari/blob/3817ad5d/ambari-server/src/main/resources/common-services/KAFKA/0.10.0.3.0/package/scripts/params.py ---------------------------------------------------------------------- diff --git a/ambari-server/src/main/resources/common-services/KAFKA/0.10.0.3.0/package/scripts/params.py b/ambari-server/src/main/resources/common-services/KAFKA/0.10.0.3.0/package/scripts/params.py new file mode 100644 index 0000000..5b0be54 --- /dev/null +++ b/ambari-server/src/main/resources/common-services/KAFKA/0.10.0.3.0/package/scripts/params.py @@ -0,0 +1,341 @@ +#!/usr/bin/env python +""" +Licensed to the Apache Software Foundation (ASF) under one +or more contributor license agreements. See the NOTICE file +distributed with this work for additional information +regarding copyright ownership. The ASF licenses this file +to you under the Apache License, Version 2.0 (the +"License"); you may not use this file except in compliance +with the License. You may obtain a copy of the License at + + http://www.apache.org/licenses/LICENSE-2.0 + +Unless required by applicable law or agreed to in writing, software +distributed under the License is distributed on an "AS IS" BASIS, +WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +See the License for the specific language governing permissions and +limitations under the License. + +""" +import os +from resource_management.libraries.functions import format +from resource_management.libraries.script.script import Script +from resource_management.libraries.functions.version import format_stack_version +from resource_management.libraries.functions import StackFeature +from resource_management.libraries.functions.stack_features import check_stack_feature +from resource_management.libraries.functions.stack_features import get_stack_feature_version +from resource_management.libraries.functions.default import default +from utils import get_bare_principal +from resource_management.libraries.functions.get_stack_version import get_stack_version +from resource_management.libraries.functions.is_empty import is_empty +import status_params +from resource_management.libraries.resources.hdfs_resource import HdfsResource +from resource_management.libraries.functions import stack_select +from resource_management.libraries.functions import conf_select +from resource_management.libraries.functions import get_kinit_path +from resource_management.libraries.functions.get_not_managed_resources import get_not_managed_resources +from resource_management.libraries.functions.setup_ranger_plugin_xml import get_audit_configs, generate_ranger_service_config + +# server configurations +config = Script.get_config() +tmp_dir = Script.get_tmp_dir() +stack_root = Script.get_stack_root() +stack_name = default("/hostLevelParams/stack_name", None) +retryAble = default("/commandParams/command_retry_enabled", False) + +# Version being upgraded/downgraded to +version = default("/commandParams/version", None) + +# Version that is CURRENT. +current_version = default("/hostLevelParams/current_version", None) + + +stack_version_unformatted = config['hostLevelParams']['stack_version'] +stack_version_formatted = format_stack_version(stack_version_unformatted) +upgrade_direction = default("/commandParams/upgrade_direction", None) + +# get the correct version to use for checking stack features +version_for_stack_feature_checks = get_stack_feature_version(config) + +stack_supports_ranger_kerberos = check_stack_feature(StackFeature.RANGER_KERBEROS_SUPPORT, version_for_stack_feature_checks) +stack_supports_ranger_audit_db = check_stack_feature(StackFeature.RANGER_AUDIT_DB_SUPPORT, version_for_stack_feature_checks) +stack_supports_core_site_for_ranger_plugin = check_stack_feature(StackFeature.CORE_SITE_FOR_RANGER_PLUGINS_SUPPORT, version_for_stack_feature_checks) + +# When downgrading the 'version' and 'current_version' are both pointing to the downgrade-target version +# downgrade_from_version provides the source-version the downgrade is happening from +downgrade_from_version = default("/commandParams/downgrade_from_version", None) + +hostname = config['hostname'] + +# default kafka parameters +kafka_home = '/usr/lib/kafka' +kafka_bin = kafka_home+'/bin/kafka' +conf_dir = "/etc/kafka/conf" +limits_conf_dir = "/etc/security/limits.d" + +# Used while upgrading the stack in a kerberized cluster and running kafka-acls.sh +zookeeper_connect = default("/configurations/kafka-broker/zookeeper.connect", None) + +kafka_user_nofile_limit = default('/configurations/kafka-env/kafka_user_nofile_limit', 128000) +kafka_user_nproc_limit = default('/configurations/kafka-env/kafka_user_nproc_limit', 65536) + +# parameters for 2.2+ +if stack_version_formatted and check_stack_feature(StackFeature.ROLLING_UPGRADE, stack_version_formatted): + kafka_home = os.path.join(stack_root, "current", "kafka-broker") + kafka_bin = os.path.join(kafka_home, "bin", "kafka") + conf_dir = os.path.join(kafka_home, "config") + +kafka_user = config['configurations']['kafka-env']['kafka_user'] +kafka_log_dir = config['configurations']['kafka-env']['kafka_log_dir'] +kafka_pid_dir = status_params.kafka_pid_dir +kafka_pid_file = kafka_pid_dir+"/kafka.pid" +# This is hardcoded on the kafka bash process lifecycle on which we have no control over +kafka_managed_pid_dir = "/var/run/kafka" +kafka_managed_log_dir = "/var/log/kafka" +user_group = config['configurations']['cluster-env']['user_group'] +java64_home = config['hostLevelParams']['java_home'] +kafka_env_sh_template = config['configurations']['kafka-env']['content'] +kafka_jaas_conf_template = default("/configurations/kafka_jaas_conf/content", None) +kafka_client_jaas_conf_template = default("/configurations/kafka_client_jaas_conf/content", None) +kafka_hosts = config['clusterHostInfo']['kafka_broker_hosts'] +kafka_hosts.sort() + +zookeeper_hosts = config['clusterHostInfo']['zookeeper_hosts'] +zookeeper_hosts.sort() +secure_acls = default("/configurations/kafka-broker/zookeeper.set.acl", False) +kafka_security_migrator = os.path.join(kafka_home, "bin", "zookeeper-security-migration.sh") + +#Kafka log4j +kafka_log_maxfilesize = default('/configurations/kafka-log4j/kafka_log_maxfilesize',256) +kafka_log_maxbackupindex = default('/configurations/kafka-log4j/kafka_log_maxbackupindex',20) +controller_log_maxfilesize = default('/configurations/kafka-log4j/controller_log_maxfilesize',256) +controller_log_maxbackupindex = default('/configurations/kafka-log4j/controller_log_maxbackupindex',20) + +if (('kafka-log4j' in config['configurations']) and ('content' in config['configurations']['kafka-log4j'])): + log4j_props = config['configurations']['kafka-log4j']['content'] +else: + log4j_props = None + +if 'ganglia_server_host' in config['clusterHostInfo'] and \ + len(config['clusterHostInfo']['ganglia_server_host'])>0: + ganglia_installed = True + ganglia_server = config['clusterHostInfo']['ganglia_server_host'][0] + ganglia_report_interval = 60 +else: + ganglia_installed = False + +metric_collector_port = "" +metric_collector_protocol = "" +metric_truststore_path= default("/configurations/ams-ssl-client/ssl.client.truststore.location", "") +metric_truststore_type= default("/configurations/ams-ssl-client/ssl.client.truststore.type", "") +metric_truststore_password= default("/configurations/ams-ssl-client/ssl.client.truststore.password", "") + +set_instanceId = "false" +cluster_name = config["clusterName"] + +if 'cluster-env' in config['configurations'] and \ + 'metrics_collector_external_hosts' in config['configurations']['cluster-env']: + ams_collector_hosts = config['configurations']['cluster-env']['metrics_collector_external_hosts'] + set_instanceId = "true" +else: + ams_collector_hosts = ",".join(default("/clusterHostInfo/metrics_collector_hosts", [])) + +has_metric_collector = not len(ams_collector_hosts) == 0 + +if has_metric_collector: + if 'cluster-env' in config['configurations'] and \ + 'metrics_collector_external_port' in config['configurations']['cluster-env']: + metric_collector_port = config['configurations']['cluster-env']['metrics_collector_external_port'] + else: + metric_collector_web_address = default("/configurations/ams-site/timeline.metrics.service.webapp.address", "0.0.0.0:6188") + if metric_collector_web_address.find(':') != -1: + metric_collector_port = metric_collector_web_address.split(':')[1] + else: + metric_collector_port = '6188' + if default("/configurations/ams-site/timeline.metrics.service.http.policy", "HTTP_ONLY") == "HTTPS_ONLY": + metric_collector_protocol = 'https' + else: + metric_collector_protocol = 'http' + pass + +# Security-related params +security_enabled = config['configurations']['cluster-env']['security_enabled'] +kafka_kerberos_enabled = (('security.inter.broker.protocol' in config['configurations']['kafka-broker']) and + ((config['configurations']['kafka-broker']['security.inter.broker.protocol'] == "PLAINTEXTSASL") or + (config['configurations']['kafka-broker']['security.inter.broker.protocol'] == "SASL_PLAINTEXT"))) + + +if security_enabled and stack_version_formatted != "" and 'kafka_principal_name' in config['configurations']['kafka-env'] \ + and check_stack_feature(StackFeature.KAFKA_KERBEROS, stack_version_formatted): + _hostname_lowercase = config['hostname'].lower() + _kafka_principal_name = config['configurations']['kafka-env']['kafka_principal_name'] + kafka_jaas_principal = _kafka_principal_name.replace('_HOST',_hostname_lowercase) + kafka_keytab_path = config['configurations']['kafka-env']['kafka_keytab'] + kafka_bare_jaas_principal = get_bare_principal(_kafka_principal_name) + kafka_kerberos_params = "-Djava.security.auth.login.config="+ conf_dir +"/kafka_jaas.conf" +else: + kafka_kerberos_params = '' + kafka_jaas_principal = None + kafka_keytab_path = None + +# for curl command in ranger plugin to get db connector +jdk_location = config['hostLevelParams']['jdk_location'] + +# ranger kafka plugin section start + +# ranger host +ranger_admin_hosts = default("/clusterHostInfo/ranger_admin_hosts", []) +has_ranger_admin = not len(ranger_admin_hosts) == 0 + +# ranger support xml_configuration flag, instead of depending on ranger xml_configurations_supported/ranger-env, using stack feature +xml_configurations_supported = check_stack_feature(StackFeature.RANGER_XML_CONFIGURATION, version_for_stack_feature_checks) + +# ambari-server hostname +ambari_server_hostname = config['clusterHostInfo']['ambari_server_host'][0] + +ranger_admin_log_dir = default("/configurations/ranger-env/ranger_admin_log_dir","/var/log/ranger/admin") + +# ranger kafka plugin enabled property +enable_ranger_kafka = default("configurations/ranger-kafka-plugin-properties/ranger-kafka-plugin-enabled", "No") +enable_ranger_kafka = True if enable_ranger_kafka.lower() == 'yes' else False + +# ranger kafka-plugin supported flag, instead of dependending on is_supported_kafka_ranger/kafka-env.xml, using stack feature +is_supported_kafka_ranger = check_stack_feature(StackFeature.KAFKA_RANGER_PLUGIN_SUPPORT, version_for_stack_feature_checks) + +# ranger kafka properties +if enable_ranger_kafka and is_supported_kafka_ranger: + # get ranger policy url + policymgr_mgr_url = config['configurations']['ranger-kafka-security']['ranger.plugin.kafka.policy.rest.url'] + + if not is_empty(policymgr_mgr_url) and policymgr_mgr_url.endswith('/'): + policymgr_mgr_url = policymgr_mgr_url.rstrip('/') + + # ranger audit db user + xa_audit_db_user = default('/configurations/admin-properties/audit_db_user', 'rangerlogger') + + xa_audit_db_password = '' + if not is_empty(config['configurations']['admin-properties']['audit_db_password']) and stack_supports_ranger_audit_db and has_ranger_admin: + xa_audit_db_password = config['configurations']['admin-properties']['audit_db_password'] + + # ranger kafka service/repository name + repo_name = str(config['clusterName']) + '_kafka' + repo_name_value = config['configurations']['ranger-kafka-security']['ranger.plugin.kafka.service.name'] + if not is_empty(repo_name_value) and repo_name_value != "{{repo_name}}": + repo_name = repo_name_value + + ranger_env = config['configurations']['ranger-env'] + + # create ranger-env config having external ranger credential properties + if not has_ranger_admin and enable_ranger_kafka: + external_admin_username = default('/configurations/ranger-kafka-plugin-properties/external_admin_username', 'admin') + external_admin_password = default('/configurations/ranger-kafka-plugin-properties/external_admin_password', 'admin') + external_ranger_admin_username = default('/configurations/ranger-kafka-plugin-properties/external_ranger_admin_username', 'amb_ranger_admin') + external_ranger_admin_password = default('/configurations/ranger-kafka-plugin-properties/external_ranger_admin_password', 'amb_ranger_admin') + ranger_env = {} + ranger_env['admin_username'] = external_admin_username + ranger_env['admin_password'] = external_admin_password + ranger_env['ranger_admin_username'] = external_ranger_admin_username + ranger_env['ranger_admin_password'] = external_ranger_admin_password + + ranger_plugin_properties = config['configurations']['ranger-kafka-plugin-properties'] + ranger_kafka_audit = config['configurations']['ranger-kafka-audit'] + ranger_kafka_audit_attrs = config['configuration_attributes']['ranger-kafka-audit'] + ranger_kafka_security = config['configurations']['ranger-kafka-security'] + ranger_kafka_security_attrs = config['configuration_attributes']['ranger-kafka-security'] + ranger_kafka_policymgr_ssl = config['configurations']['ranger-kafka-policymgr-ssl'] + ranger_kafka_policymgr_ssl_attrs = config['configuration_attributes']['ranger-kafka-policymgr-ssl'] + + policy_user = config['configurations']['ranger-kafka-plugin-properties']['policy_user'] + + ranger_plugin_config = { + 'username' : config['configurations']['ranger-kafka-plugin-properties']['REPOSITORY_CONFIG_USERNAME'], + 'password' : config['configurations']['ranger-kafka-plugin-properties']['REPOSITORY_CONFIG_PASSWORD'], + 'zookeeper.connect' : config['configurations']['ranger-kafka-plugin-properties']['zookeeper.connect'], + 'commonNameForCertificate' : config['configurations']['ranger-kafka-plugin-properties']['common.name.for.certificate'] + } + + kafka_ranger_plugin_repo = { + 'isEnabled': 'true', + 'configs': ranger_plugin_config, + 'description': 'kafka repo', + 'name': repo_name, + 'repositoryType': 'kafka', + 'type': 'kafka', + 'assetType': '1' + } + + custom_ranger_service_config = generate_ranger_service_config(ranger_plugin_properties) + if len(custom_ranger_service_config) > 0: + ranger_plugin_config.update(custom_ranger_service_config) + + if stack_supports_ranger_kerberos and security_enabled: + ranger_plugin_config['policy.download.auth.users'] = kafka_user + ranger_plugin_config['tag.download.auth.users'] = kafka_user + ranger_plugin_config['ambari.service.check.user'] = policy_user + + downloaded_custom_connector = None + previous_jdbc_jar_name = None + driver_curl_source = None + driver_curl_target = None + previous_jdbc_jar = None + + if has_ranger_admin and stack_supports_ranger_audit_db: + xa_audit_db_flavor = config['configurations']['admin-properties']['DB_FLAVOR'] + jdbc_jar_name, previous_jdbc_jar_name, audit_jdbc_url, jdbc_driver = get_audit_configs(config) + + downloaded_custom_connector = format("{tmp_dir}/{jdbc_jar_name}") if stack_supports_ranger_audit_db else None + driver_curl_source = format("{jdk_location}/{jdbc_jar_name}") if stack_supports_ranger_audit_db else None + driver_curl_target = format("{kafka_home}/libs/{jdbc_jar_name}") if stack_supports_ranger_audit_db else None + previous_jdbc_jar = format("{kafka_home}/libs/{previous_jdbc_jar_name}") if stack_supports_ranger_audit_db else None + + xa_audit_db_is_enabled = False + if xml_configurations_supported and stack_supports_ranger_audit_db: + xa_audit_db_is_enabled = config['configurations']['ranger-kafka-audit']['xasecure.audit.destination.db'] + + xa_audit_hdfs_is_enabled = default('/configurations/ranger-kafka-audit/xasecure.audit.destination.hdfs', False) + ssl_keystore_password = config['configurations']['ranger-kafka-policymgr-ssl']['xasecure.policymgr.clientssl.keystore.password'] if xml_configurations_supported else None + ssl_truststore_password = config['configurations']['ranger-kafka-policymgr-ssl']['xasecure.policymgr.clientssl.truststore.password'] if xml_configurations_supported else None + credential_file = format('/etc/ranger/{repo_name}/cred.jceks') + + stack_version = get_stack_version('kafka-broker') + setup_ranger_env_sh_source = format('{stack_root}/{stack_version}/ranger-kafka-plugin/install/conf.templates/enable/kafka-ranger-env.sh') + setup_ranger_env_sh_target = format("{conf_dir}/kafka-ranger-env.sh") + + # for SQLA explicitly disable audit to DB for Ranger + if has_ranger_admin and stack_supports_ranger_audit_db and xa_audit_db_flavor.lower() == 'sqla': + xa_audit_db_is_enabled = False + +# need this to capture cluster name from where ranger kafka plugin is enabled +cluster_name = config['clusterName'] + +# ranger kafka plugin section end + +namenode_hosts = default("/clusterHostInfo/namenode_host", []) +has_namenode = not len(namenode_hosts) == 0 + +hdfs_user = config['configurations']['hadoop-env']['hdfs_user'] if has_namenode else None +hdfs_user_keytab = config['configurations']['hadoop-env']['hdfs_user_keytab'] if has_namenode else None +hdfs_principal_name = config['configurations']['hadoop-env']['hdfs_principal_name'] if has_namenode else None +hdfs_site = config['configurations']['hdfs-site'] if has_namenode else None +default_fs = config['configurations']['core-site']['fs.defaultFS'] if has_namenode else None +hadoop_bin_dir = stack_select.get_hadoop_dir("bin") if has_namenode else None +hadoop_conf_dir = conf_select.get_hadoop_conf_dir() if has_namenode else None +kinit_path_local = get_kinit_path(default('/configurations/kerberos-env/executable_search_paths', None)) + +import functools +#create partial functions with common arguments for every HdfsResource call +#to create/delete hdfs directory/file/copyfromlocal we need to call params.HdfsResource in code +HdfsResource = functools.partial( + HdfsResource, + user=hdfs_user, + hdfs_resource_ignore_file = "/var/lib/ambari-agent/data/.hdfs_resource_ignore", + security_enabled = security_enabled, + keytab = hdfs_user_keytab, + kinit_path_local = kinit_path_local, + hadoop_bin_dir = hadoop_bin_dir, + hadoop_conf_dir = hadoop_conf_dir, + principal_name = hdfs_principal_name, + hdfs_site = hdfs_site, + default_fs = default_fs, + immutable_paths = get_not_managed_resources() +) http://git-wip-us.apache.org/repos/asf/ambari/blob/3817ad5d/ambari-server/src/main/resources/common-services/KAFKA/0.10.0.3.0/package/scripts/service_check.py ---------------------------------------------------------------------- diff --git a/ambari-server/src/main/resources/common-services/KAFKA/0.10.0.3.0/package/scripts/service_check.py b/ambari-server/src/main/resources/common-services/KAFKA/0.10.0.3.0/package/scripts/service_check.py new file mode 100644 index 0000000..0f3a417 --- /dev/null +++ b/ambari-server/src/main/resources/common-services/KAFKA/0.10.0.3.0/package/scripts/service_check.py @@ -0,0 +1,65 @@ +#!/usr/bin/env python +""" +Licensed to the Apache Software Foundation (ASF) under one +or more contributor license agreements. See the NOTICE file +distributed with this work for additional information +regarding copyright ownership. The ASF licenses this file +to you under the Apache License, Version 2.0 (the +"License"); you may not use this file except in compliance +with the License. You may obtain a copy of the License at + + http://www.apache.org/licenses/LICENSE-2.0 + +Unless required by applicable law or agreed to in writing, software +distributed under the License is distributed on an "AS IS" BASIS, +WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +See the License for the specific language governing permissions and +limitations under the License. + +""" +from resource_management.libraries.script.script import Script +from resource_management.libraries.functions.validate import call_and_match_output +from resource_management.libraries.functions.format import format +from resource_management.core.logger import Logger +from resource_management.core import sudo +import subprocess + +class ServiceCheck(Script): + def service_check(self, env): + import params + env.set_params(params) + + # TODO, Kafka Service check should be more robust , It should get all the broker_hosts + # Produce some messages and check if consumer reads same no.of messages. + + kafka_config = self.read_kafka_config() + topic = "ambari_kafka_service_check" + create_topic_cmd_created_output = "Created topic \"ambari_kafka_service_check\"." + create_topic_cmd_exists_output = "Topic \"ambari_kafka_service_check\" already exists." + source_cmd = format("source {conf_dir}/kafka-env.sh") + topic_exists_cmd = format("{kafka_home}/bin/kafka-topics.sh --zookeeper {kafka_config[zookeeper.connect]} --topic {topic} --list") + topic_exists_cmd_p = subprocess.Popen(topic_exists_cmd.split(" "), stdout=subprocess.PIPE, stderr=subprocess.PIPE) + topic_exists_cmd_out, topic_exists_cmd_err = topic_exists_cmd_p.communicate() + # run create topic command only if the topic doesn't exists + if topic not in topic_exists_cmd_out: + create_topic_cmd = format("{kafka_home}/bin/kafka-topics.sh --zookeeper {kafka_config[zookeeper.connect]} --create --topic {topic} --partitions 1 --replication-factor 1") + command = source_cmd + " ; " + create_topic_cmd + Logger.info("Running kafka create topic command: %s" % command) + call_and_match_output(command, format("({create_topic_cmd_created_output})|({create_topic_cmd_exists_output})"), "Failed to check that topic exists", user=params.kafka_user) + + def read_kafka_config(self): + import params + + kafka_config = {} + content = sudo.read_file(params.conf_dir + "/server.properties") + for line in content.splitlines(): + if line.startswith("#") or not line.strip(): + continue + + key, value = line.split("=") + kafka_config[key] = value.replace("\n", "") + + return kafka_config + +if __name__ == "__main__": + ServiceCheck().execute() http://git-wip-us.apache.org/repos/asf/ambari/blob/3817ad5d/ambari-server/src/main/resources/common-services/KAFKA/0.10.0.3.0/package/scripts/setup_ranger_kafka.py ---------------------------------------------------------------------- diff --git a/ambari-server/src/main/resources/common-services/KAFKA/0.10.0.3.0/package/scripts/setup_ranger_kafka.py b/ambari-server/src/main/resources/common-services/KAFKA/0.10.0.3.0/package/scripts/setup_ranger_kafka.py new file mode 100644 index 0000000..e9719aa --- /dev/null +++ b/ambari-server/src/main/resources/common-services/KAFKA/0.10.0.3.0/package/scripts/setup_ranger_kafka.py @@ -0,0 +1,90 @@ +#!/usr/bin/env python +""" +Licensed to the Apache Software Foundation (ASF) under one +or more contributor license agreements. See the NOTICE file +distributed with this work for additional information +regarding copyright ownership. The ASF licenses this file +to you under the Apache License, Version 2.0 (the +"License"); you may not use this file except in compliance +with the License. You may obtain a copy of the License at + http://www.apache.org/licenses/LICENSE-2.0 +Unless required by applicable law or agreed to in writing, software +distributed under the License is distributed on an "AS IS" BASIS, +WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +See the License for the specific language governing permissions and +limitations under the License. +""" +from resource_management.core.logger import Logger +from resource_management.core.resources import File, Execute +from resource_management.libraries.functions.format import format +from resource_management.libraries.functions.setup_ranger_plugin_xml import setup_core_site_for_required_plugins + +def setup_ranger_kafka(): + import params + + if params.enable_ranger_kafka: + + from resource_management.libraries.functions.setup_ranger_plugin_xml import setup_ranger_plugin + + if params.retryAble: + Logger.info("Kafka: Setup ranger: command retry enables thus retrying if ranger admin is down !") + else: + Logger.info("Kafka: Setup ranger: command retry not enabled thus skipping if ranger admin is down !") + + if params.xml_configurations_supported and params.enable_ranger_kafka and params.xa_audit_hdfs_is_enabled: + if params.has_namenode: + params.HdfsResource("/ranger/audit", + type="directory", + action="create_on_execute", + owner=params.hdfs_user, + group=params.hdfs_user, + mode=0755, + recursive_chmod=True + ) + params.HdfsResource("/ranger/audit/kafka", + type="directory", + action="create_on_execute", + owner=params.kafka_user, + group=params.kafka_user, + mode=0700, + recursive_chmod=True + ) + params.HdfsResource(None, action="execute") + + setup_ranger_plugin('kafka-broker', 'kafka', params.previous_jdbc_jar, + params.downloaded_custom_connector, params.driver_curl_source, + params.driver_curl_target, params.java64_home, + params.repo_name, params.kafka_ranger_plugin_repo, + params.ranger_env, params.ranger_plugin_properties, + params.policy_user, params.policymgr_mgr_url, + params.enable_ranger_kafka, conf_dict=params.conf_dir, + component_user=params.kafka_user, component_group=params.user_group, cache_service_list=['kafka'], + plugin_audit_properties=params.ranger_kafka_audit, plugin_audit_attributes=params.ranger_kafka_audit_attrs, + plugin_security_properties=params.ranger_kafka_security, plugin_security_attributes=params.ranger_kafka_security_attrs, + plugin_policymgr_ssl_properties=params.ranger_kafka_policymgr_ssl, plugin_policymgr_ssl_attributes=params.ranger_kafka_policymgr_ssl_attrs, + component_list=['kafka-broker'], audit_db_is_enabled=params.xa_audit_db_is_enabled, + credential_file=params.credential_file, xa_audit_db_password=params.xa_audit_db_password, + ssl_truststore_password=params.ssl_truststore_password, ssl_keystore_password=params.ssl_keystore_password, + api_version = 'v2', skip_if_rangeradmin_down= not params.retryAble, + is_security_enabled = params.security_enabled, + is_stack_supports_ranger_kerberos = params.stack_supports_ranger_kerberos, + component_user_principal=params.kafka_jaas_principal if params.security_enabled else None, + component_user_keytab=params.kafka_keytab_path if params.security_enabled else None) + + if params.enable_ranger_kafka: + Execute(('cp', '--remove-destination', params.setup_ranger_env_sh_source, params.setup_ranger_env_sh_target), + not_if=format("test -f {setup_ranger_env_sh_target}"), + sudo=True + ) + File(params.setup_ranger_env_sh_target, + owner = params.kafka_user, + group = params.user_group, + mode = 0755 + ) + if params.stack_supports_core_site_for_ranger_plugin and params.enable_ranger_kafka and params.has_namenode and params.security_enabled: + Logger.info("Stack supports core-site.xml creation for Ranger plugin, creating create core-site.xml from namenode configuraitions") + setup_core_site_for_required_plugins(component_user=params.kafka_user,component_group=params.user_group,create_core_site_path = params.conf_dir, config = params.config) + else: + Logger.info("Stack does not support core-site.xml creation for Ranger plugin, skipping core-site.xml configurations") + else: + Logger.info('Ranger Kafka plugin is not enabled') http://git-wip-us.apache.org/repos/asf/ambari/blob/3817ad5d/ambari-server/src/main/resources/common-services/KAFKA/0.10.0.3.0/package/scripts/status_params.py ---------------------------------------------------------------------- diff --git a/ambari-server/src/main/resources/common-services/KAFKA/0.10.0.3.0/package/scripts/status_params.py b/ambari-server/src/main/resources/common-services/KAFKA/0.10.0.3.0/package/scripts/status_params.py new file mode 100644 index 0000000..57bdf5e --- /dev/null +++ b/ambari-server/src/main/resources/common-services/KAFKA/0.10.0.3.0/package/scripts/status_params.py @@ -0,0 +1,26 @@ +#!/usr/bin/env python +""" +Licensed to the Apache Software Foundation (ASF) under one +or more contributor license agreements. See the NOTICE file +distributed with this work for additional information +regarding copyright ownership. The ASF licenses this file +to you under the Apache License, Version 2.0 (the +"License"); you may not use this file except in compliance +with the License. You may obtain a copy of the License at + + http://www.apache.org/licenses/LICENSE-2.0 + +Unless required by applicable law or agreed to in writing, software +distributed under the License is distributed on an "AS IS" BASIS, +WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +See the License for the specific language governing permissions and +limitations under the License. + +""" +from resource_management.libraries.functions import format +from resource_management.libraries.script.script import Script + +config = Script.get_config() + +kafka_pid_dir = config['configurations']['kafka-env']['kafka_pid_dir'] +kafka_pid_file = format("{kafka_pid_dir}/kafka.pid") http://git-wip-us.apache.org/repos/asf/ambari/blob/3817ad5d/ambari-server/src/main/resources/common-services/KAFKA/0.10.0.3.0/package/scripts/upgrade.py ---------------------------------------------------------------------- diff --git a/ambari-server/src/main/resources/common-services/KAFKA/0.10.0.3.0/package/scripts/upgrade.py b/ambari-server/src/main/resources/common-services/KAFKA/0.10.0.3.0/package/scripts/upgrade.py new file mode 100644 index 0000000..b6e4046 --- /dev/null +++ b/ambari-server/src/main/resources/common-services/KAFKA/0.10.0.3.0/package/scripts/upgrade.py @@ -0,0 +1,78 @@ + +#!/usr/bin/env python +""" +Licensed to the Apache Software Foundation (ASF) under one +or more contributor license agreements. See the NOTICE file +distributed with this work for additional information +regarding copyright ownership. The ASF licenses this file +to you under the Apache License, Version 2.0 (the +"License"); you may not use this file except in compliance +with the License. You may obtain a copy of the License at + + http://www.apache.org/licenses/LICENSE-2.0 + +Unless required by applicable law or agreed to in writing, software +distributed under the License is distributed on an "AS IS" BASIS, +WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +See the License for the specific language governing permissions and +limitations under the License. + +""" +import os + + +from resource_management.core.resources.system import Execute +from resource_management.libraries.functions import format +from resource_management.libraries.functions import Direction +from resource_management.core.exceptions import Fail +from resource_management.core.logger import Logger + +def run_migration(env, upgrade_type): + """ + If the acl migration script is present, then run it for either upgrade or downgrade. + That script was introduced in HDP 2.3.4.0 and requires stopping all Kafka brokers first. + Requires configs to be present. + :param env: Environment. + :param upgrade_type: "rolling" or "nonrolling + """ + import params + + if upgrade_type is None: + raise Fail('Parameter "upgrade_type" is missing.') + + if params.upgrade_direction is None: + raise Fail('Parameter "upgrade_direction" is missing.') + + if params.upgrade_direction == Direction.DOWNGRADE and params.downgrade_from_version is None: + raise Fail('Parameter "downgrade_from_version" is missing.') + + if not params.security_enabled: + Logger.info("Skip running the Kafka ACL migration script since cluster security is not enabled.") + return + + Logger.info("Upgrade type: {0}, direction: {1}".format(str(upgrade_type), params.upgrade_direction)) + + # If the schema upgrade script exists in the version upgrading to, then attempt to upgrade/downgrade it while still using the present bits. + kafka_acls_script = None + command_suffix = "" + if params.upgrade_direction == Direction.UPGRADE: + kafka_acls_script = format("{stack_root}/{version}/kafka/bin/kafka-acls.sh") + command_suffix = "--upgradeAcls" + elif params.upgrade_direction == Direction.DOWNGRADE: + kafka_acls_script = format("{stack_root}/{downgrade_from_version}/kafka/bin/kafka-acls.sh") + command_suffix = "--downgradeAcls" + + if kafka_acls_script is not None: + if os.path.exists(kafka_acls_script): + Logger.info("Found Kafka acls script: {0}".format(kafka_acls_script)) + if params.zookeeper_connect is None: + raise Fail("Could not retrieve property kafka-broker/zookeeper.connect") + + acls_command = "{0} --authorizer kafka.security.auth.SimpleAclAuthorizer --authorizer-properties zookeeper.connect={1} {2}".\ + format(kafka_acls_script, params.zookeeper_connect, command_suffix) + + Execute(acls_command, + user=params.kafka_user, + logoutput=True) + else: + Logger.info("Did not find Kafka acls script: {0}".format(kafka_acls_script)) http://git-wip-us.apache.org/repos/asf/ambari/blob/3817ad5d/ambari-server/src/main/resources/common-services/KAFKA/0.10.0.3.0/package/scripts/utils.py ---------------------------------------------------------------------- diff --git a/ambari-server/src/main/resources/common-services/KAFKA/0.10.0.3.0/package/scripts/utils.py b/ambari-server/src/main/resources/common-services/KAFKA/0.10.0.3.0/package/scripts/utils.py new file mode 100644 index 0000000..2f1fa5e --- /dev/null +++ b/ambari-server/src/main/resources/common-services/KAFKA/0.10.0.3.0/package/scripts/utils.py @@ -0,0 +1,38 @@ +#!/usr/bin/env python +""" +Licensed to the Apache Software Foundation (ASF) under one +or more contributor license agreements. See the NOTICE file +distributed with this work for additional information +regarding copyright ownership. The ASF licenses this file +to you under the Apache License, Version 2.0 (the +"License"); you may not use this file except in compliance +with the License. You may obtain a copy of the License at + + http://www.apache.org/licenses/LICENSE-2.0 + +Unless required by applicable law or agreed to in writing, software +distributed under the License is distributed on an "AS IS" BASIS, +WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +See the License for the specific language governing permissions and +limitations under the License. + +""" +import re + +def get_bare_principal(normalized_principal_name): + """ + Given a normalized principal name (nimbus/[email protected]) returns just the + primary component (nimbus) + :param normalized_principal_name: a string containing the principal name to process + :return: a string containing the primary component value or None if not valid + """ + + bare_principal = None + + if normalized_principal_name: + match = re.match(r"([^/@]+)(?:/[^@])?(?:@.*)?", normalized_principal_name) + + if match: + bare_principal = match.group(1) + + return bare_principal http://git-wip-us.apache.org/repos/asf/ambari/blob/3817ad5d/ambari-server/src/main/resources/common-services/KAFKA/0.10.0.3.0/package/templates/input.config-kafka.json.j2 ---------------------------------------------------------------------- diff --git a/ambari-server/src/main/resources/common-services/KAFKA/0.10.0.3.0/package/templates/input.config-kafka.json.j2 b/ambari-server/src/main/resources/common-services/KAFKA/0.10.0.3.0/package/templates/input.config-kafka.json.j2 new file mode 100644 index 0000000..5b8f896 --- /dev/null +++ b/ambari-server/src/main/resources/common-services/KAFKA/0.10.0.3.0/package/templates/input.config-kafka.json.j2 @@ -0,0 +1,92 @@ +{# + # Licensed to the Apache Software Foundation (ASF) under one + # or more contributor license agreements. See the NOTICE file + # distributed with this work for additional information + # regarding copyright ownership. The ASF licenses this file + # to you under the Apache License, Version 2.0 (the + # "License"); you may not use this file except in compliance + # with the License. You may obtain a copy of the License at + # + # http://www.apache.org/licenses/LICENSE-2.0 + # + # Unless required by applicable law or agreed to in writing, software + # distributed under the License is distributed on an "AS IS" BASIS, + # WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + # See the License for the specific language governing permissions and + # limitations under the License. + #} +{ + "input":[ + { + "type":"kafka_controller", + "rowtype":"service", + "path":"{{default('/configurations/kafka-env/kafka_log_dir', '/var/log/kafka')}}/controller.log" + }, + { + "type":"kafka_request", + "rowtype":"service", + "path":"{{default('/configurations/kafka-env/kafka_log_dir', '/var/log/kafka')}}/kafka-request.log" + }, + { + "type":"kafka_logcleaner", + "rowtype":"service", + "path":"{{default('/configurations/kafka-env/kafka_log_dir', '/var/log/kafka')}}/log-cleaner.log" + }, + { + "type":"kafka_server", + "rowtype":"service", + "path":"{{default('/configurations/kafka-env/kafka_log_dir', '/var/log/kafka')}}/server.log" + }, + { + "type":"kafka_statechange", + "rowtype":"service", + "path":"{{default('/configurations/kafka-env/kafka_log_dir', '/var/log/kafka')}}/state-change.log" + } + ], + "filter":[ + { + "filter":"grok", + "conditions":{ + "fields":{ + "type":[ + "kafka_controller", + "kafka_request", + "kafka_logcleaner" + ] + } + }, + "log4j_format":"[%d] %p %m (%c)%n", + "multiline_pattern":"^(\\[%{TIMESTAMP_ISO8601:logtime}\\])", + "message_pattern":"(?m)^\\[%{TIMESTAMP_ISO8601:logtime}\\]%{SPACE}%{LOGLEVEL:level}%{SPACE}\\[%{DATA:thread_name}\\]%{SPACE}%{GREEDYDATA:log_message}", + "post_map_values":{ + "logtime":{ + "map_date":{ + "target_date_pattern":"yyyy-MM-dd HH:mm:ss,SSS" + } + } + } + }, + { + "filter":"grok", + "comment":"Suppose to be same log4j pattern as other kafka processes, but some reason thread is not printed", + "conditions":{ + "fields":{ + "type":[ + "kafka_server", + "kafka_statechange" + ] + } + }, + "log4j_format":"[%d] %p %m (%c)%n", + "multiline_pattern":"^(\\[%{TIMESTAMP_ISO8601:logtime}\\])", + "message_pattern":"(?m)^\\[%{TIMESTAMP_ISO8601:logtime}\\]%{SPACE}%{LOGLEVEL:level}%{SPACE}%{GREEDYDATA:log_message}", + "post_map_values":{ + "logtime":{ + "map_date":{ + "target_date_pattern":"yyyy-MM-dd HH:mm:ss,SSS" + } + } + } + } + ] +} \ No newline at end of file http://git-wip-us.apache.org/repos/asf/ambari/blob/3817ad5d/ambari-server/src/main/resources/common-services/KAFKA/0.10.0.3.0/package/templates/kafka.conf.j2 ---------------------------------------------------------------------- diff --git a/ambari-server/src/main/resources/common-services/KAFKA/0.10.0.3.0/package/templates/kafka.conf.j2 b/ambari-server/src/main/resources/common-services/KAFKA/0.10.0.3.0/package/templates/kafka.conf.j2 new file mode 100644 index 0000000..9e18e1d --- /dev/null +++ b/ambari-server/src/main/resources/common-services/KAFKA/0.10.0.3.0/package/templates/kafka.conf.j2 @@ -0,0 +1,35 @@ +{# +# Licensed to the Apache Software Foundation (ASF) under one +# or more contributor license agreements. See the NOTICE file +# distributed with this work for additional information +# regarding copyright ownership. The ASF licenses this file +# to you under the Apache License, Version 2.0 (the +# "License"); you may not use this file except in compliance +# with the License. You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. +#} + +# Licensed to the Apache Software Foundation (ASF) under one or more +# contributor license agreements. See the NOTICE file distributed with +# this work for additional information regarding copyright ownership. +# The ASF licenses this file to You under the Apache License, Version 2.0 +# (the "License"); you may not use this file except in compliance with +# the License. You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. + +{{kafka_user}} - nofile {{kafka_user_nofile_limit}} +{{kafka_user}} - nproc {{kafka_user_nproc_limit}} http://git-wip-us.apache.org/repos/asf/ambari/blob/3817ad5d/ambari-server/src/main/resources/common-services/KAFKA/0.10.0.3.0/package/templates/kafka_client_jaas.conf.j2 ---------------------------------------------------------------------- diff --git a/ambari-server/src/main/resources/common-services/KAFKA/0.10.0.3.0/package/templates/kafka_client_jaas.conf.j2 b/ambari-server/src/main/resources/common-services/KAFKA/0.10.0.3.0/package/templates/kafka_client_jaas.conf.j2 new file mode 100644 index 0000000..7f81d85 --- /dev/null +++ b/ambari-server/src/main/resources/common-services/KAFKA/0.10.0.3.0/package/templates/kafka_client_jaas.conf.j2 @@ -0,0 +1,29 @@ +{# +# Licensed to the Apache Software Foundation (ASF) under one +# or more contributor license agreements. See the NOTICE file +# distributed with this work for additional information +# regarding copyright ownership. The ASF licenses this file +# to you under the Apache License, Version 2.0 (the +# "License"); you may not use this file except in compliance +# with the License. You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. +#} +KafkaClient { + com.sun.security.auth.module.Krb5LoginModule required + useTicketCache=true + renewTicket=true + serviceName="{{kafka_bare_jaas_principal}}"; +}; +Client { + com.sun.security.auth.module.Krb5LoginModule required + useTicketCache=true + renewTicket=true + serviceName="zookeeper"; +}; http://git-wip-us.apache.org/repos/asf/ambari/blob/3817ad5d/ambari-server/src/main/resources/common-services/KAFKA/0.10.0.3.0/package/templates/kafka_jaas.conf.j2 ---------------------------------------------------------------------- diff --git a/ambari-server/src/main/resources/common-services/KAFKA/0.10.0.3.0/package/templates/kafka_jaas.conf.j2 b/ambari-server/src/main/resources/common-services/KAFKA/0.10.0.3.0/package/templates/kafka_jaas.conf.j2 new file mode 100644 index 0000000..56c558d --- /dev/null +++ b/ambari-server/src/main/resources/common-services/KAFKA/0.10.0.3.0/package/templates/kafka_jaas.conf.j2 @@ -0,0 +1,41 @@ +{# +# Licensed to the Apache Software Foundation (ASF) under one +# or more contributor license agreements. See the NOTICE file +# distributed with this work for additional information +# regarding copyright ownership. The ASF licenses this file +# to you under the Apache License, Version 2.0 (the +# "License"); you may not use this file except in compliance +# with the License. You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. +#} +KafkaServer { + com.sun.security.auth.module.Krb5LoginModule required + useKeyTab=true + keyTab="{{kafka_keytab_path}}" + storeKey=true + useTicketCache=false + serviceName="{{kafka_bare_jaas_principal}}" + principal="{{kafka_jaas_principal}}"; +}; +KafkaClient { + com.sun.security.auth.module.Krb5LoginModule required + useTicketCache=true + renewTicket=true + serviceName="{{kafka_bare_jaas_principal}}"; +}; +Client { + com.sun.security.auth.module.Krb5LoginModule required + useKeyTab=true + keyTab="{{kafka_keytab_path}}" + storeKey=true + useTicketCache=false + serviceName="zookeeper" + principal="{{kafka_jaas_principal}}"; +}; http://git-wip-us.apache.org/repos/asf/ambari/blob/3817ad5d/ambari-server/src/main/resources/common-services/KAFKA/0.10.0.3.0/package/templates/tools-log4j.properties.j2 ---------------------------------------------------------------------- diff --git a/ambari-server/src/main/resources/common-services/KAFKA/0.10.0.3.0/package/templates/tools-log4j.properties.j2 b/ambari-server/src/main/resources/common-services/KAFKA/0.10.0.3.0/package/templates/tools-log4j.properties.j2 new file mode 100644 index 0000000..c4ad326 --- /dev/null +++ b/ambari-server/src/main/resources/common-services/KAFKA/0.10.0.3.0/package/templates/tools-log4j.properties.j2 @@ -0,0 +1,21 @@ +# Licensed to the Apache Software Foundation (ASF) under one or more +# contributor license agreements. See the NOTICE file distributed with +# this work for additional information regarding copyright ownership. +# The ASF licenses this file to You under the Apache License, Version 2.0 +# (the "License"); you may not use this file except in compliance with +# the License. You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. + +log4j.rootLogger=WARN, stderr + +log4j.appender.stderr=org.apache.log4j.ConsoleAppender +log4j.appender.stderr.layout=org.apache.log4j.PatternLayout +log4j.appender.stderr.layout.ConversionPattern=[%d] %p %m (%c)%n +log4j.appender.stderr.Target=System.err \ No newline at end of file http://git-wip-us.apache.org/repos/asf/ambari/blob/3817ad5d/ambari-server/src/main/resources/common-services/KAFKA/0.10.0.3.0/role_command_order.json ---------------------------------------------------------------------- diff --git a/ambari-server/src/main/resources/common-services/KAFKA/0.10.0.3.0/role_command_order.json b/ambari-server/src/main/resources/common-services/KAFKA/0.10.0.3.0/role_command_order.json new file mode 100644 index 0000000..9a52922 --- /dev/null +++ b/ambari-server/src/main/resources/common-services/KAFKA/0.10.0.3.0/role_command_order.json @@ -0,0 +1,7 @@ +{ + "general_deps" : { + "_comment" : "dependencies for KAFKA", + "KAFKA_BROKER-START" : ["ZOOKEEPER_SERVER-START", "RANGER_USERSYNC-START", "NAMENODE-START"], + "KAFKA_SERVICE_CHECK-SERVICE_CHECK": ["KAFKA_BROKER-START"] + } +} http://git-wip-us.apache.org/repos/asf/ambari/blob/3817ad5d/ambari-server/src/main/resources/common-services/KAFKA/0.10.0.3.0/widgets.json ---------------------------------------------------------------------- diff --git a/ambari-server/src/main/resources/common-services/KAFKA/0.10.0.3.0/widgets.json b/ambari-server/src/main/resources/common-services/KAFKA/0.10.0.3.0/widgets.json new file mode 100644 index 0000000..d513075 --- /dev/null +++ b/ambari-server/src/main/resources/common-services/KAFKA/0.10.0.3.0/widgets.json @@ -0,0 +1,182 @@ +{ + "layouts": [ + { + "layout_name": "default_kafka_dashboard", + "display_name": "Standard Kafka Dashboard", + "section_name": "KAFKA_SUMMARY", + "widgetLayoutInfo": [ + { + "widget_name": "Broker Topics", + "description": "Broker Topics", + "widget_type": "GRAPH", + "is_visible": true, + "metrics": [ + { + "name": "kafka.server.BrokerTopicMetrics.BytesInPerSec.1MinuteRate", + "metric_path": "metrics/kafka/server/BrokerTopicMetrics/AllTopicsBytesInPerSec/1MinuteRate", + "service_name": "KAFKA", + "component_name": "KAFKA_BROKER" + }, + { + "name": "kafka.server.BrokerTopicMetrics.BytesOutPerSec.1MinuteRate", + "metric_path": "metrics/kafka/server/BrokerTopicMetrics/AllTopicsBytesOutPerSec/1MinuteRate", + "service_name": "KAFKA", + "component_name": "KAFKA_BROKER" + }, + { + "name": "kafka.server.BrokerTopicMetrics.MessagesInPerSec.1MinuteRate", + "metric_path": "metrics/kafka/server/BrokerTopicMetrics/AllTopicsMessagesInPerSec/1MinuteRate", + "service_name": "KAFKA", + "component_name": "KAFKA_BROKER" + } + ], + "values": [ + { + "name": "Bytes In", + "value": "${kafka.server.BrokerTopicMetrics.BytesInPerSec.1MinuteRate}" + }, + { + "name": "Bytes Out", + "value": "${kafka.server.BrokerTopicMetrics.BytesOutPerSec.1MinuteRate}" + }, + { + "name": "Messages In", + "value": "${kafka.server.BrokerTopicMetrics.MessagesInPerSec.1MinuteRate}" + } + ], + "properties": { + "graph_type": "LINE", + "time_range": "1" + } + }, + { + "widget_name": "Active Controller Count", + "description": "Active Controller Count", + "widget_type": "GRAPH", + "is_visible": true, + "metrics": [ + { + "name": "kafka.controller.KafkaController.ActiveControllerCount._sum", + "metric_path": "metrics/kafka/controller/KafkaController/ActiveControllerCount._sum", + "service_name": "KAFKA", + "component_name": "KAFKA_BROKER" + } + ], + "values": [ + { + "name": "Active Controller Count", + "value": "${kafka.controller.KafkaController.ActiveControllerCount._sum}" + } + ], + "properties": { + "graph_type": "LINE", + "time_range": "1" + } + }, + { + "widget_name": "Controller Status", + "description": "Controller Status", + "widget_type": "GRAPH", + "is_visible": true, + "metrics": [ + { + "name": "kafka.controller.ControllerStats.LeaderElectionRateAndTimeMs.1MinuteRate", + "metric_path": "metrics/kafka/controller/ControllerStats/LeaderElectionRateAndTimeMs/1MinuteRate", + "service_name": "KAFKA", + "component_name": "KAFKA_BROKER" + }, + { + "name": "kafka.controller.ControllerStats.UncleanLeaderElectionsPerSec.1MinuteRate", + "metric_path": "metrics/kafka/controller/ControllerStats/UncleanLeaderElectionsPerSec/1MinuteRate", + "service_name": "KAFKA", + "component_name": "KAFKA_BROKER" + } + ], + "values": [ + { + "name": "Leader Election Rate And Time", + "value": "${kafka.controller.ControllerStats.LeaderElectionRateAndTimeMs.1MinuteRate}" + }, + { + "name": "Unclean Leader Election", + "value": "${kafka.controller.ControllerStats.UncleanLeaderElectionsPerSec.1MinuteRate}" + } + ], + "properties": { + "graph_type": "LINE", + "time_range": "1" + } + }, + { + "widget_name": "Replica MaxLag", + "description": "Replica MaxLag", + "widget_type": "GRAPH", + "is_visible": true, + "metrics": [ + { + "name": "kafka.server.ReplicaFetcherManager.MaxLag.clientId.Replica", + "metric_path": "metrics/kafka/server/ReplicaFetcherManager/Replica-MaxLag", + "service_name": "KAFKA", + "component_name": "KAFKA_BROKER" + } + ], + "values": [ + { + "name": "Replica MaxLag", + "value": "${kafka.server.ReplicaFetcherManager.MaxLag.clientId.Replica}" + } + ], + "properties": { + "graph_type": "LINE", + "time_range": "1" + } + }, + { + "widget_name": "Replica Manager", + "description": "Replica Manager", + "widget_type": "GRAPH", + "is_visible": true, + "metrics": [ + { + "name": "kafka.server.ReplicaManager.PartitionCount._sum", + "metric_path": "metrics/kafka/server/ReplicaManager/PartitionCount._sum", + "service_name": "KAFKA", + "component_name": "KAFKA_BROKER" + }, + { + "name": "kafka.server.ReplicaManager.UnderReplicatedPartitions", + "metric_path": "metrics/kafka/server/ReplicaManager/UnderReplicatedPartitions", + "service_name": "KAFKA", + "component_name": "KAFKA_BROKER" + }, + { + "name": "kafka.server.ReplicaManager.LeaderCount._sum", + "metric_path": "metrics/kafka/server/ReplicaManager/LeaderCount._sum", + "service_name": "KAFKA", + "component_name": "KAFKA_BROKER" + } + ], + "values": [ + { + "name": "Partitions count", + "value": "${kafka.server.ReplicaManager.PartitionCount._sum}" + }, + { + "name": "Under Replicated Partitions", + "value": "${kafka.server.ReplicaManager.UnderReplicatedPartitions}" + }, + { + "name": "Leader Count", + "value": "${kafka.server.ReplicaManager.LeaderCount._sum}" + } + ], + "properties": { + "graph_type": "LINE", + "time_range": "1" + } + } + + ] + } + ] +} http://git-wip-us.apache.org/repos/asf/ambari/blob/3817ad5d/ambari-server/src/main/resources/stacks/HDP/3.0/services/KAFKA/metainfo.xml ---------------------------------------------------------------------- diff --git a/ambari-server/src/main/resources/stacks/HDP/3.0/services/KAFKA/metainfo.xml b/ambari-server/src/main/resources/stacks/HDP/3.0/services/KAFKA/metainfo.xml new file mode 100644 index 0000000..d0326c2 --- /dev/null +++ b/ambari-server/src/main/resources/stacks/HDP/3.0/services/KAFKA/metainfo.xml @@ -0,0 +1,27 @@ +<?xml version="1.0"?> +<!-- + Licensed to the Apache Software Foundation (ASF) under one or more + contributor license agreements. See the NOTICE file distributed with + this work for additional information regarding copyright ownership. + The ASF licenses this file to You under the Apache License, Version 2.0 + (the "License"); you may not use this file except in compliance with + the License. You may obtain a copy of the License at + + http://www.apache.org/licenses/LICENSE-2.0 + + Unless required by applicable law or agreed to in writing, software + distributed under the License is distributed on an "AS IS" BASIS, + WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + See the License for the specific language governing permissions and + limitations under the License. +--> +<metainfo> + <schemaVersion>2.0</schemaVersion> + <services> + <service> + <name>KAFKA</name> + <version>0.10.0.3.0</version> + <extends>common-services/KAFKA/0.10.0.3.0</extends> + </service> + </services> +</metainfo>
