Incorporated Madhan's feedback

Project: http://git-wip-us.apache.org/repos/asf/incubator-ranger/repo
Commit: http://git-wip-us.apache.org/repos/asf/incubator-ranger/commit/a6b93f0e
Tree: http://git-wip-us.apache.org/repos/asf/incubator-ranger/tree/a6b93f0e
Diff: http://git-wip-us.apache.org/repos/asf/incubator-ranger/diff/a6b93f0e

Branch: refs/heads/master
Commit: a6b93f0e924ad8e0da090568e442f99618b77e0c
Parents: 72934e0
Author: Don Bosco Durai <[email protected]>
Authored: Mon Feb 23 14:24:11 2015 -0800
Committer: Don Bosco Durai <[email protected]>
Committed: Mon Feb 23 14:24:11 2015 -0800

----------------------------------------------------------------------
 .../provider/kafka/KafkaAuditProvider.java      | 27 +++++++++++++++++++-
 1 file changed, 26 insertions(+), 1 deletion(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/incubator-ranger/blob/a6b93f0e/agents-audit/src/main/java/org/apache/ranger/audit/provider/kafka/KafkaAuditProvider.java
----------------------------------------------------------------------
diff --git 
a/agents-audit/src/main/java/org/apache/ranger/audit/provider/kafka/KafkaAuditProvider.java
 
b/agents-audit/src/main/java/org/apache/ranger/audit/provider/kafka/KafkaAuditProvider.java
index a6faba4..54e73ea 100644
--- 
a/agents-audit/src/main/java/org/apache/ranger/audit/provider/kafka/KafkaAuditProvider.java
+++ 
b/agents-audit/src/main/java/org/apache/ranger/audit/provider/kafka/KafkaAuditProvider.java
@@ -1,3 +1,19 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
 package org.apache.ranger.audit.provider.kafka;
 
 import java.util.Properties;
@@ -19,6 +35,7 @@ public class KafkaAuditProvider extends BaseAuditProvider {
        public static final String AUDIT_MAX_QUEUE_SIZE_PROP = 
"xasecure.audit.kafka.async.max.queue.size";
        public static final String AUDIT_MAX_FLUSH_INTERVAL_PROP = 
"xasecure.audit.kafka.async.max.flush.interval.ms";
        public static final String AUDIT_KAFKA_BROKER_LIST = 
"xasecure.audit.kafka.broker_list";
+       public static final String AUDIT_KAFKA_TOPIC_NAME = 
"xasecure.audit.kafka.topic_name";
        boolean initDone = false;
 
        Producer<String, String> producer = null;
@@ -34,7 +51,11 @@ public class KafkaAuditProvider extends BaseAuditProvider {
                setMaxFlushInterval(BaseAuditProvider.getIntProperty(props,
                                AUDIT_MAX_QUEUE_SIZE_PROP,
                                AUDIT_ASYNC_MAX_FLUSH_INTERVAL_DEFAULT));
-               topic = "ranger_audits";
+               topic = BaseAuditProvider.getStringProperty(props,
+                               AUDIT_KAFKA_TOPIC_NAME);
+               if (topic == null || topic.isEmpty()) {
+                       topic = "ranger_audits";
+               }
 
                try {
                        if (!initDone) {
@@ -52,6 +73,10 @@ public class KafkaAuditProvider extends BaseAuditProvider {
                                // kakfaProps.put("partitioner.class",
                                // "example.producer.SimplePartitioner");
                                kakfaProps.put("request.required.acks", "1");
+
+                               LOG.info("Connecting to Kafka producer using 
properties:"
+                                               + kakfaProps.toString());
+
                                ProducerConfig kafkaConfig = new 
ProducerConfig(kakfaProps);
                                producer = new Producer<String, 
String>(kafkaConfig);
                                initDone = true;

Reply via email to