http://git-wip-us.apache.org/repos/asf/ambari/blob/aaaa9884/ambari-server/src/main/resources/common-services/KAFKA/0.8.1/package/scripts/params.py
----------------------------------------------------------------------
diff --git 
a/ambari-server/src/main/resources/common-services/KAFKA/0.8.1/package/scripts/params.py
 
b/ambari-server/src/main/resources/common-services/KAFKA/0.8.1/package/scripts/params.py
new file mode 100644
index 0000000..12ccef6
--- /dev/null
+++ 
b/ambari-server/src/main/resources/common-services/KAFKA/0.8.1/package/scripts/params.py
@@ -0,0 +1,297 @@
+#!/usr/bin/env python
+"""
+Licensed to the Apache Software Foundation (ASF) under one
+or more contributor license agreements.  See the NOTICE file
+distributed with this work for additional information
+regarding copyright ownership.  The ASF licenses this file
+to you under the Apache License, Version 2.0 (the
+"License"); you may not use this file except in compliance
+with the License.  You may obtain a copy of the License at
+
+    http://www.apache.org/licenses/LICENSE-2.0
+
+Unless required by applicable law or agreed to in writing, software
+distributed under the License is distributed on an "AS IS" BASIS,
+WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+See the License for the specific language governing permissions and
+limitations under the License.
+
+"""
+import os
+from resource_management.libraries.functions import format
+from resource_management.libraries.script.script import Script
+from resource_management.libraries.functions.version import 
format_stack_version
+from resource_management.libraries.functions import StackFeature
+from resource_management.libraries.functions.stack_features import 
check_stack_feature
+from resource_management.libraries.functions.default import default
+from utils import get_bare_principal
+from resource_management.libraries.functions.get_stack_version import 
get_stack_version
+from resource_management.libraries.functions.is_empty import is_empty
+import status_params
+from resource_management.core.logger import Logger
+from resource_management.libraries.resources.hdfs_resource import HdfsResource
+from resource_management.libraries.functions import stack_select
+from resource_management.libraries.functions import conf_select
+from resource_management.libraries.functions import get_kinit_path
+from resource_management.libraries.functions.get_not_managed_resources import 
get_not_managed_resources
+
+# server configurations
+config = Script.get_config()
+tmp_dir = Script.get_tmp_dir()
+stack_root = Script.get_stack_root()
+stack_name = default("/hostLevelParams/stack_name", None)
+retryAble = default("/commandParams/command_retry_enabled", False)
+
+# Version being upgraded/downgraded to
+version = default("/commandParams/version", None)
+
+# Version that is CURRENT.
+current_version = default("/hostLevelParams/current_version", None)
+
+host_sys_prepped = default("/hostLevelParams/host_sys_prepped", False)
+
+stack_version_unformatted = config['hostLevelParams']['stack_version']
+stack_version_formatted = format_stack_version(stack_version_unformatted)
+upgrade_direction = default("/commandParams/upgrade_direction", None)
+stack_supports_ranger_kerberos = stack_version_formatted and 
check_stack_feature(StackFeature.RANGER_KERBEROS_SUPPORT, 
stack_version_formatted)
+
+# When downgrading the 'version' and 'current_version' are both pointing to 
the downgrade-target version
+# downgrade_from_version provides the source-version the downgrade is 
happening from
+downgrade_from_version = default("/commandParams/downgrade_from_version", None)
+
+hostname = config['hostname']
+
+# default kafka parameters
+kafka_home = '/usr/lib/kafka'
+kafka_bin = kafka_home+'/bin/kafka'
+conf_dir = "/etc/kafka/conf"
+limits_conf_dir = "/etc/security/limits.d"
+
+# Used while upgrading the stack in a kerberized cluster and running 
kafka-acls.sh
+zookeeper_connect = default("/configurations/kafka-broker/zookeeper.connect", 
None)
+
+kafka_user_nofile_limit = 
config['configurations']['kafka-env']['kafka_user_nofile_limit']
+kafka_user_nproc_limit = 
config['configurations']['kafka-env']['kafka_user_nproc_limit']
+
+# parameters for 2.2+
+if stack_version_formatted and 
check_stack_feature(StackFeature.ROLLING_UPGRADE, stack_version_formatted):
+  kafka_home = os.path.join(stack_root,  "current", "kafka-broker")
+  kafka_bin = os.path.join(kafka_home, "bin", "kafka")
+  conf_dir = os.path.join(kafka_home, "config")
+
+kafka_user = config['configurations']['kafka-env']['kafka_user']
+kafka_log_dir = config['configurations']['kafka-env']['kafka_log_dir']
+kafka_pid_dir = status_params.kafka_pid_dir
+kafka_pid_file = kafka_pid_dir+"/kafka.pid"
+# This is hardcoded on the kafka bash process lifecycle on which we have no 
control over
+kafka_managed_pid_dir = "/var/run/kafka"
+kafka_managed_log_dir = "/var/log/kafka"
+user_group = config['configurations']['cluster-env']['user_group']
+java64_home = config['hostLevelParams']['java_home']
+kafka_env_sh_template = config['configurations']['kafka-env']['content']
+kafka_hosts = config['clusterHostInfo']['kafka_broker_hosts']
+kafka_hosts.sort()
+
+zookeeper_hosts = config['clusterHostInfo']['zookeeper_hosts']
+zookeeper_hosts.sort()
+
+if (('kafka-log4j' in config['configurations']) and ('content' in 
config['configurations']['kafka-log4j'])):
+    log4j_props = config['configurations']['kafka-log4j']['content']
+else:
+    log4j_props = None
+
+if 'ganglia_server_host' in config['clusterHostInfo'] and \
+    len(config['clusterHostInfo']['ganglia_server_host'])>0:
+  ganglia_installed = True
+  ganglia_server = config['clusterHostInfo']['ganglia_server_host'][0]
+  ganglia_report_interval = 60
+else:
+  ganglia_installed = False
+
+metric_collector_host = ""
+metric_collector_port = ""
+metric_collector_protocol = ""
+metric_truststore_path= 
default("/configurations/ams-ssl-client/ssl.client.truststore.location", "")
+metric_truststore_type= 
default("/configurations/ams-ssl-client/ssl.client.truststore.type", "")
+metric_truststore_password= 
default("/configurations/ams-ssl-client/ssl.client.truststore.password", "")
+
+ams_collector_hosts = default("/clusterHostInfo/metrics_collector_hosts", [])
+has_metric_collector = not len(ams_collector_hosts) == 0
+
+if has_metric_collector:
+  if 'cluster-env' in config['configurations'] and \
+      'metrics_collector_vip_host' in config['configurations']['cluster-env']:
+    metric_collector_host = 
config['configurations']['cluster-env']['metrics_collector_vip_host']
+  else:
+    metric_collector_host = ams_collector_hosts[0]
+  if 'cluster-env' in config['configurations'] and \
+      'metrics_collector_vip_port' in config['configurations']['cluster-env']:
+    metric_collector_port = 
config['configurations']['cluster-env']['metrics_collector_vip_port']
+  else:
+    metric_collector_web_address = 
default("/configurations/ams-site/timeline.metrics.service.webapp.address", 
"localhost:6188")
+    if metric_collector_web_address.find(':') != -1:
+      metric_collector_port = metric_collector_web_address.split(':')[1]
+    else:
+      metric_collector_port = '6188'
+  if default("/configurations/ams-site/timeline.metrics.service.http.policy", 
"HTTP_ONLY") == "HTTPS_ONLY":
+    metric_collector_protocol = 'https'
+  else:
+    metric_collector_protocol = 'http'
+  pass
+# Security-related params
+security_enabled = config['configurations']['cluster-env']['security_enabled']
+kafka_kerberos_enabled = (('security.inter.broker.protocol' in 
config['configurations']['kafka-broker']) and
+                         
((config['configurations']['kafka-broker']['security.inter.broker.protocol'] == 
"PLAINTEXTSASL") or
+                          
(config['configurations']['kafka-broker']['security.inter.broker.protocol'] == 
"SASL_PLAINTEXT")))
+
+
+if security_enabled and stack_version_formatted != "" and 
'kafka_principal_name' in config['configurations']['kafka-env'] \
+  and check_stack_feature(StackFeature.KAFKA_KERBEROS, 
stack_version_formatted):
+    _hostname_lowercase = config['hostname'].lower()
+    _kafka_principal_name = 
config['configurations']['kafka-env']['kafka_principal_name']
+    kafka_jaas_principal = 
_kafka_principal_name.replace('_HOST',_hostname_lowercase)
+    kafka_keytab_path = config['configurations']['kafka-env']['kafka_keytab']
+    kafka_bare_jaas_principal = get_bare_principal(_kafka_principal_name)
+    kafka_kerberos_params = "-Djava.security.auth.login.config="+ conf_dir 
+"/kafka_jaas.conf"
+else:
+    kafka_kerberos_params = ''
+
+# ***********************  RANGER PLUGIN CHANGES ***********************
+# ranger host
+# **********************************************************************
+stack_supports_ranger_audit_db = stack_version_formatted and 
check_stack_feature(StackFeature.RANGER_AUDIT_DB_SUPPORT, 
stack_version_formatted)
+ranger_admin_hosts = default("/clusterHostInfo/ranger_admin_hosts", [])
+has_ranger_admin = not len(ranger_admin_hosts) == 0
+xml_configurations_supported = 
config['configurations']['ranger-env']['xml_configurations_supported']
+ambari_server_hostname = config['clusterHostInfo']['ambari_server_host'][0]
+
+ranger_admin_log_dir = 
default("/configurations/ranger-env/ranger_admin_log_dir","/var/log/ranger/admin")
+is_supported_kafka_ranger = 
config['configurations']['kafka-env']['is_supported_kafka_ranger']
+
+#ranger kafka properties
+if has_ranger_admin and is_supported_kafka_ranger:
+
+  enable_ranger_kafka = 
config['configurations']['ranger-kafka-plugin-properties']['ranger-kafka-plugin-enabled']
+  enable_ranger_kafka = not is_empty(enable_ranger_kafka) and 
enable_ranger_kafka.lower() == 'yes'
+  policymgr_mgr_url = 
config['configurations']['admin-properties']['policymgr_external_url']
+  xa_audit_db_flavor = 
config['configurations']['admin-properties']['DB_FLAVOR']
+  xa_audit_db_flavor = xa_audit_db_flavor.lower() if xa_audit_db_flavor else 
None
+  xa_audit_db_name = 
config['configurations']['admin-properties']['audit_db_name']
+  xa_audit_db_user = 
config['configurations']['admin-properties']['audit_db_user']
+  xa_audit_db_password = 
unicode(config['configurations']['admin-properties']['audit_db_password']) if 
stack_supports_ranger_audit_db else None
+  xa_db_host = config['configurations']['admin-properties']['db_host']
+  repo_name = str(config['clusterName']) + '_kafka'
+
+  ranger_env = config['configurations']['ranger-env']
+  ranger_plugin_properties = 
config['configurations']['ranger-kafka-plugin-properties']
+
+  ranger_kafka_audit = config['configurations']['ranger-kafka-audit']
+  ranger_kafka_audit_attrs = 
config['configuration_attributes']['ranger-kafka-audit']
+  ranger_kafka_security = config['configurations']['ranger-kafka-security']
+  ranger_kafka_security_attrs = 
config['configuration_attributes']['ranger-kafka-security']
+  ranger_kafka_policymgr_ssl = 
config['configurations']['ranger-kafka-policymgr-ssl']
+  ranger_kafka_policymgr_ssl_attrs = 
config['configuration_attributes']['ranger-kafka-policymgr-ssl']
+
+  policy_user = 
config['configurations']['ranger-kafka-plugin-properties']['policy_user']
+
+  ranger_plugin_config = {
+    'username' : 
config['configurations']['ranger-kafka-plugin-properties']['REPOSITORY_CONFIG_USERNAME'],
+    'password' : 
unicode(config['configurations']['ranger-kafka-plugin-properties']['REPOSITORY_CONFIG_PASSWORD']),
+    'zookeeper.connect' : 
config['configurations']['ranger-kafka-plugin-properties']['zookeeper.connect'],
+    'commonNameForCertificate' : 
config['configurations']['ranger-kafka-plugin-properties']['common.name.for.certificate']
+  }
+
+  kafka_ranger_plugin_repo = {
+    'isEnabled': 'true',
+    'configs': ranger_plugin_config,
+    'description': 'kafka repo',
+    'name': repo_name,
+    'repositoryType': 'kafka',
+    'type': 'kafka',
+    'assetType': '1'
+  }
+
+  if stack_supports_ranger_kerberos and security_enabled:
+    ranger_plugin_config['policy.download.auth.users'] = kafka_user
+    ranger_plugin_config['tag.download.auth.users'] = kafka_user
+
+
+  #For curl command in ranger plugin to get db connector
+  jdk_location = config['hostLevelParams']['jdk_location']
+  java_share_dir = '/usr/share/java'
+  if stack_supports_ranger_audit_db:
+    if xa_audit_db_flavor and xa_audit_db_flavor == 'mysql':
+      jdbc_jar_name = default("/hostLevelParams/custom_mysql_jdbc_name", None)
+      audit_jdbc_url = format('jdbc:mysql://{xa_db_host}/{xa_audit_db_name}')
+      jdbc_driver = "com.mysql.jdbc.Driver"
+    elif xa_audit_db_flavor and xa_audit_db_flavor == 'oracle':
+      jdbc_jar_name = default("/hostLevelParams/custom_oracle_jdbc_name", None)
+      colon_count = xa_db_host.count(':')
+      if colon_count == 2 or colon_count == 0:
+        audit_jdbc_url = format('jdbc:oracle:thin:@{xa_db_host}')
+      else:
+        audit_jdbc_url = format('jdbc:oracle:thin:@//{xa_db_host}')
+      jdbc_driver = "oracle.jdbc.OracleDriver"
+    elif xa_audit_db_flavor and xa_audit_db_flavor == 'postgres':
+      jdbc_jar_name = default("/hostLevelParams/custom_postgres_jdbc_name", 
None)
+      audit_jdbc_url = 
format('jdbc:postgresql://{xa_db_host}/{xa_audit_db_name}')
+      jdbc_driver = "org.postgresql.Driver"
+    elif xa_audit_db_flavor and xa_audit_db_flavor == 'mssql':
+      jdbc_jar_name = default("/hostLevelParams/custom_mssql_jdbc_name", None)
+      audit_jdbc_url = 
format('jdbc:sqlserver://{xa_db_host};databaseName={xa_audit_db_name}')
+      jdbc_driver = "com.microsoft.sqlserver.jdbc.SQLServerDriver"
+    elif xa_audit_db_flavor and xa_audit_db_flavor == 'sqla':
+      jdbc_jar_name = default("/hostLevelParams/custom_sqlanywhere_jdbc_name", 
None)
+      audit_jdbc_url = 
format('jdbc:sqlanywhere:database={xa_audit_db_name};host={xa_db_host}')
+      jdbc_driver = "sap.jdbc4.sqlanywhere.IDriver"
+
+  downloaded_custom_connector = format("{tmp_dir}/{jdbc_jar_name}") if 
stack_supports_ranger_audit_db else None
+  driver_curl_source = format("{jdk_location}/{jdbc_jar_name}") if 
stack_supports_ranger_audit_db else None
+  driver_curl_target = format("{kafka_home}/libs/{jdbc_jar_name}") if 
stack_supports_ranger_audit_db else None
+
+  xa_audit_db_is_enabled = False
+  ranger_audit_solr_urls = 
config['configurations']['ranger-admin-site']['ranger.audit.solr.urls']
+  if xml_configurations_supported and stack_supports_ranger_audit_db:
+    xa_audit_db_is_enabled = 
config['configurations']['ranger-kafka-audit']['xasecure.audit.destination.db']
+  xa_audit_hdfs_is_enabled = 
config['configurations']['ranger-kafka-audit']['xasecure.audit.destination.hdfs']
 if xml_configurations_supported else None
+  ssl_keystore_password = 
unicode(config['configurations']['ranger-kafka-policymgr-ssl']['xasecure.policymgr.clientssl.keystore.password'])
 if xml_configurations_supported else None
+  ssl_truststore_password = 
unicode(config['configurations']['ranger-kafka-policymgr-ssl']['xasecure.policymgr.clientssl.truststore.password'])
 if xml_configurations_supported else None
+  credential_file = format('/etc/ranger/{repo_name}/cred.jceks') if 
xml_configurations_supported else None
+
+  stack_version = get_stack_version('kafka-broker')
+  setup_ranger_env_sh_source = 
format('{stack_root}/{stack_version}/ranger-kafka-plugin/install/conf.templates/enable/kafka-ranger-env.sh')
+  setup_ranger_env_sh_target = format("{conf_dir}/kafka-ranger-env.sh")
+
+  #For SQLA explicitly disable audit to DB for Ranger
+  if xa_audit_db_flavor == 'sqla':
+    xa_audit_db_is_enabled = False
+
+namenode_hosts = default("/clusterHostInfo/namenode_host", [])
+has_namenode = not len(namenode_hosts) == 0
+
+hdfs_user = config['configurations']['hadoop-env']['hdfs_user'] if 
has_namenode else None
+hdfs_user_keytab = config['configurations']['hadoop-env']['hdfs_user_keytab'] 
if has_namenode else None
+hdfs_principal_name = 
config['configurations']['hadoop-env']['hdfs_principal_name'] if has_namenode 
else None
+hdfs_site = config['configurations']['hdfs-site'] if has_namenode else None
+default_fs = config['configurations']['core-site']['fs.defaultFS'] if 
has_namenode else None
+hadoop_bin_dir = stack_select.get_hadoop_dir("bin") if has_namenode else None
+hadoop_conf_dir = conf_select.get_hadoop_conf_dir() if has_namenode else None
+kinit_path_local = 
get_kinit_path(default('/configurations/kerberos-env/executable_search_paths', 
None))
+
+import functools
+#create partial functions with common arguments for every HdfsResource call
+#to create/delete hdfs directory/file/copyfromlocal we need to call 
params.HdfsResource in code
+HdfsResource = functools.partial(
+  HdfsResource,
+  user=hdfs_user,
+  hdfs_resource_ignore_file = 
"/var/lib/ambari-agent/data/.hdfs_resource_ignore",
+  security_enabled = security_enabled,
+  keytab = hdfs_user_keytab,
+  kinit_path_local = kinit_path_local,
+  hadoop_bin_dir = hadoop_bin_dir,
+  hadoop_conf_dir = hadoop_conf_dir,
+  principal_name = hdfs_principal_name,
+  hdfs_site = hdfs_site,
+  default_fs = default_fs,
+  immutable_paths = get_not_managed_resources()
+)

http://git-wip-us.apache.org/repos/asf/ambari/blob/aaaa9884/ambari-server/src/main/resources/common-services/KAFKA/0.8.1/package/scripts/service_check.py
----------------------------------------------------------------------
diff --git 
a/ambari-server/src/main/resources/common-services/KAFKA/0.8.1/package/scripts/service_check.py
 
b/ambari-server/src/main/resources/common-services/KAFKA/0.8.1/package/scripts/service_check.py
new file mode 100644
index 0000000..0f3a417
--- /dev/null
+++ 
b/ambari-server/src/main/resources/common-services/KAFKA/0.8.1/package/scripts/service_check.py
@@ -0,0 +1,65 @@
+#!/usr/bin/env python
+"""
+Licensed to the Apache Software Foundation (ASF) under one
+or more contributor license agreements.  See the NOTICE file
+distributed with this work for additional information
+regarding copyright ownership.  The ASF licenses this file
+to you under the Apache License, Version 2.0 (the
+"License"); you may not use this file except in compliance
+with the License.  You may obtain a copy of the License at
+
+    http://www.apache.org/licenses/LICENSE-2.0
+
+Unless required by applicable law or agreed to in writing, software
+distributed under the License is distributed on an "AS IS" BASIS,
+WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+See the License for the specific language governing permissions and
+limitations under the License.
+
+"""
+from resource_management.libraries.script.script import Script
+from resource_management.libraries.functions.validate import 
call_and_match_output
+from resource_management.libraries.functions.format import format
+from resource_management.core.logger import Logger
+from resource_management.core import sudo
+import subprocess
+
+class ServiceCheck(Script):
+  def service_check(self, env):
+    import params
+    env.set_params(params)
+
+    # TODO, Kafka Service check should be more robust , It should get all the 
broker_hosts
+    # Produce some messages and check if consumer reads same no.of messages.
+    
+    kafka_config = self.read_kafka_config()
+    topic = "ambari_kafka_service_check"
+    create_topic_cmd_created_output = "Created topic 
\"ambari_kafka_service_check\"."
+    create_topic_cmd_exists_output = "Topic \"ambari_kafka_service_check\" 
already exists."
+    source_cmd = format("source {conf_dir}/kafka-env.sh")
+    topic_exists_cmd = format("{kafka_home}/bin/kafka-topics.sh --zookeeper 
{kafka_config[zookeeper.connect]} --topic {topic} --list")
+    topic_exists_cmd_p = subprocess.Popen(topic_exists_cmd.split(" "), 
stdout=subprocess.PIPE, stderr=subprocess.PIPE)
+    topic_exists_cmd_out, topic_exists_cmd_err = 
topic_exists_cmd_p.communicate()
+    # run create topic command only if the topic doesn't exists
+    if topic not in topic_exists_cmd_out:
+      create_topic_cmd = format("{kafka_home}/bin/kafka-topics.sh --zookeeper 
{kafka_config[zookeeper.connect]} --create --topic {topic} --partitions 1 
--replication-factor 1")
+      command = source_cmd + " ; " + create_topic_cmd
+      Logger.info("Running kafka create topic command: %s" % command)
+      call_and_match_output(command, 
format("({create_topic_cmd_created_output})|({create_topic_cmd_exists_output})"),
 "Failed to check that topic exists", user=params.kafka_user)
+
+  def read_kafka_config(self):
+    import params
+    
+    kafka_config = {}
+    content = sudo.read_file(params.conf_dir + "/server.properties")
+    for line in content.splitlines():
+      if line.startswith("#") or not line.strip():
+        continue
+
+      key, value = line.split("=")
+      kafka_config[key] = value.replace("\n", "")
+    
+    return kafka_config
+
+if __name__ == "__main__":
+    ServiceCheck().execute()

http://git-wip-us.apache.org/repos/asf/ambari/blob/aaaa9884/ambari-server/src/main/resources/common-services/KAFKA/0.8.1/package/scripts/setup_ranger_kafka.py
----------------------------------------------------------------------
diff --git 
a/ambari-server/src/main/resources/common-services/KAFKA/0.8.1/package/scripts/setup_ranger_kafka.py
 
b/ambari-server/src/main/resources/common-services/KAFKA/0.8.1/package/scripts/setup_ranger_kafka.py
new file mode 100644
index 0000000..3a3ecfe
--- /dev/null
+++ 
b/ambari-server/src/main/resources/common-services/KAFKA/0.8.1/package/scripts/setup_ranger_kafka.py
@@ -0,0 +1,84 @@
+#!/usr/bin/env python
+"""
+Licensed to the Apache Software Foundation (ASF) under one
+or more contributor license agreements.  See the NOTICE file
+distributed with this work for additional information
+regarding copyright ownership.  The ASF licenses this file
+to you under the Apache License, Version 2.0 (the
+"License"); you may not use this file except in compliance
+with the License.  You may obtain a copy of the License at
+    http://www.apache.org/licenses/LICENSE-2.0
+Unless required by applicable law or agreed to in writing, software
+distributed under the License is distributed on an "AS IS" BASIS,
+WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+See the License for the specific language governing permissions and
+limitations under the License.
+"""
+from resource_management.core.logger import Logger
+from resource_management.core.resources import File, Execute
+from resource_management.libraries.functions.format import format
+
+def setup_ranger_kafka():
+  import params
+
+  if params.has_ranger_admin:
+
+    from resource_management.libraries.functions.setup_ranger_plugin_xml 
import setup_ranger_plugin
+
+    if params.retryAble:
+      Logger.info("Kafka: Setup ranger: command retry enables thus retrying if 
ranger admin is down !")
+    else:
+      Logger.info("Kafka: Setup ranger: command retry not enabled thus 
skipping if ranger admin is down !")
+
+    if params.xml_configurations_supported and params.enable_ranger_kafka and 
params.xa_audit_hdfs_is_enabled:
+      if params.has_namenode:
+        params.HdfsResource("/ranger/audit",
+                           type="directory",
+                           action="create_on_execute",
+                           owner=params.hdfs_user,
+                           group=params.hdfs_user,
+                           mode=0755,
+                           recursive_chmod=True
+        )
+        params.HdfsResource("/ranger/audit/kafka",
+                           type="directory",
+                           action="create_on_execute",
+                           owner=params.kafka_user,
+                           group=params.kafka_user,
+                           mode=0700,
+                           recursive_chmod=True
+        )
+        params.HdfsResource(None, action="execute")
+
+    setup_ranger_plugin('kafka-broker', 'kafka',
+                        params.downloaded_custom_connector, 
params.driver_curl_source,
+                        params.driver_curl_target, params.java64_home,
+                        params.repo_name, params.kafka_ranger_plugin_repo,
+                        params.ranger_env, params.ranger_plugin_properties,
+                        params.policy_user, params.policymgr_mgr_url,
+                        params.enable_ranger_kafka, conf_dict=params.conf_dir,
+                        component_user=params.kafka_user, 
component_group=params.user_group, cache_service_list=['kafka'],
+                        plugin_audit_properties=params.ranger_kafka_audit, 
plugin_audit_attributes=params.ranger_kafka_audit_attrs,
+                        
plugin_security_properties=params.ranger_kafka_security, 
plugin_security_attributes=params.ranger_kafka_security_attrs,
+                        
plugin_policymgr_ssl_properties=params.ranger_kafka_policymgr_ssl, 
plugin_policymgr_ssl_attributes=params.ranger_kafka_policymgr_ssl_attrs,
+                        component_list=['kafka-broker'], 
audit_db_is_enabled=params.xa_audit_db_is_enabled,
+                        credential_file=params.credential_file, 
xa_audit_db_password=params.xa_audit_db_password, 
+                        
ssl_truststore_password=params.ssl_truststore_password, 
ssl_keystore_password=params.ssl_keystore_password,
+                        api_version = 'v2', skip_if_rangeradmin_down= not 
params.retryAble,
+                        is_security_enabled = params.security_enabled,
+                        is_stack_supports_ranger_kerberos = 
params.stack_supports_ranger_kerberos,
+                        component_user_principal=params.kafka_jaas_principal 
if params.security_enabled else None,
+                        component_user_keytab=params.kafka_keytab_path if 
params.security_enabled else None)
+    
+    if params.enable_ranger_kafka: 
+      Execute(('cp', '--remove-destination', 
params.setup_ranger_env_sh_source, params.setup_ranger_env_sh_target),
+        not_if=format("test -f {setup_ranger_env_sh_target}"),
+        sudo=True
+      )
+      File(params.setup_ranger_env_sh_target,
+        owner = params.kafka_user,
+        group = params.user_group,
+        mode = 0755
+      )
+  else:
+    Logger.info('Ranger admin not installed')

http://git-wip-us.apache.org/repos/asf/ambari/blob/aaaa9884/ambari-server/src/main/resources/common-services/KAFKA/0.8.1/package/scripts/status_params.py
----------------------------------------------------------------------
diff --git 
a/ambari-server/src/main/resources/common-services/KAFKA/0.8.1/package/scripts/status_params.py
 
b/ambari-server/src/main/resources/common-services/KAFKA/0.8.1/package/scripts/status_params.py
new file mode 100644
index 0000000..57bdf5e
--- /dev/null
+++ 
b/ambari-server/src/main/resources/common-services/KAFKA/0.8.1/package/scripts/status_params.py
@@ -0,0 +1,26 @@
+#!/usr/bin/env python
+"""
+Licensed to the Apache Software Foundation (ASF) under one
+or more contributor license agreements.  See the NOTICE file
+distributed with this work for additional information
+regarding copyright ownership.  The ASF licenses this file
+to you under the Apache License, Version 2.0 (the
+"License"); you may not use this file except in compliance
+with the License.  You may obtain a copy of the License at
+
+    http://www.apache.org/licenses/LICENSE-2.0
+
+Unless required by applicable law or agreed to in writing, software
+distributed under the License is distributed on an "AS IS" BASIS,
+WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+See the License for the specific language governing permissions and
+limitations under the License.
+
+"""
+from resource_management.libraries.functions import format
+from resource_management.libraries.script.script import Script
+
+config = Script.get_config()
+
+kafka_pid_dir = config['configurations']['kafka-env']['kafka_pid_dir']
+kafka_pid_file = format("{kafka_pid_dir}/kafka.pid")

http://git-wip-us.apache.org/repos/asf/ambari/blob/aaaa9884/ambari-server/src/main/resources/common-services/KAFKA/0.8.1/package/scripts/upgrade.py
----------------------------------------------------------------------
diff --git 
a/ambari-server/src/main/resources/common-services/KAFKA/0.8.1/package/scripts/upgrade.py
 
b/ambari-server/src/main/resources/common-services/KAFKA/0.8.1/package/scripts/upgrade.py
new file mode 100644
index 0000000..b6e4046
--- /dev/null
+++ 
b/ambari-server/src/main/resources/common-services/KAFKA/0.8.1/package/scripts/upgrade.py
@@ -0,0 +1,78 @@
+
+#!/usr/bin/env python
+"""
+Licensed to the Apache Software Foundation (ASF) under one
+or more contributor license agreements.  See the NOTICE file
+distributed with this work for additional information
+regarding copyright ownership.  The ASF licenses this file
+to you under the Apache License, Version 2.0 (the
+"License"); you may not use this file except in compliance
+with the License.  You may obtain a copy of the License at
+
+    http://www.apache.org/licenses/LICENSE-2.0
+
+Unless required by applicable law or agreed to in writing, software
+distributed under the License is distributed on an "AS IS" BASIS,
+WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+See the License for the specific language governing permissions and
+limitations under the License.
+
+"""
+import os
+
+
+from resource_management.core.resources.system import Execute
+from resource_management.libraries.functions import format
+from resource_management.libraries.functions import Direction
+from resource_management.core.exceptions import Fail
+from resource_management.core.logger import Logger
+
+def run_migration(env, upgrade_type):
+  """
+  If the acl migration script is present, then run it for either upgrade or 
downgrade.
+  That script was introduced in HDP 2.3.4.0 and requires stopping all Kafka 
brokers first.
+  Requires configs to be present.
+  :param env: Environment.
+  :param upgrade_type: "rolling" or "nonrolling
+  """
+  import params
+
+  if upgrade_type is None:
+    raise Fail('Parameter "upgrade_type" is missing.')
+
+  if params.upgrade_direction is None:
+    raise Fail('Parameter "upgrade_direction" is missing.')
+
+  if params.upgrade_direction == Direction.DOWNGRADE and 
params.downgrade_from_version is None:
+    raise Fail('Parameter "downgrade_from_version" is missing.')
+
+  if not params.security_enabled:
+    Logger.info("Skip running the Kafka ACL migration script since cluster 
security is not enabled.")
+    return
+  
+  Logger.info("Upgrade type: {0}, direction: {1}".format(str(upgrade_type), 
params.upgrade_direction))
+
+  # If the schema upgrade script exists in the version upgrading to, then 
attempt to upgrade/downgrade it while still using the present bits.
+  kafka_acls_script = None
+  command_suffix = ""
+  if params.upgrade_direction == Direction.UPGRADE:
+    kafka_acls_script = 
format("{stack_root}/{version}/kafka/bin/kafka-acls.sh")
+    command_suffix = "--upgradeAcls"
+  elif params.upgrade_direction == Direction.DOWNGRADE:
+    kafka_acls_script = 
format("{stack_root}/{downgrade_from_version}/kafka/bin/kafka-acls.sh")
+    command_suffix = "--downgradeAcls"
+
+  if kafka_acls_script is not None:
+    if os.path.exists(kafka_acls_script):
+      Logger.info("Found Kafka acls script: {0}".format(kafka_acls_script))
+      if params.zookeeper_connect is None:
+        raise Fail("Could not retrieve property 
kafka-broker/zookeeper.connect")
+
+      acls_command = "{0} --authorizer kafka.security.auth.SimpleAclAuthorizer 
--authorizer-properties zookeeper.connect={1} {2}".\
+        format(kafka_acls_script, params.zookeeper_connect, command_suffix)
+
+      Execute(acls_command,
+              user=params.kafka_user,
+              logoutput=True)
+    else:
+      Logger.info("Did not find Kafka acls script: 
{0}".format(kafka_acls_script))

http://git-wip-us.apache.org/repos/asf/ambari/blob/aaaa9884/ambari-server/src/main/resources/common-services/KAFKA/0.8.1/package/scripts/utils.py
----------------------------------------------------------------------
diff --git 
a/ambari-server/src/main/resources/common-services/KAFKA/0.8.1/package/scripts/utils.py
 
b/ambari-server/src/main/resources/common-services/KAFKA/0.8.1/package/scripts/utils.py
new file mode 100644
index 0000000..2f1fa5e
--- /dev/null
+++ 
b/ambari-server/src/main/resources/common-services/KAFKA/0.8.1/package/scripts/utils.py
@@ -0,0 +1,38 @@
+#!/usr/bin/env python
+"""
+Licensed to the Apache Software Foundation (ASF) under one
+or more contributor license agreements.  See the NOTICE file
+distributed with this work for additional information
+regarding copyright ownership.  The ASF licenses this file
+to you under the Apache License, Version 2.0 (the
+"License"); you may not use this file except in compliance
+with the License.  You may obtain a copy of the License at
+
+    http://www.apache.org/licenses/LICENSE-2.0
+
+Unless required by applicable law or agreed to in writing, software
+distributed under the License is distributed on an "AS IS" BASIS,
+WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+See the License for the specific language governing permissions and
+limitations under the License.
+
+"""
+import re
+
+def get_bare_principal(normalized_principal_name):
+    """
+    Given a normalized principal name 
(nimbus/[email protected]) returns just the
+    primary component (nimbus)
+    :param normalized_principal_name: a string containing the principal name 
to process
+    :return: a string containing the primary component value or None if not 
valid
+    """
+
+    bare_principal = None
+
+    if normalized_principal_name:
+        match = re.match(r"([^/@]+)(?:/[^@])?(?:@.*)?", 
normalized_principal_name)
+
+    if match:
+        bare_principal = match.group(1)
+
+    return bare_principal

http://git-wip-us.apache.org/repos/asf/ambari/blob/aaaa9884/ambari-server/src/main/resources/common-services/KAFKA/0.8.1/package/templates/kafka.conf.j2
----------------------------------------------------------------------
diff --git 
a/ambari-server/src/main/resources/common-services/KAFKA/0.8.1/package/templates/kafka.conf.j2
 
b/ambari-server/src/main/resources/common-services/KAFKA/0.8.1/package/templates/kafka.conf.j2
new file mode 100644
index 0000000..9e18e1d
--- /dev/null
+++ 
b/ambari-server/src/main/resources/common-services/KAFKA/0.8.1/package/templates/kafka.conf.j2
@@ -0,0 +1,35 @@
+{#
+# Licensed to the Apache Software Foundation (ASF) under one
+# or more contributor license agreements.  See the NOTICE file
+# distributed with this work for additional information
+# regarding copyright ownership.  The ASF licenses this file
+# to you under the Apache License, Version 2.0 (the
+# "License"); you may not use this file except in compliance
+# with the License.  You may obtain a copy of the License at
+#
+#   http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing, software
+# distributed under the License is distributed on an "AS IS" BASIS,
+# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+# See the License for the specific language governing permissions and
+# limitations under the License.
+#}
+
+# Licensed to the Apache Software Foundation (ASF) under one or more
+# contributor license agreements.  See the NOTICE file distributed with
+# this work for additional information regarding copyright ownership.
+# The ASF licenses this file to You under the Apache License, Version 2.0
+# (the "License"); you may not use this file except in compliance with
+# the License.  You may obtain a copy of the License at
+#
+#     http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing, software
+# distributed under the License is distributed on an "AS IS" BASIS,
+# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+# See the License for the specific language governing permissions and
+# limitations under the License.
+
+{{kafka_user}}   - nofile   {{kafka_user_nofile_limit}}
+{{kafka_user}}   - nproc    {{kafka_user_nproc_limit}}

http://git-wip-us.apache.org/repos/asf/ambari/blob/aaaa9884/ambari-server/src/main/resources/common-services/KAFKA/0.8.1/package/templates/kafka_client_jaas.conf.j2
----------------------------------------------------------------------
diff --git 
a/ambari-server/src/main/resources/common-services/KAFKA/0.8.1/package/templates/kafka_client_jaas.conf.j2
 
b/ambari-server/src/main/resources/common-services/KAFKA/0.8.1/package/templates/kafka_client_jaas.conf.j2
new file mode 100644
index 0000000..7f81d85
--- /dev/null
+++ 
b/ambari-server/src/main/resources/common-services/KAFKA/0.8.1/package/templates/kafka_client_jaas.conf.j2
@@ -0,0 +1,29 @@
+{#
+# Licensed to the Apache Software Foundation (ASF) under one
+# or more contributor license agreements.  See the NOTICE file
+# distributed with this work for additional information
+# regarding copyright ownership.  The ASF licenses this file
+# to you under the Apache License, Version 2.0 (the
+# "License"); you may not use this file except in compliance
+# with the License.  You may obtain a copy of the License at
+#
+#   http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing, software
+# distributed under the License is distributed on an "AS IS" BASIS,
+# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+# See the License for the specific language governing permissions and
+# limitations under the License.
+#}
+KafkaClient {
+   com.sun.security.auth.module.Krb5LoginModule required
+   useTicketCache=true
+   renewTicket=true
+   serviceName="{{kafka_bare_jaas_principal}}";
+};
+Client {
+   com.sun.security.auth.module.Krb5LoginModule required
+   useTicketCache=true
+   renewTicket=true
+   serviceName="zookeeper";
+};

http://git-wip-us.apache.org/repos/asf/ambari/blob/aaaa9884/ambari-server/src/main/resources/common-services/KAFKA/0.8.1/package/templates/kafka_jaas.conf.j2
----------------------------------------------------------------------
diff --git 
a/ambari-server/src/main/resources/common-services/KAFKA/0.8.1/package/templates/kafka_jaas.conf.j2
 
b/ambari-server/src/main/resources/common-services/KAFKA/0.8.1/package/templates/kafka_jaas.conf.j2
new file mode 100644
index 0000000..56c558d
--- /dev/null
+++ 
b/ambari-server/src/main/resources/common-services/KAFKA/0.8.1/package/templates/kafka_jaas.conf.j2
@@ -0,0 +1,41 @@
+{#
+# Licensed to the Apache Software Foundation (ASF) under one
+# or more contributor license agreements.  See the NOTICE file
+# distributed with this work for additional information
+# regarding copyright ownership.  The ASF licenses this file
+# to you under the Apache License, Version 2.0 (the
+# "License"); you may not use this file except in compliance
+# with the License.  You may obtain a copy of the License at
+#
+#   http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing, software
+# distributed under the License is distributed on an "AS IS" BASIS,
+# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+# See the License for the specific language governing permissions and
+# limitations under the License.
+#}
+KafkaServer {
+   com.sun.security.auth.module.Krb5LoginModule required
+   useKeyTab=true
+   keyTab="{{kafka_keytab_path}}"
+   storeKey=true
+   useTicketCache=false
+   serviceName="{{kafka_bare_jaas_principal}}"
+   principal="{{kafka_jaas_principal}}";
+};
+KafkaClient {
+   com.sun.security.auth.module.Krb5LoginModule required
+   useTicketCache=true
+   renewTicket=true
+   serviceName="{{kafka_bare_jaas_principal}}";
+};
+Client {
+   com.sun.security.auth.module.Krb5LoginModule required
+   useKeyTab=true
+   keyTab="{{kafka_keytab_path}}"
+   storeKey=true
+   useTicketCache=false
+   serviceName="zookeeper"
+   principal="{{kafka_jaas_principal}}";
+};

http://git-wip-us.apache.org/repos/asf/ambari/blob/aaaa9884/ambari-server/src/main/resources/common-services/KAFKA/0.8.1/package/templates/tools-log4j.properties.j2
----------------------------------------------------------------------
diff --git 
a/ambari-server/src/main/resources/common-services/KAFKA/0.8.1/package/templates/tools-log4j.properties.j2
 
b/ambari-server/src/main/resources/common-services/KAFKA/0.8.1/package/templates/tools-log4j.properties.j2
new file mode 100644
index 0000000..c4ad326
--- /dev/null
+++ 
b/ambari-server/src/main/resources/common-services/KAFKA/0.8.1/package/templates/tools-log4j.properties.j2
@@ -0,0 +1,21 @@
+# Licensed to the Apache Software Foundation (ASF) under one or more
+# contributor license agreements.  See the NOTICE file distributed with
+# this work for additional information regarding copyright ownership.
+# The ASF licenses this file to You under the Apache License, Version 2.0
+# (the "License"); you may not use this file except in compliance with
+# the License.  You may obtain a copy of the License at
+#
+#    http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing, software
+# distributed under the License is distributed on an "AS IS" BASIS,
+# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+# See the License for the specific language governing permissions and
+# limitations under the License.
+
+log4j.rootLogger=WARN, stderr
+
+log4j.appender.stderr=org.apache.log4j.ConsoleAppender
+log4j.appender.stderr.layout=org.apache.log4j.PatternLayout
+log4j.appender.stderr.layout.ConversionPattern=[%d] %p %m (%c)%n
+log4j.appender.stderr.Target=System.err
\ No newline at end of file

http://git-wip-us.apache.org/repos/asf/ambari/blob/aaaa9884/ambari-server/src/main/resources/common-services/KAFKA/0.9.0/alerts.json
----------------------------------------------------------------------
diff --git 
a/ambari-server/src/main/resources/common-services/KAFKA/0.9.0/alerts.json 
b/ambari-server/src/main/resources/common-services/KAFKA/0.9.0/alerts.json
new file mode 100644
index 0000000..04fb583
--- /dev/null
+++ b/ambari-server/src/main/resources/common-services/KAFKA/0.9.0/alerts.json
@@ -0,0 +1,32 @@
+{
+  "KAFKA": {
+    "service": [],
+    "KAFKA_BROKER": [
+      {
+        "name": "kafka_broker_process",
+        "label": "Kafka Broker Process",
+        "description": "This host-level alert is triggered if the Kafka Broker 
cannot be determined to be up.",
+        "interval": 1,
+        "scope": "HOST",
+        "source": {
+          "type": "PORT",
+          "uri": "{{kafka-broker/listeners}}",
+          "default_port": 6667,
+          "reporting": {
+            "ok": {
+              "text": "TCP OK - {0:.3f}s response on port {1}"
+            },
+            "warning": {
+              "text": "TCP OK - {0:.3f}s response on port {1}",
+              "value": 1.5
+            },
+            "critical": {
+              "text": "Connection failed: {0} to {1}:{2}",
+              "value": 5.0
+            }
+          }
+        }
+      }
+    ]
+  }
+}

http://git-wip-us.apache.org/repos/asf/ambari/blob/aaaa9884/ambari-server/src/main/resources/common-services/KAFKA/0.9.0/configuration/kafka-broker.xml
----------------------------------------------------------------------
diff --git 
a/ambari-server/src/main/resources/common-services/KAFKA/0.9.0/configuration/kafka-broker.xml
 
b/ambari-server/src/main/resources/common-services/KAFKA/0.9.0/configuration/kafka-broker.xml
new file mode 100644
index 0000000..8802f13
--- /dev/null
+++ 
b/ambari-server/src/main/resources/common-services/KAFKA/0.9.0/configuration/kafka-broker.xml
@@ -0,0 +1,154 @@
+<?xml version="1.0"?>
+<!--
+ censed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+-->
+
+<configuration supports_final="true" 
xmlns:xi="http://www.w3.org/2001/XInclude";>
+  <property>
+    <name>listeners</name>
+    <value>PLAINTEXT://localhost:6667</value>
+    <property-type>DONT_ADD_ON_UPGRADE</property-type>
+    <description>host and port where kafka broker will be accepting 
connections. localhost will be substituted with hostname.</description>
+  </property>
+  <property>
+    <name>controlled.shutdown.enable</name>
+    <value>true</value>
+    <description>Enable controlled shutdown of the broker. If enabled, the 
broker will move all leaders on it to some other brokers before shutting itself 
down. This reduces the unavailability window during shutdown.</description>
+  </property>
+  <property>
+    <name>auto.leader.rebalance.enable</name>
+    <value>true</value>
+    <description>Enables auto leader balancing. A background thread checks and 
triggers leader balance if required at regular intervals</description>
+  </property>
+  <property>
+    <name>num.recovery.threads.per.data.dir</name>
+    <value>1</value>
+    <description>The number of threads per data directory to be used for log 
recovery at startup and flushing at shutdown</description>
+  </property>
+  <property>
+    <name>min.insync.replicas</name>
+    <value>1</value>
+    <description>define the minimum number of replicas in ISR needed to 
satisfy a produce request with required.acks=-1 (or all)</description>
+  </property>
+  <property>
+    <name>leader.imbalance.per.broker.percentage</name>
+    <value>10</value>
+    <description>The ratio of leader imbalance allowed per broker. The 
controller would trigger a leader balance if it goes above this value per 
broker. The value is specified in percentage.</description>
+  </property>
+  <property>
+    <name>leader.imbalance.check.interval.seconds</name>
+    <value>300</value>
+    <description>The frequency with which the partition rebalance check is 
triggered by the controller</description>
+  </property>
+  <property>
+    <name>offset.metadata.max.bytes</name>
+    <value>4096</value>
+    <description>The maximum size for a metadata entry associated with an 
offset commit</description>
+  </property>
+  <property>
+    <name>offsets.load.buffer.size</name>
+    <value>5242880</value>
+    <description>Batch size for reading from the offsets segments when loading 
offsets into the cache.</description>
+  </property>
+  <property>
+    <name>offsets.topic.replication.factor</name>
+    <value>3</value>
+    <description>The replication factor for the offsets topic (set higher to 
ensure availability).
+    To ensure that the effective replication factor of the offsets topic is 
the configured value,
+    the number of alive brokers has to be at least the replication factor at 
the time of the
+    first request for the offsets topic. If not, either the offsets topic 
creation will fail or it will get a replication factor of min(alive brokers, 
configured replication factor).</description>
+  </property>
+  <property>
+    <name>offsets.topic.num.partitions</name>
+    <value>50</value>
+    <description>The number of partitions for the offset commit topic (should 
not change after deployment)</description>
+  </property>
+  <property>
+    <name>offsets.topic.segment.bytes</name>
+    <value>104857600</value>
+    <description>The offsets topic segment bytes should be kept relatively 
small in order to facilitate faster log compaction and cache loads</description>
+  </property>
+  <property>
+    <name>offsets.topic.compression.codec</name>
+    <value>0</value>
+    <description>Compression codec for the offsets topic - compression may be 
used to achieve \"atomic\" commits. Default is NoCompression. For Gzip add 
value 1 , SnappyCompression add value 2, LZ4CompressionCodec 3.
+    </description>
+  </property>
+  <property>
+    <name>offsets.retention.minutes</name>
+    <value>86400000</value>
+    <description>Log retention window in minutes for offsets 
topic</description>
+  </property>
+  <property>
+    <name>offsets.retention.check.interval.ms</name>
+    <value>600000</value>
+    <description>Frequency at which to check for stale offsets</description>
+  </property>
+  <property>
+    <name>offsets.commit.timeout.ms</name>
+    <value>5000</value>
+    <description>Offset commit will be delayed until all replicas for the 
offsets topic receive the commit or this timeout is reached. This is similar to 
the producer request timeout.</description>
+  </property>
+  <property>
+    <name>offsets.commit.required.acks</name>
+    <value>-1</value>
+    <description>The required acks before the commit can be accepted. In 
general, the default (-1) should not be overridden</description>
+  </property>
+  <property>
+    <name>delete.topic.enable</name>
+    <value>false</value>
+    <description>Enables delete topic. Delete topic through the admin tool 
will have no effect if this config is turned off</description>
+  </property>
+  <property>
+    <name>compression.type</name>
+    <description>Specify the final compression type for a given topic. This 
configuration accepts the standard compression codecs ('gzip', 'snappy', lz4). 
It additionally accepts 'uncompressed' which is equivalent to no compression; 
and 'producer' which means retain the original compression codec set by the 
producer.</description>
+    <value>producer</value>
+  </property>
+  <property>
+    <name>port</name>
+    <value>6667</value>
+    <description>Deprecated config in favor of listeners config.</description>
+    <deleted>true</deleted>
+  </property>
+  <property>
+    <name>external.kafka.metrics.exclude.prefix</name>
+    
<value>kafka.network.RequestMetrics,kafka.server.DelayedOperationPurgatory,kafka.server.BrokerTopicMetrics.BytesRejectedPerSec</value>
+    <description>
+      Exclude metrics starting with these prefixes from being collected.
+    </description>
+  </property>
+  <property>
+    <name>external.kafka.metrics.include.prefix</name>
+    
<value>kafka.network.RequestMetrics.ResponseQueueTimeMs.request.OffsetCommit.98percentile,kafka.network.RequestMetrics.ResponseQueueTimeMs.request.Offsets.95percentile,kafka.network.RequestMetrics.ResponseSendTimeMs.request.Fetch.95percentile,kafka.network.RequestMetrics.RequestsPerSec.request</value>
+    <description>
+      These metrics would be included even if the exclude prefix omits them.
+    </description>
+  </property>
+  <property>
+    <name>authorizer.class.name</name>
+    <description>
+      Kafka authorizer class
+    </description>
+    <depends-on>
+      <property>
+        <type>ranger-kafka-plugin-properties</type>
+        <name>ranger-kafka-plugin-enabled</name>
+      </property>
+    </depends-on>
+  </property>
+</configuration>

http://git-wip-us.apache.org/repos/asf/ambari/blob/aaaa9884/ambari-server/src/main/resources/common-services/KAFKA/0.9.0/configuration/kafka-env.xml
----------------------------------------------------------------------
diff --git 
a/ambari-server/src/main/resources/common-services/KAFKA/0.9.0/configuration/kafka-env.xml
 
b/ambari-server/src/main/resources/common-services/KAFKA/0.9.0/configuration/kafka-env.xml
new file mode 100644
index 0000000..c17793f
--- /dev/null
+++ 
b/ambari-server/src/main/resources/common-services/KAFKA/0.9.0/configuration/kafka-env.xml
@@ -0,0 +1,28 @@
+<?xml version="1.0"?>
+<?xml-stylesheet type="text/xsl" href="configuration.xsl"?>
+<!--
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+-->
+
+<configuration supports_adding_forbidden="true">
+  <property>
+    <name>is_supported_kafka_ranger</name>
+    <value>true</value>
+  </property>
+</configuration>

http://git-wip-us.apache.org/repos/asf/ambari/blob/aaaa9884/ambari-server/src/main/resources/common-services/KAFKA/0.9.0/configuration/ranger-kafka-audit.xml
----------------------------------------------------------------------
diff --git 
a/ambari-server/src/main/resources/common-services/KAFKA/0.9.0/configuration/ranger-kafka-audit.xml
 
b/ambari-server/src/main/resources/common-services/KAFKA/0.9.0/configuration/ranger-kafka-audit.xml
new file mode 100644
index 0000000..331daba
--- /dev/null
+++ 
b/ambari-server/src/main/resources/common-services/KAFKA/0.9.0/configuration/ranger-kafka-audit.xml
@@ -0,0 +1,178 @@
+<?xml version="1.0"?>
+<!--
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+-->
+<configuration>
+
+  <property>
+    <name>xasecure.audit.is.enabled</name>
+    <value>true</value>
+    <description>Is Audit enabled?</description>
+  </property>
+
+  <property>
+    <name>xasecure.audit.destination.db</name>
+    <value>false</value>
+    <display-name>Audit to DB</display-name>
+    <description>Is Audit to DB enabled?</description>
+    <value-attributes>
+      <type>boolean</type>
+    </value-attributes>
+    <depends-on>
+      <property>
+        <type>ranger-env</type>
+        <name>xasecure.audit.destination.db</name>
+      </property>
+    </depends-on>
+  </property>
+
+  <property>
+    <name>xasecure.audit.destination.db.jdbc.url</name>
+    <value>{{audit_jdbc_url}}</value>
+    <description>Audit DB JDBC URL</description>
+  </property>
+
+  <property>
+    <name>xasecure.audit.destination.db.user</name>
+    <value>{{xa_audit_db_user}}</value>
+    <description>Audit DB JDBC User</description>
+  </property>
+
+  <property>
+    <name>xasecure.audit.destination.db.password</name>
+    <value>crypted</value>
+    <property-type>PASSWORD</property-type>
+    <description>Audit DB JDBC Password</description>
+    <value-attributes>
+      <type>password</type>
+    </value-attributes>
+  </property>
+
+  <property>
+    <name>xasecure.audit.destination.db.jdbc.driver</name>
+    <value>{{jdbc_driver}}</value>
+    <description>Audit DB JDBC Driver</description>
+  </property>
+
+  <property>
+    <name>xasecure.audit.credential.provider.file</name>
+    <value>jceks://file{{credential_file}}</value>
+    <description>Credential file store</description>
+  </property>
+
+  <property>
+    <name>xasecure.audit.destination.db.batch.filespool.dir</name>
+    <value>/var/log/kafka/audit/db/spool</value>
+    <description>/var/log/kafka/audit/db/spool</description>
+  </property>  
+
+  <property>
+    <name>xasecure.audit.destination.hdfs</name>
+    <value>true</value>
+    <display-name>Audit to HDFS</display-name>
+    <description>Is Audit to HDFS enabled?</description>
+    <value-attributes>
+      <type>boolean</type>
+    </value-attributes>
+    <depends-on>
+      <property>
+        <type>ranger-env</type>
+        <name>xasecure.audit.destination.hdfs</name>
+      </property>
+    </depends-on>
+  </property>
+
+  <property>
+    <name>xasecure.audit.destination.hdfs.dir</name>
+    <value>hdfs://NAMENODE_HOSTNAME:8020/ranger/audit</value>
+    <description>HDFS folder to write audit to, make sure the service user has 
requried permissions</description>
+    <depends-on>
+      <property>
+        <type>ranger-env</type>
+        <name>xasecure.audit.destination.hdfs.dir</name>
+      </property>
+    </depends-on>
+  </property>
+
+  <property>
+    <name>xasecure.audit.destination.hdfs.batch.filespool.dir</name>
+    <value>/var/log/kafka/audit/hdfs/spool</value>
+    <description>/var/log/kafka/audit/hdfs/spool</description>
+  </property>
+
+  <property>
+    <name>xasecure.audit.destination.solr</name>
+    <value>false</value>
+    <display-name>Audit to SOLR</display-name>
+    <description>Is Solr audit enabled?</description>
+    <value-attributes>
+      <type>boolean</type>
+    </value-attributes>
+    <depends-on>
+      <property>
+        <type>ranger-env</type>
+        <name>xasecure.audit.destination.solr</name>
+      </property>
+    </depends-on>
+  </property>
+
+  <property>
+    <name>xasecure.audit.destination.solr.urls</name>
+    <value></value>
+    <description>Solr URL</description>
+    <value-attributes>
+      <empty-value-valid>true</empty-value-valid>
+    </value-attributes>
+    <depends-on>
+      <property>
+        <type>ranger-admin-site</type>
+        <name>ranger.audit.solr.urls</name>
+      </property>
+    </depends-on>
+  </property>
+
+  <property>
+    <name>xasecure.audit.destination.solr.zookeepers</name>
+    <value>NONE</value>
+    <description>Solr Zookeeper string</description>
+    <depends-on>
+      <property>
+        <type>ranger-admin-site</type>
+        <name>ranger.audit.solr.zookeepers</name>
+      </property>
+    </depends-on>
+  </property>
+
+  <property>
+    <name>xasecure.audit.destination.solr.batch.filespool.dir</name>
+    <value>/var/log/kafka/audit/solr/spool</value>
+    <description>/var/log/kafka/audit/solr/spool</description>
+  </property>
+
+  <property>
+    <name>xasecure.audit.provider.summary.enabled</name>
+    <value>true</value>
+    <display-name>Audit provider summary enabled</display-name>
+    <description>Enable Summary audit?</description>
+    <value-attributes>
+      <type>boolean</type>
+    </value-attributes>
+  </property>  
+
+</configuration>

http://git-wip-us.apache.org/repos/asf/ambari/blob/aaaa9884/ambari-server/src/main/resources/common-services/KAFKA/0.9.0/configuration/ranger-kafka-plugin-properties.xml
----------------------------------------------------------------------
diff --git 
a/ambari-server/src/main/resources/common-services/KAFKA/0.9.0/configuration/ranger-kafka-plugin-properties.xml
 
b/ambari-server/src/main/resources/common-services/KAFKA/0.9.0/configuration/ranger-kafka-plugin-properties.xml
new file mode 100644
index 0000000..18429a1
--- /dev/null
+++ 
b/ambari-server/src/main/resources/common-services/KAFKA/0.9.0/configuration/ranger-kafka-plugin-properties.xml
@@ -0,0 +1,90 @@
+<?xml version="1.0" encoding="UTF-8"?>
+<!--
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+-->
+<configuration supports_final="true">
+
+  <property>
+    <name>policy_user</name>
+    <value>ambari-qa</value>
+    <display-name>Policy user for KAFKA</display-name>
+    <description>This user must be system user and also present at Ranger 
admin portal</description>
+  </property> 
+
+  <property>
+    <name>hadoop.rpc.protection</name>
+    <value></value>
+    <description>Used for repository creation on ranger admin</description>
+    <value-attributes>
+      <empty-value-valid>true</empty-value-valid>
+    </value-attributes>
+  </property>
+
+  <property>
+    <name>common.name.for.certificate</name>
+    <value></value>
+    <description>Common name for certificate, this value should match what is 
specified in repo within ranger admin</description>
+    <value-attributes>
+      <empty-value-valid>true</empty-value-valid>
+    </value-attributes>
+  </property>
+  
+  <property>
+    <name>zookeeper.connect</name>
+    <value>localhost:2181</value>
+    <property-type>DONT_ADD_ON_UPGRADE</property-type>
+    <description>Used for repository creation on ranger admin</description>
+  </property>
+
+  <property>
+    <name>ranger-kafka-plugin-enabled</name>
+    <value>No</value>
+    <display-name>Enable Ranger for KAFKA</display-name>
+    <description>Enable ranger kafka plugin</description>
+    <depends-on>
+      <property>
+        <type>ranger-env</type>
+        <name>ranger-kafka-plugin-enabled</name>
+      </property>
+    </depends-on>
+    <value-attributes>
+      <type>boolean</type>
+      <overridable>false</overridable>
+    </value-attributes>
+  </property>
+
+  <property>
+    <name>REPOSITORY_CONFIG_USERNAME</name>
+    <value>kafka</value>
+    <display-name>Ranger repository config user</display-name>
+    <description>Used for repository creation on ranger admin</description>
+  </property>
+
+  <property>
+    <name>REPOSITORY_CONFIG_PASSWORD</name>
+    <value>kafka</value>
+    <property-type>PASSWORD</property-type>
+    <display-name>Ranger repository config password</display-name>
+    <description>Used for repository creation on ranger admin</description>
+    <value-attributes>
+      <type>password</type>
+    </value-attributes>
+  </property>
+
+</configuration>

http://git-wip-us.apache.org/repos/asf/ambari/blob/aaaa9884/ambari-server/src/main/resources/common-services/KAFKA/0.9.0/configuration/ranger-kafka-policymgr-ssl.xml
----------------------------------------------------------------------
diff --git 
a/ambari-server/src/main/resources/common-services/KAFKA/0.9.0/configuration/ranger-kafka-policymgr-ssl.xml
 
b/ambari-server/src/main/resources/common-services/KAFKA/0.9.0/configuration/ranger-kafka-policymgr-ssl.xml
new file mode 100644
index 0000000..e1977c9
--- /dev/null
+++ 
b/ambari-server/src/main/resources/common-services/KAFKA/0.9.0/configuration/ranger-kafka-policymgr-ssl.xml
@@ -0,0 +1,67 @@
+<?xml version="1.0"?>
+<!--
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+-->
+<configuration>
+  
+  <property>
+    <name>xasecure.policymgr.clientssl.keystore</name>
+    <value>kafkadev-clientcert.jks</value>
+    <description>Java Keystore files</description>
+  </property>
+
+  <property>
+    <name>xasecure.policymgr.clientssl.keystore.password</name>
+    <value>myKeyFilePassword</value>
+    <property-type>PASSWORD</property-type>
+    <description>password for keystore</description>
+    <value-attributes>
+      <type>password</type>
+    </value-attributes>
+  </property>
+
+  <property>
+    <name>xasecure.policymgr.clientssl.truststore</name>
+    <value>cacerts-xasecure.jks</value>
+    <description>java truststore file</description>
+  </property>
+
+  <property>
+    <name>xasecure.policymgr.clientssl.truststore.password</name>
+    <value>changeit</value>
+    <property-type>PASSWORD</property-type>
+    <description>java truststore password</description>
+    <value-attributes>
+      <type>password</type>
+    </value-attributes>
+  </property>
+
+    <property>
+    <name>xasecure.policymgr.clientssl.keystore.credential.file</name>
+    <value>jceks://file/{{credential_file}}</value>
+    <description>java keystore credential file</description>
+  </property>
+
+  <property>
+    <name>xasecure.policymgr.clientssl.truststore.credential.file</name>
+    <value>jceks://file/{{credential_file}}</value>
+    <description>java truststore credential file</description>
+  </property>
+
+</configuration>
\ No newline at end of file

http://git-wip-us.apache.org/repos/asf/ambari/blob/aaaa9884/ambari-server/src/main/resources/common-services/KAFKA/0.9.0/configuration/ranger-kafka-security.xml
----------------------------------------------------------------------
diff --git 
a/ambari-server/src/main/resources/common-services/KAFKA/0.9.0/configuration/ranger-kafka-security.xml
 
b/ambari-server/src/main/resources/common-services/KAFKA/0.9.0/configuration/ranger-kafka-security.xml
new file mode 100644
index 0000000..9dd858b
--- /dev/null
+++ 
b/ambari-server/src/main/resources/common-services/KAFKA/0.9.0/configuration/ranger-kafka-security.xml
@@ -0,0 +1,59 @@
+<?xml version="1.0"?>
+<!--
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+-->
+<configuration>
+  
+  <property>
+    <name>ranger.plugin.kafka.service.name</name>
+    <value>{{repo_name}}</value>
+    <description>Name of the Ranger service containing policies for this Kafka 
instance</description>
+  </property>
+
+  <property>
+    <name>ranger.plugin.kafka.policy.source.impl</name>
+    <value>org.apache.ranger.admin.client.RangerAdminRESTClient</value>
+    <description>Class to retrieve policies from the source</description>
+  </property>
+
+  <property>
+    <name>ranger.plugin.kafka.policy.rest.url</name>
+    <value>{{policymgr_mgr_url}}</value>
+    <description>URL to Ranger Admin</description>
+  </property>
+
+  <property>
+    <name>ranger.plugin.kafka.policy.rest.ssl.config.file</name>
+    <value>/etc/kafka/conf/ranger-policymgr-ssl.xml</value>
+    <description>Path to the file containing SSL details to contact Ranger 
Admin</description>
+  </property>
+
+  <property>
+    <name>ranger.plugin.kafka.policy.pollIntervalMs</name>
+    <value>30000</value>
+    <description>How often to poll for changes in policies?</description>
+  </property>
+
+  <property>
+    <name>ranger.plugin.kafka.policy.cache.dir</name>
+    <value>/etc/ranger/{{repo_name}}/policycache</value>
+    <description>Directory where Ranger policies are cached after successful 
retrieval from the source</description>
+  </property>
+
+</configuration>

http://git-wip-us.apache.org/repos/asf/ambari/blob/aaaa9884/ambari-server/src/main/resources/common-services/KAFKA/0.9.0/kerberos.json
----------------------------------------------------------------------
diff --git 
a/ambari-server/src/main/resources/common-services/KAFKA/0.9.0/kerberos.json 
b/ambari-server/src/main/resources/common-services/KAFKA/0.9.0/kerberos.json
new file mode 100644
index 0000000..eaa3d9d
--- /dev/null
+++ b/ambari-server/src/main/resources/common-services/KAFKA/0.9.0/kerberos.json
@@ -0,0 +1,49 @@
+{
+  "services": [
+    {
+      "name": "KAFKA",
+      "identities": [
+        {
+          "name": "/smokeuser"
+        }
+      ],
+      "configurations": [
+        {
+          "kafka-broker": {
+              "authorizer.class.name": 
"kafka.security.auth.SimpleAclAuthorizer",
+              
"principal.to.local.class":"kafka.security.auth.KerberosPrincipalToLocal",
+              "super.users": "user:${kafka-env/kafka_user}",
+              "security.inter.broker.protocol": "PLAINTEXTSASL"
+          }
+        }
+      ],
+      "components": [
+        {
+          "name": "KAFKA_BROKER",
+          "identities": [
+            {
+              "name": "kafka_broker",
+              "principal": {
+                "value": "${kafka-env/kafka_user}/_HOST@${realm}",
+                "type": "service",
+                "configuration": "kafka-env/kafka_principal_name"
+              },
+              "keytab": {
+                "file": "${keytab_dir}/kafka.service.keytab",
+                "owner": {
+                  "name": "${kafka-env/kafka_user}",
+                  "access": "r"
+                },
+                "group": {
+                  "name": "${cluster-env/user_group}",
+                  "access": ""
+                },
+                "configuration": "kafka-env/kafka_keytab"
+              }
+            }
+          ]
+        }
+      ]
+    }
+  ]
+}

http://git-wip-us.apache.org/repos/asf/ambari/blob/aaaa9884/ambari-server/src/main/resources/common-services/KAFKA/0.9.0/metainfo.xml
----------------------------------------------------------------------
diff --git 
a/ambari-server/src/main/resources/common-services/KAFKA/0.9.0/metainfo.xml 
b/ambari-server/src/main/resources/common-services/KAFKA/0.9.0/metainfo.xml
new file mode 100644
index 0000000..74b232b
--- /dev/null
+++ b/ambari-server/src/main/resources/common-services/KAFKA/0.9.0/metainfo.xml
@@ -0,0 +1,30 @@
+<?xml version="1.0"?>
+<!--
+   Licensed to the Apache Software Foundation (ASF) under one or more
+   contributor license agreements.  See the NOTICE file distributed with
+   this work for additional information regarding copyright ownership.
+   The ASF licenses this file to You under the Apache License, Version 2.0
+   (the "License"); you may not use this file except in compliance with
+   the License.  You may obtain a copy of the License at
+
+       http://www.apache.org/licenses/LICENSE-2.0
+
+   Unless required by applicable law or agreed to in writing, software
+   distributed under the License is distributed on an "AS IS" BASIS,
+   WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+   See the License for the specific language governing permissions and
+   limitations under the License.
+-->
+<metainfo>
+  <schemaVersion>2.0</schemaVersion>
+  <services>
+    <service>
+      <name>KAFKA</name>
+      <version>0.9.0</version>
+      <extends>common-services/KAFKA/0.8.1</extends>
+      <configuration-dependencies>
+        <config-type>kafka-broker</config-type>
+      </configuration-dependencies>
+    </service>
+  </services>
+</metainfo>

http://git-wip-us.apache.org/repos/asf/ambari/blob/aaaa9884/ambari-server/src/main/resources/common-services/KAFKA/0.9.0/metrics.json
----------------------------------------------------------------------
diff --git 
a/ambari-server/src/main/resources/common-services/KAFKA/0.9.0/metrics.json 
b/ambari-server/src/main/resources/common-services/KAFKA/0.9.0/metrics.json
new file mode 100644
index 0000000..e99f4eb
--- /dev/null
+++ b/ambari-server/src/main/resources/common-services/KAFKA/0.9.0/metrics.json
@@ -0,0 +1,239 @@
+{
+  "KAFKA_BROKER": {
+    "Component": [
+      {
+        "type": "ganglia",
+        "metrics": {
+          "default": {
+            "metrics/jvm/uptime": {
+              "metric": "jvm.uptime",
+              "pointInTime": true,
+              "temporal": true
+            },
+            "metrics/jvm/heap_usage": {
+              "metric": "jvm.heap_usage",
+              "pointInTime": true,
+              "temporal": true
+            },
+            "metrics/jvm/non_heap_usage": {
+              "metric": "jvm.non_heap_usage",
+              "pointInTime": true,
+              "temporal": true
+            },
+            "metrics/jvm/thread-states/runnable": {
+              "metric": "jvm.thread-states.runnable",
+              "pointInTime": true,
+              "temporal": true
+            },
+            "metrics/jvm/thread-states/blocked": {
+              "metric": "jvm.thread-states.blocked",
+              "pointInTime": true,
+              "temporal": true
+            },
+            "metrics/jvm/thread-states/timed_waiting": {
+              "metric": "jvm.thread-states.timed_waiting",
+              "pointInTime": true,
+              "temporal": true
+            },
+            "metrics/jvm/thread-states/terminated": {
+              "metric": "jvm.thread-states.terminated",
+              "pointInTime": true,
+              "temporal": true
+            },
+            "metrics/jvm/thread_count": {
+              "metric": "jvm.thread_count",
+              "pointInTime": true,
+              "temporal": true
+            },
+            "metrics/jvm/daemon_thread_count": {
+              "metric": "jvm.daemon_thread_count",
+              "pointInTime": true,
+              "temporal": true
+            },
+            
"metrics/kafka/server/BrokerTopicMetrics/AllTopicsMessagesInPerSec/1MinuteRate":
 {
+              "metric": 
"kafka.server.BrokerTopicMetrics.MessagesInPerSec.1MinuteRate",
+              "pointInTime": true,
+              "temporal": true
+            },
+            
"metrics/kafka/server/BrokerTopicMetrics/AllTopicsMessagesInPerSec/5MinuteRate":
 {
+              "metric": 
"kafka.server.BrokerTopicMetrics.MessagesInPerSec.5MinuteRate",
+              "pointInTime": false,
+              "temporal": true
+            },
+            
"metrics/kafka/server/BrokerTopicMetrics/AllTopicsMessagesInPerSec/15MinuteRate":
 {
+              "metric": 
"kafka.server.BrokerTopicMetrics.MessagesInPerSec.15MinuteRate",
+              "pointInTime": false,
+              "temporal": true
+            },
+            
"metrics/kafka/server/BrokerTopicMetrics/AllTopicsMessagesInPerSec/meanRate": {
+              "metric": 
"kafka.server.BrokerTopicMetrics.MessagesInPerSec.meanRate",
+              "pointInTime": true,
+              "temporal": true
+            },
+            
"metrics/kafka/server/BrokerTopicMetrics/AllTopicsMessagesInPerSec/count": {
+              "metric": 
"kafka.server.BrokerTopicMetrics.MessagesInPerSec.count",
+              "pointInTime": true,
+              "temporal": true
+            },
+            
"metrics/kafka/server/BrokerTopicMetrics/AllTopicsBytesInPerSec/1MinuteRate": {
+              "metric": 
"kafka.server.BrokerTopicMetrics.BytesInPerSec.1MinuteRate",
+              "pointInTime": true,
+              "temporal": true
+            },
+            
"metrics/kafka/server/BrokerTopicMetrics/AllTopicsBytesInPerSec/5MinuteRate": {
+              "metric": 
"kafka.server.BrokerTopicMetrics.BytesInPerSec.5MinuteRate",
+              "pointInTime": true,
+              "temporal": true
+            },
+            
"metrics/kafka/server/BrokerTopicMetrics/AllTopicsBytesInPerSec/15MinuteRate": {
+              "metric": 
"kafka.server.BrokerTopicMetrics.BytesInPerSec.15MinuteRate",
+              "pointInTime": true,
+              "temporal": true
+            },
+            
"metrics/kafka/server/BrokerTopicMetrics/AllTopicsBytesInPerSec/meanRate": {
+              "metric": 
"kafka.server.BrokerTopicMetrics.BytesInPerSec.meanRate",
+              "pointInTime": true,
+              "temporal": true
+            },
+            
"metrics/kafka/server/BrokerTopicMetrics/AllTopicsBytesInPerSec/count": {
+              "metric": "kafka.server.BrokerTopicMetrics.BytesInPerSec.count",
+              "pointInTime": true,
+              "temporal": true
+            },
+            
"metrics/kafka/server/BrokerTopicMetrics/AllTopicsBytesOutPerSec/1MinuteRate": {
+              "metric": 
"kafka.server.BrokerTopicMetrics.BytesOutPerSec.1MinuteRate",
+              "pointInTime": true,
+              "temporal": true
+            },
+            
"metrics/kafka/server/BrokerTopicMetrics/AllTopicsBytesOutPerSec/5MinuteRate": {
+              "metric": 
"kafka.server.BrokerTopicMetrics.BytesOutPerSec.5MinuteRate",
+              "pointInTime": true,
+              "temporal": true
+            },
+            
"metrics/kafka/server/BrokerTopicMetrics/AllTopicsBytesOutPerSec/15MinuteRate": 
{
+              "metric": 
"kafka.server.BrokerTopicMetrics.BytesOutPerSec.15MinuteRate",
+              "pointInTime": true,
+              "temporal": true
+            },
+            
"metrics/kafka/server/BrokerTopicMetrics/AllTopicsBytesOutPerSec/meanRate": {
+              "metric": 
"kafka.server.BrokerTopicMetrics.BytesOutPerSec.meanRate",
+              "pointInTime": true,
+              "temporal": true
+            },
+            
"metrics/kafka/server/BrokerTopicMetrics/AllTopicsBytesOutPerSec/count": {
+              "metric": "kafka.server.BrokerTopicMetrics.BytesOutPerSec.count",
+              "pointInTime": true,
+              "temporal": true
+            },
+            "metrics/kafka/controller/KafkaController/ActiveControllerCount": {
+              "metric": 
"kafka.controller.KafkaController.ActiveControllerCount",
+              "pointInTime": true,
+              "temporal": true
+            },
+            
"metrics/kafka/controller/ControllerStats/LeaderElectionRateAndTimeMs/meanRate":
 {
+              "metric": 
"kafka.controller.ControllerStats.LeaderElectionRateAndTimeMs.meanRate",
+              "pointInTime": true,
+              "temporal": true
+            },
+            
"metrics/kafka/controller/ControllerStats/LeaderElectionRateAndTimeMs/1MinuteRate":
 {
+              "metric": 
"kafka.controller.ControllerStats.LeaderElectionRateAndTimeMs.1MinuteRate",
+              "pointInTime": true,
+              "temporal": true
+            },
+            
"metrics/kafka/controller/ControllerStats/LeaderElectionRateAndTimeMs/5MinuteRate":
 {
+              "metric": 
"kafka.controller.ControllerStats.LeaderElectionRateAndTimeMs.5MinuteRate",
+              "pointInTime": true,
+              "temporal": true
+            },
+            
"metrics/kafka/controller/ControllerStats/LeaderElectionRateAndTimeMs/15MinuteRate":
 {
+              "metric": 
"kafka.controller.ControllerStats.LeaderElectionRateAndTimeMs.15MinuteRate",
+              "pointInTime": true,
+              "temporal": true
+            },
+            
"metrics/kafka/controller/ControllerStats/LeaderElectionRateAndTimeMs/count": {
+              "metric": 
"kafka.controller.ControllerStats.LeaderElectionRateAndTimeMs.count",
+              "pointInTime": true,
+              "temporal": true
+            },
+            
"metrics/kafka/controller/ControllerStats/UncleanLeaderElectionsPerSec/1MinuteRate":
 {
+              "metric": 
"kafka.controller.ControllerStats.UncleanLeaderElectionsPerSec.1MinuteRate",
+              "pointInTime": true,
+              "temporal": true
+            },
+            
"metrics/kafka/controller/ControllerStats/UncleanLeaderElectionsPerSec/5MinuteRate":
 {
+              "metric": 
"kafka.controller.ControllerStats.UncleanLeaderElectionsPerSec.5MinuteRate",
+              "pointInTime": true,
+              "temporal": true
+            },
+            
"metrics/kafka/controller/ControllerStats/UncleanLeaderElectionsPerSec/15MinuteRate":
 {
+              "metric": 
"kafka.controller.ControllerStats.UncleanLeaderElectionsPerSec.15MinuteRate",
+              "pointInTime": true,
+              "temporal": true
+            },
+            "metrics/kafka/controller/ControllerStats/OfflinePartitionsCount": 
{
+              "metric": 
"kafka.controller.ControllerStats.OfflinePartitionsCount",
+              "pointInTime": true,
+              "temporal": true
+            },
+            "metrics/kafka/server/ReplicaManager/PartitionCount": {
+              "metric": "kafka.server.ReplicaManager.PartitionCount",
+              "pointInTime": true,
+              "temporal": true
+            },
+            "metrics/kafka/server/ReplicaManager/LeaderCount": {
+              "metric": "kafka.server.ReplicaManager.LeaderCount",
+              "pointInTime": true,
+              "temporal": true
+            },
+            "metrics/kafka/server/ReplicaManager/UnderReplicatedPartitions": {
+              "metric": 
"kafka.server.ReplicaManager.UnderReplicatedPartitions",
+              "pointInTime": true,
+              "temporal": true
+            },
+            "metrics/kafka/server/ReplicaManager/ISRShrinksPerSec": {
+              "metric": "kafka.server.ReplicaManager.ISRShrinksPerSec",
+              "pointInTime": true,
+              "temporal": true
+            },
+            "metrics/kafka/server/ReplicaManager/ISRExpandsPerSec": {
+              "metric": "kafka.server.ReplicaManager.ISRExpandsPerSec",
+              "pointInTime": true,
+              "temporal": true
+            },
+
+            "metrics/kafka/server/ReplicaFetcherManager/Replica-MaxLag": {
+              "metric": 
"kafka.server.ReplicaFetcherManager.MaxLag.clientId.Replica",
+              "pointInTime": true,
+              "temporal": true
+            },
+            "metrics/kafka/server/ProducerRequestPurgatory/PurgatorySize": {
+              "metric": "kafka.server.ProducerRequestPurgatory.PurgatorySize",
+              "pointInTime": true,
+              "temporal": true
+            },
+            "metrics/kafka/server/FetchRequestPurgatory/PurgatorySize": {
+              "metric": "kafka.server.FetchRequestPurgatory.PurgatorySize",
+              "pointInTime": true,
+              "temporal": true
+            },
+            "metrics/kafka/cluster/Partition/$1-UnderReplicated": {
+              "metric": "kafka.cluster.Partition.(\\w+)-UnderReplicated",
+              "pointInTime": true,
+              "temporal": true
+            },
+            "metrics/kafka/consumer/ConsumerFetcherManager/$1-MaxLag": {
+              "metric": "kafka.consumer.ConsumerFetcherManager.(\\w+)-MaxLag",
+              "pointInTime": true,
+              "temporal": true
+            },
+            "metrics/kafka/consumer/ConsumerFetcherManager/$1-MinFetch": {
+              "metric": 
"kafka.consumer.ConsumerFetcherManager.(\\w+)-MinFetch",
+              "pointInTime": true,
+              "temporal": true
+            }
+          }
+        }
+      }
+    ]
+  }
+}

Reply via email to