RANGER-397 - Implement reliable streaming audits to configurable
destinations - Incorporate Review Feedback

Project: http://git-wip-us.apache.org/repos/asf/incubator-ranger/repo
Commit: http://git-wip-us.apache.org/repos/asf/incubator-ranger/commit/4f3cea22
Tree: http://git-wip-us.apache.org/repos/asf/incubator-ranger/tree/4f3cea22
Diff: http://git-wip-us.apache.org/repos/asf/incubator-ranger/diff/4f3cea22

Branch: refs/heads/master
Commit: 4f3cea223b9bb717577732bf050bc78f16e94a69
Parents: 42a0e25
Author: Don Bosco Durai <[email protected]>
Authored: Wed Apr 22 09:49:01 2015 -0700
Committer: Don Bosco Durai <[email protected]>
Committed: Wed Apr 22 09:49:01 2015 -0700

----------------------------------------------------------------------
 .../audit/destination/AuditDestination.java     |  32 +-
 .../audit/destination/FileAuditDestination.java |  33 +-
 .../audit/destination/HDFSAuditDestination.java |  50 ++-
 .../ranger/audit/model/AuditEventBase.java      |   4 +-
 .../audit/provider/AsyncAuditProvider.java      |  60 +--
 .../ranger/audit/provider/AuditHandler.java     |  46 ++
 .../ranger/audit/provider/AuditProvider.java    |  56 ---
 .../audit/provider/AuditProviderFactory.java    | 142 +++---
 .../ranger/audit/provider/BaseAuditHandler.java | 271 ++++++++++++
 .../audit/provider/BaseAuditProvider.java       | 432 -------------------
 .../audit/provider/BufferedAuditProvider.java   |  12 +-
 .../ranger/audit/provider/DbAuditProvider.java  |  10 -
 .../audit/provider/DummyAuditProvider.java      |  35 +-
 .../audit/provider/Log4jAuditProvider.java      |   2 -
 .../audit/provider/MultiDestAuditProvider.java  |  59 +--
 .../provider/kafka/KafkaAuditProvider.java      |  22 +-
 .../audit/provider/solr/SolrAuditProvider.java  |  33 +-
 .../ranger/audit/queue/AuditAsyncQueue.java     |  34 +-
 .../ranger/audit/queue/AuditBatchQueue.java     |  26 +-
 .../ranger/audit/queue/AuditFileSpool.java      |  57 ++-
 .../apache/ranger/audit/queue/AuditQueue.java   | 174 ++++++++
 .../ranger/audit/queue/AuditSummaryQueue.java   |  49 +--
 .../apache/ranger/audit/test/TestEvents.java    |   4 +-
 .../org/apache/ranger/audit/TestAuditQueue.java |  98 +++--
 .../org/apache/ranger/audit/TestConsumer.java   |  46 +-
 25 files changed, 815 insertions(+), 972 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/incubator-ranger/blob/4f3cea22/agents-audit/src/main/java/org/apache/ranger/audit/destination/AuditDestination.java
----------------------------------------------------------------------
diff --git 
a/agents-audit/src/main/java/org/apache/ranger/audit/destination/AuditDestination.java
 
b/agents-audit/src/main/java/org/apache/ranger/audit/destination/AuditDestination.java
index 25c0220..9db8937 100644
--- 
a/agents-audit/src/main/java/org/apache/ranger/audit/destination/AuditDestination.java
+++ 
b/agents-audit/src/main/java/org/apache/ranger/audit/destination/AuditDestination.java
@@ -23,13 +23,13 @@ import java.util.Properties;
 
 import org.apache.commons.logging.Log;
 import org.apache.commons.logging.LogFactory;
-import org.apache.ranger.audit.provider.BaseAuditProvider;
+import org.apache.ranger.audit.provider.BaseAuditHandler;
 
 /**
  * This class needs to be extended by anyone who wants to build custom
  * destination
  */
-public abstract class AuditDestination extends BaseAuditProvider {
+public abstract class AuditDestination extends BaseAuditHandler {
        private static final Log logger = 
LogFactory.getLog(AuditDestination.class);
 
        public AuditDestination() {
@@ -51,21 +51,31 @@ public abstract class AuditDestination extends 
BaseAuditProvider {
        /*
         * (non-Javadoc)
         * 
-        * @see org.apache.ranger.audit.provider.AuditProvider#isFlushPending()
+        * @see org.apache.ranger.audit.provider.AuditProvider#flush()
         */
        @Override
-       public boolean isFlushPending() {
-               return false;
+       public void flush() {
+
        }
 
-       /*
-        * (non-Javadoc)
-        * 
-        * @see org.apache.ranger.audit.provider.AuditProvider#flush()
-        */
        @Override
-       public void flush() {
+       public void start() {
+               
+       }
+
+       @Override
+       public void stop() {
+               
+       }
 
+       @Override
+       public void waitToComplete() {
+               
        }
 
+       @Override
+       public void waitToComplete(long timeout) {
+               
+       }
+       
 }

http://git-wip-us.apache.org/repos/asf/incubator-ranger/blob/4f3cea22/agents-audit/src/main/java/org/apache/ranger/audit/destination/FileAuditDestination.java
----------------------------------------------------------------------
diff --git 
a/agents-audit/src/main/java/org/apache/ranger/audit/destination/FileAuditDestination.java
 
b/agents-audit/src/main/java/org/apache/ranger/audit/destination/FileAuditDestination.java
index 1ccfd5f..a132cdf 100644
--- 
a/agents-audit/src/main/java/org/apache/ranger/audit/destination/FileAuditDestination.java
+++ 
b/agents-audit/src/main/java/org/apache/ranger/audit/destination/FileAuditDestination.java
@@ -21,9 +21,7 @@ package org.apache.ranger.audit.destination;
 
 import java.io.BufferedWriter;
 import java.io.File;
-import java.io.FileNotFoundException;
 import java.io.FileWriter;
-import java.io.IOException;
 import java.io.PrintWriter;
 import java.util.ArrayList;
 import java.util.Collection;
@@ -107,7 +105,12 @@ public class FileAuditDestination extends AuditDestination 
{
        }
 
        @Override
-       public boolean logJSON(Collection<String> events) {
+       synchronized public boolean logJSON(Collection<String> events) {
+               if (isStopped) {
+                       logError("log() called after stop was requested. name=" 
+ getName());
+                       return false;
+               }
+
                try {
                        PrintWriter out = getLogFileStream();
                        for (String event : events) {
@@ -128,7 +131,7 @@ public class FileAuditDestination extends AuditDestination {
         * 
org.apache.ranger.audit.provider.AuditProvider#log(java.util.Collection)
         */
        @Override
-       synchronized public boolean log(Collection<AuditEventBase> events) {
+       public boolean log(Collection<AuditEventBase> events) {
                if (isStopped) {
                        logError("log() called after stop was requested. name=" 
+ getName());
                        return false;
@@ -158,11 +161,16 @@ public class FileAuditDestination extends 
AuditDestination {
 
        @Override
        synchronized public void stop() {
+               isStopped = true;
                if (logWriter != null) {
-                       logWriter.flush();
-                       logWriter.close();
+                       try {
+                               logWriter.flush();
+                               logWriter.close();
+                       } catch (Throwable t) {
+                               logger.error("Error on closing log writter. 
Exception will be ignored. name="
+                                               + getName() + ", fileName=" + 
currentFileName);
+                       }
                        logWriter = null;
-                       isStopped = true;
                }
        }
 
@@ -214,15 +222,20 @@ public class FileAuditDestination extends 
AuditDestination {
                return logWriter;
        }
 
-       private void closeFileIfNeeded() throws FileNotFoundException, 
IOException {
+       private void closeFileIfNeeded() {
                if (logWriter == null) {
                        return;
                }
                if (System.currentTimeMillis() - fileCreateTime.getTime() > 
fileRolloverSec * 1000) {
                        logger.info("Closing file. Rolling over. name=" + 
getName()
                                        + ", fileName=" + currentFileName);
-                       logWriter.flush();
-                       logWriter.close();
+                       try {
+                               logWriter.flush();
+                               logWriter.close();
+                       } catch (Throwable t) {
+                               logger.error("Error on closing log writter. 
Exception will be ignored. name="
+                                               + getName() + ", fileName=" + 
currentFileName);
+                       }
                        logWriter = null;
                        currentFileName = null;
                }

http://git-wip-us.apache.org/repos/asf/incubator-ranger/blob/4f3cea22/agents-audit/src/main/java/org/apache/ranger/audit/destination/HDFSAuditDestination.java
----------------------------------------------------------------------
diff --git 
a/agents-audit/src/main/java/org/apache/ranger/audit/destination/HDFSAuditDestination.java
 
b/agents-audit/src/main/java/org/apache/ranger/audit/destination/HDFSAuditDestination.java
index 706eb8e..6ca4fce 100644
--- 
a/agents-audit/src/main/java/org/apache/ranger/audit/destination/HDFSAuditDestination.java
+++ 
b/agents-audit/src/main/java/org/apache/ranger/audit/destination/HDFSAuditDestination.java
@@ -74,6 +74,12 @@ public class HDFSAuditDestination extends AuditDestination {
                // Initial folder and file properties
                String logFolderProp = MiscUtil.getStringProperty(props, 
propPrefix
                                + "." + PROP_HDFS_DIR);
+               if (logFolderProp == null || logFolderProp.isEmpty()) {
+                       logger.fatal("File destination folder is not 
configured. Please set "
+                                       + propPrefix + "." + PROP_HDFS_DIR + ". 
name=" + getName());
+                       return;
+               }
+
                String logSubFolder = MiscUtil.getStringProperty(props, 
propPrefix
                                + "." + PROP_HDFS_SUBDIR);
                if (logSubFolder == null || logSubFolder.isEmpty()) {
@@ -89,12 +95,6 @@ public class HDFSAuditDestination extends AuditDestination {
                        logFileNameFormat = 
"%app-type%_ranger_audit_%hostname%" + ".log";
                }
 
-               if (logFolderProp == null || logFolderProp.isEmpty()) {
-                       logger.fatal("File destination folder is not 
configured. Please set "
-                                       + propPrefix + "." + PROP_HDFS_DIR + ". 
name=" + getName());
-                       return;
-               }
-
                logFolder = logFolderProp + "/" + logSubFolder;
                logger.info("logFolder=" + logFolder + ", destName=" + 
getName());
                logger.info("logFileNameFormat=" + logFileNameFormat + ", 
destName="
@@ -104,7 +104,12 @@ public class HDFSAuditDestination extends AuditDestination 
{
        }
 
        @Override
-       public boolean logJSON(Collection<String> events) {
+       synchronized public boolean logJSON(Collection<String> events) {
+               if (isStopped) {
+                       logError("log() called after stop was requested. name=" 
+ getName());
+                       return false;
+               }
+
                try {
                        PrintWriter out = getLogFileStream();
                        for (String event : events) {
@@ -125,7 +130,7 @@ public class HDFSAuditDestination extends AuditDestination {
         * 
org.apache.ranger.audit.provider.AuditProvider#log(java.util.Collection)
         */
        @Override
-       synchronized public boolean log(Collection<AuditEventBase> events) {
+       public boolean log(Collection<AuditEventBase> events) {
                if (isStopped) {
                        logError("log() called after stop was requested. name=" 
+ getName());
                        return false;
@@ -155,15 +160,16 @@ public class HDFSAuditDestination extends 
AuditDestination {
 
        @Override
        synchronized public void stop() {
-               try {
-                       if (logWriter != null) {
+               isStopped = true;
+               if (logWriter != null) {
+                       try {
                                logWriter.flush();
                                logWriter.close();
-                               logWriter = null;
-                               isStopped = true;
+                       } catch (Throwable t) {
+                               logger.error("Error on closing log writter. 
Exception will be ignored. name="
+                                               + getName() + ", fileName=" + 
currentFileName);
                        }
-               } catch (Throwable t) {
-                       logger.error("Error closing HDFS file.", t);
+                       logWriter = null;
                }
        }
 
@@ -198,9 +204,11 @@ public class HDFSAuditDestination extends AuditDestination 
{
                                String extension = 
defaultPath.substring(lastDot);
                                fullPath = baseName + "." + i + extension;
                                hdfPath = new Path(fullPath);
-                               logger.info("Checking whether log file exists. 
hdfPath=" + fullPath);
+                               logger.info("Checking whether log file exists. 
hdfPath="
+                                               + fullPath);
                        }
-                       logger.info("Log file doesn't exists. Will create and 
use it. hdfPath=" + fullPath);
+                       logger.info("Log file doesn't exists. Will create and 
use it. hdfPath="
+                                       + fullPath);
                        // Create parent folders
                        createParents(hdfPath, fileSystem);
 
@@ -234,8 +242,14 @@ public class HDFSAuditDestination extends AuditDestination 
{
                if (System.currentTimeMillis() - fileCreateTime.getTime() > 
fileRolloverSec * 1000) {
                        logger.info("Closing file. Rolling over. name=" + 
getName()
                                        + ", fileName=" + currentFileName);
-                       logWriter.flush();
-                       logWriter.close();
+                       try {
+                               logWriter.flush();
+                               logWriter.close();
+                       } catch (Throwable t) {
+                               logger.error("Error on closing log writter. 
Exception will be ignored. name="
+                                               + getName() + ", fileName=" + 
currentFileName);
+                       }
+
                        logWriter = null;
                        currentFileName = null;
                }

http://git-wip-us.apache.org/repos/asf/incubator-ranger/blob/4f3cea22/agents-audit/src/main/java/org/apache/ranger/audit/model/AuditEventBase.java
----------------------------------------------------------------------
diff --git 
a/agents-audit/src/main/java/org/apache/ranger/audit/model/AuditEventBase.java 
b/agents-audit/src/main/java/org/apache/ranger/audit/model/AuditEventBase.java
index 39a2578..2c6a87f 100644
--- 
a/agents-audit/src/main/java/org/apache/ranger/audit/model/AuditEventBase.java
+++ 
b/agents-audit/src/main/java/org/apache/ranger/audit/model/AuditEventBase.java
@@ -32,8 +32,8 @@ public abstract class AuditEventBase {
        
        public abstract String getEventKey();
        public abstract Date getEventTime ();
-       public abstract void setEventCount(long frequencyCount);
-       public abstract void setEventDurationMS(long frequencyDurationMS);
+       public abstract void setEventCount(long eventCount);
+       public abstract void setEventDurationMS(long eventDurationMS);
        
        protected String trim(String str, int len) {
                String ret = str;

http://git-wip-us.apache.org/repos/asf/incubator-ranger/blob/4f3cea22/agents-audit/src/main/java/org/apache/ranger/audit/provider/AsyncAuditProvider.java
----------------------------------------------------------------------
diff --git 
a/agents-audit/src/main/java/org/apache/ranger/audit/provider/AsyncAuditProvider.java
 
b/agents-audit/src/main/java/org/apache/ranger/audit/provider/AsyncAuditProvider.java
index 53adc86..c3a0c78 100644
--- 
a/agents-audit/src/main/java/org/apache/ranger/audit/provider/AsyncAuditProvider.java
+++ 
b/agents-audit/src/main/java/org/apache/ranger/audit/provider/AsyncAuditProvider.java
@@ -68,7 +68,7 @@ public class AsyncAuditProvider extends 
MultiDestAuditProvider implements
                mQueue = new ArrayBlockingQueue<AuditEventBase>(mMaxQueueSize);
        }
 
-       public AsyncAuditProvider(String name, int maxQueueSize, int 
maxFlushInterval, AuditProvider provider) {
+       public AsyncAuditProvider(String name, int maxQueueSize, int 
maxFlushInterval, AuditHandler provider) {
                this(name, maxQueueSize, maxFlushInterval);
 
                addAuditProvider(provider);
@@ -174,21 +174,21 @@ public class AsyncAuditProvider extends 
MultiDestAuditProvider implements
                while(ret == null) {
                        logSummaryIfRequired();
 
-                       if (mMaxFlushInterval > 0 && isFlushPending()) {
-                               long timeTillNextFlush = getTimeTillNextFlush();
-
-                               if (timeTillNextFlush <= 0) {
-                                       break; // force flush
-                               }
-
-                               ret = mQueue.poll(timeTillNextFlush, 
TimeUnit.MILLISECONDS);
-                       } else {
+//                     if (mMaxFlushInterval > 0 && isFlushPending()) {
+//                             long timeTillNextFlush = getTimeTillNextFlush();
+//
+//                             if (timeTillNextFlush <= 0) {
+//                                     break; // force flush
+//                             }
+//
+//                             ret = mQueue.poll(timeTillNextFlush, 
TimeUnit.MILLISECONDS);
+//                     } else {
                                // Let's wake up for summary logging
                                long waitTime = intervalLogDurationMS - 
(System.currentTimeMillis() - lastIntervalLogTime);
                                waitTime = waitTime <= 0 ? 
intervalLogDurationMS : waitTime;
 
                                ret = mQueue.poll(waitTime, 
TimeUnit.MILLISECONDS);
-                       }
+//                     }
                }
 
                if(ret != null) {
@@ -246,23 +246,23 @@ public class AsyncAuditProvider extends 
MultiDestAuditProvider implements
                LOG.debug("<== AsyncAuditProvider.waitToComplete()");
        }
 
-       private long getTimeTillNextFlush() {
-               long timeTillNextFlush = mMaxFlushInterval;
-
-               if (mMaxFlushInterval > 0) {
-                       long lastFlushTime = getLastFlushTime();
-
-                       if (lastFlushTime != 0) {
-                               long timeSinceLastFlush = 
System.currentTimeMillis()
-                                               - lastFlushTime;
-
-                               if (timeSinceLastFlush >= mMaxFlushInterval)
-                                       timeTillNextFlush = 0;
-                               else
-                                       timeTillNextFlush = mMaxFlushInterval - 
timeSinceLastFlush;
-                       }
-               }
-
-               return timeTillNextFlush;
-       }
+//     private long getTimeTillNextFlush() {
+//             long timeTillNextFlush = mMaxFlushInterval;
+//
+//             if (mMaxFlushInterval > 0) {
+//                     long lastFlushTime = getLastFlushTime();
+//
+//                     if (lastFlushTime != 0) {
+//                             long timeSinceLastFlush = 
System.currentTimeMillis()
+//                                             - lastFlushTime;
+//
+//                             if (timeSinceLastFlush >= mMaxFlushInterval)
+//                                     timeTillNextFlush = 0;
+//                             else
+//                                     timeTillNextFlush = mMaxFlushInterval - 
timeSinceLastFlush;
+//                     }
+//             }
+//
+//             return timeTillNextFlush;
+//     }
 }

http://git-wip-us.apache.org/repos/asf/incubator-ranger/blob/4f3cea22/agents-audit/src/main/java/org/apache/ranger/audit/provider/AuditHandler.java
----------------------------------------------------------------------
diff --git 
a/agents-audit/src/main/java/org/apache/ranger/audit/provider/AuditHandler.java 
b/agents-audit/src/main/java/org/apache/ranger/audit/provider/AuditHandler.java
new file mode 100644
index 0000000..7b51f1d
--- /dev/null
+++ 
b/agents-audit/src/main/java/org/apache/ranger/audit/provider/AuditHandler.java
@@ -0,0 +1,46 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.ranger.audit.provider;
+
+import java.util.Collection;
+import java.util.Properties;
+
+import org.apache.ranger.audit.model.AuditEventBase;
+
+public interface AuditHandler {
+       public boolean log(AuditEventBase event);
+       public boolean log(Collection<AuditEventBase> events);  
+
+       public boolean logJSON(String event);
+       public boolean logJSON(Collection<String> events);      
+
+    public void init(Properties prop);
+    public void init(Properties prop, String basePropertyName);
+    public void start();
+    public void stop();
+    public void waitToComplete();
+    public void waitToComplete(long timeout);
+
+    /**
+     * Name for this provider. Used only during logging. Uniqueness is not 
guaranteed
+     */
+    public String getName();
+
+    public void    flush();
+}

http://git-wip-us.apache.org/repos/asf/incubator-ranger/blob/4f3cea22/agents-audit/src/main/java/org/apache/ranger/audit/provider/AuditProvider.java
----------------------------------------------------------------------
diff --git 
a/agents-audit/src/main/java/org/apache/ranger/audit/provider/AuditProvider.java
 
b/agents-audit/src/main/java/org/apache/ranger/audit/provider/AuditProvider.java
deleted file mode 100644
index 0e38624..0000000
--- 
a/agents-audit/src/main/java/org/apache/ranger/audit/provider/AuditProvider.java
+++ /dev/null
@@ -1,56 +0,0 @@
-/**
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.ranger.audit.provider;
-
-import java.util.Collection;
-import java.util.Properties;
-
-import org.apache.ranger.audit.model.AuditEventBase;
-
-public interface AuditProvider {
-       public boolean log(AuditEventBase event);
-       public boolean log(Collection<AuditEventBase> events);  
-
-       public boolean logJSON(String event);
-       public boolean logJSON(Collection<String> events);      
-
-    public void init(Properties prop);
-    public void init(Properties prop, String basePropertyName);
-    public void start();
-    public void stop();
-    public void waitToComplete();
-    public void waitToComplete(long timeout);
-
-    /**
-     * Name for this provider. Used only during logging. Uniqueness is not 
guaranteed
-     */
-    public String getName();
-
-    /**
-     * If this AuditProvider in the state of shutdown
-     * @return
-     */
-    public boolean isDrain();
-    
-    public int getMaxBatchSize();
-    public int getMaxBatchInterval();
-       public boolean isFlushPending();
-       public long    getLastFlushTime();
-    public void    flush();
-}

http://git-wip-us.apache.org/repos/asf/incubator-ranger/blob/4f3cea22/agents-audit/src/main/java/org/apache/ranger/audit/provider/AuditProviderFactory.java
----------------------------------------------------------------------
diff --git 
a/agents-audit/src/main/java/org/apache/ranger/audit/provider/AuditProviderFactory.java
 
b/agents-audit/src/main/java/org/apache/ranger/audit/provider/AuditProviderFactory.java
index a67f7e0..7b2b52b 100644
--- 
a/agents-audit/src/main/java/org/apache/ranger/audit/provider/AuditProviderFactory.java
+++ 
b/agents-audit/src/main/java/org/apache/ranger/audit/provider/AuditProviderFactory.java
@@ -31,6 +31,7 @@ import 
org.apache.ranger.audit.provider.kafka.KafkaAuditProvider;
 import org.apache.ranger.audit.provider.solr.SolrAuditProvider;
 import org.apache.ranger.audit.queue.AuditAsyncQueue;
 import org.apache.ranger.audit.queue.AuditBatchQueue;
+import org.apache.ranger.audit.queue.AuditQueue;
 import org.apache.ranger.audit.queue.AuditSummaryQueue;
 
 /*
@@ -58,7 +59,7 @@ public class AuditProviderFactory {
 
        private static AuditProviderFactory sFactory;
 
-       private AuditProvider mProvider = null;
+       private AuditHandler mProvider = null;
        private boolean mInitDone = false;
 
        private AuditProviderFactory() {
@@ -79,11 +80,11 @@ public class AuditProviderFactory {
                return sFactory;
        }
 
-       public static AuditProvider getAuditProvider() {
+       public static AuditHandler getAuditProvider() {
                return AuditProviderFactory.getInstance().getProvider();
        }
 
-       public AuditProvider getProvider() {
+       public AuditHandler getProvider() {
                return mProvider;
        }
 
@@ -118,7 +119,7 @@ public class AuditProviderFactory {
                boolean isAuditToSolrEnabled = 
MiscUtil.getBooleanProperty(props,
                                AUDIT_SOLR_IS_ENABLED_PROP, false);
 
-               List<AuditProvider> providers = new ArrayList<AuditProvider>();
+               List<AuditHandler> providers = new ArrayList<AuditHandler>();
 
                // TODO: Delete me
                for (Object propNameObj : props.keySet()) {
@@ -150,17 +151,16 @@ public class AuditProviderFactory {
 
                for (String destName : destNameList) {
                        String destPropPrefix = AUDIT_DEST_BASE + "." + 
destName;
-                       AuditProvider destProvider = 
getProviderFromConfig(props,
-                                       destPropPrefix, destName);
+                       AuditHandler destProvider = getProviderFromConfig(props,
+                                       destPropPrefix, destName, null);
 
                        if (destProvider != null) {
                                destProvider.init(props, destPropPrefix);
 
                                String queueName = 
MiscUtil.getStringProperty(props,
-                                               destPropPrefix + "." + 
BaseAuditProvider.PROP_QUEUE);
+                                               destPropPrefix + "." + 
AuditQueue.PROP_QUEUE);
                                if (queueName == null || queueName.isEmpty()) {
-                                       LOG.info(destPropPrefix + "."
-                                                       + 
BaseAuditProvider.PROP_QUEUE
+                                       LOG.info(destPropPrefix + "." + 
AuditQueue.PROP_QUEUE
                                                        + " is not set. Setting 
queue to batch for "
                                                        + destName);
                                        queueName = "batch";
@@ -169,16 +169,15 @@ public class AuditProviderFactory {
                                if (queueName != null && !queueName.isEmpty()
                                                && 
!queueName.equalsIgnoreCase("none")) {
                                        String queuePropPrefix = destPropPrefix 
+ "." + queueName;
-                                       AuditProvider queueProvider = 
getProviderFromConfig(props,
-                                                       queuePropPrefix, 
queueName);
+                                       AuditHandler queueProvider = 
getProviderFromConfig(props,
+                                                       queuePropPrefix, 
queueName, destProvider);
                                        if (queueProvider != null) {
-                                               if (queueProvider instanceof 
BaseAuditProvider) {
-                                                       BaseAuditProvider 
qProvider = (BaseAuditProvider) queueProvider;
-                                                       
qProvider.setConsumer(destProvider);
+                                               if (queueProvider instanceof 
AuditQueue) {
+                                                       AuditQueue qProvider = 
(AuditQueue) queueProvider;
                                                        qProvider.init(props, 
queuePropPrefix);
                                                        
providers.add(queueProvider);
                                                } else {
-                                                       LOG.fatal("Provider 
queue doesn't extend BaseAuditProvider destination "
+                                                       LOG.fatal("Provider 
queue doesn't extend AuditQueue. Destination="
                                                                        + 
destName
                                                                        + " 
can't be created. queueName="
                                                                        + 
queueName);
@@ -196,51 +195,51 @@ public class AuditProviderFactory {
                }
                if (providers.size() > 0) {
                        LOG.info("Using v3 audit configuration");
-                       AuditAsyncQueue asyncQueue = new AuditAsyncQueue();
-                       String propPrefix = 
BaseAuditProvider.PROP_DEFAULT_PREFIX + "."
-                                       + "async";
-                       asyncQueue.init(props, propPrefix);
+                       AuditHandler consumer = providers.get(0);
+
+                       // Possible pipeline is:
+                       // async_queue -> summary_queue -> multidestination -> 
batch_queue
+                       // -> hdfs_destination
+                       // -> batch_queue -> solr_destination
+                       // -> batch_queue -> kafka_destination
+                       // Above, up to multidestination, the providers are 
same, then it
+                       // branches out in parallel.
+
+                       // Set the providers in the reverse order e.g.
+
+                       if (providers.size() > 1) {
+                               // If there are more than one destination, then 
we need multi
+                               // destination to process it in parallel
+                               LOG.info("MultiDestAuditProvider is used. 
Destination count="
+                                               + providers.size());
+                               MultiDestAuditProvider multiDestProvider = new 
MultiDestAuditProvider();
+                               multiDestProvider.init(props);
+                               multiDestProvider.addAuditProviders(providers);
+                               consumer = multiDestProvider;
+                       }
 
-                       propPrefix = BaseAuditProvider.PROP_DEFAULT_PREFIX;
+                       // Let's see if Summary is enabled, then summarize 
before sending it
+                       // downstream
+                       String propPrefix = 
BaseAuditHandler.PROP_DEFAULT_PREFIX;
                        boolean summaryEnabled = 
MiscUtil.getBooleanProperty(props,
                                        propPrefix + "." + "summary" + "." + 
"enabled", false);
                        AuditSummaryQueue summaryQueue = null;
                        if (summaryEnabled) {
                                LOG.info("AuditSummaryQueue is enabled");
-                               summaryQueue = new AuditSummaryQueue();
+                               summaryQueue = new AuditSummaryQueue(consumer);
                                summaryQueue.init(props, propPrefix);
-                               asyncQueue.setConsumer(summaryQueue);
+                               consumer = summaryQueue;
                        } else {
                                LOG.info("AuditSummaryQueue is disabled");
                        }
 
-                       if (providers.size() == 1) {
-                               if (summaryEnabled) {
-                                       LOG.info("Setting " + 
providers.get(0).getName()
-                                                       + " as consumer to 
AuditSummaryQueue");
-                                       
summaryQueue.setConsumer(providers.get(0));
-                               } else {
-                                       LOG.info("Setting " + 
providers.get(0).getName()
-                                                       + " as consumer to " + 
asyncQueue.getName());
-                                       
asyncQueue.setConsumer(providers.get(0));
-                               }
-                       } else {
-                               MultiDestAuditProvider multiDestProvider = new 
MultiDestAuditProvider();
-                               multiDestProvider.init(props);
-                               multiDestProvider.addAuditProviders(providers);
-                               if (summaryEnabled) {
-                                       LOG.info("Setting " + 
multiDestProvider.getName()
-                                                       + " as consumer to 
AuditSummaryQueue");
-                                       
summaryQueue.setConsumer(multiDestProvider);
-                               } else {
-                                       LOG.info("Setting " + 
multiDestProvider.getName()
-                                                       + " as consumer to " + 
asyncQueue.getName());
-                                       
asyncQueue.setConsumer(multiDestProvider);
-                               }
-                       }
+                       // Create the AsysnQueue
+                       AuditAsyncQueue asyncQueue = new 
AuditAsyncQueue(consumer);
+                       propPrefix = BaseAuditHandler.PROP_DEFAULT_PREFIX + "." 
+ "async";
+                       asyncQueue.init(props, propPrefix);
 
                        mProvider = asyncQueue;
-                       LOG.info("Starting " + mProvider.getName());
+                       LOG.info("Starting audit queue " + mProvider.getName());
                        mProvider.start();
                } else {
                        LOG.info("No v3 audit configuration found. Trying v2 
audit configurations");
@@ -315,9 +314,7 @@ public class AuditProviderFactory {
 
                                if (kafkaProvider.isAsync()) {
                                        AsyncAuditProvider asyncProvider = new 
AsyncAuditProvider(
-                                                       "MyKafkaAuditProvider",
-                                                       
kafkaProvider.getMaxQueueSize(),
-                                                       
kafkaProvider.getMaxBatchInterval(), kafkaProvider);
+                                                       "MyKafkaAuditProvider", 
1000, 1000, kafkaProvider);
                                        providers.add(asyncProvider);
                                } else {
                                        providers.add(kafkaProvider);
@@ -331,9 +328,7 @@ public class AuditProviderFactory {
 
                                if (solrProvider.isAsync()) {
                                        AsyncAuditProvider asyncProvider = new 
AsyncAuditProvider(
-                                                       "MySolrAuditProvider",
-                                                       
solrProvider.getMaxQueueSize(),
-                                                       
solrProvider.getMaxBatchInterval(), solrProvider);
+                                                       "MySolrAuditProvider", 
1000, 1000, solrProvider);
                                        providers.add(asyncProvider);
                                } else {
                                        providers.add(solrProvider);
@@ -387,18 +382,26 @@ public class AuditProviderFactory {
                Runtime.getRuntime().addShutdownHook(jvmShutdownHook);
        }
 
-       private AuditProvider getProviderFromConfig(Properties props,
-                       String propPrefix, String providerName) {
-               AuditProvider provider = null;
+       private AuditHandler getProviderFromConfig(Properties props,
+                       String propPrefix, String providerName, AuditHandler 
consumer) {
+               AuditHandler provider = null;
                String className = MiscUtil.getStringProperty(props, propPrefix 
+ "."
-                               + BaseAuditProvider.PROP_CLASS_NAME);
+                               + BaseAuditHandler.PROP_CLASS_NAME);
                if (className != null && !className.isEmpty()) {
                        try {
-                               provider = (AuditProvider) 
Class.forName(className)
-                                               .newInstance();
+                               Class<?> handlerClass = 
Class.forName(className);
+                               if 
(handlerClass.isAssignableFrom(AuditQueue.class)) {
+                                       // Queue class needs consumer
+                                       
handlerClass.getDeclaredConstructor(AuditHandler.class)
+                                                       .newInstance(consumer);
+                               } else {
+                                       provider = (AuditHandler) 
Class.forName(className)
+                                                       .newInstance();
+                               }
                        } catch (Exception e) {
                                LOG.fatal("Can't instantiate audit class for 
providerName="
-                                               + providerName + ", className=" 
+ className, e);
+                                               + providerName + ", className=" 
+ className
+                                               + ", propertyPrefix=" + 
propPrefix, e);
                        }
                } else {
                        if (providerName.equals("file")) {
@@ -414,25 +417,32 @@ public class AuditProviderFactory {
                        } else if (providerName.equals("log4j")) {
                                provider = new Log4jAuditProvider();
                        } else if (providerName.equals("batch")) {
-                               provider = new AuditBatchQueue();
+                               provider = new AuditBatchQueue(consumer);
                        } else if (providerName.equals("async")) {
-                               provider = new AuditAsyncQueue();
+                               provider = new AuditAsyncQueue(consumer);
                        } else {
                                LOG.error("Provider name doesn't have any class 
associated with it. providerName="
-                                               + providerName);
+                                               + providerName + ", 
propertyPrefix=" + propPrefix);
+                       }
+               }
+               if (provider != null && provider instanceof AuditQueue) {
+                       if (consumer == null) {
+                               LOG.fatal("consumer can't be null for 
AuditQueue. queue="
+                                               + provider.getName() + ", 
propertyPrefix=" + propPrefix);
+                               provider = null;
                        }
                }
                return provider;
        }
 
-       private AuditProvider getDefaultProvider() {
+       private AuditHandler getDefaultProvider() {
                return new DummyAuditProvider();
        }
 
        private static class JVMShutdownHook extends Thread {
-               AuditProvider mProvider;
+               AuditHandler mProvider;
 
-               public JVMShutdownHook(AuditProvider provider) {
+               public JVMShutdownHook(AuditHandler provider) {
                        mProvider = provider;
                }
 

http://git-wip-us.apache.org/repos/asf/incubator-ranger/blob/4f3cea22/agents-audit/src/main/java/org/apache/ranger/audit/provider/BaseAuditHandler.java
----------------------------------------------------------------------
diff --git 
a/agents-audit/src/main/java/org/apache/ranger/audit/provider/BaseAuditHandler.java
 
b/agents-audit/src/main/java/org/apache/ranger/audit/provider/BaseAuditHandler.java
new file mode 100644
index 0000000..601650e
--- /dev/null
+++ 
b/agents-audit/src/main/java/org/apache/ranger/audit/provider/BaseAuditHandler.java
@@ -0,0 +1,271 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.ranger.audit.provider;
+
+import org.apache.commons.logging.Log;
+import org.apache.commons.logging.LogFactory;
+import org.apache.ranger.audit.model.AuditEventBase;
+import org.apache.ranger.audit.model.AuthzAuditEvent;
+import com.google.gson.GsonBuilder;
+
+import java.util.concurrent.atomic.AtomicLong;
+import java.util.ArrayList;
+import java.util.Collection;
+import java.util.List;
+import java.util.Properties;
+
+public abstract class BaseAuditHandler implements AuditHandler {
+       private static final Log LOG = 
LogFactory.getLog(BaseAuditHandler.class);
+
+       private static final String AUDIT_LOG_FAILURE_REPORT_MIN_INTERVAL_PROP 
= "xasecure.audit.log.failure.report.min.interval.ms";
+
+       private int mLogFailureReportMinIntervalInMs = 60 * 1000;
+
+       private AtomicLong mFailedLogLastReportTime = new AtomicLong(0);
+       private AtomicLong mFailedLogCountSinceLastReport = new AtomicLong(0);
+       private AtomicLong mFailedLogCountLifeTime = new AtomicLong(0);
+
+       public static final String PROP_NAME = "name";
+       public static final String PROP_CLASS_NAME = "classname";
+
+       public static final String PROP_DEFAULT_PREFIX = 
"xasecure.audit.provider";
+
+       protected String propPrefix = PROP_DEFAULT_PREFIX;
+
+       protected String providerName = null;
+
+       protected int failedRetryTimes = 3;
+       protected int failedRetrySleep = 3 * 1000;
+
+       int errorLogIntervalMS = 30 * 1000; // Every 30 seconds
+       long lastErrorLogMS = 0;
+
+       protected Properties props = null;
+
+       @Override
+       public void init(Properties props) {
+               init(props, null);
+       }
+
+       @Override
+       public void init(Properties props, String basePropertyName) {
+               LOG.info("BaseAuditProvider.init()");
+               this.props = props;
+               if (basePropertyName != null) {
+                       propPrefix = basePropertyName;
+               }
+               LOG.info("propPrefix=" + propPrefix);
+               // Get final token
+               List<String> tokens = MiscUtil.toArray(propPrefix, ".");
+               String finalToken = tokens.get(tokens.size() - 1);
+
+               String name = MiscUtil.getStringProperty(props, 
basePropertyName + "."
+                               + PROP_NAME);
+               if (name != null && !name.isEmpty()) {
+                       providerName = name;
+               }
+               if (providerName == null) {
+                       providerName = finalToken;
+                       LOG.info("Using providerName from property prefix. 
providerName="
+                                       + providerName);
+               }
+               LOG.info("providerName=" + providerName);
+
+               try {
+                       new 
GsonBuilder().setDateFormat("yyyyMMdd-HH:mm:ss.SSS-Z").create();
+               } catch (Throwable excp) {
+                       LOG.warn(
+                                       "Log4jAuditProvider.init(): failed to 
create GsonBuilder object. events will be formated using toString(), instead of 
Json",
+                                       excp);
+               }
+
+               mLogFailureReportMinIntervalInMs = 
MiscUtil.getIntProperty(props,
+                               AUDIT_LOG_FAILURE_REPORT_MIN_INTERVAL_PROP, 60 
* 1000);
+
+       }
+
+       /*
+        * (non-Javadoc)
+        * 
+        * @see
+        * org.apache.ranger.audit.provider.AuditProvider#log(org.apache.ranger.
+        * audit.model.AuditEventBase)
+        */
+       @Override
+       public boolean log(AuditEventBase event) {
+               List<AuditEventBase> eventList = new 
ArrayList<AuditEventBase>();
+               eventList.add(event);
+               return log(eventList);
+       }
+
+       /*
+        * (non-Javadoc)
+        * 
+        * @see
+        * 
org.apache.ranger.audit.provider.AuditProvider#logJSON(java.lang.String)
+        */
+       @Override
+       public boolean logJSON(String event) {
+               AuditEventBase eventObj = MiscUtil.fromJson(event,
+                               AuthzAuditEvent.class);
+               return log(eventObj);
+       }
+
+       /*
+        * (non-Javadoc)
+        * 
+        * @see
+        * 
org.apache.ranger.audit.provider.AuditProvider#logJSON(java.util.Collection
+        * )
+        */
+       @Override
+       public boolean logJSON(Collection<String> events) {
+               boolean ret = true;
+               for (String event : events) {
+                       ret = logJSON(event);
+                       if (!ret) {
+                               break;
+                       }
+               }
+               return ret;
+       }
+
+       public void setName(String name) {
+               providerName = name;
+       }
+
+       @Override
+       public String getName() {
+               return providerName;
+       }
+
+       public void logFailedEvent(AuditEventBase event) {
+               logFailedEvent(event, null);
+       }
+
+       public void logError(String msg) {
+               long currTimeMS = System.currentTimeMillis();
+               if (currTimeMS - lastErrorLogMS > errorLogIntervalMS) {
+                       LOG.error(msg);
+                       lastErrorLogMS = currTimeMS;
+               }
+       }
+
+       public void logError(String msg, Throwable ex) {
+               long currTimeMS = System.currentTimeMillis();
+               if (currTimeMS - lastErrorLogMS > errorLogIntervalMS) {
+                       LOG.error(msg, ex);
+                       lastErrorLogMS = currTimeMS;
+               }
+       }
+
+       public String getTimeDiffStr(long time1, long time2) {
+               long timeInMs = Math.abs(time1 - time2);
+               return formatIntervalForLog(timeInMs);
+       }
+
+       public String formatIntervalForLog(long timeInMs) {
+               long hours = timeInMs / (60 * 60 * 1000);
+               long minutes = (timeInMs / (60 * 1000)) % 60;
+               long seconds = (timeInMs % (60 * 1000)) / 1000;
+               long mSeconds = (timeInMs % (1000));
+
+               if (hours > 0)
+                       return String.format("%02d:%02d:%02d.%03d hours", 
hours, minutes,
+                                       seconds, mSeconds);
+               else if (minutes > 0)
+                       return String.format("%02d:%02d.%03d minutes", minutes, 
seconds,
+                                       mSeconds);
+               else if (seconds > 0)
+                       return String.format("%02d.%03d seconds", seconds, 
mSeconds);
+               else
+                       return String.format("%03d milli-seconds", mSeconds);
+       }
+
+       public void logFailedEvent(AuditEventBase event, Throwable excp) {
+               long now = System.currentTimeMillis();
+
+               long timeSinceLastReport = now - mFailedLogLastReportTime.get();
+               long countSinceLastReport = mFailedLogCountSinceLastReport
+                               .incrementAndGet();
+               long countLifeTime = mFailedLogCountLifeTime.incrementAndGet();
+
+               if (timeSinceLastReport >= mLogFailureReportMinIntervalInMs) {
+                       mFailedLogLastReportTime.set(now);
+                       mFailedLogCountSinceLastReport.set(0);
+
+                       if (excp != null) {
+                               LOG.warn(
+                                               "failed to log audit event: "
+                                                               + 
MiscUtil.stringify(event), excp);
+                       } else {
+                               LOG.warn("failed to log audit event: "
+                                               + MiscUtil.stringify(event));
+                       }
+
+                       if (countLifeTime > 1) { // no stats to print for the 
1st failure
+                               LOG.warn("Log failure count: " + 
countSinceLastReport
+                                               + " in past "
+                                               + 
formatIntervalForLog(timeSinceLastReport) + "; "
+                                               + countLifeTime + " during 
process lifetime");
+                       }
+               }
+       }
+
+       public void logFailedEvent(Collection<AuditEventBase> events, Throwable 
excp) {
+               for (AuditEventBase event : events) {
+                       logFailedEvent(event, excp);
+               }
+       }
+
+       public void logFailedEventJSON(String event, Throwable excp) {
+               long now = System.currentTimeMillis();
+
+               long timeSinceLastReport = now - mFailedLogLastReportTime.get();
+               long countSinceLastReport = mFailedLogCountSinceLastReport
+                               .incrementAndGet();
+               long countLifeTime = mFailedLogCountLifeTime.incrementAndGet();
+
+               if (timeSinceLastReport >= mLogFailureReportMinIntervalInMs) {
+                       mFailedLogLastReportTime.set(now);
+                       mFailedLogCountSinceLastReport.set(0);
+
+                       if (excp != null) {
+                               LOG.warn("failed to log audit event: " + event, 
excp);
+                       } else {
+                               LOG.warn("failed to log audit event: " + event);
+                       }
+
+                       if (countLifeTime > 1) { // no stats to print for the 
1st failure
+                               LOG.warn("Log failure count: " + 
countSinceLastReport
+                                               + " in past "
+                                               + 
formatIntervalForLog(timeSinceLastReport) + "; "
+                                               + countLifeTime + " during 
process lifetime");
+                       }
+               }
+       }
+
+       public void logFailedEventJSON(Collection<String> events, Throwable 
excp) {
+               for (String event : events) {
+                       logFailedEventJSON(event, excp);
+               }
+       }
+
+       
+}

http://git-wip-us.apache.org/repos/asf/incubator-ranger/blob/4f3cea22/agents-audit/src/main/java/org/apache/ranger/audit/provider/BaseAuditProvider.java
----------------------------------------------------------------------
diff --git 
a/agents-audit/src/main/java/org/apache/ranger/audit/provider/BaseAuditProvider.java
 
b/agents-audit/src/main/java/org/apache/ranger/audit/provider/BaseAuditProvider.java
deleted file mode 100644
index 85c207b..0000000
--- 
a/agents-audit/src/main/java/org/apache/ranger/audit/provider/BaseAuditProvider.java
+++ /dev/null
@@ -1,432 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing,
- * software distributed under the License is distributed on an
- * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
- * KIND, either express or implied.  See the License for the
- * specific language governing permissions and limitations
- * under the License.
- */
-package org.apache.ranger.audit.provider;
-
-import org.apache.commons.logging.Log;
-import org.apache.commons.logging.LogFactory;
-import org.apache.ranger.audit.model.AuditEventBase;
-import org.apache.ranger.audit.model.AuthzAuditEvent;
-import org.apache.ranger.audit.queue.AuditFileSpool;
-
-import com.google.gson.GsonBuilder;
-
-import java.util.concurrent.atomic.AtomicLong;
-import java.util.ArrayList;
-import java.util.Collection;
-import java.util.List;
-import java.util.Properties;
-
-public abstract class BaseAuditProvider implements AuditProvider {
-       private static final Log LOG = 
LogFactory.getLog(BaseAuditProvider.class);
-
-       private static final String AUDIT_LOG_FAILURE_REPORT_MIN_INTERVAL_PROP 
= "xasecure.audit.log.failure.report.min.interval.ms";
-       public static final int AUDIT_MAX_QUEUE_SIZE_DEFAULT = 1024 * 1024;
-       public static final int AUDIT_BATCH_INTERVAL_DEFAULT_MS = 1000;
-       public static final int AUDIT_BATCH_SIZE_DEFAULT = 1000;
-
-       private AtomicLong lifeTimeInLogCount = new AtomicLong(0);
-
-       private int mLogFailureReportMinIntervalInMs = 60 * 1000;
-
-       private AtomicLong mFailedLogLastReportTime = new AtomicLong(0);
-       private AtomicLong mFailedLogCountSinceLastReport = new AtomicLong(0);
-       private AtomicLong mFailedLogCountLifeTime = new AtomicLong(0);
-
-       public static final String PROP_NAME = "name";
-       public static final String PROP_CLASS_NAME = "classname";
-       public static final String PROP_QUEUE = "queue";
-
-       public static final String PROP_BATCH_SIZE = "batch.size";
-       public static final String PROP_QUEUE_SIZE = "queue.size";
-       public static final String PROP_BATCH_INTERVAL = "batch.interval.ms";
-
-       public static final String PROP_FILE_SPOOL_ENABLE = "filespool.enable";
-       public static final String PROP_FILE_SPOOL_WAIT_FOR_FULL_DRAIN = 
"filespool.drain.full.wait.ms";
-       public static final String PROP_FILE_SPOOL_QUEUE_THRESHOLD = 
"filespool.drain.threshold.percent";
-
-       public static final String PROP_DEFAULT_PREFIX = 
"xasecure.audit.provider";
-
-       private boolean isDrain = false;
-       private String providerName = null;
-
-       private int maxQueueSize = AUDIT_MAX_QUEUE_SIZE_DEFAULT;
-       private int maxBatchInterval = AUDIT_BATCH_INTERVAL_DEFAULT_MS;
-       private int maxBatchSize = AUDIT_BATCH_SIZE_DEFAULT;
-
-       protected int failedRetryTimes = 3;
-       protected int failedRetrySleep = 3 * 1000;
-
-       protected AuditProvider consumer = null;
-       protected AuditFileSpool fileSpooler = null;
-
-       protected boolean fileSpoolerEnabled = false;
-       protected int fileSpoolMaxWaitTime = 5 * 60 * 1000; // Default 5 minutes
-       protected int fileSpoolDrainThresholdPercent = 80;
-
-       int errorLogIntervalMS = 30 * 1000; // Every 30 seconds
-       long lastErrorLogMS = 0;
-
-       protected Properties props = null;
-
-       public BaseAuditProvider() {
-       }
-
-       public BaseAuditProvider(AuditProvider consumer) {
-               this.consumer = consumer;
-       }
-
-       @Override
-       public void init(Properties props) {
-               init(props, null);
-       }
-
-       @Override
-       public void init(Properties props, String basePropertyName) {
-               LOG.info("BaseAuditProvider.init()");
-               this.props = props;
-               String propPrefix = PROP_DEFAULT_PREFIX;
-               if (basePropertyName != null) {
-                       propPrefix = basePropertyName;
-               }
-               LOG.info("propPrefix=" + propPrefix);
-               // Get final token
-               List<String> tokens = MiscUtil.toArray(propPrefix, ".");
-               String finalToken = tokens.get(tokens.size() - 1);
-
-               String name = MiscUtil.getStringProperty(props, 
basePropertyName + "."
-                               + PROP_NAME);
-               if (name != null && !name.isEmpty()) {
-                       providerName = name;
-               }
-               if (providerName == null) {
-                       providerName = finalToken;
-                       LOG.info("Using providerName from property prefix. 
providerName="
-                                       + providerName);
-               }
-               LOG.info("providerName=" + providerName);
-
-               setMaxBatchSize(MiscUtil.getIntProperty(props, propPrefix + "."
-                               + PROP_BATCH_SIZE, getMaxBatchSize()));
-               setMaxQueueSize(MiscUtil.getIntProperty(props, propPrefix + "."
-                               + PROP_QUEUE_SIZE, getMaxQueueSize()));
-               setMaxBatchInterval(MiscUtil.getIntProperty(props, propPrefix + 
"."
-                               + PROP_BATCH_INTERVAL, getMaxBatchInterval()));
-
-               fileSpoolerEnabled = MiscUtil.getBooleanProperty(props, 
propPrefix
-                               + "." + PROP_FILE_SPOOL_ENABLE, false);
-               String logFolderProp = MiscUtil.getStringProperty(props, 
propPrefix
-                               + "." + 
AuditFileSpool.PROP_FILE_SPOOL_LOCAL_DIR);
-               if (fileSpoolerEnabled || logFolderProp != null) {
-                       LOG.info("File spool is enabled for " + getName()
-                                       + ", logFolderProp=" + logFolderProp + 
", " + propPrefix
-                                       + "." + 
AuditFileSpool.PROP_FILE_SPOOL_LOCAL_DIR + "="
-                                       + fileSpoolerEnabled);
-                       fileSpoolerEnabled = true;
-                       fileSpoolMaxWaitTime = MiscUtil.getIntProperty(props, 
propPrefix
-                                       + "." + 
PROP_FILE_SPOOL_WAIT_FOR_FULL_DRAIN,
-                                       fileSpoolMaxWaitTime);
-                       fileSpoolDrainThresholdPercent = 
MiscUtil.getIntProperty(props,
-                                       propPrefix + "." + 
PROP_FILE_SPOOL_QUEUE_THRESHOLD,
-                                       fileSpoolDrainThresholdPercent);
-                       fileSpooler = new AuditFileSpool(this, consumer);
-                       fileSpooler.init(props, basePropertyName);
-               } else {
-                       LOG.info("File spool is disabled for " + getName());
-               }
-
-               try {
-                       new 
GsonBuilder().setDateFormat("yyyyMMdd-HH:mm:ss.SSS-Z").create();
-               } catch (Throwable excp) {
-                       LOG.warn(
-                                       "Log4jAuditProvider.init(): failed to 
create GsonBuilder object. events will be formated using toString(), instead of 
Json",
-                                       excp);
-               }
-
-               mLogFailureReportMinIntervalInMs = 
MiscUtil.getIntProperty(props,
-                               AUDIT_LOG_FAILURE_REPORT_MIN_INTERVAL_PROP, 60 
* 1000);
-
-       }
-
-       public AuditProvider getConsumer() {
-               return consumer;
-       }
-
-       public void setConsumer(AuditProvider consumer) {
-               this.consumer = consumer;
-       }
-
-       public void logFailedEvent(AuditEventBase event) {
-               logFailedEvent(event, null);
-       }
-
-       public void logFailedEvent(AuditEventBase event, Throwable excp) {
-               long now = System.currentTimeMillis();
-
-               long timeSinceLastReport = now - mFailedLogLastReportTime.get();
-               long countSinceLastReport = mFailedLogCountSinceLastReport
-                               .incrementAndGet();
-               long countLifeTime = mFailedLogCountLifeTime.incrementAndGet();
-
-               if (timeSinceLastReport >= mLogFailureReportMinIntervalInMs) {
-                       mFailedLogLastReportTime.set(now);
-                       mFailedLogCountSinceLastReport.set(0);
-
-                       if (excp != null) {
-                               LOG.warn(
-                                               "failed to log audit event: "
-                                                               + 
MiscUtil.stringify(event), excp);
-                       } else {
-                               LOG.warn("failed to log audit event: "
-                                               + MiscUtil.stringify(event));
-                       }
-
-                       if (countLifeTime > 1) { // no stats to print for the 
1st failure
-                               LOG.warn("Log failure count: " + 
countSinceLastReport
-                                               + " in past "
-                                               + 
formatIntervalForLog(timeSinceLastReport) + "; "
-                                               + countLifeTime + " during 
process lifetime");
-                       }
-               }
-       }
-
-       public void logFailedEvent(Collection<AuditEventBase> events, Throwable 
excp) {
-               for (AuditEventBase event : events) {
-                       logFailedEvent(event, excp);
-               }
-       }
-
-       public void logFailedEventJSON(String event, Throwable excp) {
-               long now = System.currentTimeMillis();
-
-               long timeSinceLastReport = now - mFailedLogLastReportTime.get();
-               long countSinceLastReport = mFailedLogCountSinceLastReport
-                               .incrementAndGet();
-               long countLifeTime = mFailedLogCountLifeTime.incrementAndGet();
-
-               if (timeSinceLastReport >= mLogFailureReportMinIntervalInMs) {
-                       mFailedLogLastReportTime.set(now);
-                       mFailedLogCountSinceLastReport.set(0);
-
-                       if (excp != null) {
-                               LOG.warn("failed to log audit event: " + event, 
excp);
-                       } else {
-                               LOG.warn("failed to log audit event: " + event);
-                       }
-
-                       if (countLifeTime > 1) { // no stats to print for the 
1st failure
-                               LOG.warn("Log failure count: " + 
countSinceLastReport
-                                               + " in past "
-                                               + 
formatIntervalForLog(timeSinceLastReport) + "; "
-                                               + countLifeTime + " during 
process lifetime");
-                       }
-               }
-       }
-
-       public void logFailedEventJSON(Collection<String> events, Throwable 
excp) {
-               for (String event : events) {
-                       logFailedEventJSON(event, excp);
-               }
-       }
-
-       /*
-        * (non-Javadoc)
-        * 
-        * @see
-        * org.apache.ranger.audit.provider.AuditProvider#log(org.apache.ranger.
-        * audit.model.AuditEventBase)
-        */
-       @Override
-       public boolean log(AuditEventBase event) {
-               List<AuditEventBase> eventList = new 
ArrayList<AuditEventBase>();
-               eventList.add(event);
-               return log(eventList);
-       }
-
-       /*
-        * (non-Javadoc)
-        * 
-        * @see
-        * 
org.apache.ranger.audit.provider.AuditProvider#logJSON(java.lang.String)
-        */
-       @Override
-       public boolean logJSON(String event) {
-               AuditEventBase eventObj = MiscUtil.fromJson(event,
-                               AuthzAuditEvent.class);
-               return log(eventObj);
-       }
-
-       /*
-        * (non-Javadoc)
-        * 
-        * @see
-        * 
org.apache.ranger.audit.provider.AuditProvider#logJSON(java.util.Collection
-        * )
-        */
-       @Override
-       public boolean logJSON(Collection<String> events) {
-               boolean ret = true;
-               for (String event : events) {
-                       ret = logJSON(event);
-                       if (!ret) {
-                               break;
-                       }
-               }
-               return ret;
-       }
-
-       public void setName(String name) {
-               providerName = name;
-       }
-
-       @Override
-       public String getName() {
-               return providerName;
-       }
-
-       @Override
-       public boolean isDrain() {
-               return isDrain;
-       }
-
-       public void setDrain(boolean isDrain) {
-               this.isDrain = isDrain;
-       }
-
-       public int getMaxQueueSize() {
-               return maxQueueSize;
-       }
-
-       public void setMaxQueueSize(int maxQueueSize) {
-               this.maxQueueSize = maxQueueSize;
-       }
-
-       @Override
-       public int getMaxBatchInterval() {
-               return maxBatchInterval;
-       }
-
-       public void setMaxBatchInterval(int maxBatchInterval) {
-               this.maxBatchInterval = maxBatchInterval;
-       }
-
-       @Override
-       public int getMaxBatchSize() {
-               return maxBatchSize;
-       }
-
-       public void setMaxBatchSize(int maxBatchSize) {
-               this.maxBatchSize = maxBatchSize;
-       }
-
-       /*
-        * (non-Javadoc)
-        * 
-        * @see org.apache.ranger.audit.provider.AuditProvider#waitToComplete()
-        */
-       @Override
-       public void waitToComplete() {
-               if (consumer != null) {
-                       consumer.waitToComplete(-1);
-               }
-       }
-
-       @Override
-       public void waitToComplete(long timeout) {
-               if (consumer != null) {
-                       consumer.waitToComplete(timeout);
-               }
-       }
-
-       @Override
-       public boolean isFlushPending() {
-               return false;
-       }
-
-       /*
-        * (non-Javadoc)
-        * 
-        * @see 
org.apache.ranger.audit.provider.AuditProvider#getLastFlushTime()
-        */
-       @Override
-       public long getLastFlushTime() {
-               if (consumer != null) {
-                       return consumer.getLastFlushTime();
-               }
-               return 0;
-       }
-
-       /*
-        * (non-Javadoc)
-        * 
-        * @see org.apache.ranger.audit.provider.AuditProvider#flush()
-        */
-       @Override
-       public void flush() {
-               if (consumer != null) {
-                       consumer.flush();
-               }
-       }
-
-       public AtomicLong getLifeTimeInLogCount() {
-               return lifeTimeInLogCount;
-       }
-
-       public long addLifeTimeInLogCount(long count) {
-               return lifeTimeInLogCount.addAndGet(count);
-       }
-
-       public void logError(String msg) {
-               long currTimeMS = System.currentTimeMillis();
-               if (currTimeMS - lastErrorLogMS > errorLogIntervalMS) {
-                       LOG.error(msg);
-                       lastErrorLogMS = currTimeMS;
-               }
-       }
-
-       public void logError(String msg, Throwable ex) {
-               long currTimeMS = System.currentTimeMillis();
-               if (currTimeMS - lastErrorLogMS > errorLogIntervalMS) {
-                       LOG.error(msg, ex);
-                       lastErrorLogMS = currTimeMS;
-               }
-       }
-
-       public String getTimeDiffStr(long time1, long time2) {
-               long timeInMs = Math.abs(time1 - time2);
-               return formatIntervalForLog(timeInMs);
-       }
-
-       public String formatIntervalForLog(long timeInMs) {
-               long hours = timeInMs / (60 * 60 * 1000);
-               long minutes = (timeInMs / (60 * 1000)) % 60;
-               long seconds = (timeInMs % (60 * 1000)) / 1000;
-               long mSeconds = (timeInMs % (1000));
-
-               if (hours > 0)
-                       return String.format("%02d:%02d:%02d.%03d hours", 
hours, minutes,
-                                       seconds, mSeconds);
-               else if (minutes > 0)
-                       return String.format("%02d:%02d.%03d minutes", minutes, 
seconds,
-                                       mSeconds);
-               else if (seconds > 0)
-                       return String.format("%02d.%03d seconds", seconds, 
mSeconds);
-               else
-                       return String.format("%03d milli-seconds", mSeconds);
-       }
-}

http://git-wip-us.apache.org/repos/asf/incubator-ranger/blob/4f3cea22/agents-audit/src/main/java/org/apache/ranger/audit/provider/BufferedAuditProvider.java
----------------------------------------------------------------------
diff --git 
a/agents-audit/src/main/java/org/apache/ranger/audit/provider/BufferedAuditProvider.java
 
b/agents-audit/src/main/java/org/apache/ranger/audit/provider/BufferedAuditProvider.java
index ab6a74a..ca842f3 100644
--- 
a/agents-audit/src/main/java/org/apache/ranger/audit/provider/BufferedAuditProvider.java
+++ 
b/agents-audit/src/main/java/org/apache/ranger/audit/provider/BufferedAuditProvider.java
@@ -23,7 +23,7 @@ import java.util.Properties;
 import org.apache.ranger.audit.model.AuditEventBase;
 import org.apache.ranger.audit.model.AuthzAuditEvent;
 
-public abstract class BufferedAuditProvider extends BaseAuditProvider {
+public abstract class BufferedAuditProvider extends BaseAuditHandler {
        private LogBuffer<AuditEventBase> mBuffer = null;
        private LogDestination<AuditEventBase> mDestination = null;
 
@@ -107,16 +107,6 @@ public abstract class BufferedAuditProvider extends 
BaseAuditProvider {
        }
 
        @Override
-       public boolean isFlushPending() {
-               return false;
-       }
-
-       @Override
-       public long getLastFlushTime() {
-               return 0;
-       }
-
-       @Override
        public void flush() {
        }
 

http://git-wip-us.apache.org/repos/asf/incubator-ranger/blob/4f3cea22/agents-audit/src/main/java/org/apache/ranger/audit/provider/DbAuditProvider.java
----------------------------------------------------------------------
diff --git 
a/agents-audit/src/main/java/org/apache/ranger/audit/provider/DbAuditProvider.java
 
b/agents-audit/src/main/java/org/apache/ranger/audit/provider/DbAuditProvider.java
index f4bd90c..d475f89 100644
--- 
a/agents-audit/src/main/java/org/apache/ranger/audit/provider/DbAuditProvider.java
+++ 
b/agents-audit/src/main/java/org/apache/ranger/audit/provider/DbAuditProvider.java
@@ -177,16 +177,6 @@ public class DbAuditProvider extends AuditDestination {
        }
 
        @Override
-       public boolean isFlushPending() {
-               return mUncommitted.size() > 0;
-       }
-       
-       @Override
-       public long getLastFlushTime() {
-               return mLastCommitTime;
-       }
-
-       @Override
        public void flush() {
                if(mUncommitted.size() > 0) {
                        boolean isSuccess = commitTransaction();

http://git-wip-us.apache.org/repos/asf/incubator-ranger/blob/4f3cea22/agents-audit/src/main/java/org/apache/ranger/audit/provider/DummyAuditProvider.java
----------------------------------------------------------------------
diff --git 
a/agents-audit/src/main/java/org/apache/ranger/audit/provider/DummyAuditProvider.java
 
b/agents-audit/src/main/java/org/apache/ranger/audit/provider/DummyAuditProvider.java
index 619a99d..05f882f 100644
--- 
a/agents-audit/src/main/java/org/apache/ranger/audit/provider/DummyAuditProvider.java
+++ 
b/agents-audit/src/main/java/org/apache/ranger/audit/provider/DummyAuditProvider.java
@@ -24,7 +24,7 @@ import org.apache.ranger.audit.model.AuditEventBase;
 import org.apache.ranger.audit.model.AuthzAuditEvent;
 
 
-public class DummyAuditProvider implements AuditProvider {
+public class DummyAuditProvider implements AuditHandler {
        @Override
        public void init(Properties prop) {
                // intentionally left empty
@@ -74,23 +74,6 @@ public class DummyAuditProvider implements AuditProvider {
                // intentionally left empty
        }
 
-       
-       @Override
-       public int getMaxBatchSize() {
-               // TODO Auto-generated method stub
-               return 0;
-       }
-
-       @Override
-       public boolean isFlushPending() {
-               return false;
-       }
-       
-       @Override
-       public long getLastFlushTime() {
-               return 0;
-       }
-
        @Override
        public void flush() {
                // intentionally left empty
@@ -120,20 +103,4 @@ public class DummyAuditProvider implements AuditProvider {
                return this.getClass().getName();
        }
 
-       /* (non-Javadoc)
-        * @see org.apache.ranger.audit.provider.AuditProvider#isDrain()
-        */
-       @Override
-       public boolean isDrain() {
-               return false;
-       }
-
-       /* (non-Javadoc)
-        * @see 
org.apache.ranger.audit.provider.AuditProvider#getMaxBatchInterval()
-        */
-       @Override
-       public int getMaxBatchInterval() {
-               // TODO Auto-generated method stub
-               return 0;
-       }
 }

http://git-wip-us.apache.org/repos/asf/incubator-ranger/blob/4f3cea22/agents-audit/src/main/java/org/apache/ranger/audit/provider/Log4jAuditProvider.java
----------------------------------------------------------------------
diff --git 
a/agents-audit/src/main/java/org/apache/ranger/audit/provider/Log4jAuditProvider.java
 
b/agents-audit/src/main/java/org/apache/ranger/audit/provider/Log4jAuditProvider.java
index 040a045..0402de2 100644
--- 
a/agents-audit/src/main/java/org/apache/ranger/audit/provider/Log4jAuditProvider.java
+++ 
b/agents-audit/src/main/java/org/apache/ranger/audit/provider/Log4jAuditProvider.java
@@ -27,8 +27,6 @@ import org.apache.ranger.audit.destination.AuditDestination;
 import org.apache.ranger.audit.model.AuditEventBase;
 import org.apache.ranger.audit.model.AuthzAuditEvent;
 
-import com.sun.tools.hat.internal.util.Misc;
-
 
 public class Log4jAuditProvider extends AuditDestination {
 

http://git-wip-us.apache.org/repos/asf/incubator-ranger/blob/4f3cea22/agents-audit/src/main/java/org/apache/ranger/audit/provider/MultiDestAuditProvider.java
----------------------------------------------------------------------
diff --git 
a/agents-audit/src/main/java/org/apache/ranger/audit/provider/MultiDestAuditProvider.java
 
b/agents-audit/src/main/java/org/apache/ranger/audit/provider/MultiDestAuditProvider.java
index 876fa5b..4c1593a 100644
--- 
a/agents-audit/src/main/java/org/apache/ranger/audit/provider/MultiDestAuditProvider.java
+++ 
b/agents-audit/src/main/java/org/apache/ranger/audit/provider/MultiDestAuditProvider.java
@@ -26,18 +26,18 @@ import org.apache.commons.logging.Log;
 import org.apache.commons.logging.LogFactory;
 import org.apache.ranger.audit.model.AuditEventBase;
 
-public class MultiDestAuditProvider extends BaseAuditProvider {
+public class MultiDestAuditProvider extends BaseAuditHandler {
 
        private static final Log LOG = LogFactory
                        .getLog(MultiDestAuditProvider.class);
 
-       protected List<AuditProvider> mProviders = new 
ArrayList<AuditProvider>();
+       protected List<AuditHandler> mProviders = new ArrayList<AuditHandler>();
 
        public MultiDestAuditProvider() {
                LOG.info("MultiDestAuditProvider: creating..");
        }
 
-       public MultiDestAuditProvider(AuditProvider provider) {
+       public MultiDestAuditProvider(AuditHandler provider) {
                addAuditProvider(provider);
        }
 
@@ -47,7 +47,7 @@ public class MultiDestAuditProvider extends BaseAuditProvider 
{
 
                super.init(props);
 
-               for (AuditProvider provider : mProviders) {
+               for (AuditHandler provider : mProviders) {
                        try {
                                provider.init(props);
                        } catch (Throwable excp) {
@@ -57,7 +57,7 @@ public class MultiDestAuditProvider extends BaseAuditProvider 
{
                }
        }
 
-       public void addAuditProvider(AuditProvider provider) {
+       public void addAuditProvider(AuditHandler provider) {
                if (provider != null) {
                        
LOG.info("MultiDestAuditProvider.addAuditProvider(providerType="
                                        + 
provider.getClass().getCanonicalName() + ")");
@@ -66,9 +66,9 @@ public class MultiDestAuditProvider extends BaseAuditProvider 
{
                }
        }
 
-       public void addAuditProviders(List<AuditProvider> providers) {
+       public void addAuditProviders(List<AuditHandler> providers) {
                if (providers != null) {
-                       for (AuditProvider provider : providers) {
+                       for (AuditHandler provider : providers) {
                                LOG.info("Adding " + provider.getName()
                                                + " as consumer to 
MultiDestination " + getName());
                                addAuditProvider(provider);
@@ -78,7 +78,7 @@ public class MultiDestAuditProvider extends BaseAuditProvider 
{
 
        @Override
        public boolean log(AuditEventBase event) {
-               for (AuditProvider provider : mProviders) {
+               for (AuditHandler provider : mProviders) {
                        try {
                                provider.log(event);
                        } catch (Throwable excp) {
@@ -90,7 +90,7 @@ public class MultiDestAuditProvider extends BaseAuditProvider 
{
 
        @Override
        public boolean log(Collection<AuditEventBase> events) {
-               for (AuditProvider provider : mProviders) {
+               for (AuditHandler provider : mProviders) {
                        try {
                                provider.log(events);
                        } catch (Throwable excp) {
@@ -102,7 +102,7 @@ public class MultiDestAuditProvider extends 
BaseAuditProvider {
 
        @Override
        public boolean logJSON(String event) {
-               for (AuditProvider provider : mProviders) {
+               for (AuditHandler provider : mProviders) {
                        try {
                                provider.logJSON(event);
                        } catch (Throwable excp) {
@@ -114,7 +114,7 @@ public class MultiDestAuditProvider extends 
BaseAuditProvider {
 
        @Override
        public boolean logJSON(Collection<String> events) {
-               for (AuditProvider provider : mProviders) {
+               for (AuditHandler provider : mProviders) {
                        try {
                                provider.logJSON(events);
                        } catch (Throwable excp) {
@@ -126,7 +126,7 @@ public class MultiDestAuditProvider extends 
BaseAuditProvider {
 
        @Override
        public void start() {
-               for (AuditProvider provider : mProviders) {
+               for (AuditHandler provider : mProviders) {
                        try {
                                provider.start();
                        } catch (Throwable excp) {
@@ -138,7 +138,7 @@ public class MultiDestAuditProvider extends 
BaseAuditProvider {
 
        @Override
        public void stop() {
-               for (AuditProvider provider : mProviders) {
+               for (AuditHandler provider : mProviders) {
                        try {
                                provider.stop();
                        } catch (Throwable excp) {
@@ -150,7 +150,7 @@ public class MultiDestAuditProvider extends 
BaseAuditProvider {
 
        @Override
        public void waitToComplete() {
-               for (AuditProvider provider : mProviders) {
+               for (AuditHandler provider : mProviders) {
                        try {
                                provider.waitToComplete();
                        } catch (Throwable excp) {
@@ -163,7 +163,7 @@ public class MultiDestAuditProvider extends 
BaseAuditProvider {
 
        @Override
        public void waitToComplete(long timeout) {
-               for (AuditProvider provider : mProviders) {
+               for (AuditHandler provider : mProviders) {
                        try {
                                provider.waitToComplete(timeout);
                        } catch (Throwable excp) {
@@ -175,35 +175,8 @@ public class MultiDestAuditProvider extends 
BaseAuditProvider {
        }
 
        @Override
-       public boolean isFlushPending() {
-               for (AuditProvider provider : mProviders) {
-                       if (provider.isFlushPending()) {
-                               return true;
-                       }
-               }
-
-               return false;
-       }
-
-       @Override
-       public long getLastFlushTime() {
-               long lastFlushTime = 0;
-               for (AuditProvider provider : mProviders) {
-                       long flushTime = provider.getLastFlushTime();
-
-                       if (flushTime != 0) {
-                               if (lastFlushTime == 0 || lastFlushTime > 
flushTime) {
-                                       lastFlushTime = flushTime;
-                               }
-                       }
-               }
-
-               return lastFlushTime;
-       }
-
-       @Override
        public void flush() {
-               for (AuditProvider provider : mProviders) {
+               for (AuditHandler provider : mProviders) {
                        try {
                                provider.flush();
                        } catch (Throwable excp) {

http://git-wip-us.apache.org/repos/asf/incubator-ranger/blob/4f3cea22/agents-audit/src/main/java/org/apache/ranger/audit/provider/kafka/KafkaAuditProvider.java
----------------------------------------------------------------------
diff --git 
a/agents-audit/src/main/java/org/apache/ranger/audit/provider/kafka/KafkaAuditProvider.java
 
b/agents-audit/src/main/java/org/apache/ranger/audit/provider/kafka/KafkaAuditProvider.java
index 5f39e69..2c77b40 100644
--- 
a/agents-audit/src/main/java/org/apache/ranger/audit/provider/kafka/KafkaAuditProvider.java
+++ 
b/agents-audit/src/main/java/org/apache/ranger/audit/provider/kafka/KafkaAuditProvider.java
@@ -25,12 +25,12 @@ import kafka.producer.ProducerConfig;
 
 import org.apache.commons.logging.Log;
 import org.apache.commons.logging.LogFactory;
+import org.apache.ranger.audit.destination.AuditDestination;
 import org.apache.ranger.audit.model.AuditEventBase;
 import org.apache.ranger.audit.model.AuthzAuditEvent;
-import org.apache.ranger.audit.provider.BaseAuditProvider;
 import org.apache.ranger.audit.provider.MiscUtil;
 
-public class KafkaAuditProvider extends BaseAuditProvider {
+public class KafkaAuditProvider extends AuditDestination {
        private static final Log LOG = 
LogFactory.getLog(KafkaAuditProvider.class);
 
        public static final String AUDIT_MAX_QUEUE_SIZE_PROP = 
"xasecure.audit.kafka.async.max.queue.size";
@@ -47,11 +47,6 @@ public class KafkaAuditProvider extends BaseAuditProvider {
                LOG.info("init() called");
                super.init(props);
 
-               setMaxQueueSize(MiscUtil.getIntProperty(props,
-                               AUDIT_MAX_QUEUE_SIZE_PROP, 
AUDIT_MAX_QUEUE_SIZE_DEFAULT));
-               setMaxBatchInterval(MiscUtil.getIntProperty(props,
-                               AUDIT_MAX_QUEUE_SIZE_PROP,
-                               AUDIT_BATCH_INTERVAL_DEFAULT_MS));
                topic = MiscUtil.getStringProperty(props,
                                AUDIT_KAFKA_TOPIC_NAME);
                if (topic == null || topic.isEmpty()) {
@@ -176,19 +171,6 @@ public class KafkaAuditProvider extends BaseAuditProvider {
        }
 
        @Override
-       public boolean isFlushPending() {
-               LOG.info("isFlushPending() called");
-               return false;
-       }
-
-       @Override
-       public long getLastFlushTime() {
-               LOG.info("getLastFlushTime() called");
-
-               return 0;
-       }
-
-       @Override
        public void flush() {
                LOG.info("flush() called");
 

http://git-wip-us.apache.org/repos/asf/incubator-ranger/blob/4f3cea22/agents-audit/src/main/java/org/apache/ranger/audit/provider/solr/SolrAuditProvider.java
----------------------------------------------------------------------
diff --git 
a/agents-audit/src/main/java/org/apache/ranger/audit/provider/solr/SolrAuditProvider.java
 
b/agents-audit/src/main/java/org/apache/ranger/audit/provider/solr/SolrAuditProvider.java
index 9ee4ec0..53e4348 100644
--- 
a/agents-audit/src/main/java/org/apache/ranger/audit/provider/solr/SolrAuditProvider.java
+++ 
b/agents-audit/src/main/java/org/apache/ranger/audit/provider/solr/SolrAuditProvider.java
@@ -25,16 +25,16 @@ import java.util.Properties;
 
 import org.apache.commons.logging.Log;
 import org.apache.commons.logging.LogFactory;
+import org.apache.ranger.audit.destination.AuditDestination;
 import org.apache.ranger.audit.model.AuditEventBase;
 import org.apache.ranger.audit.model.AuthzAuditEvent;
-import org.apache.ranger.audit.provider.BaseAuditProvider;
 import org.apache.ranger.audit.provider.MiscUtil;
 import org.apache.solr.client.solrj.SolrClient;
 import org.apache.solr.client.solrj.impl.HttpSolrClient;
 import org.apache.solr.client.solrj.response.UpdateResponse;
 import org.apache.solr.common.SolrInputDocument;
 
-public class SolrAuditProvider extends BaseAuditProvider {
+public class SolrAuditProvider extends AuditDestination {
        private static final Log LOG = 
LogFactory.getLog(SolrAuditProvider.class);
 
        public static final String AUDIT_MAX_QUEUE_SIZE_PROP = 
"xasecure.audit.solr.async.max.queue.size";
@@ -56,11 +56,6 @@ public class SolrAuditProvider extends BaseAuditProvider {
                LOG.info("init() called");
                super.init(props);
 
-               setMaxQueueSize(MiscUtil.getIntProperty(props,
-                               AUDIT_MAX_QUEUE_SIZE_PROP, 
AUDIT_MAX_QUEUE_SIZE_DEFAULT));
-               setMaxBatchInterval(MiscUtil.getIntProperty(props,
-                               AUDIT_MAX_QUEUE_SIZE_PROP,
-                               AUDIT_BATCH_INTERVAL_DEFAULT_MS));
                retryWaitTime = MiscUtil.getIntProperty(props,
                                AUDIT_RETRY_WAIT_PROP, retryWaitTime);
        }
@@ -241,29 +236,7 @@ public class SolrAuditProvider extends BaseAuditProvider {
        public void waitToComplete(long timeout) {
                
        }
-
-       /*
-        * (non-Javadoc)
-        * 
-        * @see org.apache.ranger.audit.provider.AuditProvider#isFlushPending()
-        */
-       @Override
-       public boolean isFlushPending() {
-               // TODO Auto-generated method stub
-               return false;
-       }
-
-       /*
-        * (non-Javadoc)
-        * 
-        * @see 
org.apache.ranger.audit.provider.AuditProvider#getLastFlushTime()
-        */
-       @Override
-       public long getLastFlushTime() {
-               // TODO Auto-generated method stub
-               return 0;
-       }
-
+       
        /*
         * (non-Javadoc)
         * 

http://git-wip-us.apache.org/repos/asf/incubator-ranger/blob/4f3cea22/agents-audit/src/main/java/org/apache/ranger/audit/queue/AuditAsyncQueue.java
----------------------------------------------------------------------
diff --git 
a/agents-audit/src/main/java/org/apache/ranger/audit/queue/AuditAsyncQueue.java 
b/agents-audit/src/main/java/org/apache/ranger/audit/queue/AuditAsyncQueue.java
index a6f291d..d16fff9 100644
--- 
a/agents-audit/src/main/java/org/apache/ranger/audit/queue/AuditAsyncQueue.java
+++ 
b/agents-audit/src/main/java/org/apache/ranger/audit/queue/AuditAsyncQueue.java
@@ -21,32 +21,27 @@ package org.apache.ranger.audit.queue;
 
 import java.util.ArrayList;
 import java.util.Collection;
-import java.util.concurrent.LinkedTransferQueue;
+import java.util.concurrent.LinkedBlockingQueue;
 
 import org.apache.commons.logging.Log;
 import org.apache.commons.logging.LogFactory;
 import org.apache.ranger.audit.model.AuditEventBase;
-import org.apache.ranger.audit.provider.AuditProvider;
-import org.apache.ranger.audit.provider.BaseAuditProvider;
+import org.apache.ranger.audit.provider.AuditHandler;
 
 /**
  * This is a non-blocking queue with no limit on capacity.
  */
-public class AuditAsyncQueue extends BaseAuditProvider implements Runnable {
+public class AuditAsyncQueue extends AuditQueue implements Runnable {
        private static final Log logger = 
LogFactory.getLog(AuditAsyncQueue.class);
 
-       LinkedTransferQueue<AuditEventBase> queue = new 
LinkedTransferQueue<AuditEventBase>();
+       LinkedBlockingQueue<AuditEventBase> queue = new 
LinkedBlockingQueue<AuditEventBase>();
        Thread consumerThread = null;
 
        static final int MAX_DRAIN = 1000;
        static int threadCount = 0;
        static final String DEFAULT_NAME = "async";
 
-       public AuditAsyncQueue() {
-               setName(DEFAULT_NAME);
-       }
-
-       public AuditAsyncQueue(AuditProvider consumer) {
+       public AuditAsyncQueue(AuditHandler consumer) {
                super(consumer);
                setName(DEFAULT_NAME);
        }
@@ -65,7 +60,6 @@ public class AuditAsyncQueue extends BaseAuditProvider 
implements Runnable {
                        return false;
                }
                queue.add(event);
-               addLifeTimeInLogCount(1);
                return true;
        }
 
@@ -90,6 +84,9 @@ public class AuditAsyncQueue extends BaseAuditProvider 
implements Runnable {
        public void start() {
                if (consumer != null) {
                        consumer.start();
+               } else {
+                       logger.error("consumer is not set. Nothing will be sent 
to any consumer. name="
+                                       + getName());
                }
 
                consumerThread = new Thread(this, this.getClass().getName()
@@ -110,23 +107,10 @@ public class AuditAsyncQueue extends BaseAuditProvider 
implements Runnable {
                        if (consumerThread != null) {
                                consumerThread.interrupt();
                        }
-                       consumerThread = null;
                } catch (Throwable t) {
                        // ignore any exception
                }
-       }
-
-       /*
-        * (non-Javadoc)
-        * 
-        * @see org.apache.ranger.audit.provider.AuditProvider#isFlushPending()
-        */
-       @Override
-       public boolean isFlushPending() {
-               if (queue.isEmpty()) {
-                       return consumer.isFlushPending();
-               }
-               return true;
+               consumerThread = null;
        }
 
        /*

http://git-wip-us.apache.org/repos/asf/incubator-ranger/blob/4f3cea22/agents-audit/src/main/java/org/apache/ranger/audit/queue/AuditBatchQueue.java
----------------------------------------------------------------------
diff --git 
a/agents-audit/src/main/java/org/apache/ranger/audit/queue/AuditBatchQueue.java 
b/agents-audit/src/main/java/org/apache/ranger/audit/queue/AuditBatchQueue.java
index 5e21efc..8ed07bd 100644
--- 
a/agents-audit/src/main/java/org/apache/ranger/audit/queue/AuditBatchQueue.java
+++ 
b/agents-audit/src/main/java/org/apache/ranger/audit/queue/AuditBatchQueue.java
@@ -29,10 +29,9 @@ import java.util.concurrent.TimeUnit;
 import org.apache.commons.logging.Log;
 import org.apache.commons.logging.LogFactory;
 import org.apache.ranger.audit.model.AuditEventBase;
-import org.apache.ranger.audit.provider.AuditProvider;
-import org.apache.ranger.audit.provider.BaseAuditProvider;
+import org.apache.ranger.audit.provider.AuditHandler;
 
-public class AuditBatchQueue extends BaseAuditProvider implements Runnable {
+public class AuditBatchQueue extends AuditQueue implements Runnable {
        private static final Log logger = 
LogFactory.getLog(AuditBatchQueue.class);
 
        private BlockingQueue<AuditEventBase> queue = null;
@@ -41,10 +40,7 @@ public class AuditBatchQueue extends BaseAuditProvider 
implements Runnable {
        Thread consumerThread = null;
        static int threadCount = 0;
 
-       public AuditBatchQueue() {
-       }
-
-       public AuditBatchQueue(AuditProvider consumer) {
+       public AuditBatchQueue(AuditHandler consumer) {
                super(consumer);
        }
 
@@ -59,7 +55,6 @@ public class AuditBatchQueue extends BaseAuditProvider 
implements Runnable {
        public boolean log(AuditEventBase event) {
                // Add to batchQueue. Block if full
                queue.add(event);
-               addLifeTimeInLogCount(1);
                return true;
        }
 
@@ -130,10 +125,10 @@ public class AuditBatchQueue extends BaseAuditProvider 
implements Runnable {
                        if (consumerThread != null) {
                                consumerThread.interrupt();
                        }
-                       consumerThread = null;
                } catch (Throwable t) {
                        // ignore any exception
                }
+               consumerThread = null;
        }
 
        /*
@@ -187,19 +182,6 @@ public class AuditBatchQueue extends BaseAuditProvider 
implements Runnable {
        /*
         * (non-Javadoc)
         * 
-        * @see org.apache.ranger.audit.provider.AuditProvider#isFlushPending()
-        */
-       @Override
-       public boolean isFlushPending() {
-               if (queue.isEmpty()) {
-                       return consumer.isFlushPending();
-               }
-               return true;
-       }
-
-       /*
-        * (non-Javadoc)
-        * 
         * @see org.apache.ranger.audit.provider.AuditProvider#flush()
         */
        @Override

http://git-wip-us.apache.org/repos/asf/incubator-ranger/blob/4f3cea22/agents-audit/src/main/java/org/apache/ranger/audit/queue/AuditFileSpool.java
----------------------------------------------------------------------
diff --git 
a/agents-audit/src/main/java/org/apache/ranger/audit/queue/AuditFileSpool.java 
b/agents-audit/src/main/java/org/apache/ranger/audit/queue/AuditFileSpool.java
index 66d1573..a1c32b9 100644
--- 
a/agents-audit/src/main/java/org/apache/ranger/audit/queue/AuditFileSpool.java
+++ 
b/agents-audit/src/main/java/org/apache/ranger/audit/queue/AuditFileSpool.java
@@ -35,13 +35,13 @@ import java.util.Iterator;
 import java.util.List;
 import java.util.Properties;
 import java.util.concurrent.BlockingQueue;
-import java.util.concurrent.LinkedTransferQueue;
+import java.util.concurrent.LinkedBlockingQueue;
 import java.util.concurrent.TimeUnit;
 
 import org.apache.commons.logging.Log;
 import org.apache.commons.logging.LogFactory;
 import org.apache.ranger.audit.model.AuditEventBase;
-import org.apache.ranger.audit.provider.AuditProvider;
+import org.apache.ranger.audit.provider.AuditHandler;
 import org.apache.ranger.audit.provider.MiscUtil;
 
 import com.google.gson.Gson;
@@ -69,10 +69,10 @@ public class AuditFileSpool implements Runnable {
        // "filespool.index.done_filename";
        public static final String PROP_FILE_SPOOL_DEST_RETRY_MS = 
"filespool.destination.retry.ms";
 
-       AuditProvider queueProvider = null;
-       AuditProvider consumerProvider = null;
+       AuditQueue queueProvider = null;
+       AuditHandler consumerProvider = null;
 
-       BlockingQueue<AuditIndexRecord> indexQueue = new 
LinkedTransferQueue<AuditIndexRecord>();
+       BlockingQueue<AuditIndexRecord> indexQueue = new 
LinkedBlockingQueue<AuditIndexRecord>();
 
        // Folder and File attributes
        File logFolder = null;
@@ -108,10 +108,10 @@ public class AuditFileSpool implements Runnable {
        boolean isDrain = false;
        boolean isDestDown = true;
 
-       private static Gson gson = null;
+       private Gson gson = null;
 
-       public AuditFileSpool(AuditProvider queueProvider,
-                       AuditProvider consumerProvider) {
+       public AuditFileSpool(AuditQueue queueProvider,
+                       AuditHandler consumerProvider) {
                this.queueProvider = queueProvider;
                this.consumerProvider = consumerProvider;
        }
@@ -120,12 +120,12 @@ public class AuditFileSpool implements Runnable {
                init(prop, null);
        }
 
-       public void init(Properties props, String basePropertyName) {
+       public boolean init(Properties props, String basePropertyName) {
                if (initDone) {
                        logger.error("init() called more than once. 
queueProvider="
                                        + queueProvider.getName() + ", 
consumerProvider="
                                        + consumerProvider.getName());
-                       return;
+                       return true;
                }
                String propPrefix = "xasecure.audit.filespool";
                if (basePropertyName != null) {
@@ -162,22 +162,22 @@ public class AuditFileSpool implements Runnable {
                                        + queueProvider.getName());
 
                        if (logFolderProp == null || logFolderProp.isEmpty()) {
-                               logger.error("Audit spool folder is not 
configured. Please set "
+                               logger.fatal("Audit spool folder is not 
configured. Please set "
                                                + propPrefix
                                                + "."
                                                + PROP_FILE_SPOOL_LOCAL_DIR
                                                + ". queueName=" + 
queueProvider.getName());
-                               return;
+                               return false;
                        }
                        logFolder = new File(logFolderProp);
                        if (!logFolder.isDirectory()) {
                                logFolder.mkdirs();
                                if (!logFolder.isDirectory()) {
-                                       logger.error("File Spool folder not 
found and can't be created. folder="
+                                       logger.fatal("File Spool folder not 
found and can't be created. folder="
                                                        + 
logFolder.getAbsolutePath()
                                                        + ", queueName="
                                                        + 
queueProvider.getName());
-                                       return;
+                                       return false;
                                }
                        }
                        logger.info("logFolder=" + logFolder + ", queueName="
@@ -202,7 +202,7 @@ public class AuditFileSpool implements Runnable {
                                                        + 
archiveFolder.getAbsolutePath()
                                                        + ", queueName="
                                                        + 
queueProvider.getName());
-                                       return;
+                                       return false;
                                }
                        }
                        logger.info("archiveFolder=" + archiveFolder + ", 
queueName="
@@ -218,17 +218,30 @@ public class AuditFileSpool implements Runnable {
 
                        indexFile = new File(logFolder, indexFileName);
                        if (!indexFile.exists()) {
-                               indexFile.createNewFile();
+                               boolean ret = indexFile.createNewFile();
+                               if (!ret) {
+                                       logger.fatal("Error creating index 
file. fileName="
+                                                       + 
indexDoneFile.getPath());
+                                       return false;
+                               }
                        }
                        logger.info("indexFile=" + indexFile + ", queueName="
                                        + queueProvider.getName());
 
                        int lastDot = indexFileName.lastIndexOf('.');
+                       if (lastDot < 0) {
+                               lastDot = indexFileName.length() - 1;
+                       }
                        indexDoneFileName = indexFileName.substring(0, lastDot)
                                        + "_closed.json";
                        indexDoneFile = new File(logFolder, indexDoneFileName);
                        if (!indexDoneFile.exists()) {
-                               indexDoneFile.createNewFile();
+                               boolean ret = indexDoneFile.createNewFile();
+                               if (!ret) {
+                                       logger.fatal("Error creating index done 
file. fileName="
+                                                       + 
indexDoneFile.getPath());
+                                       return false;
+                               }
                        }
                        logger.info("indexDoneFile=" + indexDoneFile + ", 
queueName="
                                        + queueProvider.getName());
@@ -252,8 +265,6 @@ public class AuditFileSpool implements Runnable {
                                }
                        }
                        printIndex();
-                       // One more loop to add the rest of the pending records 
in reverse
-                       // order
                        for (int i = 0; i < indexRecords.size(); i++) {
                                AuditIndexRecord auditIndexRecord = 
indexRecords.get(i);
                                if 
(auditIndexRecord.status.equals(SPOOL_FILE_STATUS.pending)) {
@@ -261,18 +272,19 @@ public class AuditFileSpool implements Runnable {
                                        if (!consumerFile.exists()) {
                                                logger.error("INIT: Consumer 
file="
                                                                + 
consumerFile.getPath() + " not found.");
-                                               System.exit(1);
+                                       } else {
+                                               
indexQueue.add(auditIndexRecord);
                                        }
-                                       indexQueue.add(auditIndexRecord);
                                }
                        }
 
                } catch (Throwable t) {
                        logger.fatal("Error initializing File Spooler. queue="
                                        + queueProvider.getName(), t);
-                       return;
+                       return false;
                }
                initDone = true;
+               return true;
        }
 
        /**
@@ -328,6 +340,7 @@ public class AuditFileSpool implements Runnable {
 
                                        out.flush();
                                        out.close();
+                                       break;
                                } catch (Throwable t) {
                                        logger.debug("Error closing spool out 
file.", t);
                                }

http://git-wip-us.apache.org/repos/asf/incubator-ranger/blob/4f3cea22/agents-audit/src/main/java/org/apache/ranger/audit/queue/AuditQueue.java
----------------------------------------------------------------------
diff --git 
a/agents-audit/src/main/java/org/apache/ranger/audit/queue/AuditQueue.java 
b/agents-audit/src/main/java/org/apache/ranger/audit/queue/AuditQueue.java
new file mode 100644
index 0000000..4c3ac5f
--- /dev/null
+++ b/agents-audit/src/main/java/org/apache/ranger/audit/queue/AuditQueue.java
@@ -0,0 +1,174 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ * 
+ * http://www.apache.org/licenses/LICENSE-2.0
+ * 
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.ranger.audit.queue;
+
+import java.util.Properties;
+
+import org.apache.commons.logging.Log;
+import org.apache.commons.logging.LogFactory;
+import org.apache.ranger.audit.provider.AuditHandler;
+import org.apache.ranger.audit.provider.BaseAuditHandler;
+import org.apache.ranger.audit.provider.MiscUtil;
+
+public abstract class AuditQueue extends BaseAuditHandler {
+       private static final Log LOG = LogFactory.getLog(AuditQueue.class);
+
+       public static final int AUDIT_MAX_QUEUE_SIZE_DEFAULT = 1024 * 1024;
+       public static final int AUDIT_BATCH_INTERVAL_DEFAULT_MS = 1000;
+       public static final int AUDIT_BATCH_SIZE_DEFAULT = 1000;
+
+       private int maxQueueSize = AUDIT_MAX_QUEUE_SIZE_DEFAULT;
+       private int maxBatchInterval = AUDIT_BATCH_INTERVAL_DEFAULT_MS;
+       private int maxBatchSize = AUDIT_BATCH_SIZE_DEFAULT;
+
+       public static final String PROP_QUEUE = "queue";
+
+       public static final String PROP_BATCH_SIZE = "batch.size";
+       public static final String PROP_QUEUE_SIZE = "queue.size";
+       public static final String PROP_BATCH_INTERVAL = "batch.interval.ms";
+
+       public static final String PROP_FILE_SPOOL_ENABLE = "filespool.enable";
+       public static final String PROP_FILE_SPOOL_WAIT_FOR_FULL_DRAIN = 
"filespool.drain.full.wait.ms";
+       public static final String PROP_FILE_SPOOL_QUEUE_THRESHOLD = 
"filespool.drain.threshold.percent";
+
+       final protected AuditHandler consumer;
+       protected AuditFileSpool fileSpooler = null;
+
+       private boolean isDrain = false;
+
+       protected boolean fileSpoolerEnabled = false;
+       protected int fileSpoolMaxWaitTime = 5 * 60 * 1000; // Default 5 minutes
+       protected int fileSpoolDrainThresholdPercent = 80;
+
+       /**
+        * @param consumer
+        */
+       public AuditQueue(AuditHandler consumer) {
+               this.consumer = consumer;
+       }
+
+       @Override
+       public void init(Properties props, String basePropertyName) {
+               LOG.info("BaseAuditProvider.init()");
+               super.init(props, basePropertyName);
+
+               setMaxBatchSize(MiscUtil.getIntProperty(props, propPrefix + "."
+                               + PROP_BATCH_SIZE, getMaxBatchSize()));
+               setMaxQueueSize(MiscUtil.getIntProperty(props, propPrefix + "."
+                               + PROP_QUEUE_SIZE, getMaxQueueSize()));
+               setMaxBatchInterval(MiscUtil.getIntProperty(props, propPrefix + 
"."
+                               + PROP_BATCH_INTERVAL, getMaxBatchInterval()));
+
+               fileSpoolerEnabled = MiscUtil.getBooleanProperty(props, 
propPrefix
+                               + "." + PROP_FILE_SPOOL_ENABLE, false);
+               String logFolderProp = MiscUtil.getStringProperty(props, 
propPrefix
+                               + "." + 
AuditFileSpool.PROP_FILE_SPOOL_LOCAL_DIR);
+               if (fileSpoolerEnabled || logFolderProp != null) {
+                       LOG.info("File spool is enabled for " + getName()
+                                       + ", logFolderProp=" + logFolderProp + 
", " + propPrefix
+                                       + "." + 
AuditFileSpool.PROP_FILE_SPOOL_LOCAL_DIR + "="
+                                       + fileSpoolerEnabled);
+                       fileSpoolerEnabled = true;
+                       fileSpoolMaxWaitTime = MiscUtil.getIntProperty(props, 
propPrefix
+                                       + "." + 
PROP_FILE_SPOOL_WAIT_FOR_FULL_DRAIN,
+                                       fileSpoolMaxWaitTime);
+                       fileSpoolDrainThresholdPercent = 
MiscUtil.getIntProperty(props,
+                                       propPrefix + "." + 
PROP_FILE_SPOOL_QUEUE_THRESHOLD,
+                                       fileSpoolDrainThresholdPercent);
+                       fileSpooler = new AuditFileSpool(this, consumer);
+                       if (!fileSpooler.init(props, basePropertyName)) {
+                               fileSpoolerEnabled = false;
+                               LOG.fatal("Couldn't initialize file spooler. 
Disabling it. queue="
+                                               + getName() + ", consumer=" + 
consumer.getName());
+                       }
+               } else {
+                       LOG.info("File spool is disabled for " + getName());
+               }
+
+       }
+
+       public AuditHandler getConsumer() {
+               return consumer;
+       }
+
+       public boolean isDrain() {
+               return isDrain;
+       }
+
+       public void setDrain(boolean isDrain) {
+               this.isDrain = isDrain;
+       }
+
+       public int getMaxQueueSize() {
+               return maxQueueSize;
+       }
+
+       public void setMaxQueueSize(int maxQueueSize) {
+               this.maxQueueSize = maxQueueSize;
+       }
+
+       public int getMaxBatchInterval() {
+               return maxBatchInterval;
+       }
+
+       public void setMaxBatchInterval(int maxBatchInterval) {
+               this.maxBatchInterval = maxBatchInterval;
+       }
+
+       public int getMaxBatchSize() {
+               return maxBatchSize;
+       }
+
+       public void setMaxBatchSize(int maxBatchSize) {
+               this.maxBatchSize = maxBatchSize;
+       }
+
+       /*
+        * (non-Javadoc)
+        * 
+        * @see org.apache.ranger.audit.provider.AuditProvider#waitToComplete()
+        */
+       @Override
+       public void waitToComplete() {
+               if (consumer != null) {
+                       consumer.waitToComplete(-1);
+               }
+       }
+
+       @Override
+       public void waitToComplete(long timeout) {
+               if (consumer != null) {
+                       consumer.waitToComplete(timeout);
+               }
+       }
+
+       /*
+        * (non-Javadoc)
+        * 
+        * @see org.apache.ranger.audit.provider.AuditProvider#flush()
+        */
+       @Override
+       public void flush() {
+               if (consumer != null) {
+                       consumer.flush();
+               }
+       }
+
+}

Reply via email to