Repository: incubator-ranger
Updated Branches:
  refs/heads/master 60cae1a74 -> a6b93f0e9


Initial implemenation for KafkaAuditProvider

Project: http://git-wip-us.apache.org/repos/asf/incubator-ranger/repo
Commit: http://git-wip-us.apache.org/repos/asf/incubator-ranger/commit/72934e06
Tree: http://git-wip-us.apache.org/repos/asf/incubator-ranger/tree/72934e06
Diff: http://git-wip-us.apache.org/repos/asf/incubator-ranger/diff/72934e06

Branch: refs/heads/master
Commit: 72934e061e40c1708efdc06551349cb2d9345051
Parents: 60cae1a
Author: Don Bosco Durai <[email protected]>
Authored: Mon Feb 23 13:29:30 2015 -0800
Committer: Don Bosco Durai <[email protected]>
Committed: Mon Feb 23 13:29:30 2015 -0800

----------------------------------------------------------------------
 agents-audit/pom.xml                            |   5 +
 .../audit/provider/AuditProviderFactory.java    |  18 ++-
 .../audit/provider/BaseAuditProvider.java       |  24 ++-
 .../provider/kafka/KafkaAuditProvider.java      | 148 +++++++++++++++++++
 4 files changed, 193 insertions(+), 2 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/incubator-ranger/blob/72934e06/agents-audit/pom.xml
----------------------------------------------------------------------
diff --git a/agents-audit/pom.xml b/agents-audit/pom.xml
index b437bc1..e54ec36 100644
--- a/agents-audit/pom.xml
+++ b/agents-audit/pom.xml
@@ -62,5 +62,10 @@
        <artifactId>log4j</artifactId>
        <version>${log4j.version}</version>
     </dependency>
+    <dependency>
+       <groupId>org.apache.kafka</groupId>
+       <artifactId>kafka_2.10</artifactId>
+       <version>0.8.2.0</version>
+</dependency>
   </dependencies>
 </project>

http://git-wip-us.apache.org/repos/asf/incubator-ranger/blob/72934e06/agents-audit/src/main/java/org/apache/ranger/audit/provider/AuditProviderFactory.java
----------------------------------------------------------------------
diff --git 
a/agents-audit/src/main/java/org/apache/ranger/audit/provider/AuditProviderFactory.java
 
b/agents-audit/src/main/java/org/apache/ranger/audit/provider/AuditProviderFactory.java
index fb5e8b5..9fbe29f 100644
--- 
a/agents-audit/src/main/java/org/apache/ranger/audit/provider/AuditProviderFactory.java
+++ 
b/agents-audit/src/main/java/org/apache/ranger/audit/provider/AuditProviderFactory.java
@@ -25,6 +25,7 @@ import java.util.Properties;
 import org.apache.commons.logging.Log;
 import org.apache.commons.logging.LogFactory;
 import org.apache.ranger.audit.provider.hdfs.HdfsAuditProvider;
+import org.apache.ranger.audit.provider.kafka.KafkaAuditProvider;
 
 
 /*
@@ -41,6 +42,7 @@ public class AuditProviderFactory {
        private static final String AUDIT_DB_IS_ENABLED_PROP    = 
"xasecure.audit.db.is.enabled" ;
        private static final String AUDIT_HDFS_IS_ENABLED_PROP  = 
"xasecure.audit.hdfs.is.enabled";
        private static final String AUDIT_LOG4J_IS_ENABLED_PROP = 
"xasecure.audit.log4j.is.enabled" ;
+       private static final String AUDIT_KAFKA_IS_ENABLED_PROP = 
"xasecure.audit.kafka.is.enabled";
        
        private static final int AUDIT_ASYNC_MAX_QUEUE_SIZE_DEFAULT     = 10 * 
1024;
        private static final int AUDIT_ASYNC_MAX_FLUSH_INTERVAL_DEFAULT =  5 * 
1000;
@@ -96,8 +98,9 @@ public class AuditProviderFactory {
                boolean isAuditToDbEnabled    = 
BaseAuditProvider.getBooleanProperty(props, AUDIT_DB_IS_ENABLED_PROP, false);
                boolean isAuditToHdfsEnabled  = 
BaseAuditProvider.getBooleanProperty(props, AUDIT_HDFS_IS_ENABLED_PROP, false);
                boolean isAuditToLog4jEnabled = 
BaseAuditProvider.getBooleanProperty(props, AUDIT_LOG4J_IS_ENABLED_PROP, false);
+               boolean isAuditToKafkaEnabled  = 
BaseAuditProvider.getBooleanProperty(props, AUDIT_KAFKA_IS_ENABLED_PROP, false);
 
-               if(!isEnabled || !(isAuditToDbEnabled || isAuditToHdfsEnabled 
|| isAuditToLog4jEnabled)) {
+               if(!isEnabled || !(isAuditToDbEnabled || isAuditToHdfsEnabled 
|| isAuditToKafkaEnabled || isAuditToLog4jEnabled)) {
                        LOG.info("AuditProviderFactory: Audit not enabled..");
 
                        mProvider = getDefaultProvider();
@@ -141,6 +144,19 @@ public class AuditProviderFactory {
                        }
                }
 
+               if(isAuditToKafkaEnabled) {
+                       LOG.info("KafkaAuditProvider is enabled");
+                       KafkaAuditProvider kafkaProvider = new 
KafkaAuditProvider();
+                       kafkaProvider.init(props);
+                       
+                       if( kafkaProvider.isAsync()) {
+                               AsyncAuditProvider asyncProvider = new 
AsyncAuditProvider("MyKafkaAuditProvider", kafkaProvider.getMaxQueueSize(), 
kafkaProvider.getMaxFlushInterval(), kafkaProvider);
+                               providers.add(asyncProvider);
+                       } else {
+                               providers.add(kafkaProvider);
+                       }
+               }
+               
                if(isAuditToLog4jEnabled) {
                        Log4jAuditProvider log4jProvider = new 
Log4jAuditProvider();
 

http://git-wip-us.apache.org/repos/asf/incubator-ranger/blob/72934e06/agents-audit/src/main/java/org/apache/ranger/audit/provider/BaseAuditProvider.java
----------------------------------------------------------------------
diff --git 
a/agents-audit/src/main/java/org/apache/ranger/audit/provider/BaseAuditProvider.java
 
b/agents-audit/src/main/java/org/apache/ranger/audit/provider/BaseAuditProvider.java
index 9a0cc45..14e6220 100644
--- 
a/agents-audit/src/main/java/org/apache/ranger/audit/provider/BaseAuditProvider.java
+++ 
b/agents-audit/src/main/java/org/apache/ranger/audit/provider/BaseAuditProvider.java
@@ -20,7 +20,6 @@ package org.apache.ranger.audit.provider;
 import org.apache.commons.logging.Log;
 import org.apache.commons.logging.LogFactory;
 import org.apache.ranger.audit.model.AuditEventBase;
-
 import java.util.concurrent.atomic.AtomicLong;
 import java.util.HashMap;
 import java.util.Map;
@@ -30,13 +29,18 @@ public abstract class BaseAuditProvider implements 
AuditProvider {
        private static final Log LOG = 
LogFactory.getLog(BaseAuditProvider.class);
 
        private static final String AUDIT_LOG_FAILURE_REPORT_MIN_INTERVAL_PROP 
= "xasecure.audit.log.failure.report.min.interval.ms";
+       public static final int AUDIT_ASYNC_MAX_QUEUE_SIZE_DEFAULT     = 10 * 
1024;
+       public static final int AUDIT_ASYNC_MAX_FLUSH_INTERVAL_DEFAULT =  5 * 
1000;
 
        private int   mLogFailureReportMinIntervalInMs = 60 * 1000;
 
        private AtomicLong mFailedLogLastReportTime       = new AtomicLong(0);
        private AtomicLong mFailedLogCountSinceLastReport = new AtomicLong(0);
        private AtomicLong mFailedLogCountLifeTime        = new AtomicLong(0);
+       private int maxQueueSize     =  AUDIT_ASYNC_MAX_QUEUE_SIZE_DEFAULT;
+       private int maxFlushInterval = AUDIT_ASYNC_MAX_FLUSH_INTERVAL_DEFAULT;
 
+       
 
        public BaseAuditProvider() {
        }
@@ -46,6 +50,7 @@ public abstract class BaseAuditProvider implements 
AuditProvider {
                LOG.info("BaseAuditProvider.init()");
 
                mLogFailureReportMinIntervalInMs = getIntProperty(props, 
AUDIT_LOG_FAILURE_REPORT_MIN_INTERVAL_PROP, 60 * 1000);
+
        }
 
        public void logFailedEvent(AuditEventBase event) {
@@ -75,6 +80,23 @@ public abstract class BaseAuditProvider implements 
AuditProvider {
                }
        }
 
+       
+       public int getMaxQueueSize() {
+               return maxQueueSize;
+       }
+
+       public void setMaxQueueSize(int maxQueueSize) {
+               this.maxQueueSize = maxQueueSize;
+       }
+
+       public int getMaxFlushInterval() {
+               return maxFlushInterval;
+       }
+
+       public void setMaxFlushInterval(int maxFlushInterval) {
+               this.maxFlushInterval = maxFlushInterval;
+       }
+
        public static Map<String, String> getPropertiesWithPrefix(Properties 
props, String prefix) {
                Map<String, String> prefixedProperties = new HashMap<String, 
String>();
 

http://git-wip-us.apache.org/repos/asf/incubator-ranger/blob/72934e06/agents-audit/src/main/java/org/apache/ranger/audit/provider/kafka/KafkaAuditProvider.java
----------------------------------------------------------------------
diff --git 
a/agents-audit/src/main/java/org/apache/ranger/audit/provider/kafka/KafkaAuditProvider.java
 
b/agents-audit/src/main/java/org/apache/ranger/audit/provider/kafka/KafkaAuditProvider.java
new file mode 100644
index 0000000..a6faba4
--- /dev/null
+++ 
b/agents-audit/src/main/java/org/apache/ranger/audit/provider/kafka/KafkaAuditProvider.java
@@ -0,0 +1,148 @@
+package org.apache.ranger.audit.provider.kafka;
+
+import java.util.Properties;
+
+import kafka.javaapi.producer.Producer;
+import kafka.producer.KeyedMessage;
+import kafka.producer.ProducerConfig;
+
+import org.apache.commons.logging.Log;
+import org.apache.commons.logging.LogFactory;
+import org.apache.ranger.audit.model.AuditEventBase;
+import org.apache.ranger.audit.model.AuthzAuditEvent;
+import org.apache.ranger.audit.provider.BaseAuditProvider;
+import org.apache.ranger.audit.provider.MiscUtil;
+
+public class KafkaAuditProvider extends BaseAuditProvider {
+       private static final Log LOG = 
LogFactory.getLog(KafkaAuditProvider.class);
+
+       public static final String AUDIT_MAX_QUEUE_SIZE_PROP = 
"xasecure.audit.kafka.async.max.queue.size";
+       public static final String AUDIT_MAX_FLUSH_INTERVAL_PROP = 
"xasecure.audit.kafka.async.max.flush.interval.ms";
+       public static final String AUDIT_KAFKA_BROKER_LIST = 
"xasecure.audit.kafka.broker_list";
+       boolean initDone = false;
+
+       Producer<String, String> producer = null;
+       String topic = null;
+
+       @Override
+       public void init(Properties props) {
+               LOG.info("init() called");
+               super.init(props);
+
+               setMaxQueueSize(BaseAuditProvider.getIntProperty(props,
+                               AUDIT_MAX_QUEUE_SIZE_PROP, 
AUDIT_ASYNC_MAX_QUEUE_SIZE_DEFAULT));
+               setMaxFlushInterval(BaseAuditProvider.getIntProperty(props,
+                               AUDIT_MAX_QUEUE_SIZE_PROP,
+                               AUDIT_ASYNC_MAX_FLUSH_INTERVAL_DEFAULT));
+               topic = "ranger_audits";
+
+               try {
+                       if (!initDone) {
+                               String brokerList = 
BaseAuditProvider.getStringProperty(props,
+                                               AUDIT_KAFKA_BROKER_LIST);
+                               if (brokerList == null || brokerList.isEmpty()) 
{
+                                       brokerList = "localhost:9092";
+                               }
+
+                               Properties kakfaProps = new Properties();
+
+                               kakfaProps.put("metadata.broker.list", 
brokerList);
+                               kakfaProps.put("serializer.class",
+                                               
"kafka.serializer.StringEncoder");
+                               // kakfaProps.put("partitioner.class",
+                               // "example.producer.SimplePartitioner");
+                               kakfaProps.put("request.required.acks", "1");
+                               ProducerConfig kafkaConfig = new 
ProducerConfig(kakfaProps);
+                               producer = new Producer<String, 
String>(kafkaConfig);
+                               initDone = true;
+                       }
+               } catch (Throwable t) {
+                       LOG.fatal("Error initializing kafka:", t);
+               }
+       }
+
+       @Override
+       public void log(AuditEventBase event) {
+               if (event instanceof AuthzAuditEvent) {
+                       AuthzAuditEvent authzEvent = (AuthzAuditEvent) event;
+
+                       if (authzEvent.getAgentHostname() == null) {
+                               
authzEvent.setAgentHostname(MiscUtil.getHostname());
+                       }
+
+                       if (authzEvent.getLogType() == null) {
+                               authzEvent.setLogType("RangerAudit");
+                       }
+
+                       if (authzEvent.getEventId() == null) {
+                               
authzEvent.setEventId(MiscUtil.generateUniqueId());
+                       }
+               }
+
+               String message = MiscUtil.stringify(event);
+               try {
+
+                       if (producer != null) {
+                               // TODO: Add partition key
+                               KeyedMessage<String, String> keyedMessage = new 
KeyedMessage<String, String>(
+                                               topic, message);
+                               producer.send(keyedMessage);
+                       } else {
+                               LOG.info("AUDIT LOG (Kafka Down):" + message);
+                       }
+               } catch (Throwable t) {
+                       LOG.error("Error sending message to Kafka topic. 
topic=" + topic
+                                       + ", message=" + message);
+               }
+       }
+
+       @Override
+       public void start() {
+               LOG.info("start() called");
+               // TODO Auto-generated method stub
+
+       }
+
+       @Override
+       public void stop() {
+               LOG.info("stop() called");
+               if (producer != null) {
+                       try {
+                               producer.close();
+                       } catch (Throwable t) {
+                               LOG.error("Error closing Kafka producer");
+                       }
+               }
+       }
+
+       @Override
+       public void waitToComplete() {
+               LOG.info("waitToComplete() called");
+               // TODO Auto-generated method stub
+
+       }
+
+       @Override
+       public boolean isFlushPending() {
+               LOG.info("isFlushPending() called");
+               return false;
+       }
+
+       @Override
+       public long getLastFlushTime() {
+               LOG.info("getLastFlushTime() called");
+
+               return 0;
+       }
+
+       @Override
+       public void flush() {
+               LOG.info("flush() called");
+
+       }
+
+       public boolean isAsync() {
+               return true;
+       }
+
+}

Reply via email to