Repository: ambari
Updated Branches:
  refs/heads/trunk 1aad067cf -> 3817ad5da


http://git-wip-us.apache.org/repos/asf/ambari/blob/3817ad5d/ambari-server/src/main/resources/common-services/KAFKA/0.10.0.3.0/package/scripts/kafka.py
----------------------------------------------------------------------
diff --git 
a/ambari-server/src/main/resources/common-services/KAFKA/0.10.0.3.0/package/scripts/kafka.py
 
b/ambari-server/src/main/resources/common-services/KAFKA/0.10.0.3.0/package/scripts/kafka.py
new file mode 100644
index 0000000..680dd32
--- /dev/null
+++ 
b/ambari-server/src/main/resources/common-services/KAFKA/0.10.0.3.0/package/scripts/kafka.py
@@ -0,0 +1,276 @@
+#!/usr/bin/env python
+"""
+Licensed to the Apache Software Foundation (ASF) under one
+or more contributor license agreements.  See the NOTICE file
+distributed with this work for additional information
+regarding copyright ownership.  The ASF licenses this file
+to you under the Apache License, Version 2.0 (the
+"License"); you may not use this file except in compliance
+with the License.  You may obtain a copy of the License at
+
+    http://www.apache.org/licenses/LICENSE-2.0
+
+Unless required by applicable law or agreed to in writing, software
+distributed under the License is distributed on an "AS IS" BASIS,
+WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+See the License for the specific language governing permissions and
+limitations under the License.
+
+"""
+import collections
+import os
+
+from resource_management.libraries.functions.version import 
format_stack_version
+from resource_management.libraries.resources.properties_file import 
PropertiesFile
+from resource_management.libraries.resources.template_config import 
TemplateConfig
+from resource_management.core.resources.system import Directory, Execute, 
File, Link
+from resource_management.core.source import StaticFile, Template, 
InlineTemplate
+from resource_management.libraries.functions import format
+from resource_management.libraries.functions.stack_features import 
check_stack_feature
+from resource_management.libraries.functions import StackFeature
+from resource_management.libraries.functions import Direction
+
+
+from resource_management.core.logger import Logger
+
+
+def kafka(upgrade_type=None):
+    import params
+    ensure_base_directories()
+
+    kafka_server_config = 
mutable_config_dict(params.config['configurations']['kafka-broker'])
+    # This still has an issue of hostnames being alphabetically out-of-order 
for broker.id in HDP-2.2.
+    # Starting in HDP 2.3, Kafka handles the generation of broker.id so Ambari 
doesn't have to.
+
+    effective_version = params.stack_version_formatted if upgrade_type is None 
else format_stack_version(params.version)
+    Logger.info(format("Effective stack version: {effective_version}"))
+
+    # In HDP-2.2 (Apache Kafka 0.8.1.1) we used to generate broker.ids based 
on hosts and add them to
+    # kafka's server.properties. In future version brokers can generate their 
own ids based on zookeeper seq
+    # We need to preserve the broker.id when user is upgrading from HDP-2.2 to 
any higher version.
+    # Once its preserved it will be written to kafka.log.dirs/meta.properties 
and it will be used from there on
+    # similarly we need preserve port as well during the upgrade
+
+    if upgrade_type is not None and params.upgrade_direction == 
Direction.UPGRADE and \
+      check_stack_feature(StackFeature.CREATE_KAFKA_BROKER_ID, 
params.current_version) and \
+      check_stack_feature(StackFeature.KAFKA_LISTENERS, params.version):
+      if len(params.kafka_hosts) > 0 and params.hostname in params.kafka_hosts:
+        brokerid = str(sorted(params.kafka_hosts).index(params.hostname))
+        kafka_server_config['broker.id'] = brokerid
+        Logger.info(format("Calculating broker.id as {brokerid}"))
+      if 'port' in kafka_server_config:
+        port = kafka_server_config['port']
+        Logger.info(format("Port config from previous verson: {port}"))
+        listeners = kafka_server_config['listeners']
+        kafka_server_config['listeners'] = listeners.replace("6667", port)
+        Logger.info(format("Kafka listeners after the port update: 
{listeners}"))
+        del kafka_server_config['port']
+
+
+    if effective_version is not None and effective_version != "" and \
+      check_stack_feature(StackFeature.CREATE_KAFKA_BROKER_ID, 
effective_version):
+      if len(params.kafka_hosts) > 0 and params.hostname in params.kafka_hosts:
+        brokerid = str(sorted(params.kafka_hosts).index(params.hostname))
+        kafka_server_config['broker.id'] = brokerid
+        Logger.info(format("Calculating broker.id as {brokerid}"))
+
+    # listeners and advertised.listeners are only added in 2.3.0.0 onwards.
+    if effective_version is not None and effective_version != "" and \
+       check_stack_feature(StackFeature.KAFKA_LISTENERS, effective_version):
+
+       listeners = kafka_server_config['listeners'].replace("localhost", 
params.hostname)
+       Logger.info(format("Kafka listeners: {listeners}"))
+       kafka_server_config['listeners'] = listeners       
+
+       if params.security_enabled and params.kafka_kerberos_enabled:
+         Logger.info("Kafka kerberos security is enabled.")
+         kafka_server_config['advertised.listeners'] = listeners
+         Logger.info(format("Kafka advertised listeners: {listeners}"))
+       elif 'advertised.listeners' in kafka_server_config:
+         advertised_listeners = 
kafka_server_config['advertised.listeners'].replace("localhost", 
params.hostname)
+         kafka_server_config['advertised.listeners'] = advertised_listeners
+         Logger.info(format("Kafka advertised listeners: 
{advertised_listeners}"))
+    else:
+      kafka_server_config['host.name'] = params.hostname
+
+    if params.has_metric_collector:
+      kafka_server_config['kafka.timeline.metrics.hosts'] = 
params.ams_collector_hosts
+      kafka_server_config['kafka.timeline.metrics.port'] = 
params.metric_collector_port
+      kafka_server_config['kafka.timeline.metrics.protocol'] = 
params.metric_collector_protocol
+      kafka_server_config['kafka.timeline.metrics.truststore.path'] = 
params.metric_truststore_path
+      kafka_server_config['kafka.timeline.metrics.truststore.type'] = 
params.metric_truststore_type
+      kafka_server_config['kafka.timeline.metrics.truststore.password'] = 
params.metric_truststore_password
+
+    kafka_data_dir = kafka_server_config['log.dirs']
+    kafka_data_dirs = filter(None, kafka_data_dir.split(","))
+    Directory(kafka_data_dirs,
+              mode=0755,
+              cd_access='a',
+              owner=params.kafka_user,
+              group=params.user_group,
+              create_parents = True,
+              recursive_ownership = True,
+    )
+
+    PropertiesFile("server.properties",
+                      dir=params.conf_dir,
+                      properties=kafka_server_config,
+                      owner=params.kafka_user,
+                      group=params.user_group,
+    )
+
+    File(format("{conf_dir}/kafka-env.sh"),
+          owner=params.kafka_user,
+          content=InlineTemplate(params.kafka_env_sh_template)
+     )
+
+    if (params.log4j_props != None):
+        File(format("{conf_dir}/log4j.properties"),
+             mode=0644,
+             group=params.user_group,
+             owner=params.kafka_user,
+             content=InlineTemplate(params.log4j_props)
+         )
+
+    if params.security_enabled and params.kafka_kerberos_enabled:
+      if params.kafka_jaas_conf_template:
+        File(format("{conf_dir}/kafka_jaas.conf"),
+             owner=params.kafka_user,
+             content=InlineTemplate(params.kafka_jaas_conf_template)
+        )
+      else:
+        TemplateConfig(format("{conf_dir}/kafka_jaas.conf"),
+                         owner=params.kafka_user)
+
+      if params.kafka_client_jaas_conf_template:
+        File(format("{conf_dir}/kafka_client_jaas.conf"),
+             owner=params.kafka_user,
+             content=InlineTemplate(params.kafka_client_jaas_conf_template)
+        )
+      else:
+        TemplateConfig(format("{conf_dir}/kafka_client_jaas.conf"),
+                       owner=params.kafka_user)
+
+    # On some OS this folder could be not exists, so we will create it before 
pushing there files
+    Directory(params.limits_conf_dir,
+              create_parents = True,
+              owner='root',
+              group='root'
+    )
+
+    File(os.path.join(params.limits_conf_dir, 'kafka.conf'),
+         owner='root',
+         group='root',
+         mode=0644,
+         content=Template("kafka.conf.j2")
+    )
+
+    File(os.path.join(params.conf_dir, 'tools-log4j.properties'),
+         owner='root',
+         group='root',
+         mode=0644,
+         content=Template("tools-log4j.properties.j2")
+         )
+
+    setup_symlink(params.kafka_managed_pid_dir, params.kafka_pid_dir)
+    setup_symlink(params.kafka_managed_log_dir, params.kafka_log_dir)
+
+
+def mutable_config_dict(kafka_broker_config):
+    kafka_server_config = {}
+    for key, value in kafka_broker_config.iteritems():
+        kafka_server_config[key] = value
+    return kafka_server_config
+
+
+# Used to workaround the hardcoded pid/log dir used on the kafka bash process 
launcher
+def setup_symlink(kafka_managed_dir, kafka_ambari_managed_dir):
+  import params
+  backup_folder_path = None
+  backup_folder_suffix = "_tmp"
+  if kafka_ambari_managed_dir != kafka_managed_dir:
+    if os.path.exists(kafka_managed_dir) and not 
os.path.islink(kafka_managed_dir):
+
+      # Backup existing data before delete if config is changed repeatedly 
to/from default location at any point in time time, as there may be relevant 
contents (historic logs)
+      backup_folder_path = backup_dir_contents(kafka_managed_dir, 
backup_folder_suffix)
+
+      Directory(kafka_managed_dir,
+                action="delete",
+                create_parents = True)
+
+    elif os.path.islink(kafka_managed_dir) and 
os.path.realpath(kafka_managed_dir) != kafka_ambari_managed_dir:
+      Link(kafka_managed_dir,
+           action="delete")
+
+    if not os.path.islink(kafka_managed_dir):
+      Link(kafka_managed_dir,
+           to=kafka_ambari_managed_dir)
+
+  elif os.path.islink(kafka_managed_dir): # If config is changed and coincides 
with the kafka managed dir, remove the symlink and physically create the folder
+    Link(kafka_managed_dir,
+         action="delete")
+
+    Directory(kafka_managed_dir,
+              mode=0755,
+              cd_access='a',
+              owner=params.kafka_user,
+              group=params.user_group,
+              create_parents = True,
+              recursive_ownership = True,
+    )
+
+  if backup_folder_path:
+    # Restore backed up files to current relevant dirs if needed - will be 
triggered only when changing to/from default path;
+    for file in os.listdir(backup_folder_path):
+      if os.path.isdir(os.path.join(backup_folder_path, file)):
+        Execute(('cp', '-r', os.path.join(backup_folder_path, file), 
kafka_managed_dir),
+                sudo=True)
+        Execute(("chown", "-R", format("{kafka_user}:{user_group}"), 
os.path.join(kafka_managed_dir, file)),
+                sudo=True)
+      else:
+        File(os.path.join(kafka_managed_dir,file),
+             owner=params.kafka_user,
+             content = StaticFile(os.path.join(backup_folder_path,file)))
+
+    # Clean up backed up folder
+    Directory(backup_folder_path,
+              action="delete",
+              create_parents = True)
+
+
+# Uses agent temp dir to store backup files
+def backup_dir_contents(dir_path, backup_folder_suffix):
+  import params
+  backup_destination_path = params.tmp_dir + 
os.path.normpath(dir_path)+backup_folder_suffix
+  Directory(backup_destination_path,
+            mode=0755,
+            cd_access='a',
+            owner=params.kafka_user,
+            group=params.user_group,
+            create_parents = True,
+            recursive_ownership = True,
+  )
+  # Safely copy top-level contents to backup folder
+  for file in os.listdir(dir_path):
+    if os.path.isdir(os.path.join(dir_path, file)):
+      Execute(('cp', '-r', os.path.join(dir_path, file), 
backup_destination_path),
+              sudo=True)
+      Execute(("chown", "-R", format("{kafka_user}:{user_group}"), 
os.path.join(backup_destination_path, file)),
+              sudo=True)
+    else:
+      File(os.path.join(backup_destination_path, file),
+         owner=params.kafka_user,
+         content = StaticFile(os.path.join(dir_path,file)))
+
+  return backup_destination_path
+
+def ensure_base_directories():
+  import params
+  Directory([params.kafka_log_dir, params.kafka_pid_dir, params.conf_dir],
+            mode=0755,
+            cd_access='a',
+            owner=params.kafka_user,
+            group=params.user_group,
+            create_parents = True,
+            recursive_ownership = True,
+            )

http://git-wip-us.apache.org/repos/asf/ambari/blob/3817ad5d/ambari-server/src/main/resources/common-services/KAFKA/0.10.0.3.0/package/scripts/kafka_broker.py
----------------------------------------------------------------------
diff --git 
a/ambari-server/src/main/resources/common-services/KAFKA/0.10.0.3.0/package/scripts/kafka_broker.py
 
b/ambari-server/src/main/resources/common-services/KAFKA/0.10.0.3.0/package/scripts/kafka_broker.py
new file mode 100644
index 0000000..81715f9
--- /dev/null
+++ 
b/ambari-server/src/main/resources/common-services/KAFKA/0.10.0.3.0/package/scripts/kafka_broker.py
@@ -0,0 +1,151 @@
+"""
+Licensed to the Apache Software Foundation (ASF) under one
+or more contributor license agreements.  See the NOTICE file
+distributed with this work for additional information
+regarding copyright ownership.  The ASF licenses this file
+to you under the Apache License, Version 2.0 (the
+"License"); you may not use this file except in compliance
+with the License.  You may obtain a copy of the License at
+
+    http://www.apache.org/licenses/LICENSE-2.0
+
+Unless required by applicable law or agreed to in writing, software
+distributed under the License is distributed on an "AS IS" BASIS,
+WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+See the License for the specific language governing permissions and
+limitations under the License.
+
+"""
+from resource_management import Script
+from resource_management.core.logger import Logger
+from resource_management.core.resources.system import Execute, File, Directory
+from resource_management.libraries.functions import conf_select
+from resource_management.libraries.functions import stack_select
+from resource_management.libraries.functions import Direction
+from resource_management.libraries.functions.version import 
format_stack_version
+from resource_management.libraries.functions.format import format
+from resource_management.libraries.functions.check_process_status import 
check_process_status
+from resource_management.libraries.functions import StackFeature
+from resource_management.libraries.functions.stack_features import 
check_stack_feature
+from resource_management.libraries.functions.show_logs import show_logs
+from kafka import ensure_base_directories
+
+import upgrade
+from kafka import kafka
+from setup_ranger_kafka import setup_ranger_kafka
+
+class KafkaBroker(Script):
+
+  def get_component_name(self):
+    return "kafka-broker"
+
+  def install(self, env):
+    self.install_packages(env)
+
+  def configure(self, env, upgrade_type=None):
+    import params
+    env.set_params(params)
+    kafka(upgrade_type=upgrade_type)
+
+  def pre_upgrade_restart(self, env, upgrade_type=None):
+    import params
+    env.set_params(params)
+
+    if params.version and check_stack_feature(StackFeature.ROLLING_UPGRADE, 
params.version):
+      stack_select.select("kafka-broker", params.version)
+
+    if params.version and check_stack_feature(StackFeature.CONFIG_VERSIONING, 
params.version):
+      conf_select.select(params.stack_name, "kafka", params.version)
+
+    # This is extremely important since it should only be called if crossing 
the HDP 2.3.4.0 boundary. 
+    if params.current_version and params.version and params.upgrade_direction:
+      src_version = dst_version = None
+      if params.upgrade_direction == Direction.UPGRADE:
+        src_version = format_stack_version(params.current_version)
+        dst_version = format_stack_version(params.version)
+      else:
+        # These represent the original values during the UPGRADE direction
+        src_version = format_stack_version(params.version)
+        dst_version = format_stack_version(params.downgrade_from_version)
+
+      if not check_stack_feature(StackFeature.KAFKA_ACL_MIGRATION_SUPPORT, 
src_version) and check_stack_feature(StackFeature.KAFKA_ACL_MIGRATION_SUPPORT, 
dst_version):
+        # Calling the acl migration script requires the configs to be present.
+        self.configure(env, upgrade_type=upgrade_type)
+        upgrade.run_migration(env, upgrade_type)
+
+  def start(self, env, upgrade_type=None):
+    import params
+    env.set_params(params)
+    self.configure(env, upgrade_type=upgrade_type)
+
+    if params.security_enabled:
+      if params.version and check_stack_feature(StackFeature.KAFKA_KERBEROS, 
params.version):
+        kafka_kinit_cmd = format("{kinit_path_local} -kt {kafka_keytab_path} 
{kafka_jaas_principal};")
+        Execute(kafka_kinit_cmd, user=params.kafka_user)
+
+    if params.is_supported_kafka_ranger:
+      setup_ranger_kafka() #Ranger Kafka Plugin related call 
+    daemon_cmd = format('source {params.conf_dir}/kafka-env.sh ; 
{params.kafka_bin} start')
+    no_op_test = format('ls {params.kafka_pid_file} >/dev/null 2>&1 && ps -p 
`cat {params.kafka_pid_file}` >/dev/null 2>&1')
+    try:
+      Execute(daemon_cmd,
+              user=params.kafka_user,
+              not_if=no_op_test
+      )
+    except:
+      show_logs(params.kafka_log_dir, params.kafka_user)
+      raise
+
+  def stop(self, env, upgrade_type=None):
+    import params
+    env.set_params(params)
+    # Kafka package scripts change permissions on folders, so we have to
+    # restore permissions after installing repo version bits
+    # before attempting to stop Kafka Broker
+    ensure_base_directories()
+    daemon_cmd = format('source {params.conf_dir}/kafka-env.sh; 
{params.kafka_bin} stop')
+    try:
+      Execute(daemon_cmd,
+              user=params.kafka_user,
+      )
+    except:
+      show_logs(params.kafka_log_dir, params.kafka_user)
+      raise
+    File(params.kafka_pid_file,
+          action = "delete"
+    )
+
+  def disable_security(self, env):
+    import params
+    if not params.zookeeper_connect:
+      Logger.info("No zookeeper connection string. Skipping reverting ACL")
+      return
+    if not params.secure_acls:
+      Logger.info("The zookeeper.set.acl is false. Skipping reverting ACL")
+      return
+    Execute(
+      "{0} --zookeeper.connect {1} 
--zookeeper.acl=unsecure".format(params.kafka_security_migrator, 
params.zookeeper_connect), \
+      user=params.kafka_user, \
+      environment={ 'JAVA_HOME': params.java64_home }, \
+      logoutput=True, \
+      tries=3)
+
+  def status(self, env):
+    import status_params
+    env.set_params(status_params)
+    check_process_status(status_params.kafka_pid_file)
+    
+  def get_log_folder(self):
+    import params
+    return params.kafka_log_dir
+  
+  def get_user(self):
+    import params
+    return params.kafka_user
+
+  def get_pid_files(self):
+    import status_params
+    return [status_params.kafka_pid_file]
+
+if __name__ == "__main__":
+  KafkaBroker().execute()

http://git-wip-us.apache.org/repos/asf/ambari/blob/3817ad5d/ambari-server/src/main/resources/common-services/KAFKA/0.10.0.3.0/package/scripts/params.py
----------------------------------------------------------------------
diff --git 
a/ambari-server/src/main/resources/common-services/KAFKA/0.10.0.3.0/package/scripts/params.py
 
b/ambari-server/src/main/resources/common-services/KAFKA/0.10.0.3.0/package/scripts/params.py
new file mode 100644
index 0000000..5b0be54
--- /dev/null
+++ 
b/ambari-server/src/main/resources/common-services/KAFKA/0.10.0.3.0/package/scripts/params.py
@@ -0,0 +1,341 @@
+#!/usr/bin/env python
+"""
+Licensed to the Apache Software Foundation (ASF) under one
+or more contributor license agreements.  See the NOTICE file
+distributed with this work for additional information
+regarding copyright ownership.  The ASF licenses this file
+to you under the Apache License, Version 2.0 (the
+"License"); you may not use this file except in compliance
+with the License.  You may obtain a copy of the License at
+
+    http://www.apache.org/licenses/LICENSE-2.0
+
+Unless required by applicable law or agreed to in writing, software
+distributed under the License is distributed on an "AS IS" BASIS,
+WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+See the License for the specific language governing permissions and
+limitations under the License.
+
+"""
+import os
+from resource_management.libraries.functions import format
+from resource_management.libraries.script.script import Script
+from resource_management.libraries.functions.version import 
format_stack_version
+from resource_management.libraries.functions import StackFeature
+from resource_management.libraries.functions.stack_features import 
check_stack_feature
+from resource_management.libraries.functions.stack_features import 
get_stack_feature_version
+from resource_management.libraries.functions.default import default
+from utils import get_bare_principal
+from resource_management.libraries.functions.get_stack_version import 
get_stack_version
+from resource_management.libraries.functions.is_empty import is_empty
+import status_params
+from resource_management.libraries.resources.hdfs_resource import HdfsResource
+from resource_management.libraries.functions import stack_select
+from resource_management.libraries.functions import conf_select
+from resource_management.libraries.functions import get_kinit_path
+from resource_management.libraries.functions.get_not_managed_resources import 
get_not_managed_resources
+from resource_management.libraries.functions.setup_ranger_plugin_xml import 
get_audit_configs, generate_ranger_service_config
+
+# server configurations
+config = Script.get_config()
+tmp_dir = Script.get_tmp_dir()
+stack_root = Script.get_stack_root()
+stack_name = default("/hostLevelParams/stack_name", None)
+retryAble = default("/commandParams/command_retry_enabled", False)
+
+# Version being upgraded/downgraded to
+version = default("/commandParams/version", None)
+
+# Version that is CURRENT.
+current_version = default("/hostLevelParams/current_version", None)
+
+
+stack_version_unformatted = config['hostLevelParams']['stack_version']
+stack_version_formatted = format_stack_version(stack_version_unformatted)
+upgrade_direction = default("/commandParams/upgrade_direction", None)
+
+# get the correct version to use for checking stack features
+version_for_stack_feature_checks = get_stack_feature_version(config)
+
+stack_supports_ranger_kerberos = 
check_stack_feature(StackFeature.RANGER_KERBEROS_SUPPORT, 
version_for_stack_feature_checks)
+stack_supports_ranger_audit_db = 
check_stack_feature(StackFeature.RANGER_AUDIT_DB_SUPPORT, 
version_for_stack_feature_checks)
+stack_supports_core_site_for_ranger_plugin = 
check_stack_feature(StackFeature.CORE_SITE_FOR_RANGER_PLUGINS_SUPPORT, 
version_for_stack_feature_checks)
+
+# When downgrading the 'version' and 'current_version' are both pointing to 
the downgrade-target version
+# downgrade_from_version provides the source-version the downgrade is 
happening from
+downgrade_from_version = default("/commandParams/downgrade_from_version", None)
+
+hostname = config['hostname']
+
+# default kafka parameters
+kafka_home = '/usr/lib/kafka'
+kafka_bin = kafka_home+'/bin/kafka'
+conf_dir = "/etc/kafka/conf"
+limits_conf_dir = "/etc/security/limits.d"
+
+# Used while upgrading the stack in a kerberized cluster and running 
kafka-acls.sh
+zookeeper_connect = default("/configurations/kafka-broker/zookeeper.connect", 
None)
+
+kafka_user_nofile_limit = 
default('/configurations/kafka-env/kafka_user_nofile_limit', 128000)
+kafka_user_nproc_limit = 
default('/configurations/kafka-env/kafka_user_nproc_limit', 65536)
+
+# parameters for 2.2+
+if stack_version_formatted and 
check_stack_feature(StackFeature.ROLLING_UPGRADE, stack_version_formatted):
+  kafka_home = os.path.join(stack_root,  "current", "kafka-broker")
+  kafka_bin = os.path.join(kafka_home, "bin", "kafka")
+  conf_dir = os.path.join(kafka_home, "config")
+
+kafka_user = config['configurations']['kafka-env']['kafka_user']
+kafka_log_dir = config['configurations']['kafka-env']['kafka_log_dir']
+kafka_pid_dir = status_params.kafka_pid_dir
+kafka_pid_file = kafka_pid_dir+"/kafka.pid"
+# This is hardcoded on the kafka bash process lifecycle on which we have no 
control over
+kafka_managed_pid_dir = "/var/run/kafka"
+kafka_managed_log_dir = "/var/log/kafka"
+user_group = config['configurations']['cluster-env']['user_group']
+java64_home = config['hostLevelParams']['java_home']
+kafka_env_sh_template = config['configurations']['kafka-env']['content']
+kafka_jaas_conf_template = default("/configurations/kafka_jaas_conf/content", 
None)
+kafka_client_jaas_conf_template = 
default("/configurations/kafka_client_jaas_conf/content", None)
+kafka_hosts = config['clusterHostInfo']['kafka_broker_hosts']
+kafka_hosts.sort()
+
+zookeeper_hosts = config['clusterHostInfo']['zookeeper_hosts']
+zookeeper_hosts.sort()
+secure_acls = default("/configurations/kafka-broker/zookeeper.set.acl", False)
+kafka_security_migrator = os.path.join(kafka_home, "bin", 
"zookeeper-security-migration.sh")
+
+#Kafka log4j
+kafka_log_maxfilesize = 
default('/configurations/kafka-log4j/kafka_log_maxfilesize',256)
+kafka_log_maxbackupindex = 
default('/configurations/kafka-log4j/kafka_log_maxbackupindex',20)
+controller_log_maxfilesize = 
default('/configurations/kafka-log4j/controller_log_maxfilesize',256)
+controller_log_maxbackupindex = 
default('/configurations/kafka-log4j/controller_log_maxbackupindex',20)
+
+if (('kafka-log4j' in config['configurations']) and ('content' in 
config['configurations']['kafka-log4j'])):
+    log4j_props = config['configurations']['kafka-log4j']['content']
+else:
+    log4j_props = None
+
+if 'ganglia_server_host' in config['clusterHostInfo'] and \
+    len(config['clusterHostInfo']['ganglia_server_host'])>0:
+  ganglia_installed = True
+  ganglia_server = config['clusterHostInfo']['ganglia_server_host'][0]
+  ganglia_report_interval = 60
+else:
+  ganglia_installed = False
+
+metric_collector_port = ""
+metric_collector_protocol = ""
+metric_truststore_path= 
default("/configurations/ams-ssl-client/ssl.client.truststore.location", "")
+metric_truststore_type= 
default("/configurations/ams-ssl-client/ssl.client.truststore.type", "")
+metric_truststore_password= 
default("/configurations/ams-ssl-client/ssl.client.truststore.password", "")
+
+set_instanceId = "false"
+cluster_name = config["clusterName"]
+
+if 'cluster-env' in config['configurations'] and \
+        'metrics_collector_external_hosts' in 
config['configurations']['cluster-env']:
+  ams_collector_hosts = 
config['configurations']['cluster-env']['metrics_collector_external_hosts']
+  set_instanceId = "true"
+else:
+  ams_collector_hosts = 
",".join(default("/clusterHostInfo/metrics_collector_hosts", []))
+
+has_metric_collector = not len(ams_collector_hosts) == 0
+
+if has_metric_collector:
+  if 'cluster-env' in config['configurations'] and \
+      'metrics_collector_external_port' in 
config['configurations']['cluster-env']:
+    metric_collector_port = 
config['configurations']['cluster-env']['metrics_collector_external_port']
+  else:
+    metric_collector_web_address = 
default("/configurations/ams-site/timeline.metrics.service.webapp.address", 
"0.0.0.0:6188")
+    if metric_collector_web_address.find(':') != -1:
+      metric_collector_port = metric_collector_web_address.split(':')[1]
+    else:
+      metric_collector_port = '6188'
+  if default("/configurations/ams-site/timeline.metrics.service.http.policy", 
"HTTP_ONLY") == "HTTPS_ONLY":
+    metric_collector_protocol = 'https'
+  else:
+    metric_collector_protocol = 'http'
+  pass
+
+# Security-related params
+security_enabled = config['configurations']['cluster-env']['security_enabled']
+kafka_kerberos_enabled = (('security.inter.broker.protocol' in 
config['configurations']['kafka-broker']) and
+                         
((config['configurations']['kafka-broker']['security.inter.broker.protocol'] == 
"PLAINTEXTSASL") or
+                          
(config['configurations']['kafka-broker']['security.inter.broker.protocol'] == 
"SASL_PLAINTEXT")))
+
+
+if security_enabled and stack_version_formatted != "" and 
'kafka_principal_name' in config['configurations']['kafka-env'] \
+  and check_stack_feature(StackFeature.KAFKA_KERBEROS, 
stack_version_formatted):
+    _hostname_lowercase = config['hostname'].lower()
+    _kafka_principal_name = 
config['configurations']['kafka-env']['kafka_principal_name']
+    kafka_jaas_principal = 
_kafka_principal_name.replace('_HOST',_hostname_lowercase)
+    kafka_keytab_path = config['configurations']['kafka-env']['kafka_keytab']
+    kafka_bare_jaas_principal = get_bare_principal(_kafka_principal_name)
+    kafka_kerberos_params = "-Djava.security.auth.login.config="+ conf_dir 
+"/kafka_jaas.conf"
+else:
+    kafka_kerberos_params = ''
+    kafka_jaas_principal = None
+    kafka_keytab_path = None
+
+# for curl command in ranger plugin to get db connector
+jdk_location = config['hostLevelParams']['jdk_location']
+
+# ranger kafka plugin section start
+
+# ranger host
+ranger_admin_hosts = default("/clusterHostInfo/ranger_admin_hosts", [])
+has_ranger_admin = not len(ranger_admin_hosts) == 0
+
+# ranger support xml_configuration flag, instead of depending on ranger 
xml_configurations_supported/ranger-env, using stack feature
+xml_configurations_supported = 
check_stack_feature(StackFeature.RANGER_XML_CONFIGURATION, 
version_for_stack_feature_checks)
+
+# ambari-server hostname
+ambari_server_hostname = config['clusterHostInfo']['ambari_server_host'][0]
+
+ranger_admin_log_dir = 
default("/configurations/ranger-env/ranger_admin_log_dir","/var/log/ranger/admin")
+
+# ranger kafka plugin enabled property
+enable_ranger_kafka = 
default("configurations/ranger-kafka-plugin-properties/ranger-kafka-plugin-enabled",
 "No")
+enable_ranger_kafka = True if enable_ranger_kafka.lower() == 'yes' else False
+
+# ranger kafka-plugin supported flag, instead of dependending on 
is_supported_kafka_ranger/kafka-env.xml, using stack feature
+is_supported_kafka_ranger = 
check_stack_feature(StackFeature.KAFKA_RANGER_PLUGIN_SUPPORT, 
version_for_stack_feature_checks)
+
+# ranger kafka properties
+if enable_ranger_kafka and is_supported_kafka_ranger:
+  # get ranger policy url
+  policymgr_mgr_url = 
config['configurations']['ranger-kafka-security']['ranger.plugin.kafka.policy.rest.url']
+
+  if not is_empty(policymgr_mgr_url) and policymgr_mgr_url.endswith('/'):
+    policymgr_mgr_url = policymgr_mgr_url.rstrip('/')
+
+  # ranger audit db user
+  xa_audit_db_user = default('/configurations/admin-properties/audit_db_user', 
'rangerlogger')
+
+  xa_audit_db_password = ''
+  if not 
is_empty(config['configurations']['admin-properties']['audit_db_password']) and 
stack_supports_ranger_audit_db and has_ranger_admin:
+    xa_audit_db_password = 
config['configurations']['admin-properties']['audit_db_password']
+
+  # ranger kafka service/repository name
+  repo_name = str(config['clusterName']) + '_kafka'
+  repo_name_value = 
config['configurations']['ranger-kafka-security']['ranger.plugin.kafka.service.name']
+  if not is_empty(repo_name_value) and repo_name_value != "{{repo_name}}":
+    repo_name = repo_name_value
+
+  ranger_env = config['configurations']['ranger-env']
+
+  # create ranger-env config having external ranger credential properties
+  if not has_ranger_admin and enable_ranger_kafka:
+    external_admin_username = 
default('/configurations/ranger-kafka-plugin-properties/external_admin_username',
 'admin')
+    external_admin_password = 
default('/configurations/ranger-kafka-plugin-properties/external_admin_password',
 'admin')
+    external_ranger_admin_username = 
default('/configurations/ranger-kafka-plugin-properties/external_ranger_admin_username',
 'amb_ranger_admin')
+    external_ranger_admin_password = 
default('/configurations/ranger-kafka-plugin-properties/external_ranger_admin_password',
 'amb_ranger_admin')
+    ranger_env = {}
+    ranger_env['admin_username'] = external_admin_username
+    ranger_env['admin_password'] = external_admin_password
+    ranger_env['ranger_admin_username'] = external_ranger_admin_username
+    ranger_env['ranger_admin_password'] = external_ranger_admin_password
+
+  ranger_plugin_properties = 
config['configurations']['ranger-kafka-plugin-properties']
+  ranger_kafka_audit = config['configurations']['ranger-kafka-audit']
+  ranger_kafka_audit_attrs = 
config['configuration_attributes']['ranger-kafka-audit']
+  ranger_kafka_security = config['configurations']['ranger-kafka-security']
+  ranger_kafka_security_attrs = 
config['configuration_attributes']['ranger-kafka-security']
+  ranger_kafka_policymgr_ssl = 
config['configurations']['ranger-kafka-policymgr-ssl']
+  ranger_kafka_policymgr_ssl_attrs = 
config['configuration_attributes']['ranger-kafka-policymgr-ssl']
+
+  policy_user = 
config['configurations']['ranger-kafka-plugin-properties']['policy_user']
+
+  ranger_plugin_config = {
+    'username' : 
config['configurations']['ranger-kafka-plugin-properties']['REPOSITORY_CONFIG_USERNAME'],
+    'password' : 
config['configurations']['ranger-kafka-plugin-properties']['REPOSITORY_CONFIG_PASSWORD'],
+    'zookeeper.connect' : 
config['configurations']['ranger-kafka-plugin-properties']['zookeeper.connect'],
+    'commonNameForCertificate' : 
config['configurations']['ranger-kafka-plugin-properties']['common.name.for.certificate']
+  }
+
+  kafka_ranger_plugin_repo = {
+    'isEnabled': 'true',
+    'configs': ranger_plugin_config,
+    'description': 'kafka repo',
+    'name': repo_name,
+    'repositoryType': 'kafka',
+    'type': 'kafka',
+    'assetType': '1'
+  }
+
+  custom_ranger_service_config = 
generate_ranger_service_config(ranger_plugin_properties)
+  if len(custom_ranger_service_config) > 0:
+    ranger_plugin_config.update(custom_ranger_service_config)
+
+  if stack_supports_ranger_kerberos and security_enabled:
+    ranger_plugin_config['policy.download.auth.users'] = kafka_user
+    ranger_plugin_config['tag.download.auth.users'] = kafka_user
+    ranger_plugin_config['ambari.service.check.user'] = policy_user
+
+  downloaded_custom_connector = None
+  previous_jdbc_jar_name = None
+  driver_curl_source = None
+  driver_curl_target = None
+  previous_jdbc_jar = None
+
+  if has_ranger_admin and stack_supports_ranger_audit_db:
+    xa_audit_db_flavor = 
config['configurations']['admin-properties']['DB_FLAVOR']
+    jdbc_jar_name, previous_jdbc_jar_name, audit_jdbc_url, jdbc_driver = 
get_audit_configs(config)
+
+    downloaded_custom_connector = format("{tmp_dir}/{jdbc_jar_name}") if 
stack_supports_ranger_audit_db else None
+    driver_curl_source = format("{jdk_location}/{jdbc_jar_name}") if 
stack_supports_ranger_audit_db else None
+    driver_curl_target = format("{kafka_home}/libs/{jdbc_jar_name}") if 
stack_supports_ranger_audit_db else None
+    previous_jdbc_jar = format("{kafka_home}/libs/{previous_jdbc_jar_name}") 
if stack_supports_ranger_audit_db else None
+
+  xa_audit_db_is_enabled = False
+  if xml_configurations_supported and stack_supports_ranger_audit_db:
+    xa_audit_db_is_enabled = 
config['configurations']['ranger-kafka-audit']['xasecure.audit.destination.db']
+
+  xa_audit_hdfs_is_enabled = 
default('/configurations/ranger-kafka-audit/xasecure.audit.destination.hdfs', 
False)
+  ssl_keystore_password = 
config['configurations']['ranger-kafka-policymgr-ssl']['xasecure.policymgr.clientssl.keystore.password']
 if xml_configurations_supported else None
+  ssl_truststore_password = 
config['configurations']['ranger-kafka-policymgr-ssl']['xasecure.policymgr.clientssl.truststore.password']
 if xml_configurations_supported else None
+  credential_file = format('/etc/ranger/{repo_name}/cred.jceks')
+
+  stack_version = get_stack_version('kafka-broker')
+  setup_ranger_env_sh_source = 
format('{stack_root}/{stack_version}/ranger-kafka-plugin/install/conf.templates/enable/kafka-ranger-env.sh')
+  setup_ranger_env_sh_target = format("{conf_dir}/kafka-ranger-env.sh")
+
+  # for SQLA explicitly disable audit to DB for Ranger
+  if has_ranger_admin and stack_supports_ranger_audit_db and 
xa_audit_db_flavor.lower() == 'sqla':
+    xa_audit_db_is_enabled = False
+
+# need this to capture cluster name from where ranger kafka plugin is enabled
+cluster_name = config['clusterName']
+
+# ranger kafka plugin section end
+
+namenode_hosts = default("/clusterHostInfo/namenode_host", [])
+has_namenode = not len(namenode_hosts) == 0
+
+hdfs_user = config['configurations']['hadoop-env']['hdfs_user'] if 
has_namenode else None
+hdfs_user_keytab = config['configurations']['hadoop-env']['hdfs_user_keytab'] 
if has_namenode else None
+hdfs_principal_name = 
config['configurations']['hadoop-env']['hdfs_principal_name'] if has_namenode 
else None
+hdfs_site = config['configurations']['hdfs-site'] if has_namenode else None
+default_fs = config['configurations']['core-site']['fs.defaultFS'] if 
has_namenode else None
+hadoop_bin_dir = stack_select.get_hadoop_dir("bin") if has_namenode else None
+hadoop_conf_dir = conf_select.get_hadoop_conf_dir() if has_namenode else None
+kinit_path_local = 
get_kinit_path(default('/configurations/kerberos-env/executable_search_paths', 
None))
+
+import functools
+#create partial functions with common arguments for every HdfsResource call
+#to create/delete hdfs directory/file/copyfromlocal we need to call 
params.HdfsResource in code
+HdfsResource = functools.partial(
+  HdfsResource,
+  user=hdfs_user,
+  hdfs_resource_ignore_file = 
"/var/lib/ambari-agent/data/.hdfs_resource_ignore",
+  security_enabled = security_enabled,
+  keytab = hdfs_user_keytab,
+  kinit_path_local = kinit_path_local,
+  hadoop_bin_dir = hadoop_bin_dir,
+  hadoop_conf_dir = hadoop_conf_dir,
+  principal_name = hdfs_principal_name,
+  hdfs_site = hdfs_site,
+  default_fs = default_fs,
+  immutable_paths = get_not_managed_resources()
+)

http://git-wip-us.apache.org/repos/asf/ambari/blob/3817ad5d/ambari-server/src/main/resources/common-services/KAFKA/0.10.0.3.0/package/scripts/service_check.py
----------------------------------------------------------------------
diff --git 
a/ambari-server/src/main/resources/common-services/KAFKA/0.10.0.3.0/package/scripts/service_check.py
 
b/ambari-server/src/main/resources/common-services/KAFKA/0.10.0.3.0/package/scripts/service_check.py
new file mode 100644
index 0000000..0f3a417
--- /dev/null
+++ 
b/ambari-server/src/main/resources/common-services/KAFKA/0.10.0.3.0/package/scripts/service_check.py
@@ -0,0 +1,65 @@
+#!/usr/bin/env python
+"""
+Licensed to the Apache Software Foundation (ASF) under one
+or more contributor license agreements.  See the NOTICE file
+distributed with this work for additional information
+regarding copyright ownership.  The ASF licenses this file
+to you under the Apache License, Version 2.0 (the
+"License"); you may not use this file except in compliance
+with the License.  You may obtain a copy of the License at
+
+    http://www.apache.org/licenses/LICENSE-2.0
+
+Unless required by applicable law or agreed to in writing, software
+distributed under the License is distributed on an "AS IS" BASIS,
+WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+See the License for the specific language governing permissions and
+limitations under the License.
+
+"""
+from resource_management.libraries.script.script import Script
+from resource_management.libraries.functions.validate import 
call_and_match_output
+from resource_management.libraries.functions.format import format
+from resource_management.core.logger import Logger
+from resource_management.core import sudo
+import subprocess
+
+class ServiceCheck(Script):
+  def service_check(self, env):
+    import params
+    env.set_params(params)
+
+    # TODO, Kafka Service check should be more robust , It should get all the 
broker_hosts
+    # Produce some messages and check if consumer reads same no.of messages.
+    
+    kafka_config = self.read_kafka_config()
+    topic = "ambari_kafka_service_check"
+    create_topic_cmd_created_output = "Created topic 
\"ambari_kafka_service_check\"."
+    create_topic_cmd_exists_output = "Topic \"ambari_kafka_service_check\" 
already exists."
+    source_cmd = format("source {conf_dir}/kafka-env.sh")
+    topic_exists_cmd = format("{kafka_home}/bin/kafka-topics.sh --zookeeper 
{kafka_config[zookeeper.connect]} --topic {topic} --list")
+    topic_exists_cmd_p = subprocess.Popen(topic_exists_cmd.split(" "), 
stdout=subprocess.PIPE, stderr=subprocess.PIPE)
+    topic_exists_cmd_out, topic_exists_cmd_err = 
topic_exists_cmd_p.communicate()
+    # run create topic command only if the topic doesn't exists
+    if topic not in topic_exists_cmd_out:
+      create_topic_cmd = format("{kafka_home}/bin/kafka-topics.sh --zookeeper 
{kafka_config[zookeeper.connect]} --create --topic {topic} --partitions 1 
--replication-factor 1")
+      command = source_cmd + " ; " + create_topic_cmd
+      Logger.info("Running kafka create topic command: %s" % command)
+      call_and_match_output(command, 
format("({create_topic_cmd_created_output})|({create_topic_cmd_exists_output})"),
 "Failed to check that topic exists", user=params.kafka_user)
+
+  def read_kafka_config(self):
+    import params
+    
+    kafka_config = {}
+    content = sudo.read_file(params.conf_dir + "/server.properties")
+    for line in content.splitlines():
+      if line.startswith("#") or not line.strip():
+        continue
+
+      key, value = line.split("=")
+      kafka_config[key] = value.replace("\n", "")
+    
+    return kafka_config
+
+if __name__ == "__main__":
+    ServiceCheck().execute()

http://git-wip-us.apache.org/repos/asf/ambari/blob/3817ad5d/ambari-server/src/main/resources/common-services/KAFKA/0.10.0.3.0/package/scripts/setup_ranger_kafka.py
----------------------------------------------------------------------
diff --git 
a/ambari-server/src/main/resources/common-services/KAFKA/0.10.0.3.0/package/scripts/setup_ranger_kafka.py
 
b/ambari-server/src/main/resources/common-services/KAFKA/0.10.0.3.0/package/scripts/setup_ranger_kafka.py
new file mode 100644
index 0000000..e9719aa
--- /dev/null
+++ 
b/ambari-server/src/main/resources/common-services/KAFKA/0.10.0.3.0/package/scripts/setup_ranger_kafka.py
@@ -0,0 +1,90 @@
+#!/usr/bin/env python
+"""
+Licensed to the Apache Software Foundation (ASF) under one
+or more contributor license agreements.  See the NOTICE file
+distributed with this work for additional information
+regarding copyright ownership.  The ASF licenses this file
+to you under the Apache License, Version 2.0 (the
+"License"); you may not use this file except in compliance
+with the License.  You may obtain a copy of the License at
+    http://www.apache.org/licenses/LICENSE-2.0
+Unless required by applicable law or agreed to in writing, software
+distributed under the License is distributed on an "AS IS" BASIS,
+WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+See the License for the specific language governing permissions and
+limitations under the License.
+"""
+from resource_management.core.logger import Logger
+from resource_management.core.resources import File, Execute
+from resource_management.libraries.functions.format import format
+from resource_management.libraries.functions.setup_ranger_plugin_xml import 
setup_core_site_for_required_plugins
+
+def setup_ranger_kafka():
+  import params
+
+  if params.enable_ranger_kafka:
+
+    from resource_management.libraries.functions.setup_ranger_plugin_xml 
import setup_ranger_plugin
+
+    if params.retryAble:
+      Logger.info("Kafka: Setup ranger: command retry enables thus retrying if 
ranger admin is down !")
+    else:
+      Logger.info("Kafka: Setup ranger: command retry not enabled thus 
skipping if ranger admin is down !")
+
+    if params.xml_configurations_supported and params.enable_ranger_kafka and 
params.xa_audit_hdfs_is_enabled:
+      if params.has_namenode:
+        params.HdfsResource("/ranger/audit",
+                           type="directory",
+                           action="create_on_execute",
+                           owner=params.hdfs_user,
+                           group=params.hdfs_user,
+                           mode=0755,
+                           recursive_chmod=True
+        )
+        params.HdfsResource("/ranger/audit/kafka",
+                           type="directory",
+                           action="create_on_execute",
+                           owner=params.kafka_user,
+                           group=params.kafka_user,
+                           mode=0700,
+                           recursive_chmod=True
+        )
+        params.HdfsResource(None, action="execute")
+
+    setup_ranger_plugin('kafka-broker', 'kafka', params.previous_jdbc_jar,
+                        params.downloaded_custom_connector, 
params.driver_curl_source,
+                        params.driver_curl_target, params.java64_home,
+                        params.repo_name, params.kafka_ranger_plugin_repo,
+                        params.ranger_env, params.ranger_plugin_properties,
+                        params.policy_user, params.policymgr_mgr_url,
+                        params.enable_ranger_kafka, conf_dict=params.conf_dir,
+                        component_user=params.kafka_user, 
component_group=params.user_group, cache_service_list=['kafka'],
+                        plugin_audit_properties=params.ranger_kafka_audit, 
plugin_audit_attributes=params.ranger_kafka_audit_attrs,
+                        
plugin_security_properties=params.ranger_kafka_security, 
plugin_security_attributes=params.ranger_kafka_security_attrs,
+                        
plugin_policymgr_ssl_properties=params.ranger_kafka_policymgr_ssl, 
plugin_policymgr_ssl_attributes=params.ranger_kafka_policymgr_ssl_attrs,
+                        component_list=['kafka-broker'], 
audit_db_is_enabled=params.xa_audit_db_is_enabled,
+                        credential_file=params.credential_file, 
xa_audit_db_password=params.xa_audit_db_password, 
+                        
ssl_truststore_password=params.ssl_truststore_password, 
ssl_keystore_password=params.ssl_keystore_password,
+                        api_version = 'v2', skip_if_rangeradmin_down= not 
params.retryAble,
+                        is_security_enabled = params.security_enabled,
+                        is_stack_supports_ranger_kerberos = 
params.stack_supports_ranger_kerberos,
+                        component_user_principal=params.kafka_jaas_principal 
if params.security_enabled else None,
+                        component_user_keytab=params.kafka_keytab_path if 
params.security_enabled else None)
+    
+    if params.enable_ranger_kafka: 
+      Execute(('cp', '--remove-destination', 
params.setup_ranger_env_sh_source, params.setup_ranger_env_sh_target),
+        not_if=format("test -f {setup_ranger_env_sh_target}"),
+        sudo=True
+      )
+      File(params.setup_ranger_env_sh_target,
+        owner = params.kafka_user,
+        group = params.user_group,
+        mode = 0755
+      )
+    if params.stack_supports_core_site_for_ranger_plugin and 
params.enable_ranger_kafka and params.has_namenode and params.security_enabled:
+      Logger.info("Stack supports core-site.xml creation for Ranger plugin, 
creating create core-site.xml from namenode configuraitions")
+      
setup_core_site_for_required_plugins(component_user=params.kafka_user,component_group=params.user_group,create_core_site_path
 = params.conf_dir, config = params.config)
+    else:
+      Logger.info("Stack does not support core-site.xml creation for Ranger 
plugin, skipping core-site.xml configurations")
+  else:
+    Logger.info('Ranger Kafka plugin is not enabled')

http://git-wip-us.apache.org/repos/asf/ambari/blob/3817ad5d/ambari-server/src/main/resources/common-services/KAFKA/0.10.0.3.0/package/scripts/status_params.py
----------------------------------------------------------------------
diff --git 
a/ambari-server/src/main/resources/common-services/KAFKA/0.10.0.3.0/package/scripts/status_params.py
 
b/ambari-server/src/main/resources/common-services/KAFKA/0.10.0.3.0/package/scripts/status_params.py
new file mode 100644
index 0000000..57bdf5e
--- /dev/null
+++ 
b/ambari-server/src/main/resources/common-services/KAFKA/0.10.0.3.0/package/scripts/status_params.py
@@ -0,0 +1,26 @@
+#!/usr/bin/env python
+"""
+Licensed to the Apache Software Foundation (ASF) under one
+or more contributor license agreements.  See the NOTICE file
+distributed with this work for additional information
+regarding copyright ownership.  The ASF licenses this file
+to you under the Apache License, Version 2.0 (the
+"License"); you may not use this file except in compliance
+with the License.  You may obtain a copy of the License at
+
+    http://www.apache.org/licenses/LICENSE-2.0
+
+Unless required by applicable law or agreed to in writing, software
+distributed under the License is distributed on an "AS IS" BASIS,
+WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+See the License for the specific language governing permissions and
+limitations under the License.
+
+"""
+from resource_management.libraries.functions import format
+from resource_management.libraries.script.script import Script
+
+config = Script.get_config()
+
+kafka_pid_dir = config['configurations']['kafka-env']['kafka_pid_dir']
+kafka_pid_file = format("{kafka_pid_dir}/kafka.pid")

http://git-wip-us.apache.org/repos/asf/ambari/blob/3817ad5d/ambari-server/src/main/resources/common-services/KAFKA/0.10.0.3.0/package/scripts/upgrade.py
----------------------------------------------------------------------
diff --git 
a/ambari-server/src/main/resources/common-services/KAFKA/0.10.0.3.0/package/scripts/upgrade.py
 
b/ambari-server/src/main/resources/common-services/KAFKA/0.10.0.3.0/package/scripts/upgrade.py
new file mode 100644
index 0000000..b6e4046
--- /dev/null
+++ 
b/ambari-server/src/main/resources/common-services/KAFKA/0.10.0.3.0/package/scripts/upgrade.py
@@ -0,0 +1,78 @@
+
+#!/usr/bin/env python
+"""
+Licensed to the Apache Software Foundation (ASF) under one
+or more contributor license agreements.  See the NOTICE file
+distributed with this work for additional information
+regarding copyright ownership.  The ASF licenses this file
+to you under the Apache License, Version 2.0 (the
+"License"); you may not use this file except in compliance
+with the License.  You may obtain a copy of the License at
+
+    http://www.apache.org/licenses/LICENSE-2.0
+
+Unless required by applicable law or agreed to in writing, software
+distributed under the License is distributed on an "AS IS" BASIS,
+WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+See the License for the specific language governing permissions and
+limitations under the License.
+
+"""
+import os
+
+
+from resource_management.core.resources.system import Execute
+from resource_management.libraries.functions import format
+from resource_management.libraries.functions import Direction
+from resource_management.core.exceptions import Fail
+from resource_management.core.logger import Logger
+
+def run_migration(env, upgrade_type):
+  """
+  If the acl migration script is present, then run it for either upgrade or 
downgrade.
+  That script was introduced in HDP 2.3.4.0 and requires stopping all Kafka 
brokers first.
+  Requires configs to be present.
+  :param env: Environment.
+  :param upgrade_type: "rolling" or "nonrolling
+  """
+  import params
+
+  if upgrade_type is None:
+    raise Fail('Parameter "upgrade_type" is missing.')
+
+  if params.upgrade_direction is None:
+    raise Fail('Parameter "upgrade_direction" is missing.')
+
+  if params.upgrade_direction == Direction.DOWNGRADE and 
params.downgrade_from_version is None:
+    raise Fail('Parameter "downgrade_from_version" is missing.')
+
+  if not params.security_enabled:
+    Logger.info("Skip running the Kafka ACL migration script since cluster 
security is not enabled.")
+    return
+  
+  Logger.info("Upgrade type: {0}, direction: {1}".format(str(upgrade_type), 
params.upgrade_direction))
+
+  # If the schema upgrade script exists in the version upgrading to, then 
attempt to upgrade/downgrade it while still using the present bits.
+  kafka_acls_script = None
+  command_suffix = ""
+  if params.upgrade_direction == Direction.UPGRADE:
+    kafka_acls_script = 
format("{stack_root}/{version}/kafka/bin/kafka-acls.sh")
+    command_suffix = "--upgradeAcls"
+  elif params.upgrade_direction == Direction.DOWNGRADE:
+    kafka_acls_script = 
format("{stack_root}/{downgrade_from_version}/kafka/bin/kafka-acls.sh")
+    command_suffix = "--downgradeAcls"
+
+  if kafka_acls_script is not None:
+    if os.path.exists(kafka_acls_script):
+      Logger.info("Found Kafka acls script: {0}".format(kafka_acls_script))
+      if params.zookeeper_connect is None:
+        raise Fail("Could not retrieve property 
kafka-broker/zookeeper.connect")
+
+      acls_command = "{0} --authorizer kafka.security.auth.SimpleAclAuthorizer 
--authorizer-properties zookeeper.connect={1} {2}".\
+        format(kafka_acls_script, params.zookeeper_connect, command_suffix)
+
+      Execute(acls_command,
+              user=params.kafka_user,
+              logoutput=True)
+    else:
+      Logger.info("Did not find Kafka acls script: 
{0}".format(kafka_acls_script))

http://git-wip-us.apache.org/repos/asf/ambari/blob/3817ad5d/ambari-server/src/main/resources/common-services/KAFKA/0.10.0.3.0/package/scripts/utils.py
----------------------------------------------------------------------
diff --git 
a/ambari-server/src/main/resources/common-services/KAFKA/0.10.0.3.0/package/scripts/utils.py
 
b/ambari-server/src/main/resources/common-services/KAFKA/0.10.0.3.0/package/scripts/utils.py
new file mode 100644
index 0000000..2f1fa5e
--- /dev/null
+++ 
b/ambari-server/src/main/resources/common-services/KAFKA/0.10.0.3.0/package/scripts/utils.py
@@ -0,0 +1,38 @@
+#!/usr/bin/env python
+"""
+Licensed to the Apache Software Foundation (ASF) under one
+or more contributor license agreements.  See the NOTICE file
+distributed with this work for additional information
+regarding copyright ownership.  The ASF licenses this file
+to you under the Apache License, Version 2.0 (the
+"License"); you may not use this file except in compliance
+with the License.  You may obtain a copy of the License at
+
+    http://www.apache.org/licenses/LICENSE-2.0
+
+Unless required by applicable law or agreed to in writing, software
+distributed under the License is distributed on an "AS IS" BASIS,
+WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+See the License for the specific language governing permissions and
+limitations under the License.
+
+"""
+import re
+
+def get_bare_principal(normalized_principal_name):
+    """
+    Given a normalized principal name 
(nimbus/[email protected]) returns just the
+    primary component (nimbus)
+    :param normalized_principal_name: a string containing the principal name 
to process
+    :return: a string containing the primary component value or None if not 
valid
+    """
+
+    bare_principal = None
+
+    if normalized_principal_name:
+        match = re.match(r"([^/@]+)(?:/[^@])?(?:@.*)?", 
normalized_principal_name)
+
+    if match:
+        bare_principal = match.group(1)
+
+    return bare_principal

http://git-wip-us.apache.org/repos/asf/ambari/blob/3817ad5d/ambari-server/src/main/resources/common-services/KAFKA/0.10.0.3.0/package/templates/input.config-kafka.json.j2
----------------------------------------------------------------------
diff --git 
a/ambari-server/src/main/resources/common-services/KAFKA/0.10.0.3.0/package/templates/input.config-kafka.json.j2
 
b/ambari-server/src/main/resources/common-services/KAFKA/0.10.0.3.0/package/templates/input.config-kafka.json.j2
new file mode 100644
index 0000000..5b8f896
--- /dev/null
+++ 
b/ambari-server/src/main/resources/common-services/KAFKA/0.10.0.3.0/package/templates/input.config-kafka.json.j2
@@ -0,0 +1,92 @@
+{#
+ # Licensed to the Apache Software Foundation (ASF) under one
+ # or more contributor license agreements.  See the NOTICE file
+ # distributed with this work for additional information
+ # regarding copyright ownership.  The ASF licenses this file
+ # to you under the Apache License, Version 2.0 (the
+ # "License"); you may not use this file except in compliance
+ # with the License.  You may obtain a copy of the License at
+ #
+ #   http://www.apache.org/licenses/LICENSE-2.0
+ #
+ # Unless required by applicable law or agreed to in writing, software
+ # distributed under the License is distributed on an "AS IS" BASIS,
+ # WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ # See the License for the specific language governing permissions and
+ # limitations under the License.
+ #}
+{
+  "input":[
+    {
+      "type":"kafka_controller",
+      "rowtype":"service",
+      "path":"{{default('/configurations/kafka-env/kafka_log_dir', 
'/var/log/kafka')}}/controller.log"
+    },
+    {
+      "type":"kafka_request",
+      "rowtype":"service",
+      "path":"{{default('/configurations/kafka-env/kafka_log_dir', 
'/var/log/kafka')}}/kafka-request.log"
+    },
+    {
+      "type":"kafka_logcleaner",
+      "rowtype":"service",
+      "path":"{{default('/configurations/kafka-env/kafka_log_dir', 
'/var/log/kafka')}}/log-cleaner.log"
+    },
+    {
+      "type":"kafka_server",
+      "rowtype":"service",
+      "path":"{{default('/configurations/kafka-env/kafka_log_dir', 
'/var/log/kafka')}}/server.log"
+    },
+    {
+      "type":"kafka_statechange",
+      "rowtype":"service",
+      "path":"{{default('/configurations/kafka-env/kafka_log_dir', 
'/var/log/kafka')}}/state-change.log"
+    }
+  ],
+  "filter":[
+    {
+      "filter":"grok",
+      "conditions":{
+        "fields":{
+          "type":[
+            "kafka_controller",
+            "kafka_request",
+            "kafka_logcleaner"
+          ]
+        }
+      },
+      "log4j_format":"[%d] %p %m (%c)%n",
+      "multiline_pattern":"^(\\[%{TIMESTAMP_ISO8601:logtime}\\])",
+      
"message_pattern":"(?m)^\\[%{TIMESTAMP_ISO8601:logtime}\\]%{SPACE}%{LOGLEVEL:level}%{SPACE}\\[%{DATA:thread_name}\\]%{SPACE}%{GREEDYDATA:log_message}",
+      "post_map_values":{
+        "logtime":{
+          "map_date":{
+            "target_date_pattern":"yyyy-MM-dd HH:mm:ss,SSS"
+          }
+        }
+      }
+    },
+    {
+      "filter":"grok",
+      "comment":"Suppose to be same log4j pattern as other kafka processes, 
but some reason thread is not printed",
+      "conditions":{
+        "fields":{
+          "type":[
+            "kafka_server",
+            "kafka_statechange"
+          ]
+        }
+      },
+      "log4j_format":"[%d] %p %m (%c)%n",
+      "multiline_pattern":"^(\\[%{TIMESTAMP_ISO8601:logtime}\\])",
+      
"message_pattern":"(?m)^\\[%{TIMESTAMP_ISO8601:logtime}\\]%{SPACE}%{LOGLEVEL:level}%{SPACE}%{GREEDYDATA:log_message}",
+      "post_map_values":{
+        "logtime":{
+          "map_date":{
+            "target_date_pattern":"yyyy-MM-dd HH:mm:ss,SSS"
+          }
+        }
+      }
+    }
+  ]
+}
\ No newline at end of file

http://git-wip-us.apache.org/repos/asf/ambari/blob/3817ad5d/ambari-server/src/main/resources/common-services/KAFKA/0.10.0.3.0/package/templates/kafka.conf.j2
----------------------------------------------------------------------
diff --git 
a/ambari-server/src/main/resources/common-services/KAFKA/0.10.0.3.0/package/templates/kafka.conf.j2
 
b/ambari-server/src/main/resources/common-services/KAFKA/0.10.0.3.0/package/templates/kafka.conf.j2
new file mode 100644
index 0000000..9e18e1d
--- /dev/null
+++ 
b/ambari-server/src/main/resources/common-services/KAFKA/0.10.0.3.0/package/templates/kafka.conf.j2
@@ -0,0 +1,35 @@
+{#
+# Licensed to the Apache Software Foundation (ASF) under one
+# or more contributor license agreements.  See the NOTICE file
+# distributed with this work for additional information
+# regarding copyright ownership.  The ASF licenses this file
+# to you under the Apache License, Version 2.0 (the
+# "License"); you may not use this file except in compliance
+# with the License.  You may obtain a copy of the License at
+#
+#   http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing, software
+# distributed under the License is distributed on an "AS IS" BASIS,
+# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+# See the License for the specific language governing permissions and
+# limitations under the License.
+#}
+
+# Licensed to the Apache Software Foundation (ASF) under one or more
+# contributor license agreements.  See the NOTICE file distributed with
+# this work for additional information regarding copyright ownership.
+# The ASF licenses this file to You under the Apache License, Version 2.0
+# (the "License"); you may not use this file except in compliance with
+# the License.  You may obtain a copy of the License at
+#
+#     http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing, software
+# distributed under the License is distributed on an "AS IS" BASIS,
+# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+# See the License for the specific language governing permissions and
+# limitations under the License.
+
+{{kafka_user}}   - nofile   {{kafka_user_nofile_limit}}
+{{kafka_user}}   - nproc    {{kafka_user_nproc_limit}}

http://git-wip-us.apache.org/repos/asf/ambari/blob/3817ad5d/ambari-server/src/main/resources/common-services/KAFKA/0.10.0.3.0/package/templates/kafka_client_jaas.conf.j2
----------------------------------------------------------------------
diff --git 
a/ambari-server/src/main/resources/common-services/KAFKA/0.10.0.3.0/package/templates/kafka_client_jaas.conf.j2
 
b/ambari-server/src/main/resources/common-services/KAFKA/0.10.0.3.0/package/templates/kafka_client_jaas.conf.j2
new file mode 100644
index 0000000..7f81d85
--- /dev/null
+++ 
b/ambari-server/src/main/resources/common-services/KAFKA/0.10.0.3.0/package/templates/kafka_client_jaas.conf.j2
@@ -0,0 +1,29 @@
+{#
+# Licensed to the Apache Software Foundation (ASF) under one
+# or more contributor license agreements.  See the NOTICE file
+# distributed with this work for additional information
+# regarding copyright ownership.  The ASF licenses this file
+# to you under the Apache License, Version 2.0 (the
+# "License"); you may not use this file except in compliance
+# with the License.  You may obtain a copy of the License at
+#
+#   http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing, software
+# distributed under the License is distributed on an "AS IS" BASIS,
+# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+# See the License for the specific language governing permissions and
+# limitations under the License.
+#}
+KafkaClient {
+   com.sun.security.auth.module.Krb5LoginModule required
+   useTicketCache=true
+   renewTicket=true
+   serviceName="{{kafka_bare_jaas_principal}}";
+};
+Client {
+   com.sun.security.auth.module.Krb5LoginModule required
+   useTicketCache=true
+   renewTicket=true
+   serviceName="zookeeper";
+};

http://git-wip-us.apache.org/repos/asf/ambari/blob/3817ad5d/ambari-server/src/main/resources/common-services/KAFKA/0.10.0.3.0/package/templates/kafka_jaas.conf.j2
----------------------------------------------------------------------
diff --git 
a/ambari-server/src/main/resources/common-services/KAFKA/0.10.0.3.0/package/templates/kafka_jaas.conf.j2
 
b/ambari-server/src/main/resources/common-services/KAFKA/0.10.0.3.0/package/templates/kafka_jaas.conf.j2
new file mode 100644
index 0000000..56c558d
--- /dev/null
+++ 
b/ambari-server/src/main/resources/common-services/KAFKA/0.10.0.3.0/package/templates/kafka_jaas.conf.j2
@@ -0,0 +1,41 @@
+{#
+# Licensed to the Apache Software Foundation (ASF) under one
+# or more contributor license agreements.  See the NOTICE file
+# distributed with this work for additional information
+# regarding copyright ownership.  The ASF licenses this file
+# to you under the Apache License, Version 2.0 (the
+# "License"); you may not use this file except in compliance
+# with the License.  You may obtain a copy of the License at
+#
+#   http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing, software
+# distributed under the License is distributed on an "AS IS" BASIS,
+# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+# See the License for the specific language governing permissions and
+# limitations under the License.
+#}
+KafkaServer {
+   com.sun.security.auth.module.Krb5LoginModule required
+   useKeyTab=true
+   keyTab="{{kafka_keytab_path}}"
+   storeKey=true
+   useTicketCache=false
+   serviceName="{{kafka_bare_jaas_principal}}"
+   principal="{{kafka_jaas_principal}}";
+};
+KafkaClient {
+   com.sun.security.auth.module.Krb5LoginModule required
+   useTicketCache=true
+   renewTicket=true
+   serviceName="{{kafka_bare_jaas_principal}}";
+};
+Client {
+   com.sun.security.auth.module.Krb5LoginModule required
+   useKeyTab=true
+   keyTab="{{kafka_keytab_path}}"
+   storeKey=true
+   useTicketCache=false
+   serviceName="zookeeper"
+   principal="{{kafka_jaas_principal}}";
+};

http://git-wip-us.apache.org/repos/asf/ambari/blob/3817ad5d/ambari-server/src/main/resources/common-services/KAFKA/0.10.0.3.0/package/templates/tools-log4j.properties.j2
----------------------------------------------------------------------
diff --git 
a/ambari-server/src/main/resources/common-services/KAFKA/0.10.0.3.0/package/templates/tools-log4j.properties.j2
 
b/ambari-server/src/main/resources/common-services/KAFKA/0.10.0.3.0/package/templates/tools-log4j.properties.j2
new file mode 100644
index 0000000..c4ad326
--- /dev/null
+++ 
b/ambari-server/src/main/resources/common-services/KAFKA/0.10.0.3.0/package/templates/tools-log4j.properties.j2
@@ -0,0 +1,21 @@
+# Licensed to the Apache Software Foundation (ASF) under one or more
+# contributor license agreements.  See the NOTICE file distributed with
+# this work for additional information regarding copyright ownership.
+# The ASF licenses this file to You under the Apache License, Version 2.0
+# (the "License"); you may not use this file except in compliance with
+# the License.  You may obtain a copy of the License at
+#
+#    http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing, software
+# distributed under the License is distributed on an "AS IS" BASIS,
+# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+# See the License for the specific language governing permissions and
+# limitations under the License.
+
+log4j.rootLogger=WARN, stderr
+
+log4j.appender.stderr=org.apache.log4j.ConsoleAppender
+log4j.appender.stderr.layout=org.apache.log4j.PatternLayout
+log4j.appender.stderr.layout.ConversionPattern=[%d] %p %m (%c)%n
+log4j.appender.stderr.Target=System.err
\ No newline at end of file

http://git-wip-us.apache.org/repos/asf/ambari/blob/3817ad5d/ambari-server/src/main/resources/common-services/KAFKA/0.10.0.3.0/role_command_order.json
----------------------------------------------------------------------
diff --git 
a/ambari-server/src/main/resources/common-services/KAFKA/0.10.0.3.0/role_command_order.json
 
b/ambari-server/src/main/resources/common-services/KAFKA/0.10.0.3.0/role_command_order.json
new file mode 100644
index 0000000..9a52922
--- /dev/null
+++ 
b/ambari-server/src/main/resources/common-services/KAFKA/0.10.0.3.0/role_command_order.json
@@ -0,0 +1,7 @@
+{
+  "general_deps" : {
+    "_comment" : "dependencies for KAFKA",
+    "KAFKA_BROKER-START" : ["ZOOKEEPER_SERVER-START", "RANGER_USERSYNC-START", 
"NAMENODE-START"],
+    "KAFKA_SERVICE_CHECK-SERVICE_CHECK": ["KAFKA_BROKER-START"]
+  }
+}

http://git-wip-us.apache.org/repos/asf/ambari/blob/3817ad5d/ambari-server/src/main/resources/common-services/KAFKA/0.10.0.3.0/widgets.json
----------------------------------------------------------------------
diff --git 
a/ambari-server/src/main/resources/common-services/KAFKA/0.10.0.3.0/widgets.json
 
b/ambari-server/src/main/resources/common-services/KAFKA/0.10.0.3.0/widgets.json
new file mode 100644
index 0000000..d513075
--- /dev/null
+++ 
b/ambari-server/src/main/resources/common-services/KAFKA/0.10.0.3.0/widgets.json
@@ -0,0 +1,182 @@
+{
+  "layouts": [
+    {
+      "layout_name": "default_kafka_dashboard",
+      "display_name": "Standard Kafka Dashboard",
+      "section_name": "KAFKA_SUMMARY",
+      "widgetLayoutInfo": [
+        {
+          "widget_name": "Broker Topics",
+          "description": "Broker Topics",
+          "widget_type": "GRAPH",
+          "is_visible": true,
+          "metrics": [
+            {
+              "name": 
"kafka.server.BrokerTopicMetrics.BytesInPerSec.1MinuteRate",
+              "metric_path": 
"metrics/kafka/server/BrokerTopicMetrics/AllTopicsBytesInPerSec/1MinuteRate",
+              "service_name": "KAFKA",
+              "component_name": "KAFKA_BROKER"
+            },
+            {
+              "name": 
"kafka.server.BrokerTopicMetrics.BytesOutPerSec.1MinuteRate",
+              "metric_path": 
"metrics/kafka/server/BrokerTopicMetrics/AllTopicsBytesOutPerSec/1MinuteRate",
+              "service_name": "KAFKA",
+              "component_name": "KAFKA_BROKER"
+            },
+            {
+              "name": 
"kafka.server.BrokerTopicMetrics.MessagesInPerSec.1MinuteRate",
+              "metric_path": 
"metrics/kafka/server/BrokerTopicMetrics/AllTopicsMessagesInPerSec/1MinuteRate",
+              "service_name": "KAFKA",
+              "component_name": "KAFKA_BROKER"
+            }
+          ],
+          "values": [
+            {
+              "name": "Bytes In",
+              "value": 
"${kafka.server.BrokerTopicMetrics.BytesInPerSec.1MinuteRate}"
+            },
+            {
+              "name": "Bytes Out",
+              "value": 
"${kafka.server.BrokerTopicMetrics.BytesOutPerSec.1MinuteRate}"
+            },
+            {
+              "name": "Messages In",
+              "value": 
"${kafka.server.BrokerTopicMetrics.MessagesInPerSec.1MinuteRate}"
+            }
+          ],
+          "properties": {
+            "graph_type": "LINE",
+            "time_range": "1"
+          }
+        },
+        {
+          "widget_name": "Active Controller Count",
+          "description": "Active Controller Count",
+          "widget_type": "GRAPH",
+          "is_visible": true,
+          "metrics": [
+            {
+              "name": 
"kafka.controller.KafkaController.ActiveControllerCount._sum",
+              "metric_path": 
"metrics/kafka/controller/KafkaController/ActiveControllerCount._sum",
+              "service_name": "KAFKA",
+              "component_name": "KAFKA_BROKER"
+            }
+          ],
+          "values": [
+            {
+              "name": "Active Controller Count",
+              "value": 
"${kafka.controller.KafkaController.ActiveControllerCount._sum}"
+            }
+          ],
+          "properties": {
+            "graph_type": "LINE",
+            "time_range": "1"
+          }
+        },
+        {
+          "widget_name": "Controller Status",
+          "description": "Controller Status",
+          "widget_type": "GRAPH",
+          "is_visible": true,
+          "metrics": [
+            {
+              "name": 
"kafka.controller.ControllerStats.LeaderElectionRateAndTimeMs.1MinuteRate",
+              "metric_path": 
"metrics/kafka/controller/ControllerStats/LeaderElectionRateAndTimeMs/1MinuteRate",
+              "service_name": "KAFKA",
+              "component_name": "KAFKA_BROKER"
+            },
+            {
+              "name": 
"kafka.controller.ControllerStats.UncleanLeaderElectionsPerSec.1MinuteRate",
+              "metric_path": 
"metrics/kafka/controller/ControllerStats/UncleanLeaderElectionsPerSec/1MinuteRate",
+              "service_name": "KAFKA",
+              "component_name": "KAFKA_BROKER"
+            }
+          ],
+          "values": [
+            {
+              "name": "Leader Election Rate And Time",
+              "value": 
"${kafka.controller.ControllerStats.LeaderElectionRateAndTimeMs.1MinuteRate}"
+            },
+            {
+              "name": "Unclean Leader Election",
+              "value": 
"${kafka.controller.ControllerStats.UncleanLeaderElectionsPerSec.1MinuteRate}"
+            }
+          ],
+          "properties": {
+            "graph_type": "LINE",
+            "time_range": "1"
+          }
+        },
+        {
+          "widget_name": "Replica MaxLag",
+          "description": "Replica MaxLag",
+          "widget_type": "GRAPH",
+          "is_visible": true,
+          "metrics": [
+            {
+              "name": 
"kafka.server.ReplicaFetcherManager.MaxLag.clientId.Replica",
+              "metric_path": 
"metrics/kafka/server/ReplicaFetcherManager/Replica-MaxLag",
+              "service_name": "KAFKA",
+              "component_name": "KAFKA_BROKER"
+            }
+          ],
+          "values": [
+            {
+              "name": "Replica MaxLag",
+              "value": 
"${kafka.server.ReplicaFetcherManager.MaxLag.clientId.Replica}"
+            }
+          ],
+          "properties": {
+            "graph_type": "LINE",
+            "time_range": "1"
+          }
+        },
+        {
+          "widget_name": "Replica Manager",
+          "description": "Replica Manager",
+          "widget_type": "GRAPH",
+          "is_visible": true,
+          "metrics": [
+            {
+              "name": "kafka.server.ReplicaManager.PartitionCount._sum",
+              "metric_path": 
"metrics/kafka/server/ReplicaManager/PartitionCount._sum",
+              "service_name": "KAFKA",
+              "component_name": "KAFKA_BROKER"
+            },
+            {
+              "name": "kafka.server.ReplicaManager.UnderReplicatedPartitions",
+              "metric_path": 
"metrics/kafka/server/ReplicaManager/UnderReplicatedPartitions",
+              "service_name": "KAFKA",
+              "component_name": "KAFKA_BROKER"
+            },
+            {
+              "name": "kafka.server.ReplicaManager.LeaderCount._sum",
+              "metric_path": 
"metrics/kafka/server/ReplicaManager/LeaderCount._sum",
+              "service_name": "KAFKA",
+              "component_name": "KAFKA_BROKER"
+            }
+          ],
+          "values": [
+            {
+              "name": "Partitions count",
+              "value": "${kafka.server.ReplicaManager.PartitionCount._sum}"
+            },
+            {
+              "name": "Under Replicated Partitions",
+              "value": 
"${kafka.server.ReplicaManager.UnderReplicatedPartitions}"
+            },
+            {
+              "name": "Leader Count",
+              "value": "${kafka.server.ReplicaManager.LeaderCount._sum}"
+            }
+          ],
+          "properties": {
+            "graph_type": "LINE",
+            "time_range": "1"
+          }
+        }
+
+      ]
+    }
+  ]
+}

http://git-wip-us.apache.org/repos/asf/ambari/blob/3817ad5d/ambari-server/src/main/resources/stacks/HDP/3.0/services/KAFKA/metainfo.xml
----------------------------------------------------------------------
diff --git 
a/ambari-server/src/main/resources/stacks/HDP/3.0/services/KAFKA/metainfo.xml 
b/ambari-server/src/main/resources/stacks/HDP/3.0/services/KAFKA/metainfo.xml
new file mode 100644
index 0000000..d0326c2
--- /dev/null
+++ 
b/ambari-server/src/main/resources/stacks/HDP/3.0/services/KAFKA/metainfo.xml
@@ -0,0 +1,27 @@
+<?xml version="1.0"?>
+<!--
+   Licensed to the Apache Software Foundation (ASF) under one or more
+   contributor license agreements.  See the NOTICE file distributed with
+   this work for additional information regarding copyright ownership.
+   The ASF licenses this file to You under the Apache License, Version 2.0
+   (the "License"); you may not use this file except in compliance with
+   the License.  You may obtain a copy of the License at
+
+       http://www.apache.org/licenses/LICENSE-2.0
+
+   Unless required by applicable law or agreed to in writing, software
+   distributed under the License is distributed on an "AS IS" BASIS,
+   WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+   See the License for the specific language governing permissions and
+   limitations under the License.
+-->
+<metainfo>
+    <schemaVersion>2.0</schemaVersion>
+    <services>
+        <service>
+            <name>KAFKA</name>
+            <version>0.10.0.3.0</version>
+            <extends>common-services/KAFKA/0.10.0.3.0</extends>
+        </service>
+    </services>
+</metainfo>

Reply via email to