RANGER-397 - Implement reliable streaming audits to configurable
destinations

First cut with HDFS and File destination working

Project: http://git-wip-us.apache.org/repos/asf/incubator-ranger/repo
Commit: http://git-wip-us.apache.org/repos/asf/incubator-ranger/commit/eb1e9b5c
Tree: http://git-wip-us.apache.org/repos/asf/incubator-ranger/tree/eb1e9b5c
Diff: http://git-wip-us.apache.org/repos/asf/incubator-ranger/diff/eb1e9b5c

Branch: refs/heads/master
Commit: eb1e9b5c447dc3960d132bd2dc5adfbb33d5cb2c
Parents: 917833c
Author: Don Bosco Durai <bo...@apache.org>
Authored: Tue Apr 14 18:00:46 2015 -0700
Committer: Don Bosco Durai <bo...@apache.org>
Committed: Tue Apr 14 18:02:57 2015 -0700

----------------------------------------------------------------------
 .gitignore                                      |   1 +
 .../ranger/audit/model/AuditEventBase.java      |  14 +-
 .../ranger/audit/model/AuthzAuditEvent.java     |  41 +-
 .../audit/provider/AsyncAuditProvider.java      |   5 +-
 .../ranger/audit/provider/AuditAsyncQueue.java  | 167 ++++
 .../audit/provider/AuditBatchProcessor.java     | 327 +++++++
 .../ranger/audit/provider/AuditDestination.java |  70 ++
 .../ranger/audit/provider/AuditFileSpool.java   | 875 +++++++++++++++++++
 .../audit/provider/AuditMessageException.java   |  67 ++
 .../ranger/audit/provider/AuditProvider.java    |  22 +-
 .../audit/provider/AuditProviderFactory.java    | 386 +++++---
 .../audit/provider/BaseAuditProvider.java       | 400 +++++++--
 .../audit/provider/BufferedAuditProvider.java   |  32 +-
 .../ranger/audit/provider/DbAuditProvider.java  |  47 +-
 .../audit/provider/DummyAuditProvider.java      |  76 +-
 .../audit/provider/FileAuditDestination.java    | 230 +++++
 .../audit/provider/HDFSAuditDestination.java    | 243 +++++
 .../audit/provider/LocalFileLogBuffer.java      |  17 +-
 .../audit/provider/Log4jAuditProvider.java      |  57 +-
 .../ranger/audit/provider/LogDestination.java   |  16 +-
 .../apache/ranger/audit/provider/MiscUtil.java  | 291 ++++--
 .../audit/provider/MultiDestAuditProvider.java  | 153 +++-
 .../audit/provider/hdfs/HdfsAuditProvider.java  |   3 +-
 .../audit/provider/hdfs/HdfsLogDestination.java |  48 +-
 .../provider/kafka/KafkaAuditProvider.java      |  46 +-
 .../audit/provider/solr/SolrAuditProvider.java  |  52 +-
 security-admin/.gitignore                       |   3 +
 security-admin/pom.xml                          |   6 +-
 .../apache/ranger/audit/TestAuditProcessor.java | 786 +++++++++++++++++
 29 files changed, 4081 insertions(+), 400 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/incubator-ranger/blob/eb1e9b5c/.gitignore
----------------------------------------------------------------------
diff --git a/.gitignore b/.gitignore
index 2c746ed..dd4e2c2 100644
--- a/.gitignore
+++ b/.gitignore
@@ -1,5 +1,6 @@
 *.class
 *.iml
+.pydevproject
 .settings/
 .metadata
 .classpath

http://git-wip-us.apache.org/repos/asf/incubator-ranger/blob/eb1e9b5c/agents-audit/src/main/java/org/apache/ranger/audit/model/AuditEventBase.java
----------------------------------------------------------------------
diff --git 
a/agents-audit/src/main/java/org/apache/ranger/audit/model/AuditEventBase.java 
b/agents-audit/src/main/java/org/apache/ranger/audit/model/AuditEventBase.java
index 82fcab8..a44e047 100644
--- 
a/agents-audit/src/main/java/org/apache/ranger/audit/model/AuditEventBase.java
+++ 
b/agents-audit/src/main/java/org/apache/ranger/audit/model/AuditEventBase.java
@@ -17,24 +17,26 @@
  * under the License.
  */
 
- package org.apache.ranger.audit.model;
+package org.apache.ranger.audit.model;
 
 import org.apache.ranger.audit.dao.DaoManager;
 
-
 public abstract class AuditEventBase {
+
        protected AuditEventBase() {
        }
 
        public abstract void persist(DaoManager daoManager);
-       
+
        protected String trim(String str, int len) {
-               String ret = str ;
+               String ret = str;
                if (str != null) {
                        if (str.length() > len) {
-                               ret = str.substring(0,len) ;
+                               ret = str.substring(0, len);
                        }
                }
-               return ret ;
+               return ret;
        }
+
+
 }

http://git-wip-us.apache.org/repos/asf/incubator-ranger/blob/eb1e9b5c/agents-audit/src/main/java/org/apache/ranger/audit/model/AuthzAuditEvent.java
----------------------------------------------------------------------
diff --git 
a/agents-audit/src/main/java/org/apache/ranger/audit/model/AuthzAuditEvent.java 
b/agents-audit/src/main/java/org/apache/ranger/audit/model/AuthzAuditEvent.java
index d0c1526..af89f60 100644
--- 
a/agents-audit/src/main/java/org/apache/ranger/audit/model/AuthzAuditEvent.java
+++ 
b/agents-audit/src/main/java/org/apache/ranger/audit/model/AuthzAuditEvent.java
@@ -24,6 +24,7 @@ import java.util.Date;
 import org.apache.ranger.audit.dao.DaoManager;
 import org.apache.ranger.audit.entity.AuthzAuditEventDbObj;
 
+import com.google.gson.Gson;
 import com.google.gson.annotations.SerializedName;
 
 
@@ -33,7 +34,6 @@ public class AuthzAuditEvent extends AuditEventBase {
        protected static final int MAX_ACTION_FIELD_SIZE       = 1800 ;
        protected static final int MAX_REQUEST_DATA_FIELD_SIZE = 1800 ;
 
-
        @SerializedName("repoType")
        protected int    repositoryType = 0;
 
@@ -94,6 +94,17 @@ public class AuthzAuditEvent extends AuditEventBase {
        @SerializedName("id")
        protected String eventId        = null;
 
+       /**
+        * This to ensure order within a session. Order not guaranteed across 
processes and hosts 
+        */
+       @SerializedName("seq_num")
+       protected long seqNum = 0;
+
+       @SerializedName("freq_count")
+       protected long frequencyCount = 1;
+
+       @SerializedName("freq_dur_ms")
+       protected long frequencyDurationMS = 0;
 
        public AuthzAuditEvent() {
                super();
@@ -400,6 +411,31 @@ public class AuthzAuditEvent extends AuditEventBase {
        }
 
 
+       
+       public long getSeqNum() {
+               return seqNum;
+       }
+
+       public void setSeqNum(long seqNum) {
+               this.seqNum = seqNum;
+       }
+
+       public long getFrequencyCount() {
+               return frequencyCount;
+       }
+
+       public void setFrequencyCount(long frequencyCount) {
+               this.frequencyCount = frequencyCount;
+       }
+
+       public long getFrequencyDurationMS() {
+               return frequencyDurationMS;
+       }
+
+       public void setFrequencyDurationMS(long frequencyDurationMS) {
+               this.frequencyDurationMS = frequencyDurationMS;
+       }
+
        @Override
        public String toString() {
                StringBuilder sb = new StringBuilder();
@@ -432,6 +468,9 @@ public class AuthzAuditEvent extends AuditEventBase {
                  
.append("agentHostname=").append(agentHostname).append(FIELD_SEPARATOR)
                  .append("logType=").append(logType).append(FIELD_SEPARATOR)
                  .append("eventId=").append(eventId).append(FIELD_SEPARATOR)
+                 .append("seq_num=").append(seqNum).append(FIELD_SEPARATOR)
+                 
.append("freq_count=").append(frequencyCount).append(FIELD_SEPARATOR)
+                 
.append("freq_dur_ms=").append(frequencyDurationMS).append(FIELD_SEPARATOR)
                ;
                return sb;
        }

http://git-wip-us.apache.org/repos/asf/incubator-ranger/blob/eb1e9b5c/agents-audit/src/main/java/org/apache/ranger/audit/provider/AsyncAuditProvider.java
----------------------------------------------------------------------
diff --git 
a/agents-audit/src/main/java/org/apache/ranger/audit/provider/AsyncAuditProvider.java
 
b/agents-audit/src/main/java/org/apache/ranger/audit/provider/AsyncAuditProvider.java
index 5da5064..53adc86 100644
--- 
a/agents-audit/src/main/java/org/apache/ranger/audit/provider/AsyncAuditProvider.java
+++ 
b/agents-audit/src/main/java/org/apache/ranger/audit/provider/AsyncAuditProvider.java
@@ -90,10 +90,11 @@ public class AsyncAuditProvider extends 
MultiDestAuditProvider implements
        }
 
        @Override
-       public void log(AuditEventBase event) {
+       public boolean log(AuditEventBase event) {
                LOG.debug("AsyncAuditProvider.logEvent(AuditEventBase)");
 
                queueEvent(event);
+               return true;
        }
 
        @Override
@@ -230,7 +231,7 @@ public class AsyncAuditProvider extends 
MultiDestAuditProvider implements
                return mQueue.isEmpty();
        }
 
-       private void waitToComplete(long maxWaitSeconds) {
+       public void waitToComplete(long maxWaitSeconds) {
                LOG.debug("==> AsyncAuditProvider.waitToComplete()");
 
                for (long waitTime = 0; !isEmpty()

http://git-wip-us.apache.org/repos/asf/incubator-ranger/blob/eb1e9b5c/agents-audit/src/main/java/org/apache/ranger/audit/provider/AuditAsyncQueue.java
----------------------------------------------------------------------
diff --git 
a/agents-audit/src/main/java/org/apache/ranger/audit/provider/AuditAsyncQueue.java
 
b/agents-audit/src/main/java/org/apache/ranger/audit/provider/AuditAsyncQueue.java
new file mode 100644
index 0000000..5553bcc
--- /dev/null
+++ 
b/agents-audit/src/main/java/org/apache/ranger/audit/provider/AuditAsyncQueue.java
@@ -0,0 +1,167 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ * 
+ * http://www.apache.org/licenses/LICENSE-2.0
+ * 
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.ranger.audit.provider;
+
+import java.util.ArrayList;
+import java.util.Collection;
+import java.util.concurrent.LinkedTransferQueue;
+
+import org.apache.commons.logging.Log;
+import org.apache.commons.logging.LogFactory;
+import org.apache.ranger.audit.model.AuditEventBase;
+
+/**
+ * This is a non-blocking queue with no limit on capacity.
+ */
+public class AuditAsyncQueue extends BaseAuditProvider implements Runnable {
+       private static final Log logger = 
LogFactory.getLog(AuditAsyncQueue.class);
+
+       LinkedTransferQueue<AuditEventBase> queue = new 
LinkedTransferQueue<AuditEventBase>();
+       Thread consumerThread = null;
+
+       static int threadCount = 0;
+       static final String DEFAULT_NAME = "async";
+
+       public AuditAsyncQueue() {
+               setName(DEFAULT_NAME);
+       }
+
+       public AuditAsyncQueue(AuditProvider consumer) {
+               super(consumer);
+               setName(DEFAULT_NAME);
+       }
+
+       /*
+        * (non-Javadoc)
+        * 
+        * @see
+        * org.apache.ranger.audit.provider.AuditProvider#log(org.apache.ranger.
+        * audit.model.AuditEventBase)
+        */
+       @Override
+       public boolean log(AuditEventBase event) {
+               // Add to the queue and return ASAP
+               if (queue.size() >= getMaxQueueSize()) {
+                       return false;
+               }
+               queue.add(event);
+               addLifeTimeInLogCount(1);
+               return true;
+       }
+
+       @Override
+       public boolean log(Collection<AuditEventBase> events) {
+               for (AuditEventBase event : events) {
+                       log(event);
+               }
+               return true;
+       }
+
+       /*
+        * (non-Javadoc)
+        * 
+        * @see org.apache.ranger.audit.provider.AuditProvider#start()
+        */
+       @Override
+       public void start() {
+               if(consumer != null) {
+                       consumer.start();
+               }
+               
+               consumerThread = new Thread(this, this.getClass().getName()
+                               + (threadCount++));
+               consumerThread.setDaemon(true);
+               consumerThread.start();
+       }
+
+       /*
+        * (non-Javadoc)
+        * 
+        * @see org.apache.ranger.audit.provider.AuditProvider#stop()
+        */
+       @Override
+       public void stop() {
+               setDrain(true);
+               try {
+                       consumerThread.interrupt();
+               } catch (Throwable t) {
+                       // ignore any exception
+               }
+       }
+
+       /*
+        * (non-Javadoc)
+        * 
+        * @see org.apache.ranger.audit.provider.AuditProvider#isFlushPending()
+        */
+       @Override
+       public boolean isFlushPending() {
+               if (queue.isEmpty()) {
+                       return consumer.isFlushPending();
+               }
+               return true;
+       }
+
+       /*
+        * (non-Javadoc)
+        * 
+        * @see java.lang.Runnable#run()
+        */
+       @Override
+       public void run() {
+               while (true) {
+                       try {
+                               AuditEventBase event = null;
+                               if (!isDrain()) {
+                                       // For Transfer queue take() is blocking
+                                       event = queue.take();
+                               } else {
+                                       // For Transfer queue poll() is non 
blocking
+                                       event = queue.poll();
+                               }
+                               if (event != null) {
+                                       Collection<AuditEventBase> eventList = 
new ArrayList<AuditEventBase>();
+                                       eventList.add(event);
+                                       // TODO: Put a limit. Hard coding to 
1000 (use batch size
+                                       // property)
+                                       queue.drainTo(eventList, 1000 - 1);
+                                       consumer.log(eventList);
+                                       eventList.clear();
+                               }
+                       } catch (InterruptedException e) {
+                               logger.info(
+                                               "Caught exception in consumer 
thread. Mostly to about loop",
+                                               e);
+                       } catch (Throwable t) {
+                               logger.error("Caught error during processing 
request.", t);
+                       }
+                       if (isDrain() && queue.isEmpty()) {
+                               break;
+                       }
+               }
+               try {
+                       // Call stop on the consumer
+                       consumer.stop();
+               } catch (Throwable t) {
+                       logger.error("Error while calling stop on consumer.", 
t);
+               }
+       }
+
+}

http://git-wip-us.apache.org/repos/asf/incubator-ranger/blob/eb1e9b5c/agents-audit/src/main/java/org/apache/ranger/audit/provider/AuditBatchProcessor.java
----------------------------------------------------------------------
diff --git 
a/agents-audit/src/main/java/org/apache/ranger/audit/provider/AuditBatchProcessor.java
 
b/agents-audit/src/main/java/org/apache/ranger/audit/provider/AuditBatchProcessor.java
new file mode 100644
index 0000000..58d122a
--- /dev/null
+++ 
b/agents-audit/src/main/java/org/apache/ranger/audit/provider/AuditBatchProcessor.java
@@ -0,0 +1,327 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ * 
+ * http://www.apache.org/licenses/LICENSE-2.0
+ * 
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.ranger.audit.provider;
+
+import java.util.ArrayList;
+import java.util.Collection;
+import java.util.Properties;
+import java.util.concurrent.ArrayBlockingQueue;
+import java.util.concurrent.BlockingQueue;
+import java.util.concurrent.TimeUnit;
+
+import org.apache.commons.logging.Log;
+import org.apache.commons.logging.LogFactory;
+import org.apache.ranger.audit.model.AuditEventBase;
+
+public class AuditBatchProcessor extends BaseAuditProvider implements Runnable 
{
+       private static final Log logger = LogFactory
+                       .getLog(AuditBatchProcessor.class);
+
+       private BlockingQueue<AuditEventBase> queue = null;
+       private Collection<AuditEventBase> localBatchBuffer = new 
ArrayList<AuditEventBase>();
+
+       Thread consumerThread = null;
+       static int threadCount = 0;
+
+       public AuditBatchProcessor() {
+       }
+
+       public AuditBatchProcessor(AuditProvider consumer) {
+               super(consumer);
+       }
+
+       /*
+        * (non-Javadoc)
+        * 
+        * @see
+        * org.apache.ranger.audit.provider.AuditProvider#log(org.apache.ranger.
+        * audit.model.AuditEventBase)
+        */
+       @Override
+       public boolean log(AuditEventBase event) {
+               // Add to batchQueue. Block if full
+               queue.add(event);
+               addLifeTimeInLogCount(1);
+               return true;
+       }
+
+       @Override
+       public boolean log(Collection<AuditEventBase> events) {
+               for (AuditEventBase event : events) {
+                       log(event);
+               }
+               return true;
+       }
+
+       @Override
+       public void init(Properties prop, String basePropertyName) {
+               String propPrefix = "xasecure.audit.batch";
+               if (basePropertyName != null) {
+                       propPrefix = basePropertyName;
+               }
+
+               super.init(prop, propPrefix);
+
+       }
+
+       /*
+        * (non-Javadoc)
+        * 
+        * @see org.apache.ranger.audit.provider.AuditProvider#start()
+        */
+       @Override
+       synchronized public void start() {
+               if (consumerThread != null) {
+                       logger.error("Provider is already started. name=" + 
getName());
+                       return;
+               }
+               logger.info("Creating ArrayBlockingQueue with maxSize="
+                               + getMaxQueueSize());
+               queue = new 
ArrayBlockingQueue<AuditEventBase>(getMaxQueueSize());
+
+               // Start the consumer first
+               consumer.start();
+
+               // Then the FileSpooler
+               if (fileSpoolerEnabled) {
+                       fileSpooler.start();
+               }
+
+               // Finally the queue listener
+               consumerThread = new Thread(this, this.getClass().getName()
+                               + (threadCount++));
+               consumerThread.setDaemon(true);
+               consumerThread.start();
+
+       }
+
+       /*
+        * (non-Javadoc)
+        * 
+        * @see org.apache.ranger.audit.provider.AuditProvider#stop()
+        */
+       @Override
+       public void stop() {
+               setDrain(true);
+               flush();
+               try {
+                       consumerThread.interrupt();
+               } catch (Throwable t) {
+                       // ignore any exception
+               }
+       }
+
+       /*
+        * (non-Javadoc)
+        * 
+        * @see org.apache.ranger.audit.provider.AuditProvider#waitToComplete()
+        */
+       @Override
+       public void waitToComplete() {
+               int defaultTimeOut = -1;
+               waitToComplete(defaultTimeOut);
+               consumer.waitToComplete(defaultTimeOut);
+       }
+
+       @Override
+       public void waitToComplete(long timeout) {
+               setDrain(true);
+               flush();
+               long sleepTime = 1000;
+               long startTime = System.currentTimeMillis();
+               int prevQueueSize = -1;
+               int staticLoopCount = 0;
+               while ((queue.size() > 0 || localBatchBuffer.size() > 0)) {
+                       if (prevQueueSize == queue.size()) {
+                               logger.error("Queue size is not changing. " + 
getName()
+                                               + ".size=" + queue.size());
+                               staticLoopCount++;
+                               if (staticLoopCount > 5) {
+                                       logger.error("Aborting writing to 
consumer. Some logs will be discarded."
+                                                       + getName() + ".size=" 
+ queue.size());
+                               }
+                       } else {
+                               staticLoopCount = 0;
+                       }
+                       consumerThread.interrupt();
+                       try {
+                               Thread.sleep(sleepTime);
+                               if (timeout > 0
+                                               && (System.currentTimeMillis() 
- startTime > timeout)) {
+                                       break;
+                               }
+                       } catch (InterruptedException e) {
+                               break;
+                       }
+               }
+               consumer.waitToComplete(timeout);
+       }
+
+       /*
+        * (non-Javadoc)
+        * 
+        * @see org.apache.ranger.audit.provider.AuditProvider#isFlushPending()
+        */
+       @Override
+       public boolean isFlushPending() {
+               if (queue.isEmpty()) {
+                       return consumer.isFlushPending();
+               }
+               return true;
+       }
+
+       /*
+        * (non-Javadoc)
+        * 
+        * @see org.apache.ranger.audit.provider.AuditProvider#flush()
+        */
+       @Override
+       public void flush() {
+               if (fileSpoolerEnabled) {
+                       fileSpooler.flush();
+               }
+               consumer.flush();
+       }
+
+       /*
+        * (non-Javadoc)
+        * 
+        * @see java.lang.Runnable#run()
+        */
+       @Override
+       public void run() {
+               long lastDispatchTime = System.currentTimeMillis();
+               boolean isDestActive = true;
+               while (true) {
+                       // Time to next dispatch
+                       long nextDispatchDuration = lastDispatchTime
+                                       - System.currentTimeMillis() + 
getMaxBatchInterval();
+
+                       boolean isToSpool = false;
+                       boolean fileSpoolDrain = false;
+                       try {
+                               if (fileSpoolerEnabled && 
fileSpooler.isPending()) {
+                                       int percentUsed = (getMaxQueueSize() - 
queue.size()) * 100
+                                                       / getMaxQueueSize();
+                                       long lastAttemptDelta = fileSpooler
+                                                       
.getLastAttemptTimeDelta();
+
+                                       fileSpoolDrain = lastAttemptDelta > 
fileSpoolMaxWaitTime;
+                                       // If we should even read from queue?
+                                       if (!isDrain() && !fileSpoolDrain
+                                                       && percentUsed < 
fileSpoolDrainThresholdPercent) {
+                                               // Since some files are still 
under progress and it is
+                                               // not in drain mode, lets wait 
and retry
+                                               if (nextDispatchDuration > 0) {
+                                                       
Thread.sleep(nextDispatchDuration);
+                                               }
+                                               continue;
+                                       }
+                                       isToSpool = true;
+                               }
+
+                               AuditEventBase event = null;
+
+                               if (!isToSpool && !isDrain() && !fileSpoolDrain
+                                               && nextDispatchDuration > 0) {
+                                       event = queue.poll(nextDispatchDuration,
+                                                       TimeUnit.MILLISECONDS);
+
+                               } else {
+                                       // For poll() is non blocking
+                                       event = queue.poll();
+                               }
+                               if (event != null) {
+                                       localBatchBuffer.add(event);
+                                       if (getMaxBatchSize() >= 
localBatchBuffer.size()) {
+                                               queue.drainTo(localBatchBuffer, 
getMaxBatchSize()
+                                                               - 
localBatchBuffer.size());
+                                       }
+                               }
+                       } catch (InterruptedException e) {
+                               logger.info(
+                                               "Caught exception in consumer 
thread. Mostly to abort loop",
+                                               e);
+                       } catch (Throwable t) {
+                               logger.error("Caught error during processing 
request.", t);
+                       }
+
+                       if (localBatchBuffer.size() > 0 && isToSpool) {
+                               // Let spool to the file directly
+                               if (isDestActive) {
+                                       logger.info("Switching to file spool. 
Queue=" + getName()
+                                                       + ", dest=" + 
consumer.getName());
+                               }
+                               isDestActive = false;
+                               fileSpooler.stashLogs(localBatchBuffer);
+                               localBatchBuffer.clear();
+                               // Reset all variables
+                               lastDispatchTime = System.currentTimeMillis();
+                       } else if (localBatchBuffer.size() > 0
+                                       && (isDrain()
+                                                       || 
localBatchBuffer.size() >= getMaxBatchSize() || nextDispatchDuration <= 0)) {
+                               if (fileSpoolerEnabled && !isDestActive) {
+                                       logger.info("Switching to writing to 
destination. Queue="
+                                                       + getName() + ", dest=" 
+ consumer.getName());
+                               }
+                               boolean ret = consumer.log(localBatchBuffer);
+                               if (!ret) {
+                                       if (fileSpoolerEnabled) {
+                                               logger.info("Switching to file 
spool. Queue="
+                                                               + getName() + 
", dest=" + consumer.getName());
+                                               // Transient error. Stash and 
move on
+                                               
fileSpooler.stashLogs(localBatchBuffer);
+                                               isDestActive = false;
+                                       } else {
+                                               // We need to drop this event
+                                               
logFailedEvent(localBatchBuffer, null);
+                                       }
+                               } else {
+                                       isDestActive = true;
+                               }
+                               localBatchBuffer.clear();
+                               // Reset all variables
+                               lastDispatchTime = System.currentTimeMillis();
+                       }
+
+                       if (isDrain()) {
+                               if (!queue.isEmpty() || localBatchBuffer.size() 
> 0) {
+                                       logger.info("Queue is not empty. Will 
retry. queue.size)="
+                                                       + queue.size() + ", 
localBatchBuffer.size()="
+                                                       + 
localBatchBuffer.size());
+                               } else {
+                                       break;
+                               }
+                       }
+               }
+
+               logger.info("Exiting consumerThread. Queue=" + getName() + ", 
dest="
+                               + consumer.getName());
+               try {
+                       // Call stop on the consumer
+                       consumer.stop();
+                       if (fileSpoolerEnabled) {
+                               fileSpooler.stop();
+                       }
+               } catch (Throwable t) {
+                       logger.error("Error while calling stop on consumer.", 
t);
+               }
+       }
+}

http://git-wip-us.apache.org/repos/asf/incubator-ranger/blob/eb1e9b5c/agents-audit/src/main/java/org/apache/ranger/audit/provider/AuditDestination.java
----------------------------------------------------------------------
diff --git 
a/agents-audit/src/main/java/org/apache/ranger/audit/provider/AuditDestination.java
 
b/agents-audit/src/main/java/org/apache/ranger/audit/provider/AuditDestination.java
new file mode 100644
index 0000000..11c32ca
--- /dev/null
+++ 
b/agents-audit/src/main/java/org/apache/ranger/audit/provider/AuditDestination.java
@@ -0,0 +1,70 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ * 
+ * http://www.apache.org/licenses/LICENSE-2.0
+ * 
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.ranger.audit.provider;
+
+import java.util.Properties;
+
+import org.apache.commons.logging.Log;
+import org.apache.commons.logging.LogFactory;
+
+/**
+ * This class needs to be extended by anyone who wants to build custom
+ * destination
+ */
+public abstract class AuditDestination extends BaseAuditProvider {
+       private static final Log logger = 
LogFactory.getLog(AuditDestination.class);
+
+       public AuditDestination() {
+               logger.info("AuditDestination() enter");
+       }
+
+       /*
+        * (non-Javadoc)
+        * 
+        * @see
+        * 
org.apache.ranger.audit.provider.AuditProvider#init(java.util.Properties,
+        * java.lang.String)
+        */
+       @Override
+       public void init(Properties prop, String basePropertyName) {
+               super.init(prop, basePropertyName);
+       }
+
+       /*
+        * (non-Javadoc)
+        * 
+        * @see org.apache.ranger.audit.provider.AuditProvider#isFlushPending()
+        */
+       @Override
+       public boolean isFlushPending() {
+               return false;
+       }
+
+       /*
+        * (non-Javadoc)
+        * 
+        * @see org.apache.ranger.audit.provider.AuditProvider#flush()
+        */
+       @Override
+       public void flush() {
+
+       }
+
+}

http://git-wip-us.apache.org/repos/asf/incubator-ranger/blob/eb1e9b5c/agents-audit/src/main/java/org/apache/ranger/audit/provider/AuditFileSpool.java
----------------------------------------------------------------------
diff --git 
a/agents-audit/src/main/java/org/apache/ranger/audit/provider/AuditFileSpool.java
 
b/agents-audit/src/main/java/org/apache/ranger/audit/provider/AuditFileSpool.java
new file mode 100644
index 0000000..8b006de
--- /dev/null
+++ 
b/agents-audit/src/main/java/org/apache/ranger/audit/provider/AuditFileSpool.java
@@ -0,0 +1,875 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ * 
+ * http://www.apache.org/licenses/LICENSE-2.0
+ * 
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.ranger.audit.provider;
+
+import java.io.BufferedReader;
+import java.io.BufferedWriter;
+import java.io.File;
+import java.io.FileFilter;
+import java.io.FileNotFoundException;
+import java.io.FileReader;
+import java.io.FileWriter;
+import java.io.IOException;
+import java.io.PrintWriter;
+import java.util.ArrayList;
+import java.util.Collection;
+import java.util.Date;
+import java.util.Iterator;
+import java.util.List;
+import java.util.Properties;
+import java.util.concurrent.BlockingQueue;
+import java.util.concurrent.LinkedTransferQueue;
+import java.util.concurrent.TimeUnit;
+
+import org.apache.commons.logging.Log;
+import org.apache.commons.logging.LogFactory;
+import org.apache.ranger.audit.model.AuditEventBase;
+
+import com.google.gson.Gson;
+import com.google.gson.GsonBuilder;
+
+/**
+ * This class temporarily stores logs in file system if the destination is
+ * overloaded or down
+ */
+public class AuditFileSpool implements Runnable {
+       private static final Log logger = 
LogFactory.getLog(AuditFileSpool.class);
+
+       public enum SPOOL_FILE_STATUS {
+               pending, write_inprogress, read_inprogress, done
+       }
+
+       public static final String PROP_FILE_SPOOL_LOCAL_DIR = "filespool.dir";
+       public static final String PROP_FILE_SPOOL_LOCAL_FILE_NAME = 
"filespool.filename.format";
+       public static final String PROP_FILE_SPOOL_ARCHIVE_DIR = 
"filespool.archive.dir";
+       public static final String PROP_FILE_SPOOL_ARCHIVE_MAX_FILES_COUNT = 
"filespool.archive.max.files";
+       public static final String PROP_FILE_SPOOL_FILENAME_PREFIX = 
"filespool.file.prefix";
+       public static final String PROP_FILE_SPOOL_FILE_ROLLOVER = 
"filespool.file.rollover.sec";
+       public static final String PROP_FILE_SPOOL_INDEX_FILE = 
"filespool.index.filename";
+       // public static final String PROP_FILE_SPOOL_INDEX_DONE_FILE =
+       // "filespool.index.done_filename";
+       public static final String PROP_FILE_SPOOL_DEST_RETRY_MS = 
"filespool.destination.retry.ms";
+
+       AuditProvider queueProvider = null;
+       AuditProvider consumerProvider = null;
+
+       BlockingQueue<AuditIndexRecord> indexQueue = new 
LinkedTransferQueue<AuditIndexRecord>();
+
+       // Folder and File attributes
+       File logFolder = null;
+       String logFileNameFormat = null;
+       File archiveFolder = null;
+       String fileNamePrefix = null;
+       String indexFileName = null;
+       File indexFile = null;
+       String indexDoneFileName = null;
+       File indexDoneFile = null;
+       int retryDestinationMS = 30 * 1000; // Default 30 seconds
+       int fileRolloverSec = 24 * 60 * 60; // In seconds
+       int maxArchiveFiles = 100;
+
+       int errorLogIntervalMS = 30 * 1000; // Every 30 seconds
+       long lastErrorLogMS = 0;
+
+       List<AuditIndexRecord> indexRecords = new ArrayList<AuditIndexRecord>();
+
+       boolean isPending = false;
+       long lastAttemptTime = 0;
+       boolean initDone = false;
+
+       PrintWriter logWriter = null;
+       AuditIndexRecord currentWriterIndexRecord = null;
+       AuditIndexRecord currentConsumerIndexRecord = null;
+
+       BufferedReader logReader = null;
+
+       Thread destinationThread = null;
+
+       boolean isWriting = true;
+       boolean isDrain = false;
+       boolean isDestDown = true;
+
+       private static Gson gson = null;
+
+       public AuditFileSpool(AuditProvider queueProvider,
+                       AuditProvider consumerProvider) {
+               this.queueProvider = queueProvider;
+               this.consumerProvider = consumerProvider;
+       }
+
+       public void init(Properties prop) {
+               init(prop, null);
+       }
+
+       public void init(Properties props, String basePropertyName) {
+               if (initDone) {
+                       logger.error("init() called more than once. 
queueProvider="
+                                       + queueProvider.getName() + ", 
consumerProvider="
+                                       + consumerProvider.getName());
+                       return;
+               }
+               String propPrefix = "xasecure.audit.filespool";
+               if (basePropertyName != null) {
+                       propPrefix = basePropertyName;
+               }
+
+               try {
+                       gson = new GsonBuilder().setDateFormat("yyyy-MM-dd 
HH:mm:ss.SSS")
+                                       .create();
+
+                       // Initial folder and file properties
+                       String logFolderProp = 
MiscUtil.getStringProperty(props, propPrefix
+                                       + "." + PROP_FILE_SPOOL_LOCAL_DIR);
+                       logFileNameFormat = MiscUtil.getStringProperty(props,
+                                       basePropertyName + "." + 
PROP_FILE_SPOOL_LOCAL_FILE_NAME);
+                       String archiveFolderProp = 
MiscUtil.getStringProperty(props,
+                                       propPrefix + "." + 
PROP_FILE_SPOOL_ARCHIVE_DIR);
+                       fileNamePrefix = MiscUtil.getStringProperty(props, 
propPrefix + "."
+                                       + PROP_FILE_SPOOL_FILENAME_PREFIX);
+                       indexFileName = MiscUtil.getStringProperty(props, 
propPrefix + "."
+                                       + PROP_FILE_SPOOL_INDEX_FILE);
+                       retryDestinationMS = MiscUtil.getIntProperty(props, 
propPrefix
+                                       + "." + PROP_FILE_SPOOL_DEST_RETRY_MS, 
retryDestinationMS);
+                       fileRolloverSec = MiscUtil.getIntProperty(props, 
propPrefix + "."
+                                       + PROP_FILE_SPOOL_FILE_ROLLOVER, 
fileRolloverSec);
+                       maxArchiveFiles = MiscUtil.getIntProperty(props, 
propPrefix + "."
+                                       + 
PROP_FILE_SPOOL_ARCHIVE_MAX_FILES_COUNT, maxArchiveFiles);
+
+                       logger.info("retryDestinationMS=" + retryDestinationMS
+                                       + ", queueName=" + 
queueProvider.getName());
+                       logger.info("fileRolloverSec=" + fileRolloverSec + ", 
queueName="
+                                       + queueProvider.getName());
+                       logger.info("maxArchiveFiles=" + maxArchiveFiles + ", 
queueName="
+                                       + queueProvider.getName());
+
+                       if (logFolderProp == null || logFolderProp.isEmpty()) {
+                               logger.error("Audit spool folder is not 
configured. Please set "
+                                               + propPrefix
+                                               + "."
+                                               + PROP_FILE_SPOOL_LOCAL_DIR
+                                               + ". queueName=" + 
queueProvider.getName());
+                               return;
+                       }
+                       logFolder = new File(logFolderProp);
+                       if (!logFolder.isDirectory()) {
+                               logFolder.mkdirs();
+                               if (!logFolder.isDirectory()) {
+                                       logger.error("File Spool folder not 
found and can't be created. folder="
+                                                       + 
logFolder.getAbsolutePath()
+                                                       + ", queueName="
+                                                       + 
queueProvider.getName());
+                                       return;
+                               }
+                       }
+                       logger.info("logFolder=" + logFolder + ", queueName="
+                                       + queueProvider.getName());
+
+                       if (logFileNameFormat == null || 
logFileNameFormat.isEmpty()) {
+                               logFileNameFormat = "spool_" + "%app-type%" + 
"_"
+                                               + "%time:yyyyMMdd-HHmm.ss%.log";
+                       }
+                       logger.info("logFileNameFormat=" + logFileNameFormat
+                                       + ", queueName=" + 
queueProvider.getName());
+
+                       if (archiveFolderProp == null || 
archiveFolderProp.isEmpty()) {
+                               archiveFolder = new File(logFolder, "archive");
+                       } else {
+                               archiveFolder = new File(archiveFolderProp);
+                       }
+                       if (!archiveFolder.isDirectory()) {
+                               archiveFolder.mkdirs();
+                               if (!archiveFolder.isDirectory()) {
+                                       logger.error("File Spool archive folder 
not found and can't be created. folder="
+                                                       + 
archiveFolder.getAbsolutePath()
+                                                       + ", queueName="
+                                                       + 
queueProvider.getName());
+                                       return;
+                               }
+                       }
+                       logger.info("archiveFolder=" + archiveFolder + ", 
queueName="
+                                       + queueProvider.getName());
+
+                       if (indexFileName == null || indexFileName.isEmpty()) {
+                               indexFileName = "index_" + fileNamePrefix + 
".json";
+                       }
+
+                       indexFile = new File(logFolder, indexFileName);
+                       if (!indexFile.exists()) {
+                               indexFile.createNewFile();
+                       }
+                       logger.info("indexFile=" + indexFile + ", queueName="
+                                       + queueProvider.getName());
+
+                       int lastDot = indexFileName.lastIndexOf('.');
+                       indexDoneFileName = indexFileName.substring(0, lastDot)
+                                       + "_closed.json";
+                       indexDoneFile = new File(logFolder, indexDoneFileName);
+                       if (!indexDoneFile.exists()) {
+                               indexDoneFile.createNewFile();
+                       }
+                       logger.info("indexDoneFile=" + indexDoneFile + ", 
queueName="
+                                       + queueProvider.getName());
+
+                       // Load index file
+                       loadIndexFile();
+                       for (AuditIndexRecord auditIndexRecord : indexRecords) {
+                               if 
(!auditIndexRecord.status.equals(SPOOL_FILE_STATUS.done)) {
+                                       isPending = true;
+                               }
+                               if (auditIndexRecord.status
+                                               
.equals(SPOOL_FILE_STATUS.write_inprogress)) {
+                                       currentWriterIndexRecord = 
auditIndexRecord;
+                                       logger.info("currentWriterIndexRecord="
+                                                       + 
currentWriterIndexRecord.filePath
+                                                       + ", queueName=" + 
queueProvider.getName());
+                               }
+                               if (auditIndexRecord.status
+                                               
.equals(SPOOL_FILE_STATUS.read_inprogress)) {
+                                       indexQueue.add(auditIndexRecord);
+                               }
+                       }
+                       printIndex();
+                       // One more loop to add the rest of the pending records 
in reverse
+                       // order
+                       for (int i = 0; i < indexRecords.size(); i++) {
+                               AuditIndexRecord auditIndexRecord = 
indexRecords.get(i);
+                               if 
(auditIndexRecord.status.equals(SPOOL_FILE_STATUS.pending)) {
+                                       File consumerFile = new 
File(auditIndexRecord.filePath);
+                                       if (!consumerFile.exists()) {
+                                               logger.error("INIT: Consumer 
file="
+                                                               + 
consumerFile.getPath() + " not found.");
+                                               System.exit(1);
+                                       }
+                                       indexQueue.add(auditIndexRecord);
+                               }
+                       }
+
+               } catch (Throwable t) {
+                       logger.fatal("Error initializing File Spooler. queue="
+                                       + queueProvider.getName(), t);
+                       return;
+               }
+               initDone = true;
+       }
+
+       /**
+        * Start looking for outstanding logs and update status according.
+        */
+       public void start() {
+               if (!initDone) {
+                       logger.error("Cannot start Audit File Spooler. 
Initilization not done yet. queueName="
+                                       + queueProvider.getName());
+                       return;
+               }
+
+               logger.info("Starting writerThread, queueName="
+                               + queueProvider.getName() + ", consumer="
+                               + consumerProvider.getName());
+
+               // Let's start the thread to read
+               destinationThread = new Thread(this, queueProvider.getName()
+                               + "_destWriter");
+               destinationThread.setDaemon(true);
+               destinationThread.start();
+       }
+
+       public void stop() {
+               if (!initDone) {
+                       logger.error("Cannot stop Audit File Spooler. 
Initilization not done. queueName="
+                                       + queueProvider.getName());
+                       return;
+               }
+               logger.info("Stop called, queueName=" + queueProvider.getName()
+                               + ", consumer=" + consumerProvider.getName());
+
+               isDrain = true;
+               flush();
+
+               PrintWriter out = getOpenLogFileStream();
+               if (out != null) {
+                       // If write is still going on, then let's give it 
enough time to
+                       // complete
+                       for (int i = 0; i < 3; i++) {
+                               if (isWriting) {
+                                       try {
+                                               Thread.sleep(1000);
+                                       } catch (InterruptedException e) {
+                                               // ignore
+                                       }
+                                       continue;
+                               }
+                               try {
+                                       logger.info("Closing open file, 
queueName="
+                                                       + 
queueProvider.getName() + ", consumer="
+                                                       + 
consumerProvider.getName());
+
+                                       out.flush();
+                                       out.close();
+                               } catch (Throwable t) {
+                                       logger.debug("Error closing spool out 
file.", t);
+                               }
+                       }
+               }
+               try {
+                       destinationThread.interrupt();
+               } catch (Throwable e) {
+                       // ignore
+               }
+       }
+
+       public void flush() {
+               if (!initDone) {
+                       logger.error("Cannot flush Audit File Spooler. 
Initilization not done. queueName="
+                                       + queueProvider.getName());
+                       return;
+               }
+               PrintWriter out = getOpenLogFileStream();
+               if (out != null) {
+                       out.flush();
+               }
+       }
+
+       /**
+        * If any files are still not processed. Also, if the destination is not
+        * reachable
+        * 
+        * @return
+        */
+       public boolean isPending() {
+               if (!initDone) {
+                       logError("isPending(): File Spooler not initialized. 
queueName="
+                                       + queueProvider.getName());
+                       return false;
+               }
+
+               return isPending;
+       }
+
+       /**
+        * Milliseconds from last attempt time
+        * 
+        * @return
+        */
+       public long getLastAttemptTimeDelta() {
+               if (lastAttemptTime == 0) {
+                       return 0;
+               }
+               return System.currentTimeMillis() - lastAttemptTime;
+       }
+
+       synchronized public void stashLogs(AuditEventBase event) {
+               if (isDrain) {
+                       // Stop has been called, so this method shouldn't be 
called
+                       logger.error("stashLogs() is called after stop is 
called. event="
+                                       + event);
+                       return;
+               }
+               try {
+                       isWriting = true;
+                       PrintWriter logOut = getLogFileStream();
+                       // Convert event to json
+                       String jsonStr = MiscUtil.stringify(event);
+                       logOut.println(jsonStr);
+                       isPending = true;
+               } catch (Exception ex) {
+                       logger.error("Error writing to file. event=" + event, 
ex);
+               } finally {
+                       isWriting = false;
+               }
+
+       }
+
+       synchronized public void stashLogs(Collection<AuditEventBase> events) {
+               for (AuditEventBase event : events) {
+                       stashLogs(event);
+               }
+               flush();
+       }
+
+       synchronized public void stashLogsString(String event) {
+               if (isDrain) {
+                       // Stop has been called, so this method shouldn't be 
called
+                       logger.error("stashLogs() is called after stop is 
called. event="
+                                       + event);
+                       return;
+               }
+               try {
+                       isWriting = true;
+                       PrintWriter logOut = getLogFileStream();
+                       logOut.println(event);
+               } catch (Exception ex) {
+                       logger.error("Error writing to file. event=" + event, 
ex);
+               } finally {
+                       isWriting = false;
+               }
+
+       }
+
+       synchronized public void stashLogsString(Collection<String> events) {
+               for (String event : events) {
+                       stashLogsString(event);
+               }
+               flush();
+       }
+
+       /**
+        * This return the current file. If there are not current open output 
file,
+        * then it will return null
+        * 
+        * @return
+        * @throws Exception
+        */
+       synchronized private PrintWriter getOpenLogFileStream() {
+               return logWriter;
+       }
+
+       /**
+        * @return
+        * @throws Exception
+        */
+       synchronized private PrintWriter getLogFileStream() throws Exception {
+               closeFileIfNeeded();
+
+               // Either there are no open log file or the previous one has 
been rolled
+               // over
+               if (currentWriterIndexRecord == null) {
+                       Date currentTime = new Date();
+                       // Create a new file
+                       String fileName = 
MiscUtil.replaceTokens(logFileNameFormat,
+                                       currentTime.getTime());
+                       String newFileName = fileName;
+                       File outLogFile = null;
+                       int i = 0;
+                       while (true) {
+                               outLogFile = new File(logFolder, newFileName);
+                               File archiveLogFile = new File(archiveFolder, 
newFileName);
+                               if (!outLogFile.exists() && 
!archiveLogFile.exists()) {
+                                       break;
+                               }
+                               i++;
+                               int lastDot = fileName.lastIndexOf('.');
+                               String baseName = fileName.substring(0, 
lastDot);
+                               String extension = fileName.substring(lastDot);
+                               newFileName = baseName + "." + i + extension;
+                       }
+                       fileName = newFileName;
+                       logger.info("Creating new file. queueName="
+                                       + queueProvider.getName() + ", 
fileName=" + fileName);
+                       // Open the file
+                       logWriter = new PrintWriter(new BufferedWriter(new 
FileWriter(
+                                       outLogFile)));
+
+                       AuditIndexRecord tmpIndexRecord = new 
AuditIndexRecord();
+
+                       tmpIndexRecord.id = MiscUtil.generateUniqueId();
+                       tmpIndexRecord.filePath = outLogFile.getPath();
+                       tmpIndexRecord.status = 
SPOOL_FILE_STATUS.write_inprogress;
+                       tmpIndexRecord.fileCreateTime = currentTime;
+                       tmpIndexRecord.lastAttempt = true;
+                       currentWriterIndexRecord = tmpIndexRecord;
+                       indexRecords.add(currentWriterIndexRecord);
+                       saveIndexFile();
+
+               } else {
+                       if (logWriter == null) {
+                               // This means the process just started. We need 
to open the file
+                               // in append mode.
+                               logger.info("Opening existing file for append. 
queueName="
+                                               + queueProvider.getName() + ", 
fileName="
+                                               + 
currentWriterIndexRecord.filePath);
+                               logWriter = new PrintWriter(new 
BufferedWriter(new FileWriter(
+                                               
currentWriterIndexRecord.filePath, true)));
+                       }
+               }
+               return logWriter;
+       }
+
+       synchronized private void closeFileIfNeeded() throws 
FileNotFoundException,
+                       IOException {
+               // Is there file open to write or there are no pending file, 
then close
+               // the active file
+               if (currentWriterIndexRecord != null) {
+                       // Check whether the file needs to rolled
+                       boolean closeFile = false;
+                       if (indexRecords.size() == 1) {
+                               closeFile = true;
+                               logger.info("Closing file. Only one open file. 
queueName="
+                                               + queueProvider.getName() + ", 
fileName="
+                                               + 
currentWriterIndexRecord.filePath);
+                       } else if (System.currentTimeMillis()
+                                       - 
currentWriterIndexRecord.fileCreateTime.getTime() > fileRolloverSec * 1000) {
+                               closeFile = true;
+                               logger.info("Closing file. Rolling over. 
queueName="
+                                               + queueProvider.getName() + ", 
fileName="
+                                               + 
currentWriterIndexRecord.filePath);
+                       }
+                       if (closeFile) {
+                               // Roll the file
+                               if (logWriter != null) {
+                                       logWriter.flush();
+                                       logWriter.close();
+                                       logWriter = null;
+                               }
+                               currentWriterIndexRecord.status = 
SPOOL_FILE_STATUS.pending;
+                               currentWriterIndexRecord.writeCompleteTime = 
new Date();
+                               saveIndexFile();
+                               logger.info("Adding file to queue. queueName="
+                                               + queueProvider.getName() + ", 
fileName="
+                                               + 
currentWriterIndexRecord.filePath);
+                               indexQueue.add(currentWriterIndexRecord);
+                               currentWriterIndexRecord = null;
+                       }
+               }
+       }
+
+       /**
+        * Load the index file
+        * 
+        * @throws IOException
+        */
+       void loadIndexFile() throws IOException {
+               logger.info("Loading index file. fileName=" + 
indexFile.getPath());
+               BufferedReader br = new BufferedReader(new 
FileReader(indexFile));
+               indexRecords.clear();
+               String line;
+               while ((line = br.readLine()) != null) {
+                       if (!line.isEmpty() && !line.startsWith("#")) {
+                               AuditIndexRecord record = gson.fromJson(line,
+                                               AuditIndexRecord.class);
+                               indexRecords.add(record);
+                       }
+               }
+               br.close();
+       }
+
+       synchronized void printIndex() {
+               logger.info("INDEX printIndex() ==== START");
+               Iterator<AuditIndexRecord> iter = indexRecords.iterator();
+               while (iter.hasNext()) {
+                       AuditIndexRecord record = iter.next();
+                       logger.info("INDEX=" + record + ", isFileExist="
+                                       + (new File(record.filePath).exists()));
+               }
+               logger.info("INDEX printIndex() ==== END");
+       }
+
+       synchronized void removeIndexRecord(AuditIndexRecord indexRecord)
+                       throws FileNotFoundException, IOException {
+               Iterator<AuditIndexRecord> iter = indexRecords.iterator();
+               while (iter.hasNext()) {
+                       AuditIndexRecord record = iter.next();
+                       if (record.id.equals(indexRecord.id)) {
+                               logger.info("Removing file from index. file=" + 
record.filePath
+                                               + ", queueName=" + 
queueProvider.getName()
+                                               + ", consumer=" + 
consumerProvider.getName());
+
+                               iter.remove();
+                               appendToDoneFile(record);
+                       }
+               }
+               saveIndexFile();
+       }
+
+       synchronized void saveIndexFile() throws FileNotFoundException, 
IOException {
+               PrintWriter out = new PrintWriter(indexFile);
+               for (AuditIndexRecord auditIndexRecord : indexRecords) {
+                       out.println(gson.toJson(auditIndexRecord));
+               }
+               out.close();
+               // printIndex();
+
+       }
+
+       void appendToDoneFile(AuditIndexRecord indexRecord)
+                       throws FileNotFoundException, IOException {
+               logger.info("Moving to done file. " + indexRecord.filePath
+                               + ", queueName=" + queueProvider.getName() + ", 
consumer="
+                               + consumerProvider.getName());
+               String line = gson.toJson(indexRecord);
+               PrintWriter out = new PrintWriter(new BufferedWriter(new 
FileWriter(
+                               indexDoneFile, true)));
+               out.println(line);
+               out.flush();
+               out.close();
+
+               // Move to archive folder
+               File logFile = null;
+               File archiveFile = null;
+               try {
+                       logFile = new File(indexRecord.filePath);
+                       String fileName = logFile.getName();
+                       archiveFile = new File(archiveFolder, fileName);
+                       logger.info("Moving logFile " + logFile + " to " + 
archiveFile);
+                       logFile.renameTo(archiveFile);
+               } catch (Throwable t) {
+                       logger.error("Error moving log file to archive folder. 
logFile="
+                                       + logFile + ", archiveFile=" + 
archiveFile, t);
+               }
+
+               archiveFile = null;
+               try {
+                       // Remove old files
+                       File[] logFiles = archiveFolder.listFiles(new 
FileFilter() {
+                               public boolean accept(File pathname) {
+                                       return 
pathname.getName().toLowerCase().endsWith(".log");
+                               }
+                       });
+
+                       if (logFiles.length > maxArchiveFiles) {
+                               int filesToDelete = logFiles.length - 
maxArchiveFiles;
+                               BufferedReader br = new BufferedReader(new 
FileReader(
+                                               indexDoneFile));
+                               try {
+                                       int filesDeletedCount = 0;
+                                       while ((line = br.readLine()) != null) {
+                                               if (!line.isEmpty() && 
!line.startsWith("#")) {
+                                                       AuditIndexRecord record 
= gson.fromJson(line,
+                                                                       
AuditIndexRecord.class);
+                                                       logFile = new 
File(record.filePath);
+                                                       String fileName = 
logFile.getName();
+                                                       archiveFile = new 
File(archiveFolder, fileName);
+                                                       if 
(archiveFile.exists()) {
+                                                               
logger.info("Deleting archive file "
+                                                                               
+ archiveFile);
+                                                               boolean ret = 
archiveFile.delete();
+                                                               if (!ret) {
+                                                                       
logger.error("Error deleting archive file. archiveFile="
+                                                                               
        + archiveFile);
+                                                               }
+                                                               
filesDeletedCount++;
+                                                               if 
(filesDeletedCount >= filesToDelete) {
+                                                                       
logger.info("Deleted " + filesDeletedCount
+                                                                               
        + " files");
+                                                                       break;
+                                                               }
+                                                       }
+                                               }
+                                       }
+                               } finally {
+                                       br.close();
+                               }
+                       }
+               } catch (Throwable t) {
+                       logger.error("Error deleting older archive file. 
archiveFile="
+                                       + archiveFile, t);
+               }
+
+       }
+
+       void logError(String msg) {
+               long currTimeMS = System.currentTimeMillis();
+               if (currTimeMS - lastErrorLogMS > errorLogIntervalMS) {
+                       logger.error(msg);
+                       lastErrorLogMS = currTimeMS;
+               }
+       }
+
+       class AuditIndexRecord {
+               String id;
+               String filePath;
+               int linePosition = 0;
+               SPOOL_FILE_STATUS status = SPOOL_FILE_STATUS.write_inprogress;
+               Date fileCreateTime;
+               Date writeCompleteTime;
+               Date doneCompleteTime;
+               Date lastSuccessTime;
+               Date lastFailedTime;
+               int failedAttemptCount = 0;
+               boolean lastAttempt = false;
+
+               @Override
+               public String toString() {
+                       return "AuditIndexRecord [id=" + id + ", filePath=" + 
filePath
+                                       + ", linePosition=" + linePosition + ", 
status=" + status
+                                       + ", fileCreateTime=" + fileCreateTime
+                                       + ", writeCompleteTime=" + 
writeCompleteTime
+                                       + ", doneCompleteTime=" + 
doneCompleteTime
+                                       + ", lastSuccessTime=" + lastSuccessTime
+                                       + ", lastFailedTime=" + lastFailedTime
+                                       + ", failedAttemptCount=" + 
failedAttemptCount
+                                       + ", lastAttempt=" + lastAttempt + "]";
+               }
+
+       }
+
+       class AuditFileSpoolAttempt {
+               Date attemptTime;
+               String status;
+       }
+
+       /*
+        * (non-Javadoc)
+        * 
+        * @see java.lang.Runnable#run()
+        */
+       @Override
+       public void run() {
+               while (true) {
+                       try {
+                               // Let's pause between each iteration
+                               if (currentConsumerIndexRecord == null) {
+                                       currentConsumerIndexRecord = 
indexQueue.poll(
+                                                       retryDestinationMS, 
TimeUnit.MILLISECONDS);
+                               } else {
+                                       Thread.sleep(retryDestinationMS);
+                               }
+
+                               if (isDrain) {
+                                       // Need to exit
+                                       break;
+                               }
+                               if (currentConsumerIndexRecord == null) {
+                                       closeFileIfNeeded();
+                                       continue;
+                               }
+
+                               boolean isRemoveIndex = false;
+                               File consumerFile = new File(
+                                               
currentConsumerIndexRecord.filePath);
+                               if (!consumerFile.exists()) {
+                                       logger.error("Consumer file=" + 
consumerFile.getPath()
+                                                       + " not found.");
+                                       printIndex();
+                                       isRemoveIndex = true;
+                               } else {
+                                       // Let's open the file to write
+                                       BufferedReader br = new 
BufferedReader(new FileReader(
+                                                       
currentConsumerIndexRecord.filePath));
+                                       try {
+                                               int startLine = 
currentConsumerIndexRecord.linePosition;
+                                               String line;
+                                               int currLine = 0;
+                                               boolean isResumed = false;
+                                               List<String> lines = new 
ArrayList<String>();
+                                               while ((line = br.readLine()) 
!= null) {
+                                                       currLine++;
+                                                       if (currLine < 
startLine) {
+                                                               continue;
+                                                       }
+                                                       lines.add(line);
+                                                       if (lines.size() == 
queueProvider.getMaxBatchSize()) {
+                                                               boolean ret = 
sendEvent(lines,
+                                                                               
currentConsumerIndexRecord, currLine);
+                                                               if (!ret) {
+                                                                       throw 
new Exception("Destination down");
+                                                               } else {
+                                                                       if 
(!isResumed) {
+                                                                               
logger.info("Started writing to destination. file="
+                                                                               
                + currentConsumerIndexRecord.filePath
+                                                                               
                + ", queueName="
+                                                                               
                + queueProvider.getName()
+                                                                               
                + ", consumer="
+                                                                               
                + consumerProvider.getName());
+                                                                       }
+                                                               }
+                                                               lines.clear();
+                                                       }
+                                               }
+                                               if (lines.size() > 0) {
+                                                       boolean ret = 
sendEvent(lines,
+                                                                       
currentConsumerIndexRecord, currLine);
+                                                       if (!ret) {
+                                                               throw new 
Exception("Destination down");
+                                                       } else {
+                                                               if (!isResumed) 
{
+                                                                       
logger.info("Started writing to destination. file="
+                                                                               
        + currentConsumerIndexRecord.filePath
+                                                                               
        + ", queueName="
+                                                                               
        + queueProvider.getName()
+                                                                               
        + ", consumer="
+                                                                               
        + consumerProvider.getName());
+                                                               }
+                                                       }
+                                                       lines.clear();
+                                               }
+                                               logger.info("Done reading file. 
file="
+                                                               + 
currentConsumerIndexRecord.filePath
+                                                               + ", 
queueName=" + queueProvider.getName()
+                                                               + ", consumer=" 
+ consumerProvider.getName());
+                                               // The entire file is read
+                                               
currentConsumerIndexRecord.status = SPOOL_FILE_STATUS.done;
+                                               
currentConsumerIndexRecord.doneCompleteTime = new Date();
+                                               
currentConsumerIndexRecord.lastAttempt = true;
+
+                                               isRemoveIndex = true;
+                                       } catch (Exception ex) {
+                                               isDestDown = true;
+                                               logError("Destination down. 
queueName="
+                                                               + 
queueProvider.getName() + ", consumer="
+                                                               + 
consumerProvider.getName());
+                                               lastAttemptTime = 
System.currentTimeMillis();
+                                               // Update the index file
+                                               
currentConsumerIndexRecord.lastFailedTime = new Date();
+                                               
currentConsumerIndexRecord.failedAttemptCount++;
+                                               
currentConsumerIndexRecord.lastAttempt = false;
+                                               saveIndexFile();
+                                       } finally {
+                                               br.close();
+                                       }
+                               }
+                               if (isRemoveIndex) {
+                                       // Remove this entry from index
+                                       
removeIndexRecord(currentConsumerIndexRecord);
+                                       currentConsumerIndexRecord = null;
+                                       closeFileIfNeeded();
+                               }
+                       } catch (Throwable t) {
+                               logger.error("Exception in destination writing 
thread.", t);
+                       }
+               }
+               logger.info("Exiting file spooler. provider=" + 
queueProvider.getName()
+                               + ", consumer=" + consumerProvider.getName());
+       }
+
+       private boolean sendEvent(List<String> lines, AuditIndexRecord 
indexRecord,
+                       int currLine) {
+               boolean ret = true;
+               try {
+                       ret = consumerProvider.logJSON(lines);
+                       if (!ret) {
+                               // Need to log error after fixed interval
+                               logError("Error sending logs to consumer. 
provider="
+                                               + queueProvider.getName() + ", 
consumer="
+                                               + consumerProvider.getName());
+                       } else {
+                               // Update index and save
+                               indexRecord.linePosition = currLine;
+                               indexRecord.status = 
SPOOL_FILE_STATUS.read_inprogress;
+                               indexRecord.lastSuccessTime = new Date();
+                               indexRecord.lastAttempt = true;
+                               saveIndexFile();
+
+                               if (isDestDown) {
+                                       isDestDown = false;
+                                       logger.info("Destination up now. " + 
indexRecord.filePath
+                                                       + ", queueName=" + 
queueProvider.getName()
+                                                       + ", consumer=" + 
consumerProvider.getName());
+                               }
+                       }
+               } catch (Throwable t) {
+                       logger.error("Error while sending logs to consumer. 
provider="
+                                       + queueProvider.getName() + ", 
consumer="
+                                       + consumerProvider.getName() + ", log=" 
+ lines, t);
+               }
+
+               return ret;
+       }
+
+}

http://git-wip-us.apache.org/repos/asf/incubator-ranger/blob/eb1e9b5c/agents-audit/src/main/java/org/apache/ranger/audit/provider/AuditMessageException.java
----------------------------------------------------------------------
diff --git 
a/agents-audit/src/main/java/org/apache/ranger/audit/provider/AuditMessageException.java
 
b/agents-audit/src/main/java/org/apache/ranger/audit/provider/AuditMessageException.java
new file mode 100644
index 0000000..3ef3e30
--- /dev/null
+++ 
b/agents-audit/src/main/java/org/apache/ranger/audit/provider/AuditMessageException.java
@@ -0,0 +1,67 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ * 
+ * http://www.apache.org/licenses/LICENSE-2.0
+ * 
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.ranger.audit.provider;
+
+/**
+ * This exception should be thrown only when there is an error in the message
+ * itself. E.g. invalid field type, etc. Don't throw this exception if there is
+ * a transient error
+ */
+public class AuditMessageException extends Exception {
+
+       private static final long serialVersionUID = 1L;
+
+       public AuditMessageException() {
+       }
+
+       /**
+        * @param message
+        */
+       public AuditMessageException(String message) {
+               super(message);
+       }
+
+       /**
+        * @param cause
+        */
+       public AuditMessageException(Throwable cause) {
+               super(cause);
+       }
+
+       /**
+        * @param message
+        * @param cause
+        */
+       public AuditMessageException(String message, Throwable cause) {
+               super(message, cause);
+       }
+
+       /**
+        * @param message
+        * @param cause
+        * @param enableSuppression
+        * @param writableStackTrace
+        */
+       public AuditMessageException(String message, Throwable cause,
+                       boolean enableSuppression, boolean writableStackTrace) {
+               super(message, cause, enableSuppression, writableStackTrace);
+       }
+
+}

http://git-wip-us.apache.org/repos/asf/incubator-ranger/blob/eb1e9b5c/agents-audit/src/main/java/org/apache/ranger/audit/provider/AuditProvider.java
----------------------------------------------------------------------
diff --git 
a/agents-audit/src/main/java/org/apache/ranger/audit/provider/AuditProvider.java
 
b/agents-audit/src/main/java/org/apache/ranger/audit/provider/AuditProvider.java
index 47c2d7f..0e38624 100644
--- 
a/agents-audit/src/main/java/org/apache/ranger/audit/provider/AuditProvider.java
+++ 
b/agents-audit/src/main/java/org/apache/ranger/audit/provider/AuditProvider.java
@@ -18,18 +18,38 @@
 
 package org.apache.ranger.audit.provider;
 
+import java.util.Collection;
 import java.util.Properties;
 
 import org.apache.ranger.audit.model.AuditEventBase;
 
 public interface AuditProvider {
-       public void log(AuditEventBase event);
+       public boolean log(AuditEventBase event);
+       public boolean log(Collection<AuditEventBase> events);  
+
+       public boolean logJSON(String event);
+       public boolean logJSON(Collection<String> events);      
 
     public void init(Properties prop);
+    public void init(Properties prop, String basePropertyName);
     public void start();
     public void stop();
     public void waitToComplete();
+    public void waitToComplete(long timeout);
+
+    /**
+     * Name for this provider. Used only during logging. Uniqueness is not 
guaranteed
+     */
+    public String getName();
 
+    /**
+     * If this AuditProvider in the state of shutdown
+     * @return
+     */
+    public boolean isDrain();
+    
+    public int getMaxBatchSize();
+    public int getMaxBatchInterval();
        public boolean isFlushPending();
        public long    getLastFlushTime();
     public void    flush();

http://git-wip-us.apache.org/repos/asf/incubator-ranger/blob/eb1e9b5c/agents-audit/src/main/java/org/apache/ranger/audit/provider/AuditProviderFactory.java
----------------------------------------------------------------------
diff --git 
a/agents-audit/src/main/java/org/apache/ranger/audit/provider/AuditProviderFactory.java
 
b/agents-audit/src/main/java/org/apache/ranger/audit/provider/AuditProviderFactory.java
index bb8fa6d..13b3142 100644
--- 
a/agents-audit/src/main/java/org/apache/ranger/audit/provider/AuditProviderFactory.java
+++ 
b/agents-audit/src/main/java/org/apache/ranger/audit/provider/AuditProviderFactory.java
@@ -28,7 +28,6 @@ import 
org.apache.ranger.audit.provider.hdfs.HdfsAuditProvider;
 import org.apache.ranger.audit.provider.kafka.KafkaAuditProvider;
 import org.apache.ranger.audit.provider.solr.SolrAuditProvider;
 
-
 /*
  * TODO:
  * 1) Flag to enable/disable audit logging
@@ -37,22 +36,25 @@ import 
org.apache.ranger.audit.provider.solr.SolrAuditProvider;
  */
 
 public class AuditProviderFactory {
-       private static final Log LOG = 
LogFactory.getLog(AuditProviderFactory.class);
-
-       private static final String AUDIT_IS_ENABLED_PROP       = 
"xasecure.audit.is.enabled" ;
-       private static final String AUDIT_DB_IS_ENABLED_PROP    = 
"xasecure.audit.db.is.enabled" ;
-       private static final String AUDIT_HDFS_IS_ENABLED_PROP  = 
"xasecure.audit.hdfs.is.enabled";
-       private static final String AUDIT_LOG4J_IS_ENABLED_PROP = 
"xasecure.audit.log4j.is.enabled" ;
-       private static final String AUDIT_KAFKA_IS_ENABLED_PROP = 
"xasecure.audit.kafka.is.enabled";
-       private static final String AUDIT_SOLR_IS_ENABLED_PROP = 
"xasecure.audit.solr.is.enabled";
-       
-       private static final int AUDIT_ASYNC_MAX_QUEUE_SIZE_DEFAULT     = 10 * 
1024;
-       private static final int AUDIT_ASYNC_MAX_FLUSH_INTERVAL_DEFAULT =  5 * 
1000;
-       
+       private static final Log LOG = LogFactory
+                       .getLog(AuditProviderFactory.class);
+
+       public static final String AUDIT_IS_ENABLED_PROP = 
"xasecure.audit.is.enabled";
+       public static final String AUDIT_DB_IS_ENABLED_PROP = 
"xasecure.audit.db.is.enabled";
+       public static final String AUDIT_HDFS_IS_ENABLED_PROP = 
"xasecure.audit.hdfs.is.enabled";
+       public static final String AUDIT_LOG4J_IS_ENABLED_PROP = 
"xasecure.audit.log4j.is.enabled";
+       public static final String AUDIT_KAFKA_IS_ENABLED_PROP = 
"xasecure.audit.kafka.is.enabled";
+       public static final String AUDIT_SOLR_IS_ENABLED_PROP = 
"xasecure.audit.solr.is.enabled";
+
+       public static final String AUDIT_DEST_BASE = 
"xasecure.audit.destination";
+
+       public static final int AUDIT_ASYNC_MAX_QUEUE_SIZE_DEFAULT = 10 * 1024;
+       public static final int AUDIT_ASYNC_MAX_FLUSH_INTERVAL_DEFAULT = 5 * 
1000;
+
        private static AuditProviderFactory sFactory;
 
        private AuditProvider mProvider = null;
-       private boolean       mInitDone = false;
+       private boolean mInitDone = false;
 
        private AuditProviderFactory() {
                LOG.info("AuditProviderFactory: creating..");
@@ -61,9 +63,9 @@ public class AuditProviderFactory {
        }
 
        public static AuditProviderFactory getInstance() {
-               if(sFactory == null) {
-                       synchronized(AuditProviderFactory.class) {
-                               if(sFactory == null) {
+               if (sFactory == null) {
+                       synchronized (AuditProviderFactory.class) {
+                               if (sFactory == null) {
                                        sFactory = new AuditProviderFactory();
                                }
                        }
@@ -75,7 +77,7 @@ public class AuditProviderFactory {
        public static AuditProvider getAuditProvider() {
                return AuditProviderFactory.getInstance().getProvider();
        }
-       
+
        public AuditProvider getProvider() {
                return mProvider;
        }
@@ -86,133 +88,301 @@ public class AuditProviderFactory {
 
        public synchronized void init(Properties props, String appType) {
                LOG.info("AuditProviderFactory: initializing..");
-               
-               if(mInitDone) {
-                       LOG.warn("AuditProviderFactory.init(): already 
initialized!", new Exception());
+
+               if (mInitDone) {
+                       LOG.warn("AuditProviderFactory.init(): already 
initialized!",
+                                       new Exception());
 
                        return;
                }
                mInitDone = true;
-               
-               MiscUtil.setApplicationType(appType);
 
-               boolean isEnabled             = 
BaseAuditProvider.getBooleanProperty(props, AUDIT_IS_ENABLED_PROP, false);
-               boolean isAuditToDbEnabled    = 
BaseAuditProvider.getBooleanProperty(props, AUDIT_DB_IS_ENABLED_PROP, false);
-               boolean isAuditToHdfsEnabled  = 
BaseAuditProvider.getBooleanProperty(props, AUDIT_HDFS_IS_ENABLED_PROP, false);
-               boolean isAuditToLog4jEnabled = 
BaseAuditProvider.getBooleanProperty(props, AUDIT_LOG4J_IS_ENABLED_PROP, false);
-               boolean isAuditToKafkaEnabled  = 
BaseAuditProvider.getBooleanProperty(props, AUDIT_KAFKA_IS_ENABLED_PROP, false);
-               boolean isAuditToSolrEnabled  = 
BaseAuditProvider.getBooleanProperty(props, AUDIT_SOLR_IS_ENABLED_PROP, false);
+               MiscUtil.setApplicationType(appType);
 
-               if(!isEnabled || !(isAuditToDbEnabled || isAuditToHdfsEnabled 
|| isAuditToKafkaEnabled || isAuditToLog4jEnabled || isAuditToSolrEnabled)) {
-                       LOG.info("AuditProviderFactory: Audit not enabled..");
+               boolean isEnabled = MiscUtil.getBooleanProperty(props,
+                               AUDIT_IS_ENABLED_PROP, false);
+               boolean isAuditToDbEnabled = MiscUtil.getBooleanProperty(props,
+                               AUDIT_DB_IS_ENABLED_PROP, false);
+               boolean isAuditToHdfsEnabled = 
MiscUtil.getBooleanProperty(props,
+                               AUDIT_HDFS_IS_ENABLED_PROP, false);
+               boolean isAuditToLog4jEnabled = 
MiscUtil.getBooleanProperty(props,
+                               AUDIT_LOG4J_IS_ENABLED_PROP, false);
+               boolean isAuditToKafkaEnabled = 
MiscUtil.getBooleanProperty(props,
+                               AUDIT_KAFKA_IS_ENABLED_PROP, false);
+               boolean isAuditToSolrEnabled = 
MiscUtil.getBooleanProperty(props,
+                               AUDIT_SOLR_IS_ENABLED_PROP, false);
 
-                       mProvider = getDefaultProvider();
+               List<AuditProvider> providers = new ArrayList<AuditProvider>();
 
-                       return;
+               // TODO: Delete me
+               for (Object propNameObj : props.keySet()) {
+                       LOG.info("DELETE ME: " + propNameObj.toString() + "="
+                                       + 
props.getProperty(propNameObj.toString()));
                }
 
-               List<AuditProvider> providers = new ArrayList<AuditProvider>();
+               // Process new audit configurations
+               List<String> destNameList = new ArrayList<String>();
 
-               if(isAuditToDbEnabled) {
-                       LOG.info("DbAuditProvider is enabled");
-                       DbAuditProvider dbProvider = new DbAuditProvider();
-
-                       boolean isAuditToDbAsync = 
BaseAuditProvider.getBooleanProperty(props, 
DbAuditProvider.AUDIT_DB_IS_ASYNC_PROP, false);
+               for (Object propNameObj : props.keySet()) {
+                       String propName = propNameObj.toString();
+                       if (propName.length() <= AUDIT_DEST_BASE.length() + 1) {
+                               continue;
+                       }
+                       String destName = 
propName.substring(AUDIT_DEST_BASE.length() + 1);
+                       List<String> splits = MiscUtil.toArray(destName, ".");
+                       if (splits.size() > 1) {
+                               continue;
+                       }
+                       String value = props.getProperty(propName);
+                       if (value.equalsIgnoreCase("enable")
+                                       || value.equalsIgnoreCase("true")) {
+                               destNameList.add(destName);
+                               LOG.info("Audit destination " + propName + " is 
set to "
+                                               + value);
+                       }
+               }
 
-                       if(isAuditToDbAsync) {
-                               int maxQueueSize     = 
BaseAuditProvider.getIntProperty(props, 
DbAuditProvider.AUDIT_DB_MAX_QUEUE_SIZE_PROP, 
AUDIT_ASYNC_MAX_QUEUE_SIZE_DEFAULT);
-                               int maxFlushInterval = 
BaseAuditProvider.getIntProperty(props, 
DbAuditProvider.AUDIT_DB_MAX_FLUSH_INTERVAL_PROP, 
AUDIT_ASYNC_MAX_FLUSH_INTERVAL_DEFAULT);
+               for (String destName : destNameList) {
+                       String destPropPrefix = AUDIT_DEST_BASE + "." + 
destName;
+                       AuditProvider destProvider = 
getProviderFromConfig(props,
+                                       destPropPrefix, destName);
 
-                               AsyncAuditProvider asyncProvider = new 
AsyncAuditProvider("DbAuditProvider", maxQueueSize, maxFlushInterval, 
dbProvider);
+                       if (destProvider != null) {
+                               destProvider.init(props, destPropPrefix);
 
-                               providers.add(asyncProvider);
-                       } else {
-                               providers.add(dbProvider);
+                               String queueName = 
MiscUtil.getStringProperty(props,
+                                               destPropPrefix + "." + 
BaseAuditProvider.PROP_QUEUE);
+                               if( queueName == null || queueName.isEmpty()) {
+                                       queueName = "batch";
+                               }
+                               if (queueName != null && !queueName.isEmpty()
+                                               && 
!queueName.equalsIgnoreCase("none")) {
+                                       String queuePropPrefix = destPropPrefix 
+ "." + queueName;
+                                       AuditProvider queueProvider = 
getProviderFromConfig(props,
+                                                       queuePropPrefix, 
queueName);
+                                       if (queueProvider != null) {
+                                               if (queueProvider instanceof 
BaseAuditProvider) {
+                                                       BaseAuditProvider 
qProvider = (BaseAuditProvider) queueProvider;
+                                                       
qProvider.setConsumer(destProvider);
+                                                       qProvider.init(props, 
queuePropPrefix);
+                                                       
providers.add(queueProvider);
+                                               } else {
+                                                       LOG.fatal("Provider 
queue doesn't extend BaseAuditProvider destination "
+                                                                       + 
destName
+                                                                       + " 
can't be created. queueName="
+                                                                       + 
queueName);
+                                               }
+                                       } else {
+                                               LOG.fatal("Queue provider for 
destination " + destName
+                                                               + " can't be 
created. queueName=" + queueName);
+                                       }
+                               } else {
+                                       LOG.info("Audit destination " + 
destProvider.getName()
+                                                       + " added to provider 
list");
+                                       providers.add(destProvider);
+                               }
                        }
                }
+               if (providers.size() > 0) {
+                       LOG.info("Using v2 audit configuration");
+                       AuditAsyncQueue asyncQueue = new AuditAsyncQueue();
+                       String propPrefix = 
BaseAuditProvider.PROP_DEFAULT_PREFIX + "." + "async";
+                       asyncQueue.init(props, propPrefix);
+
+                       if (providers.size() == 1) {
+                               asyncQueue.setConsumer(providers.get(0));
+                       } else {
+                               MultiDestAuditProvider multiDestProvider = new 
MultiDestAuditProvider();
+                               multiDestProvider.init(props);
+                               multiDestProvider.addAuditProviders(providers);
+                               asyncQueue.setConsumer(multiDestProvider);
+                       }
 
-               if(isAuditToHdfsEnabled) {
-                       LOG.info("HdfsAuditProvider is enabled");
+                       mProvider = asyncQueue;
+                       mProvider.start();
+               } else {
+                       LOG.info("No v2 audit configuration found. Trying v1 
audit configurations");
+                       if (!isEnabled
+                                       || !(isAuditToDbEnabled || 
isAuditToHdfsEnabled
+                                                       || 
isAuditToKafkaEnabled || isAuditToLog4jEnabled
+                                                       || isAuditToSolrEnabled 
|| providers.size() == 0)) {
+                               LOG.info("AuditProviderFactory: Audit not 
enabled..");
 
-                       HdfsAuditProvider hdfsProvider = new 
HdfsAuditProvider();
+                               mProvider = getDefaultProvider();
 
-                       boolean isAuditToHdfsAsync = 
BaseAuditProvider.getBooleanProperty(props, 
HdfsAuditProvider.AUDIT_HDFS_IS_ASYNC_PROP, false);
+                               return;
+                       }
 
-                       if(isAuditToHdfsAsync) {
-                               int maxQueueSize     = 
BaseAuditProvider.getIntProperty(props, 
HdfsAuditProvider.AUDIT_HDFS_MAX_QUEUE_SIZE_PROP, 
AUDIT_ASYNC_MAX_QUEUE_SIZE_DEFAULT);
-                               int maxFlushInterval = 
BaseAuditProvider.getIntProperty(props, 
HdfsAuditProvider.AUDIT_HDFS_MAX_FLUSH_INTERVAL_PROP, 
AUDIT_ASYNC_MAX_FLUSH_INTERVAL_DEFAULT);
+                       if (isAuditToDbEnabled) {
+                               LOG.info("DbAuditProvider is enabled");
+                               DbAuditProvider dbProvider = new 
DbAuditProvider();
 
-                               AsyncAuditProvider asyncProvider = new 
AsyncAuditProvider("HdfsAuditProvider", maxQueueSize, maxFlushInterval, 
hdfsProvider);
+                               boolean isAuditToDbAsync = 
MiscUtil.getBooleanProperty(props,
+                                               
DbAuditProvider.AUDIT_DB_IS_ASYNC_PROP, false);
 
-                               providers.add(asyncProvider);
-                       } else {
-                               providers.add(hdfsProvider);
-                       }
-               }
+                               if (isAuditToDbAsync) {
+                                       int maxQueueSize = 
MiscUtil.getIntProperty(props,
+                                                       
DbAuditProvider.AUDIT_DB_MAX_QUEUE_SIZE_PROP,
+                                                       
AUDIT_ASYNC_MAX_QUEUE_SIZE_DEFAULT);
+                                       int maxFlushInterval = 
MiscUtil.getIntProperty(props,
+                                                       
DbAuditProvider.AUDIT_DB_MAX_FLUSH_INTERVAL_PROP,
+                                                       
AUDIT_ASYNC_MAX_FLUSH_INTERVAL_DEFAULT);
 
-               if(isAuditToKafkaEnabled) {
-                       LOG.info("KafkaAuditProvider is enabled");
-                       KafkaAuditProvider kafkaProvider = new 
KafkaAuditProvider();
-                       kafkaProvider.init(props);
-                       
-                       if( kafkaProvider.isAsync()) {
-                               AsyncAuditProvider asyncProvider = new 
AsyncAuditProvider("MyKafkaAuditProvider", kafkaProvider.getMaxQueueSize(), 
kafkaProvider.getMaxFlushInterval(), kafkaProvider);
-                               providers.add(asyncProvider);
-                       } else {
-                               providers.add(kafkaProvider);
-                       }
-               }
+                                       AsyncAuditProvider asyncProvider = new 
AsyncAuditProvider(
+                                                       "DbAuditProvider", 
maxQueueSize, maxFlushInterval,
+                                                       dbProvider);
 
-               if(isAuditToSolrEnabled) {
-                       LOG.info("SolrAuditProvider is enabled");
-                       SolrAuditProvider solrProvider = new 
SolrAuditProvider();
-                       solrProvider.init(props);
-                       
-                       if( solrProvider.isAsync()) {
-                               AsyncAuditProvider asyncProvider = new 
AsyncAuditProvider("MySolrAuditProvider", solrProvider.getMaxQueueSize(), 
solrProvider.getMaxFlushInterval(), solrProvider);
-                               providers.add(asyncProvider);
-                       } else {
-                               providers.add(solrProvider);
+                                       providers.add(asyncProvider);
+                               } else {
+                                       providers.add(dbProvider);
+                               }
                        }
-               }
 
-               if(isAuditToLog4jEnabled) {
-                       Log4jAuditProvider log4jProvider = new 
Log4jAuditProvider();
+                       if (isAuditToHdfsEnabled) {
+                               LOG.info("HdfsAuditProvider is enabled");
 
-                       boolean isAuditToLog4jAsync = 
BaseAuditProvider.getBooleanProperty(props, 
Log4jAuditProvider.AUDIT_LOG4J_IS_ASYNC_PROP, false);
+                               HdfsAuditProvider hdfsProvider = new 
HdfsAuditProvider();
 
-                       if(isAuditToLog4jAsync) {
-                               int maxQueueSize     = 
BaseAuditProvider.getIntProperty(props, 
Log4jAuditProvider.AUDIT_LOG4J_MAX_QUEUE_SIZE_PROP, 
AUDIT_ASYNC_MAX_QUEUE_SIZE_DEFAULT);
-                               int maxFlushInterval = 
BaseAuditProvider.getIntProperty(props, 
Log4jAuditProvider.AUDIT_LOG4J_MAX_FLUSH_INTERVAL_PROP, 
AUDIT_ASYNC_MAX_FLUSH_INTERVAL_DEFAULT);
+                               boolean isAuditToHdfsAsync = 
MiscUtil.getBooleanProperty(props,
+                                               
HdfsAuditProvider.AUDIT_HDFS_IS_ASYNC_PROP, false);
 
-                               AsyncAuditProvider asyncProvider = new 
AsyncAuditProvider("Log4jAuditProvider", maxQueueSize, maxFlushInterval, 
log4jProvider);
+                               if (isAuditToHdfsAsync) {
+                                       int maxQueueSize = 
MiscUtil.getIntProperty(props,
+                                                       
HdfsAuditProvider.AUDIT_HDFS_MAX_QUEUE_SIZE_PROP,
+                                                       
AUDIT_ASYNC_MAX_QUEUE_SIZE_DEFAULT);
+                                       int maxFlushInterval = MiscUtil
+                                                       .getIntProperty(
+                                                                       props,
+                                                                       
HdfsAuditProvider.AUDIT_HDFS_MAX_FLUSH_INTERVAL_PROP,
+                                                                       
AUDIT_ASYNC_MAX_FLUSH_INTERVAL_DEFAULT);
+
+                                       AsyncAuditProvider asyncProvider = new 
AsyncAuditProvider(
+                                                       "HdfsAuditProvider", 
maxQueueSize,
+                                                       maxFlushInterval, 
hdfsProvider);
+
+                                       providers.add(asyncProvider);
+                               } else {
+                                       providers.add(hdfsProvider);
+                               }
+                       }
 
-                               providers.add(asyncProvider);
+                       if (isAuditToKafkaEnabled) {
+                               LOG.info("KafkaAuditProvider is enabled");
+                               KafkaAuditProvider kafkaProvider = new 
KafkaAuditProvider();
+                               kafkaProvider.init(props);
+
+                               if (kafkaProvider.isAsync()) {
+                                       AsyncAuditProvider asyncProvider = new 
AsyncAuditProvider(
+                                                       "MyKafkaAuditProvider",
+                                                       
kafkaProvider.getMaxQueueSize(),
+                                                       
kafkaProvider.getMaxBatchInterval(), kafkaProvider);
+                                       providers.add(asyncProvider);
+                               } else {
+                                       providers.add(kafkaProvider);
+                               }
+                       }
+
+                       if (isAuditToSolrEnabled) {
+                               LOG.info("SolrAuditProvider is enabled");
+                               SolrAuditProvider solrProvider = new 
SolrAuditProvider();
+                               solrProvider.init(props);
+
+                               if (solrProvider.isAsync()) {
+                                       AsyncAuditProvider asyncProvider = new 
AsyncAuditProvider(
+                                                       "MySolrAuditProvider",
+                                                       
solrProvider.getMaxQueueSize(),
+                                                       
solrProvider.getMaxBatchInterval(), solrProvider);
+                                       providers.add(asyncProvider);
+                               } else {
+                                       providers.add(solrProvider);
+                               }
+                       }
+
+                       if (isAuditToLog4jEnabled) {
+                               Log4jAuditProvider log4jProvider = new 
Log4jAuditProvider();
+
+                               boolean isAuditToLog4jAsync = 
MiscUtil.getBooleanProperty(
+                                               props, 
Log4jAuditProvider.AUDIT_LOG4J_IS_ASYNC_PROP,
+                                               false);
+
+                               if (isAuditToLog4jAsync) {
+                                       int maxQueueSize = 
MiscUtil.getIntProperty(props,
+                                                       
Log4jAuditProvider.AUDIT_LOG4J_MAX_QUEUE_SIZE_PROP,
+                                                       
AUDIT_ASYNC_MAX_QUEUE_SIZE_DEFAULT);
+                                       int maxFlushInterval = MiscUtil
+                                                       .getIntProperty(
+                                                                       props,
+                                                                       
Log4jAuditProvider.AUDIT_LOG4J_MAX_FLUSH_INTERVAL_PROP,
+                                                                       
AUDIT_ASYNC_MAX_FLUSH_INTERVAL_DEFAULT);
+
+                                       AsyncAuditProvider asyncProvider = new 
AsyncAuditProvider(
+                                                       "Log4jAuditProvider", 
maxQueueSize,
+                                                       maxFlushInterval, 
log4jProvider);
+
+                                       providers.add(asyncProvider);
+                               } else {
+                                       providers.add(log4jProvider);
+                               }
+                       }
+                       if (providers.size() == 0) {
+                               mProvider = getDefaultProvider();
+                       } else if (providers.size() == 1) {
+                               mProvider = providers.get(0);
                        } else {
-                               providers.add(log4jProvider);
+                               MultiDestAuditProvider multiDestProvider = new 
MultiDestAuditProvider();
+
+                               multiDestProvider.addAuditProviders(providers);
+
+                               mProvider = multiDestProvider;
                        }
-               }
 
-               if(providers.size() == 0) {
-                       mProvider = getDefaultProvider();
-               } else if(providers.size() == 1) {
-                       mProvider = providers.get(0);
-               } else {
-                       MultiDestAuditProvider multiDestProvider = new 
MultiDestAuditProvider();
-                       
-                       multiDestProvider.addAuditProviders(providers);
-                       
-                       mProvider = multiDestProvider;
+                       mProvider.init(props);
+                       mProvider.start();
                }
-               
-               mProvider.init(props);
-               mProvider.start();
 
                JVMShutdownHook jvmShutdownHook = new 
JVMShutdownHook(mProvider);
 
-           Runtime.getRuntime().addShutdownHook(jvmShutdownHook);
+               Runtime.getRuntime().addShutdownHook(jvmShutdownHook);
+       }
+
+       private AuditProvider getProviderFromConfig(Properties props,
+                       String propPrefix, String providerName) {
+               AuditProvider provider = null;
+               String className = MiscUtil.getStringProperty(props, propPrefix 
+ "."
+                               + BaseAuditProvider.PROP_CLASS_NAME);
+               if (className != null && !className.isEmpty()) {
+                       try {
+                               provider = (AuditProvider) 
Class.forName(className)
+                                               .newInstance();
+                       } catch (Exception e) {
+                               LOG.fatal("Can't instantiate audit class for 
providerName="
+                                               + providerName + ", className=" 
+ className);
+                       }
+               } else {
+                       if (providerName.equals("file")) {
+                               provider = new FileAuditDestination();
+                       } else if (providerName.equalsIgnoreCase("hdfs")) {
+                               provider = new HDFSAuditDestination();
+                       } else if (providerName.equals("solr")) {
+                               provider = new SolrAuditProvider();
+                       } else if (providerName.equals("kafka")) {
+                               provider = new KafkaAuditProvider();
+                       } else if (providerName.equals("db")) {
+                               provider = new DbAuditProvider();
+                       } else if (providerName.equals("log4j")) {
+                               provider = new Log4jAuditProvider();
+                       } else if (providerName.equals("batch")) {
+                               provider = new AuditBatchProcessor();
+                       } else if (providerName.equals("async")) {
+                               provider = new AuditAsyncQueue();
+                       } else {
+                               LOG.error("Provider name doesn't have any class 
associated with it. providerName="
+                                               + providerName);
+                       }
+               }
+               return provider;
        }
-       
+
        private AuditProvider getDefaultProvider() {
                return new DummyAuditProvider();
        }
@@ -227,6 +397,6 @@ public class AuditProviderFactory {
                public void run() {
                        mProvider.waitToComplete();
                        mProvider.stop();
-           }
+               }
        }
 }

Reply via email to