Repository: incubator-ranger Updated Branches: refs/heads/master 60cae1a74 -> a6b93f0e9
Initial implemenation for KafkaAuditProvider Project: http://git-wip-us.apache.org/repos/asf/incubator-ranger/repo Commit: http://git-wip-us.apache.org/repos/asf/incubator-ranger/commit/72934e06 Tree: http://git-wip-us.apache.org/repos/asf/incubator-ranger/tree/72934e06 Diff: http://git-wip-us.apache.org/repos/asf/incubator-ranger/diff/72934e06 Branch: refs/heads/master Commit: 72934e061e40c1708efdc06551349cb2d9345051 Parents: 60cae1a Author: Don Bosco Durai <[email protected]> Authored: Mon Feb 23 13:29:30 2015 -0800 Committer: Don Bosco Durai <[email protected]> Committed: Mon Feb 23 13:29:30 2015 -0800 ---------------------------------------------------------------------- agents-audit/pom.xml | 5 + .../audit/provider/AuditProviderFactory.java | 18 ++- .../audit/provider/BaseAuditProvider.java | 24 ++- .../provider/kafka/KafkaAuditProvider.java | 148 +++++++++++++++++++ 4 files changed, 193 insertions(+), 2 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/incubator-ranger/blob/72934e06/agents-audit/pom.xml ---------------------------------------------------------------------- diff --git a/agents-audit/pom.xml b/agents-audit/pom.xml index b437bc1..e54ec36 100644 --- a/agents-audit/pom.xml +++ b/agents-audit/pom.xml @@ -62,5 +62,10 @@ <artifactId>log4j</artifactId> <version>${log4j.version}</version> </dependency> + <dependency> + <groupId>org.apache.kafka</groupId> + <artifactId>kafka_2.10</artifactId> + <version>0.8.2.0</version> +</dependency> </dependencies> </project> http://git-wip-us.apache.org/repos/asf/incubator-ranger/blob/72934e06/agents-audit/src/main/java/org/apache/ranger/audit/provider/AuditProviderFactory.java ---------------------------------------------------------------------- diff --git a/agents-audit/src/main/java/org/apache/ranger/audit/provider/AuditProviderFactory.java b/agents-audit/src/main/java/org/apache/ranger/audit/provider/AuditProviderFactory.java index fb5e8b5..9fbe29f 100644 --- a/agents-audit/src/main/java/org/apache/ranger/audit/provider/AuditProviderFactory.java +++ b/agents-audit/src/main/java/org/apache/ranger/audit/provider/AuditProviderFactory.java @@ -25,6 +25,7 @@ import java.util.Properties; import org.apache.commons.logging.Log; import org.apache.commons.logging.LogFactory; import org.apache.ranger.audit.provider.hdfs.HdfsAuditProvider; +import org.apache.ranger.audit.provider.kafka.KafkaAuditProvider; /* @@ -41,6 +42,7 @@ public class AuditProviderFactory { private static final String AUDIT_DB_IS_ENABLED_PROP = "xasecure.audit.db.is.enabled" ; private static final String AUDIT_HDFS_IS_ENABLED_PROP = "xasecure.audit.hdfs.is.enabled"; private static final String AUDIT_LOG4J_IS_ENABLED_PROP = "xasecure.audit.log4j.is.enabled" ; + private static final String AUDIT_KAFKA_IS_ENABLED_PROP = "xasecure.audit.kafka.is.enabled"; private static final int AUDIT_ASYNC_MAX_QUEUE_SIZE_DEFAULT = 10 * 1024; private static final int AUDIT_ASYNC_MAX_FLUSH_INTERVAL_DEFAULT = 5 * 1000; @@ -96,8 +98,9 @@ public class AuditProviderFactory { boolean isAuditToDbEnabled = BaseAuditProvider.getBooleanProperty(props, AUDIT_DB_IS_ENABLED_PROP, false); boolean isAuditToHdfsEnabled = BaseAuditProvider.getBooleanProperty(props, AUDIT_HDFS_IS_ENABLED_PROP, false); boolean isAuditToLog4jEnabled = BaseAuditProvider.getBooleanProperty(props, AUDIT_LOG4J_IS_ENABLED_PROP, false); + boolean isAuditToKafkaEnabled = BaseAuditProvider.getBooleanProperty(props, AUDIT_KAFKA_IS_ENABLED_PROP, false); - if(!isEnabled || !(isAuditToDbEnabled || isAuditToHdfsEnabled || isAuditToLog4jEnabled)) { + if(!isEnabled || !(isAuditToDbEnabled || isAuditToHdfsEnabled || isAuditToKafkaEnabled || isAuditToLog4jEnabled)) { LOG.info("AuditProviderFactory: Audit not enabled.."); mProvider = getDefaultProvider(); @@ -141,6 +144,19 @@ public class AuditProviderFactory { } } + if(isAuditToKafkaEnabled) { + LOG.info("KafkaAuditProvider is enabled"); + KafkaAuditProvider kafkaProvider = new KafkaAuditProvider(); + kafkaProvider.init(props); + + if( kafkaProvider.isAsync()) { + AsyncAuditProvider asyncProvider = new AsyncAuditProvider("MyKafkaAuditProvider", kafkaProvider.getMaxQueueSize(), kafkaProvider.getMaxFlushInterval(), kafkaProvider); + providers.add(asyncProvider); + } else { + providers.add(kafkaProvider); + } + } + if(isAuditToLog4jEnabled) { Log4jAuditProvider log4jProvider = new Log4jAuditProvider(); http://git-wip-us.apache.org/repos/asf/incubator-ranger/blob/72934e06/agents-audit/src/main/java/org/apache/ranger/audit/provider/BaseAuditProvider.java ---------------------------------------------------------------------- diff --git a/agents-audit/src/main/java/org/apache/ranger/audit/provider/BaseAuditProvider.java b/agents-audit/src/main/java/org/apache/ranger/audit/provider/BaseAuditProvider.java index 9a0cc45..14e6220 100644 --- a/agents-audit/src/main/java/org/apache/ranger/audit/provider/BaseAuditProvider.java +++ b/agents-audit/src/main/java/org/apache/ranger/audit/provider/BaseAuditProvider.java @@ -20,7 +20,6 @@ package org.apache.ranger.audit.provider; import org.apache.commons.logging.Log; import org.apache.commons.logging.LogFactory; import org.apache.ranger.audit.model.AuditEventBase; - import java.util.concurrent.atomic.AtomicLong; import java.util.HashMap; import java.util.Map; @@ -30,13 +29,18 @@ public abstract class BaseAuditProvider implements AuditProvider { private static final Log LOG = LogFactory.getLog(BaseAuditProvider.class); private static final String AUDIT_LOG_FAILURE_REPORT_MIN_INTERVAL_PROP = "xasecure.audit.log.failure.report.min.interval.ms"; + public static final int AUDIT_ASYNC_MAX_QUEUE_SIZE_DEFAULT = 10 * 1024; + public static final int AUDIT_ASYNC_MAX_FLUSH_INTERVAL_DEFAULT = 5 * 1000; private int mLogFailureReportMinIntervalInMs = 60 * 1000; private AtomicLong mFailedLogLastReportTime = new AtomicLong(0); private AtomicLong mFailedLogCountSinceLastReport = new AtomicLong(0); private AtomicLong mFailedLogCountLifeTime = new AtomicLong(0); + private int maxQueueSize = AUDIT_ASYNC_MAX_QUEUE_SIZE_DEFAULT; + private int maxFlushInterval = AUDIT_ASYNC_MAX_FLUSH_INTERVAL_DEFAULT; + public BaseAuditProvider() { } @@ -46,6 +50,7 @@ public abstract class BaseAuditProvider implements AuditProvider { LOG.info("BaseAuditProvider.init()"); mLogFailureReportMinIntervalInMs = getIntProperty(props, AUDIT_LOG_FAILURE_REPORT_MIN_INTERVAL_PROP, 60 * 1000); + } public void logFailedEvent(AuditEventBase event) { @@ -75,6 +80,23 @@ public abstract class BaseAuditProvider implements AuditProvider { } } + + public int getMaxQueueSize() { + return maxQueueSize; + } + + public void setMaxQueueSize(int maxQueueSize) { + this.maxQueueSize = maxQueueSize; + } + + public int getMaxFlushInterval() { + return maxFlushInterval; + } + + public void setMaxFlushInterval(int maxFlushInterval) { + this.maxFlushInterval = maxFlushInterval; + } + public static Map<String, String> getPropertiesWithPrefix(Properties props, String prefix) { Map<String, String> prefixedProperties = new HashMap<String, String>(); http://git-wip-us.apache.org/repos/asf/incubator-ranger/blob/72934e06/agents-audit/src/main/java/org/apache/ranger/audit/provider/kafka/KafkaAuditProvider.java ---------------------------------------------------------------------- diff --git a/agents-audit/src/main/java/org/apache/ranger/audit/provider/kafka/KafkaAuditProvider.java b/agents-audit/src/main/java/org/apache/ranger/audit/provider/kafka/KafkaAuditProvider.java new file mode 100644 index 0000000..a6faba4 --- /dev/null +++ b/agents-audit/src/main/java/org/apache/ranger/audit/provider/kafka/KafkaAuditProvider.java @@ -0,0 +1,148 @@ +package org.apache.ranger.audit.provider.kafka; + +import java.util.Properties; + +import kafka.javaapi.producer.Producer; +import kafka.producer.KeyedMessage; +import kafka.producer.ProducerConfig; + +import org.apache.commons.logging.Log; +import org.apache.commons.logging.LogFactory; +import org.apache.ranger.audit.model.AuditEventBase; +import org.apache.ranger.audit.model.AuthzAuditEvent; +import org.apache.ranger.audit.provider.BaseAuditProvider; +import org.apache.ranger.audit.provider.MiscUtil; + +public class KafkaAuditProvider extends BaseAuditProvider { + private static final Log LOG = LogFactory.getLog(KafkaAuditProvider.class); + + public static final String AUDIT_MAX_QUEUE_SIZE_PROP = "xasecure.audit.kafka.async.max.queue.size"; + public static final String AUDIT_MAX_FLUSH_INTERVAL_PROP = "xasecure.audit.kafka.async.max.flush.interval.ms"; + public static final String AUDIT_KAFKA_BROKER_LIST = "xasecure.audit.kafka.broker_list"; + boolean initDone = false; + + Producer<String, String> producer = null; + String topic = null; + + @Override + public void init(Properties props) { + LOG.info("init() called"); + super.init(props); + + setMaxQueueSize(BaseAuditProvider.getIntProperty(props, + AUDIT_MAX_QUEUE_SIZE_PROP, AUDIT_ASYNC_MAX_QUEUE_SIZE_DEFAULT)); + setMaxFlushInterval(BaseAuditProvider.getIntProperty(props, + AUDIT_MAX_QUEUE_SIZE_PROP, + AUDIT_ASYNC_MAX_FLUSH_INTERVAL_DEFAULT)); + topic = "ranger_audits"; + + try { + if (!initDone) { + String brokerList = BaseAuditProvider.getStringProperty(props, + AUDIT_KAFKA_BROKER_LIST); + if (brokerList == null || brokerList.isEmpty()) { + brokerList = "localhost:9092"; + } + + Properties kakfaProps = new Properties(); + + kakfaProps.put("metadata.broker.list", brokerList); + kakfaProps.put("serializer.class", + "kafka.serializer.StringEncoder"); + // kakfaProps.put("partitioner.class", + // "example.producer.SimplePartitioner"); + kakfaProps.put("request.required.acks", "1"); + ProducerConfig kafkaConfig = new ProducerConfig(kakfaProps); + producer = new Producer<String, String>(kafkaConfig); + initDone = true; + } + } catch (Throwable t) { + LOG.fatal("Error initializing kafka:", t); + } + } + + @Override + public void log(AuditEventBase event) { + if (event instanceof AuthzAuditEvent) { + AuthzAuditEvent authzEvent = (AuthzAuditEvent) event; + + if (authzEvent.getAgentHostname() == null) { + authzEvent.setAgentHostname(MiscUtil.getHostname()); + } + + if (authzEvent.getLogType() == null) { + authzEvent.setLogType("RangerAudit"); + } + + if (authzEvent.getEventId() == null) { + authzEvent.setEventId(MiscUtil.generateUniqueId()); + } + } + + String message = MiscUtil.stringify(event); + try { + + if (producer != null) { + // TODO: Add partition key + KeyedMessage<String, String> keyedMessage = new KeyedMessage<String, String>( + topic, message); + producer.send(keyedMessage); + } else { + LOG.info("AUDIT LOG (Kafka Down):" + message); + } + } catch (Throwable t) { + LOG.error("Error sending message to Kafka topic. topic=" + topic + + ", message=" + message); + } + } + + @Override + public void start() { + LOG.info("start() called"); + // TODO Auto-generated method stub + + } + + @Override + public void stop() { + LOG.info("stop() called"); + if (producer != null) { + try { + producer.close(); + } catch (Throwable t) { + LOG.error("Error closing Kafka producer"); + } + } + } + + @Override + public void waitToComplete() { + LOG.info("waitToComplete() called"); + // TODO Auto-generated method stub + + } + + @Override + public boolean isFlushPending() { + LOG.info("isFlushPending() called"); + return false; + } + + @Override + public long getLastFlushTime() { + LOG.info("getLastFlushTime() called"); + + return 0; + } + + @Override + public void flush() { + LOG.info("flush() called"); + + } + + public boolean isAsync() { + return true; + } + +}
